복붙노트

[HADOOP] 스파크 스트리밍 : 마이크로 배치 병렬 실행

HADOOP

스파크 스트리밍 : 마이크로 배치 병렬 실행

우리는 카프카에서 스파크 스트리밍으로 데이터를 받고 있습니다. Spark Streaming에서 실행이 시작된 후에는 하나의 배치 만 실행되고 나머지 배치는 ​​Kafka에서 대기열에 들어가기 시작합니다.

여러 실행 프로그램, 코어, 역 압력 및 기타 구성을 사용하여 여러 구성을 시도했지만 지금까지 아무 것도 작동하지 않았습니다. 대기중인 메시지가 많으며 한 번에 하나의 마이크로 배치 만 처리되고 나머지는 대기열에 남아 있습니다.

우리는 최대한의 병렬 처리를 원합니다. 따라서 충분한 리소스를 사용할 수 있으므로 마이크로 배치가 대기열에 저장되지 않습니다. 그렇다면 리소스를 최대한 활용하여 시간을 줄일 수 있습니다.

// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
        getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
                sparkServiceConf.getKafkaConsumeParams()));

ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());

JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {
    private static final long serialVersionUID = 1L;
    @Override
    public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception {
        return kafkaRecord.value();
    }
});

    // Decode each binary message and generate JSON array
        JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String call(byte[] asn1Data) throws Exception {
                if(asn1Data.length > 0) {
                    try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
                            Writer writer = new StringWriter(); ) {


                        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
                        GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

                        byte[] buffer = new byte[1024];
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                        int len;
                        while((len = gzipInputStream.read(buffer)) != -1) {
                            byteArrayOutputStream.write(buffer, 0, len);
                        }


                        return new String(byteArrayOutputStream.toByteArray());


                    } catch (Exception e) {
//                      
                        producer.flush();

                        throw e;
                    }
                } 

                return null;
            }
        });




// publish generated json gzip to kafka 
        cache.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
                //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
                if(!jsonRdd4DF.isEmpty()) {
                    //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
                    Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);   

                    SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
                    airMainJsonProcessor.processAIRData(json, sparkSession);
                }

            }               
        });

        getJavaStreamingContext().start();
        getJavaStreamingContext().awaitTermination();
        getJavaStreamingContext().stop();

우리가 사용하고있는 기술 :

HDFS  2.7.1.2.5 
YARN + MapReduce2  2.7.1.2.5 
ZooKeeper  3.4.6.2.5 
Ambari Infra  0.1.0 
Ambari Metrics  0.1.0 
Kafka  0.10.0.2.5 
Knox  0.9.0.2.5 
Ranger  0.6.0.2.5 
Ranger KMS  0.6.0.2.5 
SmartSense  1.3.0.0-1
Spark2  2.0.x.2.5 

차이 실험에서 얻은 통계 :

num_executors=6
executor_memory=8g
executor_cores=12

100 파일 처리 시간 48 분

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 파일 처리 시간 8 분

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 파일 처리 시간 7 분

spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12

100 파일 처리 시간 10 분

우리가 대기열에 들어갈 수있는 최대량을 어떻게 처리 할 수 ​​있는지 조언 해주십시오.

