[HADOOP] StreamExecutionEnvironment를 사용하여 S3 싱크를 사용하여 S3에 쓸 수 없음-Apache Flink 1.1.4
HADOOPStreamExecutionEnvironment를 사용하여 S3 싱크를 사용하여 S3에 쓸 수 없음-Apache Flink 1.1.4
Kafka 주제에서 데이터를 읽고 해당 데이터를 S3 버킷에 쓰는 간단한 Apache Flink 프로젝트를 만들었습니다. 프로젝트를 실행할 때 오류가 발생하지 않으며 Kafka 주제에서 각 메시지를 성공적으로 읽지 만 S3 버킷에는 아무것도 기록되지 않습니다. 오류가 없으므로 진행중인 작업을 시도하고 디버그하기가 어렵습니다. 아래는 내 프로젝트와 구성입니다. 이것은 StreamExecutionEnviornment를 사용하는 경우에만 발생합니다. 정규 배치 ExecutionEnviornment를 사용하여 S3로 생산하려고하면 작동합니다.
S3 테스트 자바 프로그램
public class S3Test {
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
if(parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); //make parameters available in the web interface
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<String>(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
// write kafka stream to standard out.
//messageStream.print();
String id = UUID.randomUUID().toString();
messageStream.writeAsText("s3://flink-data/" + id + ".txt").setParallelism(1);
env.execute("Write to S3 Example");
}
}
pom.khml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.5</version>
</dependency>
<!-- Apache Kafka Dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
core-site.xml (Hadoop 구성)
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>***************</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>****************</value>
</property>
</configuration>
해결법
-
==============================
1.Flink를 통해 Kafka 주제에서 S3로 지속하려면 RollingSink를 사용해야합니다. RollingSink는 Bucketer를 사용하여 부품 파일이 저장 될 디렉토리의 이름을 지정합니다. DateTime은 기본 Bucketer이지만 사용자 정의 Bucketer도 만들 수 있습니다. 최대 배치 크기에 도달 할 때마다 부품 파일이 저장되고 닫히고 새 부품 파일이 작성됩니다. 아래 코드는 작동합니다.
Flink를 통해 Kafka 주제에서 S3로 지속하려면 RollingSink를 사용해야합니다. RollingSink는 Bucketer를 사용하여 부품 파일이 저장 될 디렉토리의 이름을 지정합니다. DateTime은 기본 Bucketer이지만 사용자 정의 Bucketer도 만들 수 있습니다. 최대 배치 크기에 도달 할 때마다 부품 파일이 저장되고 닫히고 새 부품 파일이 작성됩니다. 아래 코드는 작동합니다.
public class TestRollingSink { public static void main(String[] args){ Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/config.yaml"); final ParameterTool parameterTool = ParameterTool.fromMap(configs); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setGlobalJobParameters(parameterTool); env.socketTextStream("localhost", 9092); DataStream<String> parsed = env .addSource(new FlinkKafkaConsumer09<String>( parameterTool.getRequired("kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties())); env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE); RollingSink<String> sink = new RollingSink<String>("s3://flink-test/"+"TEST"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); sink.setWriter(new StringWriter<String>()); sink.setBatchSize(200); sink.setPendingPrefix("file-"); sink.setPendingSuffix(".txt"); parsed.print(); parsed.addSink(sink).setParallelism(1); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }
}
-
==============================
2.IAM 권한-S3 버킷에 쓰려고하는 역할을 확인하십시오.
IAM 권한-S3 버킷에 쓰려고하는 역할을 확인하십시오.
-
==============================
3.디버그 정보를 얻는 데 도움이되는 간단한 방법은 kafka 데이터를 수신해야하는 s3 버킷에 대한 로깅을 설정하는 것입니다. 이것은 s3 관점에서 오류의 원인을 결정하는 데 도움이되는 추가 정보를 제공합니다.
디버그 정보를 얻는 데 도움이되는 간단한 방법은 kafka 데이터를 수신해야하는 s3 버킷에 대한 로깅을 설정하는 것입니다. 이것은 s3 관점에서 오류의 원인을 결정하는 데 도움이되는 추가 정보를 제공합니다.
http://docs.aws.amazon.com/AmazonS3/latest/UG/ManagingBucketLogging.html
from https://stackoverflow.com/questions/41473343/unable-to-write-to-s3-using-s3-sink-using-streamexecutionenvironment-apache-fl by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] HBase completebulkload가 예외를 반환 함 (0) | 2019.09.16 |
---|---|
[HADOOP] pydoop없이 HDFS Python의 모든 파일 나열 (0) | 2019.09.16 |
[HADOOP] HRegionServer는“마스터에게 가동 중임을 알리는 오류”를 표시합니다. 소켓 예외 표시 : 잘못된 인수 (0) | 2019.09.16 |
[HADOOP] 스파크 작업 오류 GC 오버 헤드 한도를 초과 함 [중복] (0) | 2019.09.16 |
[HADOOP] Hadoop 파일의 분산 캐싱에서 예외를 찾을 수 없음 (0) | 2019.09.16 |