복붙노트

[HADOOP] Apache Spark on YARN : 많은 수의 입력 데이터 파일 (스파크의 여러 입력 파일 결합)

HADOOP

Apache Spark on YARN : 많은 수의 입력 데이터 파일 (스파크의 여러 입력 파일 결합)

구현 모범 사례에 대한 도움이 필요합니다. 운영 환경은 다음과 같습니다.

테스트에서, glob 경로로 sc.textFile ()을 시작으로 Spark에서 160,000 개의 사후 처리 된 파일을 처리하려고 시도했지만 드라이버 프로세스의 OutOfMemory 예외로 인해 실패했습니다.

이러한 종류의 데이터를 처리하는 가장 좋은 방법은 무엇입니까? 후 처리 된 데이터를 저장하기 위해 일반 파일 대신 HBase를 사용해야합니까?

해결법

  1. ==============================

    1.우리는 자신의 로더를 썼다. 그것은 HDFS의 작은 파일들로 우리의 문제를 해결했습니다. Hadoop CombineFileInputFormat을 사용합니다. 우리의 경우에는 매퍼 수를 100000에서 약 3000으로 줄였으며 작업 속도가 훨씬 빨랐습니다.

    우리는 자신의 로더를 썼다. 그것은 HDFS의 작은 파일들로 우리의 문제를 해결했습니다. Hadoop CombineFileInputFormat을 사용합니다. 우리의 경우에는 매퍼 수를 100000에서 약 3000으로 줄였으며 작업 속도가 훨씬 빨랐습니다.

    https://github.com/RetailRocket/SparkMultiTool

    예:

    import ru.retailrocket.spark.multitool.Loaders 
    val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
    // or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
    // where size is split size in Megabytes, delim - line break character 
    println(sessions.count())
    
  2. ==============================

    2.나는 많은 작은 파일들을 다루기 때문에 OOM을 얻는 이유가 확실합니다. 원하는 것은 입력 파일을 결합하여 파티션이 너무 많아지지 않도록하는 것입니다. 저는 약 10k 파티션으로 작업을 제한하려고합니다.

    나는 많은 작은 파일들을 다루기 때문에 OOM을 얻는 이유가 확실합니다. 원하는 것은 입력 파일을 결합하여 파티션이 너무 많아지지 않도록하는 것입니다. 저는 약 10k 파티션으로 작업을 제한하려고합니다.

    textFile 이후에는 .coalesce (10000, false)를 사용할 수 있습니다 ... 100 % 확신 할 수는 없지만 작업을 수행한지 얼마되지 않았으므로 알려 주시기 바랍니다. 그래서 시도해보십시오.

    sc.textFile(path).coalesce(10000, false)
    
  3. ==============================

    3.이것을 사용할 수 있습니다.

    이것을 사용할 수 있습니다.

    먼저 HDFS 또는 로컬 경로에 대해 S3 경로 / 버퍼 목록을 가져올 수 있습니다.

    Amazon S3를 사용하려는 경우 :

    import scala.collection.JavaConverters._
    import java.util.ArrayList
    import com.amazonaws.services.s3.AmazonS3Client
    import com.amazonaws.services.s3.model.ObjectListing
    import com.amazonaws.services.s3.model.S3ObjectSummary
    import com.amazonaws.services.s3.model.ListObjectsRequest
    
    def listFiles(s3_bucket:String, base_prefix : String) = {
        var files = new ArrayList[String]
    
        //S3 Client and List Object Request
        var s3Client = new AmazonS3Client();
        var objectListing: ObjectListing = null;
        var listObjectsRequest = new ListObjectsRequest();
    
        //Your S3 Bucket
        listObjectsRequest.setBucketName(s3_bucket)
    
        //Your Folder path or Prefix
        listObjectsRequest.setPrefix(base_prefix)
    
        //Adding s3:// to the paths and adding to a list
        do {
          objectListing = s3Client.listObjects(listObjectsRequest);
          for (objectSummary <- objectListing.getObjectSummaries().asScala) {
            files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
          }
          listObjectsRequest.setMarker(objectListing.getNextMarker());
        } while (objectListing.isTruncated());
    
        //Removing Base Directory Name
        files.remove(0)
    
        //Creating a Scala List for same
        files.asScala
      }
    

    이제이 List 객체를 다음 코드 조각에 전달합니다. sc는 SQLContext의 객체입니다.

    var df: DataFrame = null;
      for (file <- files) {
        val fileDf= sc.textFile(file)
        if (df!= null) {
          df= df.unionAll(fileDf)
        } else {
          df= fileDf
        }
      }
    

    이제 최종 통합 RDD, 즉 df

    옵션, 그리고 당신은 또한 하나의 BigRDD에서 다시 파티션 할 수 있습니다

    val files = sc.textFile(filename, 1).repartition(1)
    

    다시 파티션하면 항상 작동합니다. D

  4. from https://stackoverflow.com/questions/24623402/apache-spark-on-yarn-large-number-of-input-data-files-combine-multiple-input-f by cc-by-sa and MIT license