복붙노트

[HADOOP] 키로 여러 출력에 쓰기 Spark - 하나의 Spark 작업

HADOOP

키로 여러 출력에 쓰기 Spark - 하나의 Spark 작업

단일 작업에서 Spark를 사용하여 키에 종속적 인 여러 출력에 어떻게 쓸 수 있습니까?

관련 항목 : 하나의 MapReduce 작업 인 하둡을 긁어서 키로 여러 출력 작성

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

cat 접두사 / 1이 맞는지 확인합니다.

a
b

cat 접두사 / 2는

c

편집 : 최근에 전체 수입, 포주 및 압축 코덱을 포함하는 새로운 답변을 추가했습니다. https://stackoverflow.com/a/46118044/1586965를 참조하십시오.이 답변은 이전 답변에 도움이 될 수 있습니다.

해결법

  1. ==============================

    1.Spark 1.4+를 사용하면 DataFrame API 덕분에 훨씬 쉽고 편리합니다. (DataFrames는 Spark 1.3에서 소개되었지만, 우리가 필요로하는 partitionBy ()는 1.4에서 소개되었습니다.)

    Spark 1.4+를 사용하면 DataFrame API 덕분에 훨씬 쉽고 편리합니다. (DataFrames는 Spark 1.3에서 소개되었지만, 우리가 필요로하는 partitionBy ()는 1.4에서 소개되었습니다.)

    RDD로 시작하는 경우 먼저 RDD로 변환해야합니다.

    val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
    val people_df = people_rdd.toDF("number", "name")
    

    파이썬에서이 동일한 코드는 다음과 같습니다.

    people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
    people_df = people_rdd.toDF(["number", "name"])
    

    DataFrame이 있으면 특정 키를 기반으로 여러 출력에 쓰기가 간단합니다. 더욱이 - DataFrame API의 아름다움입니다. 코드는 Python, Scala, Java 및 R에서 거의 동일합니다.

    people_df.write.partitionBy("number").text("people")
    

    원하는 경우 다른 출력 형식을 쉽게 사용할 수 있습니다.

    people_df.write.partitionBy("number").json("people-json")
    people_df.write.partitionBy("number").parquet("people-parquet")
    

    위의 각 예제에서 Spark는 DataFrame을 파티션 분할 한 각 키에 대한 하위 디렉토리를 만듭니다.

    people/
      _SUCCESS
      number=1/
        part-abcd
        part-efgh
      number=2/
        part-abcd
        part-efgh
    
  2. ==============================

    2.나는 이것을 확장 성있는 것으로 할 것이다.

    나는 이것을 확장 성있는 것으로 할 것이다.

    import org.apache.hadoop.io.NullWritable
    
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    
    class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
      override def generateActualKey(key: Any, value: Any): Any = 
        NullWritable.get()
    
      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
        key.asInstanceOf[String]
    }
    
    object Split {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Split" + args(1))
        val sc = new SparkContext(conf)
        sc.textFile("input/path")
        .map(a => (k, v)) // Your own implementation
        .partitionBy(new HashPartitioner(num))
        .saveAsHadoopFile("output/path", classOf[String], classOf[String],
          classOf[RDDMultipleTextOutputFormat])
        spark.stop()
      }
    }
    

    위와 비슷한 대답을 보았지만 실제로는 사용자 정의 파티션이 필요하지 않습니다. MultipleTextOutputFormat은 각 키에 대한 파일을 만듭니다. 동일한 키를 가진 여러 레코드가 같은 파티션에 속하게되면 좋습니다.

    새로운 HashPartitioner (num). num은 원하는 파티션 번호입니다. 많은 수의 다른 키가있는 경우 숫자를 크게 설정할 수 있습니다. 이 경우, 각 파티션은 너무 많은 hdfs 파일 핸들러를 열지 않습니다.

  3. ==============================

    3.잠재적으로 주어진 키에 대해 많은 값이있는 경우, 확장 가능한 솔루션은 파티션 당 키당 하나의 파일을 작성하는 것입니다. 불행히도 Spark에는 내장 된 지원이 없지만 뭔가를 쓸어 낼 수 있습니다.

    잠재적으로 주어진 키에 대해 많은 값이있는 경우, 확장 가능한 솔루션은 파티션 당 키당 하나의 파일을 작성하는 것입니다. 불행히도 Spark에는 내장 된 지원이 없지만 뭔가를 쓸어 낼 수 있습니다.

    sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
      .mapPartitionsWithIndex { (p, it) =>
        val outputs = new MultiWriter(p.toString)
        for ((k, v) <- it) {
          outputs.write(k.toString, v)
        }
        outputs.close
        Nil.iterator
      }
      .foreach((x: Nothing) => ()) // To trigger the job.
    
    // This one is Local, but you could write one for HDFS
    class MultiWriter(suffix: String) {
      private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
      def write(key: String, value: Any) = {
        if (!writers.contains(key)) {
          val f = new java.io.File("output/" + key + "/" + suffix)
          f.getParentFile.mkdirs
          writers(key) = new java.io.PrintWriter(f)
        }
        writers(key).println(value)
      }
      def close = writers.values.foreach(_.close)
    }
    

    (PrintWriter를 선택한 분산 파일 시스템 작업으로 대체하십시오.)

    이렇게하면 RDD를 한 번 통과하고 셔플을 수행하지 않습니다. 키 하나당 하나의 디렉토리를 제공하며 각 디렉토리 안에 여러 개의 파일이 있습니다.

  4. ==============================

    4.여기에는 요청 된 코덱, 필요한 가져 오기 및 포주가 포함됩니다.

    여기에는 요청 된 코덱, 필요한 가져 오기 및 포주가 포함됩니다.

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SQLContext
    
    // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
    implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
      def writeAsMultiple(prefix: String, codec: String,
                          keyName: String = "key")
                         (implicit sqlContext: SQLContext): Unit = {
        import sqlContext.implicits._
    
        rdd.toDF(keyName, "_2").write.partitionBy(keyName)
        .format("text").option("codec", codec).save(prefix)
      }
    }
    
    val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
    

    OP와의 한 가지 미묘한 차이점은 디렉토리 이름에 = 접두어가 붙는 것입니다. 예 :

    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
    

    줄 것입니다 :

    prefix/key=1/part-00000
    prefix/key=2/part-00000
    

    접두어 / my_number = 1 / part-00000은 a 및 b 행을 포함하고 / my_number = 2 / part-00000 접두사는 c 행을 포함합니다.

    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
    

    줄 것입니다 :

    prefix/foo=1/part-00000
    prefix/foo=2/part-00000
    

    쪽모퉁이를 편집하는 방법은 분명해야합니다.

    마지막으로 Tuples를 사용하는 것이 더 좋은 Dataset의 예가 아래에 나와 있습니다.

    implicit class PimpedDataset[T](dataset: Dataset[T]) {
      def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
        dataset.write.partitionBy(field)
        .format("text").option("codec", codec).save(prefix)
      }
    }
    
  5. ==============================

    5.나는 비슷한 필요를 가지고 있고 길을 찾았습니다. 하지만 하나의 단점 (내 경우에는 문제가되지 않습니다)이 있습니다. 출력 파일 당 하나의 파티션으로 데이터를 다시 파티션해야합니다.

    나는 비슷한 필요를 가지고 있고 길을 찾았습니다. 하지만 하나의 단점 (내 경우에는 문제가되지 않습니다)이 있습니다. 출력 파일 당 하나의 파티션으로 데이터를 다시 파티션해야합니다.

    이 방법으로 파티션을 나누려면 일반적으로 작업에서 출력 할 파일 수를 미리 알고 각 키를 각 파티션에 매핑하는 함수를 찾아야합니다.

    먼저 MultipleTextOutputFormat 기반 클래스를 만듭니다.

    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    
    class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
      override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
        key.toString
      }
      override protected def generateActualKey(key: T, value: V) = {
        null
      }
    }
    

    이 클래스를 사용하여 Spark는 파티션 (첫 번째 / 마지막 것 같음)에서 키를 가져오고이 키로 파일의 이름을 지정하므로 같은 파티션에 여러 키를 혼합하는 것은 좋지 않습니다.

    예를 들어, 사용자 정의 분할자가 필요합니다. 이것은 일을 할 것입니다 :

    import org.apache.spark.Partitioner
    
    class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
      def numPartitions = maxKey
    
      def getPartition(key: Any): Int = key match {
        case i: Int if i < maxKey => i
      }
    }
    

    이제 모든 것을합시다.

    val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))
    
    // You need to know the max number of partitions (files) beforehand
    // In this case we want one partition per key and we have 3 keys,
    // with the biggest key being 7, so 10 will be large enough
    val partitioner = new IdentityIntPartitioner(10)
    
    val prefix = "hdfs://.../prefix"
    
    val partitionedRDD = rdd.partitionBy(partitioner)
    
    partitionedRDD.saveAsHadoopFile(prefix,
        classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])
    

    이것은 접두사 (1, 2 및 7로 명명 됨) 아래에 3 개의 파일을 생성하여 모든 것을 한 번에 처리합니다.

    보시다시피,이 솔루션을 사용하려면 키에 대한 지식이 필요합니다.

    필자는 각 키 해시마다 하나의 출력 파일을 필요로하고 파일 수를 제어 할 수 있기 때문에 쉬웠다. 그래서 주식 HashPartitioner를 사용하여 트릭을 수행 할 수 있었다.

  6. ==============================

    6.나는 자바에서 같은 것을 필요로했다. Spang Java API 사용자에게 Zhang Zhan의 Scala 답변 번역 게시 :

    나는 자바에서 같은 것을 필요로했다. Spang Java API 사용자에게 Zhang Zhan의 Scala 답변 번역 게시 :

    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    
    
    class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
    
        @Override
        protected String generateFileNameForKeyValue(A key, B value, String name) {
            return key.toString();
        }
    }
    
    public class Main {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName("Split Job")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
            sc.parallelize(Arrays.asList(strings))
                    // The first character of the string is the key
                    .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                    .saveAsHadoopFile("output/", String.class, String.class,
                            RDDMultipleTextOutputFormat.class);
            sc.stop();
        }
    }
    
  7. ==============================

    7.saveAsText () 및 saveAsHadoop (...)은 RDD 데이터, 특히 실행되는 PairRdd에서 데이터를 가져 오는 PairRDD.saveAsHadoopDataset 메서드에 의해 구현됩니다. 두 가지 가능한 옵션이 있습니다. 데이터 크기가 비교적 작 으면 RDD를 그룹화하고 각 컬렉션에서 새 RDD를 만들고 해당 RDD를 사용하여 데이터를 쓰는 방법으로 구현 시간을 절약 할 수 있습니다. 이 같은:

    saveAsText () 및 saveAsHadoop (...)은 RDD 데이터, 특히 실행되는 PairRdd에서 데이터를 가져 오는 PairRDD.saveAsHadoopDataset 메서드에 의해 구현됩니다. 두 가지 가능한 옵션이 있습니다. 데이터 크기가 비교적 작 으면 RDD를 그룹화하고 각 컬렉션에서 새 RDD를 만들고 해당 RDD를 사용하여 데이터를 쓰는 방법으로 구현 시간을 절약 할 수 있습니다. 이 같은:

    val byKey = dataRDD.groupByKey().collect()
    val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
    val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}
    

    v.toSeq에서 iterator의 materialization이 메모리에 맞지 않을 때 대용량 데이터 세트 b / c에서는 작동하지 않습니다.

    내가 본 다른 옵션과 실제로이 경우 권장할만한 옵션은 : 직접 hadoop / hdfs api를 호출하여 직접 롤백하십시오.

    다음은이 질문을 연구하면서 시작한 토론입니다. 다른 RDD에서 RDD를 만드는 방법은 무엇입니까?

  8. ==============================

    8.Hadoop HDFS에서 입력 파일을 키 (키당 1 개의 파일)를 기반으로 여러 파일로 분할 한 비슷한 사용 사례가있었습니다. 여기 스파크에 대한 나의 스칼라 코드가있다.

    Hadoop HDFS에서 입력 파일을 키 (키당 1 개의 파일)를 기반으로 여러 파일로 분할 한 비슷한 사용 사례가있었습니다. 여기 스파크에 대한 나의 스칼라 코드가있다.

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    val hadoopconf = new Configuration();
    val fs = FileSystem.get(hadoopconf);
    
    @serializable object processGroup {
        def apply(groupName:String, records:Iterable[String]): Unit = {
            val outFileStream = fs.create(new Path("/output_dir/"+groupName))
            for( line <- records ) {
                    outFileStream.writeUTF(line+"\n")
                }
            outFileStream.close()
        }
    }
    val infile = sc.textFile("input_file")
    val dateGrouped = infile.groupBy( _.split(",")(0))
    dateGrouped.foreach( (x) => processGroup(x._1, x._2))
    

    키를 기반으로 레코드를 그룹화했습니다. 각 키의 값은 별도의 파일에 기록됩니다.

  9. ==============================

    9.파이썬 사용자를위한 좋은 소식은 다중 열이 있고 Css 형식으로 분할되지 않은 다른 모든 열을 저장하려는 경우 Nick Chammas의 제안과 같이 "텍스트"방법을 사용하면 실패합니다.

    파이썬 사용자를위한 좋은 소식은 다중 열이 있고 Css 형식으로 분할되지 않은 다른 모든 열을 저장하려는 경우 Nick Chammas의 제안과 같이 "텍스트"방법을 사용하면 실패합니다.

    people_df.write.partitionBy("number").text("people") 
    

    오류 메시지는 "AnalysisException : u'Text 데이터 소스는 하나의 열만 지원하며 두 개의 열이 있습니다. ''

    2.0.0의 spark (내 테스트 환경은 hdp spark 2.0.0입니다.) 패키지 "com.databricks.spark.csv"가 통합되었으며, 하나의 열로 분할 된 텍스트 파일을 저장할 수 있습니다. 예제는 다음과 같습니다.

    people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                                 (1,"2016-12-25", "alice"),
                                 (1,"2016-12-25", "tom"), 
                                 (1, "2016-12-25","bob"), 
                                 (2,"2016-12-26" ,"charlie")])
    df = people_rdd.toDF(["number", "date","name"])
    
    df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")
    
    [root@namenode people]# tree
    .
    ├── number=1
    │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
    ├── number=2
    │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
    └── _SUCCESS
    
    [root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
    2016-12-26,alice
    2016-12-25,alice
    2016-12-25,tom
    2016-12-25,bob
    [root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
    2016-12-26,charlie
    

    내 스파크 1.6.1 환경에서, 코드는 오류를 던지지 않았지만, 하나의 파일 만 생성되었습니다. 두 개의 폴더로 분할되지 않았습니다.

    희망이 도움이 될 수 있습니다.

  10. ==============================

    10.비슷한 사용 사례가있었습니다. MultipleTextOutputFormat 및 RecordWriter를 구현하는 두 개의 사용자 정의 클래스를 작성하여 Java로 해결했습니다.

    비슷한 사용 사례가있었습니다. MultipleTextOutputFormat 및 RecordWriter를 구현하는 두 개의 사용자 정의 클래스를 작성하여 Java로 해결했습니다.

    필자의 입력은 JavaPairRDD >이었고 키로 명명 된 파일에 모든 값을 저장했습니다.

    내 MultipleTextOutputFormat 구현을위한 코드는 다음과 같습니다.

    class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
    
        @Override
        protected String generateFileNameForKeyValue(K key, V value, String name) {
            return key.toString(); //The return will be used as file name
        }
    
        /** The following 4 functions are only for visibility purposes                 
        (they are used in the class MyRecordWriter) **/
        protected String generateLeafFileName(String name) {
            return super.generateLeafFileName(name);
        }
    
        protected V generateActualValue(K key, V value) {
            return super.generateActualValue(key, value);
        }
    
        protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
            return super.getInputFileBasedOutputFileName(job, name);
            }
    
        protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
            return super.getBaseRecordWriter(fs, job, name, arg3);
        }
    
        /** Use my custom RecordWriter **/
        @Override
        RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
        final String myName = this.generateLeafFileName(name);
            return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
        }
    } 
    

    다음은 내 RecordWriter 구현을위한 코드입니다.

    class MyRecordWriter<K, V> implements RecordWriter<K, V> {
    
        private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
        private final FileSystem fs;
        private final JobConf job;
        private final Progressable arg3;
        private String myName;
    
        TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();
    
        MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
            this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
            this.fs = fs;
            this.job = job;
            this.arg3 = arg3;
            this.myName = myName;
        }
    
        @Override
        void write(K key, V value) throws IOException {
            String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
            String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
            Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
            RecordWriter rw = this.recordWriters.get(finalPath);
            if(rw == null) {
                rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
                this.recordWriters.put(finalPath, rw);
            }
            List<String> lines = (List<String>) actualValue;
            for (String line : lines) {
                rw.write(null, line);
            }
        }
    
        @Override
        void close(Reporter reporter) throws IOException {
            Iterator keys = this.recordWriters.keySet().iterator();
    
            while(keys.hasNext()) {
                RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
                rw.close(reporter);
            }
    
            this.recordWriters.clear();
        }
    }
    

    대부분의 코드는 FileOutputFormat과 완전히 동일합니다. 유일한 차이점은 몇 줄 밖에 없다는 것입니다.

    List<String> lines = (List<String>) actualValue;
    for (String line : lines) {
        rw.write(null, line);
    }
    

    이 줄을 사용하여 입력 목록 의 각 줄을 파일에 쓸 수있었습니다. write 함수의 첫 번째 인수는 각 행에서 키를 기록하지 않도록 null로 설정됩니다.

    끝내려면이 파일을 작성하기 위해이 호출 만하면됩니다.

    javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
    
  11. from https://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job by cc-by-sa and MIT license