[SCALA] 독립 RDD의 병렬로 여러 개의 파일을 처리
SCALA독립 RDD의 병렬로 여러 개의 파일을 처리
나는하여 그룹을 포함하여 작업의 특정 번호가 작은 (~ 300메가바이트 각) 파일의 수에 적용 할 수있는 시나리오가 있습니다. 동작은 다음과 같습니다 ..
df.groupBy (....). AGG (....)
이제 여러 파일에 그것을 처리하기 위해, 나는 작업에 단일 RDD 및 파티션을 생성하는 와일드 카드 그러나 "/**/*.csv"를 사용할 수 있습니다. 그러나, 운영보고, 그것은에 의해 그룹이며, 파일이 상호 배타적 인 경우 불필요 셔플을 많이 포함한다.
무엇을, 나는에 있습니다 내가 파일에 독립적 인 RDD 년대를 생성하고 독립적으로 작동 할 수있는 방법을 찾고 있어요.
해결법
-
==============================
1.그것은 더 전체 솔루션보다 생각 내가 아직 테스트하지 않았습니다.
그것은 더 전체 솔루션보다 생각 내가 아직 테스트하지 않았습니다.
당신은 함수로 데이터 처리 파이프 라인을 추출하여 시작할 수 있습니다.
def pipeline(f: String, n: Int) = { sqlContext .read .format("com.databricks.spark.csv") .option("header", "true") .load(f) .repartition(n) .groupBy(...) .agg(...) .cache // Cache so we can force computation later }
파일이 작은 경우에 당신은 하나의 파일에서 데이터를 맞고 셔플을 피하기 위해 가능한 파티션의 작은 번호를 사용하는 n 매개 변수를 조정할 수 있습니다. 그것은 당신이 동시성을 제한하고 있지만, 우리가 나중에 다시이 문제에 얻을 것이다 의미한다.
val n: Int = ???
다음으로 입력 파일의 목록을 얻을 수 있습니다. 이 단계는 데이터 소스에 따라 다르지만 대부분의 시간은 다소 간단하다 :
val files: Array[String] = ???
다음으로 파이프 라인 기능을 사용하여 목록 위에 매핑 할 수 있습니다 :
val rdds = files.map(f => pipeline(f, n))
우리는 하나의 파일 수준에서 동시성을 제한하기 때문에 우리는 여러 작업을 제출하여 보상합니다. 평가를 강제로 미래로 래핑하는 간단한 도우미를 추가 할 수 있습니다
import scala.concurrent._ import ExecutionContext.Implicits.global def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future { df.rdd.foreach(_ => ()) // Force computation df }
마지막으로 우리는 rdds에 도우미 위에 사용할 수 있습니다 :
val result = Future.sequence( rdds.map(rdd => pipelineToFuture(rdd)).toList )
귀하의 요구 사항에 따라 콜백을 onComplete를 추가하거나 결과를 수집하는 반응 스트림을 사용할 수 있습니다.
-
==============================
2.당신이 많은 파일을 가지고 있고, 각 파일이 작은 경우, 각 레코드는 전체 파일 인 RDD을 생성합니다 SparkContext.wholeTextFiles를 사용하여 시도해 볼 수도 있습니다 (당신은 내가 점화를위한 작은 계산 것이다 위 3백메가바이트 말).
당신이 많은 파일을 가지고 있고, 각 파일이 작은 경우, 각 레코드는 전체 파일 인 RDD을 생성합니다 SparkContext.wholeTextFiles를 사용하여 시도해 볼 수도 있습니다 (당신은 내가 점화를위한 작은 계산 것이다 위 3백메가바이트 말).
-
==============================
3.이 방법으로 우리는 평행 여러 RDD을 쓸 수 있습니다
이 방법으로 우리는 평행 여러 RDD을 쓸 수 있습니다
public class ParallelWriteSevice implements IApplicationEventListener { private static final IprogramLogger logger = programLoggerFactory.getLogger(ParallelWriteSevice.class); private static ExecutorService executorService=null; private static List<Future<Boolean>> futures=new ArrayList<Future<Boolean>>(); public static void submit(Callable callable) { if(executorService==null) { executorService=Executors.newFixedThreadPool(15);//Based on target tables increase this } futures.add(executorService.submit(callable)); } public static boolean isWriteSucess() { boolean writeFailureOccured = false; try { for (Future<Boolean> future : futures) { try { Boolean writeStatus = future.get(); if (writeStatus == false) { writeFailureOccured = true; } } catch (Exception e) { logger.error("Erorr - Scdeduled write failed " + e.getMessage(), e); writeFailureOccured = true; } } } finally { resetFutures(); if (executorService != null) executorService.shutdown(); executorService = null; } return !writeFailureOccured; } private static void resetFutures() { logger.error("resetFutures called"); //futures.clear(); } }
from https://stackoverflow.com/questions/31912858/processing-multiple-files-as-independent-rdds-in-parallel by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 형태 보증 된 열거 유형을 모델링하기 위해? (0) | 2019.11.02 |
---|---|
[SCALA] "암시"스칼라 ID는 무엇입니까? (0) | 2019.11.02 |
[SCALA] 스칼라에서 인쇄 배열 (0) | 2019.11.02 |
[SCALA] 왜 스칼라 함수는 22 개 매개 변수로 제한된다? (0) | 2019.11.02 |
[SCALA] 어떻게 스파크 SQL에서 UDF를에 추가 매개 변수를 전달할 수 있습니다? (0) | 2019.11.02 |