[HADOOP] 아파치 스파크로 수백만 개의 작은 s3 파일을 처리하는 법
HADOOP아파치 스파크로 수백만 개의 작은 s3 파일을 처리하는 법
그래서이 문제는 나를 괴롭히는 결과를 낳았고, s3로 불꽃처럼 느껴지기 시작했습니다.이 특정 작업에 적합한 도구가 아닙니다. 기본적으로 s3 버킷에 수백만 개의 작은 파일이 있습니다. 필연적으로 들어갈 수없는 이유로 이러한 파일을 통합 할 수 없습니다 (하나는 암호화 된 고유 한 사본입니다). 이 질문과 비슷한 질문을 보았습니다. 모든 단일 솔루션이 좋은 결과를 내지 못했습니다. 내가 시도한 첫 번째 일은 와일드 카드였습니다.
sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();
참고 : 파일을 처리하는 데 걸리는 시간에 대해 더 많은 디버깅이 필요했습니다. 이 작업은 10 개가 넘는 인스턴스로 거의 하루 종일 걸렸지 만 목록의 맨 아래에 오류가 표시되어 실패했습니다. 나는이 링크를 찾았는데, 기본적으로 이것이 최적이 아니라고 말했다 : https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from- s3-my.html
그런 다음, 지금은 찾을 수없는 다른 솔루션을 시도하기로 결정했습니다.이 솔루션은 모든 경로를로드 한 다음 모든 rdd를 결합합니다.
ObjectListing objectListing = s3Client.listObjects(bucket);
List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
//initializes objectListing
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
while(objectListing.isTruncated()) {
objectListing = s3Client.listNextBatchOfObjects(objectListing);
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
if (tempMeta.size() > 5000) {
rdds.addAll(tempMeta);
tempMeta = new ArrayList<>();
}
}
if (!tempMeta.isEmpty()){
rdds.addAll(tempMeta);
}
return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size()));
그렇다면, emrfs-site config를 다음과 같이 설정해도 :
{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.consistent.retryPolicyType": "fixed",
"fs.s3.consistent.retryPeriodSeconds": "15",
"fs.s3.consistent.retryCount": "20",
"fs.s3.enableServerSideEncryption": "true",
"fs.s3.consistent": "false"
}
}
작업을 실행할 때마다 6 시간 이내에이 오류가 발생합니다.
17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond
먼저, s3의 불꽃이있는 작은 파일을 사용하는 방법이 있습니까? 솔루션이 차선책인지 상관 없어요. 그냥 노력하고 싶습니다. 스파크 스트리밍을 시도해 보았습니다. 내부 파일이 모든 파일을로드하는 것과 조금 다릅니다. 그런 다음 fileStream을 사용하고 newFiles를 false로 설정합니다. 그런 다음 일괄 처리 할 수 있습니다. 그러나 스파크 스트리밍이 제작 된 것이 아니기 때문에 그 길로 갈등을 겪고 있습니다.
참고로, 나는 수백만 개의 작은 파일을 hdfs에 생성하고 같은 작업을 시도해 보았고 한 시간 내에 완료되었습니다. 이것은 내가 s3 특유의 것 같은 기분이 들게 만듭니다. 또한, 나는 s3a를 사용하고 있으며, 보통 s3을 사용하지는 않습니다.
해결법
-
==============================
1.아마존 EMR을 사용하고 있다면 s3 : // URLs; s3a : //는 ASF 릴리스 용입니다.
아마존 EMR을 사용하고 있다면 s3 : // URLs; s3a : //는 ASF 릴리스 용입니다.
가장 큰 문제는 s3에서 디렉토리 트리를 나열하는 데 걸리는 시간, 특히 재귀 트리 워크입니다. spark 코드는 목록 파일과 파일을 나열하는 것이 비용이 적게 드는 빠른 파일 시스템을 가정합니다. 실제로 각 작업에는 재사용 된 HTTP / 1.1 연결에서도 1-4 개의 HTTPS 요청이 필요합니다. 너무 느려서 로그에서 멈춤을 볼 수 있습니다.
이것이 정말로 상처받는 부분은 딜레이가 많이 발생하는 프론트 파티셔닝이므로 무릎을 꿇고 일하는 일련의 작업입니다.
S3a 2 단계 작업의 일환으로 Hadoop 2.8에 포함 된 S3a에 대한 트래킹에 대한 속도 향상은 있지만, //*.txt 형식의 와일드 카드 스캔은 속도 향상을 얻지 못할 것입니다. 내 추천은 깊은 나무에서 얕은 곳으로, 어쩌면 같은 디렉토리에있는 모든 곳으로 이동할 수 있도록 디렉토리 구조를 평평하게하려는 것입니다. 따라서 5000 항목 당 1 개의 HTTP 요청을 처리해야합니다. .
많은 작은 파일들이 스토리지를 다 사용하는 HDFS를 포함하여 꽤 비싸다는 것을 명심하십시오. 특별한 집합 형식 인 HAR 파일이 있습니다. HAR 파일은 hadoop, hive 및 spark가 모두 파일 자체 내에서 작동 할 수 있다는 점을 제외하고 tar 파일과 같습니다. 실제 성능 테스트 수치는 보지 못했지만 도움이 될 것입니다.
from https://stackoverflow.com/questions/42286463/how-to-handle-millions-of-smaller-s3-files-with-apache-spark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 매퍼 (또는 감속기) 내부에서 MR 작업을 중단하는 방법 (0) | 2019.06.29 |
---|---|
[HADOOP] 매퍼 수를 제한하는 방법 (0) | 2019.06.29 |
[HADOOP] hbase 스캐너에서 결과 정렬 (0) | 2019.06.29 |
[HADOOP] 하나의 JVM에서 여러 맵 작업을 실행할 수 있습니까? (0) | 2019.06.28 |
[HADOOP] Python Hadoop이 Windows에서 스트리밍 중이며, 유효한 Win32 응용 프로그램이 아닙니다. (0) | 2019.06.28 |