복붙노트

[PYTHON] 다른 변수를 기반으로 주문을 보존하여 collect_list

PYTHON

다른 변수를 기반으로 주문을 보존하여 collect_list

기존 열 집합에 groupby 집계를 사용하여 Pyspark 목록의 새 열을 만들려고합니다. 입력 데이터 프레임의 예는 다음과 같습니다.

------------------------
id | date        | value
------------------------
1  |2014-01-03   | 10 
1  |2014-01-04   | 5
1  |2014-01-05   | 15
1  |2014-01-06   | 20
2  |2014-02-10   | 100   
2  |2014-03-11   | 500
2  |2014-04-15   | 1500

예상되는 출력은 다음과 같습니다.

id | value_list
------------------------
1  | [10, 5, 15, 20]
2  | [100, 500, 1500]

목록 내의 값은 날짜별로 정렬됩니다.

나는 다음과 같이 collect_list를 사용하여 시도했다.

from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))

그러나 collect_list는 집계 전에 날짜별로 입력 데이터 프레임을 정렬하더라도 주문을 보장하지 않습니다.

누군가가 두 번째 (날짜) 변수를 기반으로 주문을 보존하여 집계를 수행하는 방법을 도울 수 있습니까?

해결법

  1. ==============================

    1.날짜와 값을 모두 목록으로 수집하는 경우 date 및 udf에 따라 결과 열을 정렬 한 다음 결과의 값만 유지할 수 있습니다.

    날짜와 값을 모두 목록으로 수집하는 경우 date 및 udf에 따라 결과 열을 정렬 한 다음 결과의 값만 유지할 수 있습니다.

    import operator
    import pyspark.sql.functions as F
    
    # create list column
    grouped_df = input_df.groupby("id") \
                   .agg(F.collect_list(F.struct("date", "value")) \
                   .alias("list_col"))
    
    # define udf
    def sorter(l):
      res = sorted(l, key=operator.itemgetter(0))
      return [item[1] for item in res]
    
    sort_udf = F.udf(sorter)
    
    # test
    grouped_df.select("id", sort_udf("list_col") \
      .alias("sorted_list")) \
      .show(truncate = False)
    +---+----------------+
    |id |sorted_list     |
    +---+----------------+
    |1  |[10, 5, 15, 20] |
    |2  |[100, 500, 1500]|
    +---+----------------+
    
  2. ==============================

    2.

    from pyspark.sql import functions as F
    from pyspark.sql import Window
    
    w = Window.partitionBy('id').orderBy('date')
    
    sorted_list_df = input_df.withColumn(
                'sorted_list', F.collect_list('value').over(w)
            )\
            .groupBy('id')\
            .agg(F.max('sorted_list').alias('sorted_list'))
    

    사용자가 제공 한 창 예제는 실제로 어떤 일이 일어나고 있는지 설명하지 않아서 내가 해부하라고합니다.

    아시다시피 collect_list를 groupBy와 함께 사용하면 순서가 지정되지 않은 값 목록이 생성됩니다. 이는 데이터가 분할되는 방식에 따라 그룹에서 행을 찾으면 바로 목록에 값을 추가하기 때문입니다. 명령은 Spark이 집행자에 대한 집계를 계획하는 방법에 따라 다릅니다.

    Window 함수를 사용하면 상황을 제어하고 행을 특정 값으로 그룹화하여 결과 그룹 각각에 대해 작업을 수행 할 수 있습니다.

    w = Window.partitionBy('id').orderBy('date')
    

    Window의 범위를 정의한 후에는 "동일한 ID로 날짜별로 정렬 된"행을 사용하여이를 통해 작업을 수행 할 수 있습니다.이 경우 collect_list :

    F.collect_list('value').over(w)
    

    이 시점에서 정렬 된 값 목록을 날짜별로 정렬 한 새 열 sorted_list를 만들었지 만 ID 당 중복 행이 남아 있습니다. 그룹화하려는 중복 행을 잘라내어 각 그룹에 최대 값을 유지하려면 다음을 수행하십시오.

    .groupBy('id')\
    .agg(F.max('sorted_list').alias('sorted_list'))
    
  3. ==============================

    3.각 id에 대해 정렬이 완료되도록하려면 sortWithinPartitions를 사용할 수 있습니다.

    각 id에 대해 정렬이 완료되도록하려면 sortWithinPartitions를 사용할 수 있습니다.

    from pyspark.sql import functions as F
    ordered_df = (
        input_df
            .repartition(input_df.id)
            .sortWithinPartitions(['date'])
    
    
    )
    grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
    
  4. ==============================

    4.질문은 PySpark에 대한 것이지만 Scala Spark에도 도움이 될 것입니다. 여기있어:

    질문은 PySpark에 대한 것이지만 Scala Spark에도 도움이 될 것입니다. 여기있어:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.expressions.UserDefinedFunction
    import java.sql.Date
    import java.time.LocalDate
    
    val spark: SparkSession = ...
    
    implicit val dateOrdering: Ordering[Date] = Ordering.by(_.toLocalDate.toEpochDay)
    
    // Out test data set
    val data: Seq[(Int, Date, Int)] = Seq(
      (1, Date.valueOf(LocalDate.parse("2014-01-03")), 10),
      (1, Date.valueOf(LocalDate.parse("2014-01-04")), 5),
      (1, Date.valueOf(LocalDate.parse("2014-01-05")), 15),
      (1, Date.valueOf(LocalDate.parse("2014-01-06")), 20),
      (2, Date.valueOf(LocalDate.parse("2014-02-10")), 100),
      (2, Date.valueOf(LocalDate.parse("2014-02-11")), 500),
      (2, Date.valueOf(LocalDate.parse("2014-02-15")), 1500)
    )
    
    // Create dataframe
    val df: DataFrame = spark.createDataFrame(data)
      .toDF("id", "date", "value")
    df.show()
    
    // Group by id and aggregate date and value to new column date_value
    val grouped = df.groupBy(col("id"))
      .agg(collect_list(struct("date", "value")) as "date_value")
    grouped.show()
    grouped.printSchema()
    // +---+--------------------+
    // | id|          date_value|
    // +---+--------------------+
    // |  1|[[2014-01-03,10],...|
    // |  2|[[2014-02-10,100]...|
    // +---+--------------------+
    
    // udf to extract data from Row, sort by needed column (date) and return value
    val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
      rows.map { case Row(date: Date, value: Int) => (date, value) }
        .sortBy { case (date, value) => date }
        .map { case (date, value) => value }
    })
    
    // Select id and value_list
    val result = grouped.select(col("id"), sortUdf(col("date_value")).alias("value_list"))
    result.show()
    // +---+----------------+
    // | id|      value_list|
    // +---+----------------+
    // |  1| [10, 5, 15, 20]|
    // |  2|[100, 500, 1500]|
    // +---+----------------+
    
  5. from https://stackoverflow.com/questions/46580253/collect-list-by-preserving-order-based-on-another-variable by cc-by-sa and MIT license