해결법

  1. ==============================

    1.나는 똑같은 문제에 직면 해 있었고 그 문제를 해결하기 위해 몇 가지 시도를했으며 다음과 같은 결과를 얻었다.

    나는 똑같은 문제에 직면 해 있었고 그 문제를 해결하기 위해 몇 가지 시도를했으며 다음과 같은 결과를 얻었다.

    가장 먼저. Intuition은 Executor마다 하나의 일괄 처리를 처리해야하지만 한 번에 하나의 일괄 처리 만 처리되지만 작업과 작업은 병렬 처리됩니다.

    spark.streaming.concurrentjobs를 사용하면 여러 배치 처리를 수행 할 수 있지만 문서화되지 않았으며 몇 가지 수정 사항이 필요합니다. 문제 중 하나는 카프카 오프셋을 저장하는 것입니다. 이 매개 변수를 4와 4 개의 배치를 병렬로 처리한다고 가정합니다. 3 번째 배치가 4 번째 전에 완료되면 카프카 오프셋이 커밋됩니다. 이 매개 변수는 일괄 처리가 독립적 인 경우 매우 유용합니다.

    spark.default.parallelism은 때때로 이름을 평행하게 만드는 것으로 간주되기 때문입니다. 그러나 분산 된 셔플 작업의 이점은 그럴 것입니다. 다른 번호를 시도하고 이에 대한 최적의 번호를 찾으십시오. 처리 시간에는 상당한 차이가 있습니다. 그것은 당신의 작업에서 셔플 작업에 따라 다릅니다. 너무 높게 설정하면 성능이 저하됩니다. 실험 결과에서도 분명합니다.

    또 다른 옵션은 RDD에서 foreach 대신 foreachPartitionAsync를 사용하는 것입니다. 하지만 foreachPartitionAsync는 작업을 대기열에 넣었을 때 foreachPartition을 사용하는 것이 좋지만 일괄 처리는 처리되지만 작업은 여전히 ​​대기열에 있거나 처리 중일 것이라고 생각합니다. 나는 그 사용법을 제대로하지 못했을 수도 있습니다. 그러나 그것은 나의 3 가지 서비스에서 똑같이 행동했다.

    FAIR spark.scheduler.mode는 작업에 라운드 로빈 (round-robin) 작업 할당과 같이 많은 작업이있는 작업에 사용해야하며 큰 작업이 처리되는 동안 작은 작업으로 자원을 받기 시작할 수있는 기회를 제공합니다.

    배치 지속 시간 + 입력 크기를 조정하고 항상 처리 기간 아래로 유지하십시오. 그렇지 않으면 배치의 긴 백 로그가 표시됩니다.

    이것들은 나의 발견과 제안이다. 그러나 스트리밍을하기위한 설정과 방법이 너무나 많아서 종종 한 세트의 작업 만 다른 사람들을 위해 작동하지 않는다. Spark Streaming은 학습에 대한 모든 것입니다. 경험과 기대를 하나로 모아 최적의 구성으로 설정하십시오.

    희망이 도움이됩니다. 누군가가 합법적으로 일괄 처리 할 수있는 방법을 구체적으로 말할 수 있다면 큰 도움이 될 것입니다.

  2. ==============================

    2.이것이 스트림 처리에 관한 것입니다. 데이터를받은 순서대로 처리합니다. 도착한 것보다 느린 속도로 데이터를 처리하면 대기열에 들어갑니다. 또한 한 레코드의 처리가 여러 노드에서 갑자기 병렬화 될 것으로 예상하지 마십시오.

    이것이 스트림 처리에 관한 것입니다. 데이터를받은 순서대로 처리합니다. 도착한 것보다 느린 속도로 데이터를 처리하면 대기열에 들어갑니다. 또한 한 레코드의 처리가 여러 노드에서 갑자기 병렬화 될 것으로 예상하지 마십시오.

    스크린 샷을 보면 일괄 처리 시간이 10 초이고 제작자가 90 초 동안 100 개의 레코드를 게시 한 것으로 보입니다.

    17 레코드를 처리하기 위해 2 레코드와 70 레코드를 처리하는 데 36 초가 걸렸습니다. 분명히, 배치 당 오버 헤드가 있습니다. 이 종속성이 선형 적이면 하나의 미니 배치에 100 개의 모든 레코드를 처리하는 데 4 분의 1 초 밖에 걸리지 않으므로 레코드 소유자를 이기게됩니다.

    코드가 완전하지 않으므로 정확한 시간이 정확히 무엇인지 알기가 어렵습니다. 코드의 변환은 잘 보이지만 액션 (또는 후속 변환)이 실제 병목 현상 일 수 있습니다. 또한, 코드에서 언급되지 않은 producer.flush ()는 무엇입니까?

  3. ==============================

    3.나는 같은 문제에 직면했고 Scala Futures를 사용하여 해결했습니다.

    나는 같은 문제에 직면했고 Scala Futures를 사용하여 해결했습니다.

    다음은 링크를 사용하는 방법을 보여주는 링크입니다.

    또한 Scala Futures를 사용할 때 이것은 다음과 같은 코드입니다.

    messages.foreachRDD{ rdd =>
      val f = Future {
      //        sleep(100)
        val newRDD = rdd.map{message => 
                               val req_message = message.value()  
                               (message.value())
                            }
    
        println("Request messages: " + newRDD.count())         
        var resultrows = newRDD.collect()//.collectAsList() 
        processMessage(resultrows, mlFeatures: MLFeatures, conf)          
        println("Inside scala future")
        1          
      }
      f.onComplete {
        case Success(messages) => println("yay!")
        case Failure(exception) => println("On no!")
      }  
    }
    
  4. ==============================

    4.모든 세부 사항을 알지 못하더라도 말할 수 없지만 그런 문제를 해결하기위한 일반적인 조언은 매우 간단한 응용 프로그램 인 "Hello world"와 비슷합니다. 입력 스트림을 읽고 로그 파일에 데이터를 인쇄하면됩니다. 이 기능이 작동하면 문제가 애플리케이션에 있음을 증명하고 점차적으로 기능을 추가하여 범인을 찾습니다. 가장 간단한 앱이라도 작동하지 않는다면 구성이나 스파크 클러스터 자체의 문제를 알 수 있습니다. 희망이 도움이됩니다.

    모든 세부 사항을 알지 못하더라도 말할 수 없지만 그런 문제를 해결하기위한 일반적인 조언은 매우 간단한 응용 프로그램 인 "Hello world"와 비슷합니다. 입력 스트림을 읽고 로그 파일에 데이터를 인쇄하면됩니다. 이 기능이 작동하면 문제가 애플리케이션에 있음을 증명하고 점차적으로 기능을 추가하여 범인을 찾습니다. 가장 간단한 앱이라도 작동하지 않는다면 구성이나 스파크 클러스터 자체의 문제를 알 수 있습니다. 희망이 도움이됩니다.

  5. from https://stackoverflow.com/questions/45084775/spark-streaming-micro-batches-parallel-execution by cc-by-sa and MIT license