복붙노트

[SCALA] 스파크 DataFrame : 해 orderBy 후 GROUPBY 그 질서를 유지 하는가?

SCALA

스파크 DataFrame : 해 orderBy 후 GROUPBY 그 질서를 유지 하는가?

나는 다음과 같은 구조의 스파크 2.0 dataframe 예를 :

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

각 ID (하루의 시간마다 하나) 24 개 항목을 포함하고 해 orderBy 함수를 사용하여, ID에 의해 시간을 지시한다.

I는 그리 게이터 GROUP_CONCAT 만들었습니다 :

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }.toColumn

그것은 나를 마지막 dataframe을 얻기 위해 문자열로 열을 연결하는 데 도움이 :

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

.. 내 질문은 내가 ($ "아이디"$ "시간") example.orderBy을 할 경우 GROUPBY ( "ID") AGG를 ( ":"groupConcat ( "hourly_count"등을, 2))하다는 것을 그 보증을 수행하는 시간당 수는 각각의 버킷에서 제대로 주문할 수 있습니까?

나는이 반드시 RDDs (일종의 키로하고 그때까지 그룹은 반복 가능한을 주문하려면? 스파크 참조)의 경우에는 해당되지 않습니다 것을 읽을 수 있지만 아마도 DataFrames에 대해 서로 다른입니까?

그렇지 않은 경우, 나는 그것을 주위에 방법을 작동 할 수 있습니까?

