복붙노트

[SCALA] 어떻게 DataFrame의 파티션을 정의?

SCALA

어떻게 DataFrame의 파티션을 정의?

나는 스파크 1.4.0에서 스파크 SQL 및 DataFrames를 사용하기 시작했습니다. 나는 스칼라, DataFrames에서 사용자 정의 파티션 프로그램을 정의하고자하지만,이 작업을 수행하는 방법을 확인할 수 없습니다.

내가 함께 일하고 있어요 데이터 테이블 중 하나는 다음 예 silimar, 계정에 의해, 트랜잭션의 목록이 포함되어 있습니다.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

처음에 적어도 계산의 대부분은 계정 내에서 거래 사이에 발생합니다. 그래서 계정의 모든 트랜잭션이 같은 스파크 파티션에 있도록 분할 된 데이터를 갖고 싶어한다.

하지만이 정의하는 방법을 확인할 수 없습니다. DataFrame 클래스를 생성 할 파티션의 수를 지정할 수 있습니다 '재분할 (INT)'라는 방법을 가지고 있습니다. 그러나 나는 RDD에 지정 될 수 같은 DataFrame에 대한 사용자 정의 파티션 프로그램을 정의 할 수있는 방법을 사용할 보이지 않아요.

원본 데이터는 마루에 저장된다. 나는 마루에 DataFrame를 작성할 때, 당신은에 의해 분할 열을 지정할 수 있습니다 것을 볼 수 없었기 때문에 아마도 나는 '계정'열하여 데이터의 분할 마루를 말할 수 있었다. 그러나이 계정의 수백만이 될 수 있고, 내가 제대로 마루를 이해하고있어 경우에 그 합리적인 해결책 같은 소리하지 않았다, 그래서 그것은 각 계정에 대해 별개의 디렉토리를 만들 것입니다.

계정에 대한 모든 데이터가 동일한 파티션에 있도록이 DataFrame를 분할 불꽃을 얻을 수있는 방법이 있나요?

