[HADOOP] Apache Spark on YARN : 많은 수의 입력 데이터 파일 (스파크의 여러 입력 파일 결합)
HADOOPApache Spark on YARN : 많은 수의 입력 데이터 파일 (스파크의 여러 입력 파일 결합)
구현 모범 사례에 대한 도움이 필요합니다. 운영 환경은 다음과 같습니다.
테스트에서, glob 경로로 sc.textFile ()을 시작으로 Spark에서 160,000 개의 사후 처리 된 파일을 처리하려고 시도했지만 드라이버 프로세스의 OutOfMemory 예외로 인해 실패했습니다.
이러한 종류의 데이터를 처리하는 가장 좋은 방법은 무엇입니까? 후 처리 된 데이터를 저장하기 위해 일반 파일 대신 HBase를 사용해야합니까?
해결법
-
==============================
1.우리는 자신의 로더를 썼다. 그것은 HDFS의 작은 파일들로 우리의 문제를 해결했습니다. Hadoop CombineFileInputFormat을 사용합니다. 우리의 경우에는 매퍼 수를 100000에서 약 3000으로 줄였으며 작업 속도가 훨씬 빨랐습니다.
우리는 자신의 로더를 썼다. 그것은 HDFS의 작은 파일들로 우리의 문제를 해결했습니다. Hadoop CombineFileInputFormat을 사용합니다. 우리의 경우에는 매퍼 수를 100000에서 약 3000으로 줄였으며 작업 속도가 훨씬 빨랐습니다.
https://github.com/RetailRocket/SparkMultiTool
예:
import ru.retailrocket.spark.multitool.Loaders val sessions = Loaders.combineTextFile(sc, "file:///test/*") // or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") // where size is split size in Megabytes, delim - line break character println(sessions.count())
-
==============================
2.나는 많은 작은 파일들을 다루기 때문에 OOM을 얻는 이유가 확실합니다. 원하는 것은 입력 파일을 결합하여 파티션이 너무 많아지지 않도록하는 것입니다. 저는 약 10k 파티션으로 작업을 제한하려고합니다.
나는 많은 작은 파일들을 다루기 때문에 OOM을 얻는 이유가 확실합니다. 원하는 것은 입력 파일을 결합하여 파티션이 너무 많아지지 않도록하는 것입니다. 저는 약 10k 파티션으로 작업을 제한하려고합니다.
textFile 이후에는 .coalesce (10000, false)를 사용할 수 있습니다 ... 100 % 확신 할 수는 없지만 작업을 수행한지 얼마되지 않았으므로 알려 주시기 바랍니다. 그래서 시도해보십시오.
sc.textFile(path).coalesce(10000, false)
-
==============================
3.이것을 사용할 수 있습니다.
이것을 사용할 수 있습니다.
먼저 HDFS 또는 로컬 경로에 대해 S3 경로 / 버퍼 목록을 가져올 수 있습니다.
Amazon S3를 사용하려는 경우 :
import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala }
이제이 List 객체를 다음 코드 조각에 전달합니다. sc는 SQLContext의 객체입니다.
var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } }
이제 최종 통합 RDD, 즉 df
옵션, 그리고 당신은 또한 하나의 BigRDD에서 다시 파티션 할 수 있습니다
val files = sc.textFile(filename, 1).repartition(1)
다시 파티션하면 항상 작동합니다. D
from https://stackoverflow.com/questions/24623402/apache-spark-on-yarn-large-number-of-input-data-files-combine-multiple-input-f by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Avro 파일의 온보드 스키마를 사용하여 Spark에서 Avros를로드하려면 어떻게해야합니까? (0) | 2019.07.06 |
---|---|
[HADOOP] java.util.Map의 드롭 인 대체품 찾기 (0) | 2019.07.06 |
[HADOOP] Hadoop을 실행할 때 OutOfMemoryException을 피하는 방법? (0) | 2019.07.06 |
[HADOOP] HDFS에서 파일 전송 (0) | 2019.07.06 |
[HADOOP] 직업 부름의 차이점 (0) | 2019.07.06 |