복붙노트

[HADOOP] StreamExecutionEnvironment를 사용하여 S3 싱크를 사용하여 S3에 쓸 수 없음-Apache Flink 1.1.4

HADOOP

StreamExecutionEnvironment를 사용하여 S3 싱크를 사용하여 S3에 쓸 수 없음-Apache Flink 1.1.4

Kafka 주제에서 데이터를 읽고 해당 데이터를 S3 버킷에 쓰는 간단한 Apache Flink 프로젝트를 만들었습니다. 프로젝트를 실행할 때 오류가 발생하지 않으며 Kafka 주제에서 각 메시지를 성공적으로 읽지 만 S3 버킷에는 아무것도 기록되지 않습니다. 오류가 없으므로 진행중인 작업을 시도하고 디버그하기가 어렵습니다. 아래는 내 프로젝트와 구성입니다. 이것은 StreamExecutionEnviornment를 사용하는 경우에만 발생합니다. 정규 배치 ExecutionEnviornment를 사용하여 S3로 생산하려고하면 작동합니다.

S3 테스트 자바 프로그램

public class S3Test {

public static void main(String[] args) throws Exception {
    // parse input arguments
    final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);

    if(parameterTool.getNumberOfParameters() < 4) {
        System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
                "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
        return;
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.getConfig().setGlobalJobParameters(parameterTool); //make parameters available in the web interface

    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer09<String>(
                    parameterTool.getRequired("kafka.topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));


    // write kafka stream to standard out.
    //messageStream.print();
    String id = UUID.randomUUID().toString();
    messageStream.writeAsText("s3://flink-data/" + id + ".txt").setParallelism(1);

    env.execute("Write to S3 Example");
}
}

pom.khml

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.7.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.7.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.2.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.2.5</version>
    </dependency>

    <!-- Apache Kafka Dependencies -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

</dependencies>

core-site.xml (Hadoop 구성)

<configuration>
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
</property>

<property>
   <name>fs.s3.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
 large results prior to transmitting them to S3. -->
<property>
  <name>fs.s3a.buffer.dir</name>
  <value>/tmp</value>
</property>

<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
    <name>fs.s3a.access.key</name>
    <value>***************</value>
</property>

<!-- set your AWS access key -->
<property>
    <name>fs.s3a.secret.key</name>
    <value>****************</value>
</property>

</configuration>

해결법

  1. ==============================

    1.Flink를 통해 Kafka 주제에서 S3로 지속하려면 RollingSink를 사용해야합니다. RollingSink는 Bucketer를 사용하여 부품 파일이 저장 될 디렉토리의 이름을 지정합니다. DateTime은 기본 Bucketer이지만 사용자 정의 Bucketer도 만들 수 있습니다. 최대 배치 크기에 도달 할 때마다 부품 파일이 저장되고 닫히고 새 부품 파일이 작성됩니다. 아래 코드는 작동합니다.

    Flink를 통해 Kafka 주제에서 S3로 지속하려면 RollingSink를 사용해야합니다. RollingSink는 Bucketer를 사용하여 부품 파일이 저장 될 디렉토리의 이름을 지정합니다. DateTime은 기본 Bucketer이지만 사용자 정의 Bucketer도 만들 수 있습니다. 최대 배치 크기에 도달 할 때마다 부품 파일이 저장되고 닫히고 새 부품 파일이 작성됩니다. 아래 코드는 작동합니다.

    public class TestRollingSink {
    
        public static void main(String[] args){
            Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/config.yaml");
    
        final ParameterTool parameterTool = ParameterTool.fromMap(configs);
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        env.getConfig().disableSysoutLogging();
        env.getConfig().setGlobalJobParameters(parameterTool);
        env.socketTextStream("localhost", 9092);
    
        DataStream<String> parsed = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired("kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()));
    
        env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE);
    
        RollingSink<String> sink = new RollingSink<String>("s3://flink-test/"+"TEST");
        sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
        sink.setWriter(new StringWriter<String>());
        sink.setBatchSize(200);
        sink.setPendingPrefix("file-");
        sink.setPendingSuffix(".txt");
        parsed.print();
        parsed.addSink(sink).setParallelism(1);
    
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    }

  2. ==============================

    2.IAM 권한-S3 버킷에 쓰려고하는 역할을 확인하십시오.

    IAM 권한-S3 버킷에 쓰려고하는 역할을 확인하십시오.

  3. ==============================

    3.디버그 정보를 얻는 데 도움이되는 간단한 방법은 kafka 데이터를 수신해야하는 s3 버킷에 대한 로깅을 설정하는 것입니다. 이것은 s3 관점에서 오류의 원인을 결정하는 데 도움이되는 추가 정보를 제공합니다.

    디버그 정보를 얻는 데 도움이되는 간단한 방법은 kafka 데이터를 수신해야하는 s3 버킷에 대한 로깅을 설정하는 것입니다. 이것은 s3 관점에서 오류의 원인을 결정하는 데 도움이되는 추가 정보를 제공합니다.

    http://docs.aws.amazon.com/AmazonS3/latest/UG/ManagingBucketLogging.html

  4. from https://stackoverflow.com/questions/41473343/unable-to-write-to-s3-using-s3-sink-using-streamexecutionenvironment-apache-fl by cc-by-sa and MIT license