복붙노트

[SCALA] 독립 RDD의 병렬로 여러 개의 파일을 처리

SCALA

독립 RDD의 병렬로 여러 개의 파일을 처리

나는하여 그룹을 포함하여 작업의 특정 번호가 작은 (~ 300메가바이트 각) 파일의 수에 적용 할 수있는 시나리오가 있습니다. 동작은 다음과 같습니다 ..

df.groupBy (....). AGG (....)

이제 여러 파일에 그것을 처리하기 위해, 나는 작업에 단일 RDD 및 파티션을 생성하는 와일드 카드 그러나 "/**/*.csv"를 사용할 수 있습니다. 그러나, 운영보고, 그것은에 의해 그룹이며, 파일이 상호 배타적 인 경우 불필요 셔플을 많이 포함한다.

무엇을, 나는에 있습니다 내가 파일에 독립적 인 RDD 년대를 생성하고 독립적으로 작동 할 수있는 방법을 찾고 있어요.

해결법

  1. ==============================

    1.그것은 더 전체 솔루션보다 생각 내가 아직 테스트하지 않았습니다.

    그것은 더 전체 솔루션보다 생각 내가 아직 테스트하지 않았습니다.

    당신은 함수로 데이터 처리 파이프 라인을 추출하여 시작할 수 있습니다.

    def pipeline(f: String, n: Int) = {
        sqlContext
            .read
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .load(f)
            .repartition(n)
            .groupBy(...)
            .agg(...)
            .cache // Cache so we can force computation later
    }
    

    파일이 작은 경우에 당신은 하나의 파일에서 데이터를 맞고 셔플을 피하기 위해 가능한 파티션의 작은 번호를 사용하는 n 매개 변수를 조정할 수 있습니다. 그것은 당신이 동시성을 제한하고 있지만, 우리가 나중에 다시이 문제에 얻을 것이다 의미한다.

    val n: Int = ??? 
    

    다음으로 입력 파일의 목록을 얻을 수 있습니다. 이 단계는 데이터 소스에 따라 다르지만 대부분의 시간은 다소 간단하다 :

    val files: Array[String] = ???
    

    다음으로 파이프 라인 기능을 사용하여 목록 위에 매핑 할 수 있습니다 :

    val rdds = files.map(f => pipeline(f, n))
    

    우리는 하나의 파일 수준에서 동시성을 제한하기 때문에 우리는 여러 작업을 제출하여 보상합니다. 평가를 강제로 미래로 래핑하는 간단한 도우미를 추가 할 수 있습니다

    import scala.concurrent._
    import ExecutionContext.Implicits.global
    
    def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
        df.rdd.foreach(_ => ()) // Force computation
        df
    }
    

    마지막으로 우리는 rdds에 도우미 위에 사용할 수 있습니다 :

    val result = Future.sequence(
       rdds.map(rdd => pipelineToFuture(rdd)).toList
    )
    

    귀하의 요구 사항에 따라 콜백을 onComplete를 추가하거나 결과를 수집하는 반응 스트림을 사용할 수 있습니다.

  2. ==============================

    2.당신이 많은 파일을 가지고 있고, 각 파일이 작은 경우, 각 레코드는 전체 파일 인 RDD을 생성합니다 SparkContext.wholeTextFiles를 사용하여 시도해 볼 수도 있습니다 (당신은 내가 점화를위한 작은 계산 것이다 위 3백메가바이트 말).

    당신이 많은 파일을 가지고 있고, 각 파일이 작은 경우, 각 레코드는 전체 파일 인 RDD을 생성합니다 SparkContext.wholeTextFiles를 사용하여 시도해 볼 수도 있습니다 (당신은 내가 점화를위한 작은 계산 것이다 위 3백메가바이트 말).

  3. ==============================

    3.이 방법으로 우리는 평행 여러 RDD을 쓸 수 있습니다

    이 방법으로 우리는 평행 여러 RDD을 쓸 수 있습니다

    public class ParallelWriteSevice implements IApplicationEventListener {
    
        private static final IprogramLogger logger = programLoggerFactory.getLogger(ParallelWriteSevice.class);
    
        private static ExecutorService executorService=null;
        private static List<Future<Boolean>> futures=new ArrayList<Future<Boolean>>();
    
        public static void submit(Callable callable) {
            if(executorService==null)
            {
                executorService=Executors.newFixedThreadPool(15);//Based on target tables increase this
            }
    
            futures.add(executorService.submit(callable));
        }
    
        public static boolean isWriteSucess() {
            boolean writeFailureOccured = false;
            try {
                for (Future<Boolean> future : futures) {
                    try {
                        Boolean writeStatus = future.get();
                        if (writeStatus == false) {
                            writeFailureOccured = true;
                        }
                    } catch (Exception e) {
                        logger.error("Erorr - Scdeduled write failed " + e.getMessage(), e);
                        writeFailureOccured = true;
                    }
                }
            } finally {
                resetFutures();         
                  if (executorService != null) 
                      executorService.shutdown();
                  executorService = null;
    
            }
            return !writeFailureOccured;
        }
    
        private static void resetFutures() {
                logger.error("resetFutures called");
                //futures.clear();
        }
    
    
    
    
    }
    
  4. from https://stackoverflow.com/questions/31912858/processing-multiple-files-as-independent-rdds-in-parallel by cc-by-sa and MIT license