복붙노트

[SQL] 스파크 DataFrame의 GROUPBY를 사용할 때 어떻게 다른 열을 얻으려면?

SQL

스파크 DataFrame의 GROUPBY를 사용할 때 어떻게 다른 열을 얻으려면?

때이 같은 DataFrame GROUPBY을 사용합니다 :

df.groupBy(df("age")).agg(Map("id"->"count"))

난 단지 열 "세"와 "수 (ID)"와 DataFrame을 얻을 것이다, 그러나 안양에서 "이름"과 같은 다른 많은 열이 있습니다.

모두에서는, 내가 MySQL을 같이 결과를 얻으려면,

사용이 스파크에 GROUPBY 때 어떻게해야합니까?

해결법

  1. ==============================

    1.일반적으로 긴 이야기의 짧은 원래 테이블 집계 결과에 가입해야합니다. 집계 쿼리에 추가 열을 허용하지 않는 주요 데이터베이스 (PostgreSQL을, 오라클, MS SQL 서버)의 대부분으로 1999 년 대회 : 스파크 SQL 같은 사전-SQL을 따른다.

    일반적으로 긴 이야기의 짧은 원래 테이블 집계 결과에 가입해야합니다. 집계 쿼리에 추가 열을 허용하지 않는 주요 데이터베이스 (PostgreSQL을, 오라클, MS SQL 서버)의 대부분으로 1999 년 대회 : 스파크 SQL 같은 사전-SQL을 따른다.

    집계 결과와 같은 집계를 위해 잘 정의 된 행동이 쿼리의 유형을 지원하는 시스템으로 변화하는 경향이되지 않기 때문에 당신은 첫 번째 또는 마지막과 같은 임의의 집합을 사용하여 추가 열을 단지를 포함 할 수 있습니다.

    하지만 상황에 따라 꽤 비싼 수있는 경우에 당신은 윈도우 기능 선택 및 후속 사용 AGG 교체 할 수 있습니다.

  2. ==============================

    2.사용 기능에 가입하는 GROUPBY을 수행 한 후 모든 열을 얻을 수있는 한 가지 방법이다.

    사용 기능에 가입하는 GROUPBY을 수행 한 후 모든 열을 얻을 수있는 한 가지 방법이다.

    feature_group = ['name', 'age']
    data_counts = df.groupBy(feature_group).count().alias("counts")
    data_joined = df.join(data_counts, feature_group)
    

    data_joined 이제 카운트 값을 포함하여 모든 열을해야합니다.

  3. ==============================

    3.어쩌면이 솔루션은 도움 것이다.

    어쩌면이 솔루션은 도움 것이다.

    from pyspark.sql import SQLContext
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import functions as F
    from pyspark.sql import Window
    
        name_list = [(101, 'abc', 24), (102, 'cde', 24), (103, 'efg', 22), (104, 'ghi', 21),
                     (105, 'ijk', 20), (106, 'klm', 19), (107, 'mno', 18), (108, 'pqr', 18),
                     (109, 'rst', 26), (110, 'tuv', 27), (111, 'pqr', 18), (112, 'rst', 28), (113, 'tuv', 29)]
    
    age_w = Window.partitionBy("age")
    name_age_df = sqlContext.createDataFrame(name_list, ['id', 'name', 'age'])
    
    name_age_count_df = name_age_df.withColumn("count", F.count("id").over(age_w)).orderBy("count")
    name_age_count_df.show()
    
    +---+----+---+-----+
    | id|name|age|count|
    +---+----+---+-----+
    |109| rst| 26|    1|
    |113| tuv| 29|    1|
    |110| tuv| 27|    1|
    |106| klm| 19|    1|
    |103| efg| 22|    1|
    |104| ghi| 21|    1|
    |105| ijk| 20|    1|
    |112| rst| 28|    1|
    |101| abc| 24|    2|
    |102| cde| 24|    2|
    |107| mno| 18|    3|
    |111| pqr| 18|    3|
    |108| pqr| 18|    3|
    +---+----+---+-----+
    
  4. ==============================

    4.집계 기능은 그룹 내의 지정된 열의 행의 값을 감소시킨다. 다른 행 값을 유지하려면 각 값에서 비롯되는 행을 지정 감소 로직을 구현해야합니다. 예를 들어 시대의 최대 값과 첫 번째 행의 모든 ​​값을 유지합니다. 당신이 UDAF을 사용할 수 있습니다이를 위해 그룹 내에서 행을 줄이기 위해 (사용자가 집계 함수를 정의).

    집계 기능은 그룹 내의 지정된 열의 행의 값을 감소시킨다. 다른 행 값을 유지하려면 각 값에서 비롯되는 행을 지정 감소 로직을 구현해야합니다. 예를 들어 시대의 최대 값과 첫 번째 행의 모든 ​​값을 유지합니다. 당신이 UDAF을 사용할 수 있습니다이를 위해 그룹 내에서 행을 줄이기 위해 (사용자가 집계 함수를 정의).

    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    
    
    object AggregateKeepingRowJob {
    
      def main (args: Array[String]): Unit = {
    
        val sparkSession = SparkSession
          .builder()
          .appName(this.getClass.getName.replace("$", ""))
          .master("local")
          .getOrCreate()
    
        val sc = sparkSession.sparkContext
        sc.setLogLevel("ERROR")
    
        import sparkSession.sqlContext.implicits._
    
        val rawDf = Seq(
          (1L, "Moe",  "Slap",  2.0, 18),
          (2L, "Larry",  "Spank",  3.0, 15),
          (3L, "Curly",  "Twist", 5.0, 15),
          (4L, "Laurel", "Whimper", 3.0, 15),
          (5L, "Hardy", "Laugh", 6.0, 15),
          (6L, "Charley",  "Ignore",   5.0, 5)
        ).toDF("id", "name", "requisite", "money", "age")
    
        rawDf.show(false)
        rawDf.printSchema
    
        val maxAgeUdaf = new KeepRowWithMaxAge
    
        val aggDf = rawDf
          .groupBy("age")
          .agg(
            count("id"),
            max(col("money")),
            maxAgeUdaf(
              col("id"),
              col("name"),
              col("requisite"),
              col("money"),
              col("age")).as("KeepRowWithMaxAge")
          )
    
        aggDf.printSchema
        aggDf.show(false)
    
      }
    
    
    }
    

    외적 :

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._
    
    class KeepRowWithMaxAmt extends UserDefinedAggregateFunction {
    // This is the input fields for your aggregate function.
    override def inputSchema: org.apache.spark.sql.types.StructType =
      StructType(
        StructField("store", StringType) ::
        StructField("prod", StringType) ::
        StructField("amt", DoubleType) ::
        StructField("units", IntegerType) :: Nil
      )
    
    // This is the internal fields you keep for computing your aggregate.
    override def bufferSchema: StructType = StructType(
      StructField("store", StringType) ::
      StructField("prod", StringType) ::
      StructField("amt", DoubleType) ::
      StructField("units", IntegerType) :: Nil
    )
    
    
    // This is the output type of your aggregation function.
    override def dataType: DataType =
      StructType((Array(
        StructField("store", StringType),
        StructField("prod", StringType),
        StructField("amt", DoubleType),
        StructField("units", IntegerType)
      )))
    
    override def deterministic: Boolean = true
    
    // This is the initial value for your buffer schema.
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = ""
      buffer(1) = ""
      buffer(2) = 0.0
      buffer(3) = 0
    }
    
    // This is how to update your buffer schema given an input.
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    
      val amt = buffer.getAs[Double](2)
      val candidateAmt = input.getAs[Double](2)
    
      amt match {
        case a if a < candidateAmt =>
          buffer(0) = input.getAs[String](0)
          buffer(1) = input.getAs[String](1)
          buffer(2) = input.getAs[Double](2)
          buffer(3) = input.getAs[Int](3)
        case _ =>
      }
    }
    
    // This is how to merge two objects with the bufferSchema type.
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    
      buffer1(0) = buffer2.getAs[String](0)
      buffer1(1) = buffer2.getAs[String](1)
      buffer1(2) = buffer2.getAs[Double](2)
      buffer1(3) = buffer2.getAs[Int](3)
    }
    
    // This is where you output the final value, given the final value of your bufferSchema.
    override def evaluate(buffer: Row): Any = {
      buffer
    }
    }
    
  5. ==============================

    5.나는에 걸쳐 온 것을 여기에 예 스파크 워크샵

    나는에 걸쳐 온 것을 여기에 예 스파크 워크샵

    val populationDF = spark.read
                    .option("infer-schema", "true")
                    .option("header", "true")
                    .format("csv").load("file:///databricks/driver/population.csv")
                    .select('name, regexp_replace(col("population"), "\\s", "").cast("integer").as("population"))
    

    발 maxPopulationDF = populationDF.agg (최대 ( '인구)이 .as ( "populationmax"))

    populationDF.join(maxPopulationDF,populationDF.col("population") === maxPopulationDF.col("populationmax")).select('name, 'populationmax).show()
    
  6. ==============================

    6.당신은 집계 함수는 행을 줄이고, 따라서 당신이 감소 기능을 원하는 이름 행의 지정해야한다는 것을 기억해야합니다. 당신이 그룹의 모든 행을 유지하려는 경우리스트로를 수집 할 수 있습니다 (경고!이 폭발 또는 왜곡 된 파티션의 원인이 될 수 있습니다.) 그런 다음 내 예를 들어 돈, 당신의 기준에 따라이를 줄이기 위해 UDF (사용자 정의 함수)를 사용할 수 있습니다. 그리고 또 다른 UDF와 단일 감소 된 행에서 열을 확장합니다. 나는 가정이 답변의 목적을 위해 당신은 가장 돈이 사람의 이름을 유지하고 싶습니다.

    당신은 집계 함수는 행을 줄이고, 따라서 당신이 감소 기능을 원하는 이름 행의 지정해야한다는 것을 기억해야합니다. 당신이 그룹의 모든 행을 유지하려는 경우리스트로를 수집 할 수 있습니다 (경고!이 폭발 또는 왜곡 된 파티션의 원인이 될 수 있습니다.) 그런 다음 내 예를 들어 돈, 당신의 기준에 따라이를 줄이기 위해 UDF (사용자 정의 함수)를 사용할 수 있습니다. 그리고 또 다른 UDF와 단일 감소 된 행에서 열을 확장합니다. 나는 가정이 답변의 목적을 위해 당신은 가장 돈이 사람의 이름을 유지하고 싶습니다.

    import org.apache.spark.sql._
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.StringType
    
    import scala.collection.mutable
    
    
    object TestJob3 {
    
    def main (args: Array[String]): Unit = {
    
    val sparkSession = SparkSession
      .builder()
      .appName(this.getClass.getName.replace("$", ""))
      .master("local")
      .getOrCreate()
    
    val sc = sparkSession.sparkContext
    
    import sparkSession.sqlContext.implicits._
    
    val rawDf = Seq(
      (1, "Moe",  "Slap",  2.0, 18),
      (2, "Larry",  "Spank",  3.0, 15),
      (3, "Curly",  "Twist", 5.0, 15),
      (4, "Laurel", "Whimper", 3.0, 9),
      (5, "Hardy", "Laugh", 6.0, 18),
      (6, "Charley",  "Ignore",   5.0, 5)
    ).toDF("id", "name", "requisite", "money", "age")
    
    rawDf.show(false)
    rawDf.printSchema
    
    val rawSchema = rawDf.schema
    
    val fUdf = udf(reduceByMoney, rawSchema)
    
    val nameUdf = udf(extractName, StringType)
    
    val aggDf = rawDf
      .groupBy("age")
      .agg(
        count(struct("*")).as("count"),
        max(col("money")),
        collect_list(struct("*")).as("horizontal")
      )
      .withColumn("short", fUdf($"horizontal"))
      .withColumn("name", nameUdf($"short"))
      .drop("horizontal")
    
    aggDf.printSchema
    
    aggDf.show(false)
    
    }
    
    def reduceByMoney= (x: Any) => {
    
    val d = x.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]]
    
    val red = d.reduce((r1, r2) => {
    
      val money1 = r1.getAs[Double]("money")
      val money2 = r2.getAs[Double]("money")
    
      val r3 = money1 match {
        case a if a >= money2 =>
          r1
        case _ =>
          r2
      }
    
      r3
    })
    
    red
    }
    
    def extractName = (x: Any) => {
    
      val d = x.asInstanceOf[GenericRowWithSchema]
    
      d.getAs[String]("name")
    }
    }
    

    여기 출력은

    +---+-----+----------+----------------------------+-------+
    |age|count|max(money)|short                       |name   |
    +---+-----+----------+----------------------------+-------+
    |5  |1    |5.0       |[6, Charley, Ignore, 5.0, 5]|Charley|
    |15 |2    |5.0       |[3, Curly, Twist, 5.0, 15]  |Curly  |
    |9  |1    |3.0       |[4, Laurel, Whimper, 3.0, 9]|Laurel |
    |18 |2    |6.0       |[5, Hardy, Laugh, 6.0, 18]  |Hardy  |
    +---+-----+----------+----------------------------+-------+
    
  7. ==============================

    7.당신은 다음과 같이 할 수 있습니다 :

    당신은 다음과 같이 할 수 있습니다 :

    샘플 데이터 :

    name    age id
    abc     24  1001
    cde     24  1002
    efg     22  1003
    ghi     21  1004
    ijk     20  1005
    klm     19  1006
    mno     18  1007
    pqr     18  1008
    rst     26  1009
    tuv     27  1010
    pqr     18  1012
    rst     28  1013
    tuv     29  1011
    

    산출:

        +----+---+-----+
        |name|age|count|
        +----+---+-----+
        | efg| 22|    1|
        | tuv| 29|    1|
        | rst| 28|    1|
        | klm| 19|    1|
        | pqr| 18|    2|
        | cde| 24|    1|
        | tuv| 27|    1|
        | ijk| 20|    1|
        | abc| 24|    1|
        | mno| 18|    1|
        | ghi| 21|    1|
        | rst| 26|    1|
        +----+---+-----+
    
  8. from https://stackoverflow.com/questions/34409875/how-to-get-other-columns-when-using-spark-dataframe-groupby by cc-by-sa and MIT license