해결법

  1. ==============================

    1.다른 사람들이 지적으로 해 orderBy 후 GROUPBY은 질서를 유지하지 않습니다. 시간 ID와 순서에 파티션 - 당신이 원하는 것은 윈도우 기능을 사용할 수 있습니다. 당신은이 이상 collect_list 그들이 누적 이동하기 때문에 다음 결과 목록의 최대 (최대) 걸릴 수 있습니다 (즉, 첫 번째 시간은 그 자체가 목록에있을 것이다, 두 번째 시간은이 개 목록의 요소 등이있을 것이다).

    다른 사람들이 지적으로 해 orderBy 후 GROUPBY은 질서를 유지하지 않습니다. 시간 ID와 순서에 파티션 - 당신이 원하는 것은 윈도우 기능을 사용할 수 있습니다. 당신은이 이상 collect_list 그들이 누적 이동하기 때문에 다음 결과 목록의 최대 (최대) 걸릴 수 있습니다 (즉, 첫 번째 시간은 그 자체가 목록에있을 것이다, 두 번째 시간은이 개 목록의 요소 등이있을 것이다).

    전체 예제 코드 :

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    import spark.implicits._
    
    val data = Seq(( "id1", 0, 12),
      ("id1", 1, 55),
      ("id1", 23, 44),
      ("id2", 0, 12),
      ("id2", 1, 89),
      ("id2", 23, 34)).toDF("id", "hour", "count")
    
        val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
        data.withColumn("collected", collect_list($"count")
                                                        .over(Window.partitionBy("id")
                                                                     .orderBy("hour")))
                .groupBy("id")
                .agg(max($"collected").as("collected"))
                .withColumn("hourly_count", mergeList($"collected"))
                .select("id", "hourly_count").show
    

    이것은 DataFrame 세계에서 우리를 유지합니다. 나는 또한 영업 이익은 사용 된 UDF 코드를 단순화.

    산출:

    +---+------------+
    | id|hourly_count|
    +---+------------+
    |id1|    12:55:44|
    |id2|    12:89:34|
    +---+------------+
    
  2. ==============================

    2.나는 순서가 항상 유지되지 않는 경우가 없습니다 : 때때로 네, 대부분 더.

    나는 순서가 항상 유지되지 않는 경우가 없습니다 : 때때로 네, 대부분 더.

    내 dataframe는 스파크 1.6에서 실행되는 200 개의 파티션이

    df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                                                      F.sort_array(F.collect_list(times)),
                                                      F.collect_list(times)
                                                               )
    

    순서를 확인하는 난의 반환 값을 비교

    F.sort_array(F.collect_list(times))
    

    F.collect_list(times)
    

    주는 예를 들어, (왼쪽 : sort_array (collect_list ()); 오른쪽 : collect_list ())

    2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
    2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
    2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
    2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
    2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
    2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
    2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
    2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
    2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
    2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
    2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
    2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
    2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
    2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000
    

    오른쪽 열은 정렬 된 블록으로 구성하는 동안 왼쪽 열은 항상 정렬됩니다. 인출 ()의 서로 다른 실행을 위해, 우측 열의 블록의 순서는 다르다.

  3. ==============================

    3.당신은 (유사해야 스칼라 및 Python) 자바에서 구현을 주위에 작업 할 경우 :

    당신은 (유사해야 스칼라 및 Python) 자바에서 구현을 주위에 작업 할 경우 :

    example.orderBy(“hour”)
    .groupBy(“id”)
    .agg(functions.sort_array(
      functions.collect_list( 
         functions.struct(dataRow.col(“hour”),
                          dataRow.col(“count”))),false)
     .as(“hourly_count”));
    
  4. ==============================

    4.순서 또는 파티션의 수 및 데이터의 분포에 따라 동일하지 않을 수있다. 우리는 RDD 자체를 사용하여 해결할 수 있습니다.

    순서 또는 파티션의 수 및 데이터의 분포에 따라 동일하지 않을 수있다. 우리는 RDD 자체를 사용하여 해결할 수 있습니다.

    예를 들어 ::

    나는 파일에 샘플 데이터 아래를 저장 HDFS에로드.

    1,type1,300
    2,type1,100
    3,type2,400
    4,type2,500
    5,type1,400
    6,type3,560
    7,type2,200
    8,type3,800
    

    및 명령 아래 실행 :

    sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()
    

    산출:

    Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))
    

    즉, 우리가 세퍼레이터로서 "~"후 가격 정렬과 함께 ID를 연접 입력하여 데이터를 그룹화한다. 위의 명령은 다음과 같이 나눌 수 있습니다 :

    val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)
    
    val groupedData=validData.groupBy(_(1))  //group data rdds
    
    val sortedJoinedData=groupedData.mapValues(x=>{
       val list=x.toList
       val sortedList=list.sortBy(_(2))
       val idOnlyList=sortedList.map(_(0))
       idOnlyList.mkString("~")
    }
    )
    sortedJoinedData.collect()
    

    우리는 다음 명령을 사용하여 특정 그룹을 취할 수

    sortedJoinedData.filter(_._1=="type1").collect()
    

    산출:

    Array[(String, String)] = Array((type1,2~1~5))
    
  5. ==============================

    5.아니, GroupByKey에서 내 정렬하는 것은 반드시 유지하지만,이 하나 개의 노드에서 메모리에 재현하는 악명 어려운되지 않습니다. 로 이전에 일이 일어날 GroupByKey에서 위해 다시 분할 할 필요가있을 때 이런 일이 발생 가장 일반적인 방법입니다, 말했다. 나는 수동으로 정렬 한 후 다시 분할을 수행하여이 문제를 재현 할 수 있었다. 그럼 난 GroupByKey에서에 결과를 전달.

    아니, GroupByKey에서 내 정렬하는 것은 반드시 유지하지만,이 하나 개의 노드에서 메모리에 재현하는 악명 어려운되지 않습니다. 로 이전에 일이 일어날 GroupByKey에서 위해 다시 분할 할 필요가있을 때 이런 일이 발생 가장 일반적인 방법입니다, 말했다. 나는 수동으로 정렬 한 후 다시 분할을 수행하여이 문제를 재현 할 수 있었다. 그럼 난 GroupByKey에서에 결과를 전달.

    case class Numbered(num:Int, group:Int, otherData:Int)
    
    // configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number 
    
    val v =
      (1 to 100000)
        // Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
        .map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
        // Be sure they are stored in a small number of partitions
        .repartition(2)
        .sort($"num")
        // Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
        .repartition(200)
        .groupByKey(_.group)
        .mapGroups {
          case (g, nums) =>
            nums             // all you need is .sortBy(_.num) here to fix the problem          
              .map(_.num)
              .mkString("~")
        }
        .collect()
    
    // Walk through the concatenated strings. If any number ahead 
    // is smaller than the number before it, you know that something
    // is out of order.
    v.zipWithIndex.map { case (r, i) =>
      r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
        if (next < prev) {
          println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***")
        }
        next
      }
    }
    
  6. ==============================

    6.짧은 대답은 예, 시간당 카운트 같은 순서를 유지하는 것입니다.

    짧은 대답은 예, 시간당 카운트 같은 순서를 유지하는 것입니다.

    일반화, 그것은 당신이 그런 종류의 당신 전에 그룹 중요합니다. 또한 정렬 그룹 + 실제로 정렬을하고자하는 열의와 동일해야합니다.

    예는 같은 것입니다 :

    employees
        .sort("company_id", "department_id", "employee_role")
        .groupBy("company_id", "department_id")
        .agg(Aggregators.groupConcat(":", 2) as "count_per_role")
    
  7. from https://stackoverflow.com/questions/39505599/spark-dataframe-does-groupby-after-orderby-maintain-that-order by cc-by-sa and MIT license