해결법

  1. ==============================

    1.스파크 22,614 범위의 분할을 공개한다.

    스파크 22,614 범위의 분할을 공개한다.

    val partitionedByRange = df.repartitionByRange(42, $"k")
    
    partitionedByRange.explain
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k ASC NULLS FIRST], 42
    // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
    // 
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- Project [_1#2 AS k#5, _2#3 AS v#6]
    //    +- LocalRelation [_1#2, _2#3]
    // 
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- LocalRelation [k#5, v#6]
    // 
    // == Physical Plan ==
    // Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
    // +- LocalTableScan [k#5, v#6]
    

    SPARK-22389은 데이터 소스 API v2의 외부 형식으로 분할을 제공합니다.

    불꽃에서> = 1.6 쿼리 및 캐싱 열로 분할을 사용할 수있다. 참조 : SPARK-11410 및 SPARK-4849을 다시 분할 방법을 사용하여 :

    val df = Seq(
      ("A", 1), ("B", 2), ("A", 3), ("C", 1)
    ).toDF("k", "v")
    
    val partitioned = df.repartition($"k")
    partitioned.explain
    
    // scala> df.repartition($"k").explain(true)
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Physical Plan ==
    // TungstenExchange hashpartitioning(k#7,200), None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- Scan PhysicalRDD[_1#5,_2#6]
    

    RDDs 달리 지금 같이 정의 분할기를 사용할 수 없다 (a.k.a DataFrame 데이터 세트 [행] 포함) 데이터 집합 스파크. 당신은 일반적으로 인공 분할 열을 생성하여 그것을 해결할 수 있지만, 그것은 당신에게 같은 유연성을 제공하지 않습니다.

    당신이 할 수있는 한 가지는 당신이 DataFrame를 만들기 전에 입력 데이터를 파티션 사전입니다

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.HashPartitioner
    
    val schema = StructType(Seq(
      StructField("x", StringType, false),
      StructField("y", LongType, false),
      StructField("z", DoubleType, false)
    ))
    
    val rdd = sc.parallelize(Seq(
      Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
      Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
    ))
    
    val partitioner = new HashPartitioner(5) 
    
    val partitioned = rdd.map(r => (r.getString(0), r))
      .partitionBy(partitioner)
      .values
    
    val df = sqlContext.createDataFrame(partitioned, schema)
    

    RDD에서 DataFrame 생성이 필요하기 때문에 단지 간단한지도 상 기존 파티션 구조가 유지되어야한다 * :

    assert(df.rdd.partitions == partitioned.partitions)
    

    기존 DataFrame을 다시 분할 할 수있는 같은 방법 :

    sqlContext.createDataFrame(
      df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
      df.schema
    )
    

    불가능하지 않습니다 같은 것 같습니다. 전혀 이해하면 문제는 남아있다. 나는 그것을하지 않는 대부분의 시간을 주장한다 :

    JDBC 소스를 사용하여 파티션 :

    JDBC 데이터 소스는 조건부 인수를 지원합니다. 다음과 같이 사용할 수 있습니다 :

    sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
    

    이 술어 당 하나의 JDBC 파티션을 생성합니다. 세트가 해체되지 않은 개별 술어를 사용하여 만든 경우 결과 테이블에 중복을 볼 수 있음을 유의하십시오.

    DataFrameWriter에서 partitionBy 방법 :

    스파크 DataFrameWriter에 쓰기에 "파티션"데이터를 사용할 수 partitionBy 방법을 제공한다. 이 컬럼의 제공 세트를 사용하여 쓰기에 데이터를 분리

    val df = Seq(
      ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
    ).toDF("k", "v")
    
    df.write.partitionBy("k").json("/tmp/foo.json")
    

    이 술어는 키를 기반으로 쿼리에 대한 읽기를 아래로 밀어 수 있습니다 :

    val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
    df1.where($"k" === "bar")
    

    하지만 DataFrame.repartition에 해당하지 않습니다. 특히 집계에서 같은 :

    val cnts = df1.groupBy($"k").sum()
    

    여전히 TungstenExchange이 필요합니다 :

    cnts.explain
    
    // == Physical Plan ==
    // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
    // +- TungstenExchange hashpartitioning(k#90,200), None
    //    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
    //       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
    

    DataFrameWriter에 bucketBy 방법 (스파크> = 2.0) :

    bucketBy는 partitionBy 유사한 응용 프로그램을 가지고 있지만 그것은 단지 테이블 (saveAsTable) 사용할 수 있습니다. 정보를 버킷 조인 최적화하는 데 사용할 수 있습니다 :

    // Temporarily disable broadcast joins
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df.write.bucketBy(42, "k").saveAsTable("df1")
    val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
    df2.write.bucketBy(42, "k").saveAsTable("df2")
    
    // == Physical Plan ==
    // *Project [k#41, v#42, v2#47]
    // +- *SortMergeJoin [k#41], [k#46], Inner
    //    :- *Sort [k#41 ASC NULLS FIRST], false, 0
    //    :  +- *Project [k#41, v#42]
    //    :     +- *Filter isnotnull(k#41)
    //    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
    //    +- *Sort [k#46 ASC NULLS FIRST], false, 0
    //       +- *Project [k#46, v2#47]
    //          +- *Filter isnotnull(k#46)
    //             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
    

    * 파티션 레이아웃으로 난 단지 데이터 분포를 의미한다. 분할 RDD는 더 이상 파티션 프로그램이 없습니다. ** 더 일찍 프로젝션을 가정하지 않는다. 집계 컬럼의 작은 하위 집합을 포함하면 아마 아무런 이득이 없다.

  2. ==============================

    2.스파크 <1.6에서 당신은 HiveContext 아니라 당신이 HiveQL이 colX에 의해, 배포 사용할 수있는 평범한 구식는 SqlContext ... 작성하는 경우에 대한 colX에 의해 & CLUSTER (바로 가기 (N 이경 각각 겹치지 않는 X의 범위를 얻을 수 있도록) 예를 들어으로하고 정렬 기준)를 배포;

    스파크 <1.6에서 당신은 HiveContext 아니라 당신이 HiveQL이 colX에 의해, 배포 사용할 수있는 평범한 구식는 SqlContext ... 작성하는 경우에 대한 colX에 의해 & CLUSTER (바로 가기 (N 이경 각각 겹치지 않는 X의 범위를 얻을 수 있도록) 예를 들어으로하고 정렬 기준)를 배포;

    df.registerTempTable("partitionMe")
    hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
    

    이 스파크 DF API를에 맞는 방법을 잘하지 않습니다. 이 키워드가 정상는 SqlContext에서 지원되지 않습니다 (당신이 HiveContext를 사용하는 하이브 메타 저장소를 가질 필요가없는 주)

    편집 : 스파크 1.6은 고유 한 DataFrame의 API이있다

  3. ==============================

    3.그래서 대답의 일종으로 시작하려면 다음) - 당신은 할 수 없습니다

    그래서 대답의 일종으로 시작하려면 다음) - 당신은 할 수 없습니다

    나는 전문가가 아니지만, 지금까지의 내가 DataFrames 이해, 그들은 RDD 같지하며 DataFrame은 파티션 설정 같은 것은이 없습니다.

    일반적으로 DataFrame의 생각은 이러한 문제 자체를 처리 추상화의 다른 수준을 제공하는 것입니다. DataFrame의 쿼리는 더 RDDs에 작업에 번역 논리적 인 계획으로 변환됩니다. 당신이 제안한 분할은 아마 자동으로 적용됩니다 또는 이상이어야한다.

    당신이 최적의 작업의 어떤 종류를 제공 할 것 SparkSQL를 신뢰하지 않는 경우, 당신은 항상 코멘트에 제안 RDD [행]에 DataFrame을 변환 할 수 있습니다.

  4. ==============================

    4.에 의해 반환 된 DataFrame를 사용 :

    에 의해 반환 된 DataFrame를 사용 :

    yourDF.orderBy(account)
    

    이 단지 PairRDD에, DataFrame에 partitionBy를 사용하는 명시적인 방법은 없지만, 당신이 DataFrame을 정렬 할 때 LogicalPlan 그리고 당신이 각 계정에 대한 계산을해야 할 때 그 도움이 될 것입니다, 그것은 그것을 사용합니다.

    난 그냥 계정으로 분할하려는 dataframe에, 똑같은 문제를 만났다. 사용처럼 (난 당신이 말할 때, 당신은 규모와 성능을 원하는 "계정의 모든 트랜잭션이 같은 스파크 파티션에 있도록 분할 된 데이터를 갖고 싶어"가정,하지만 코드는에 의존하지 않는 mapPartitions () 등), 맞죠?

  5. ==============================

    5.나는이 사용 RDD을 할 수 있었다. 이 당신을 위해 수용 가능한 솔루션입니다하지만 나도 몰라. 당신은 RDD으로 DF 가능한 한 후에는 데이터의 사용자 정의 재분할을 수행 할 수 repartitionAndSortWithinPartitions을 적용 할 수 있습니다.

    나는이 사용 RDD을 할 수 있었다. 이 당신을 위해 수용 가능한 솔루션입니다하지만 나도 몰라. 당신은 RDD으로 DF 가능한 한 후에는 데이터의 사용자 정의 재분할을 수행 할 수 repartitionAndSortWithinPartitions을 적용 할 수 있습니다.

    저는 여기에 사용되는 샘플입니다 :

    class DatePartitioner(partitions: Int) extends Partitioner {
    
      override def getPartition(key: Any): Int = {
        val start_time: Long = key.asInstanceOf[Long]
        Objects.hash(Array(start_time)) % partitions
      }
    
      override def numPartitions: Int = partitions
    }
    
    myRDD
      .repartitionAndSortWithinPartitions(new DatePartitioner(24))
      .map { v => v._2 }
      .toDF()
      .write.mode(SaveMode.Overwrite)
    
  6. from https://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-dataframe by cc-by-sa and MIT license