[SQL] 스파크 DataFrame의 GROUPBY를 사용할 때 어떻게 다른 열을 얻으려면?
SQL스파크 DataFrame의 GROUPBY를 사용할 때 어떻게 다른 열을 얻으려면?
때이 같은 DataFrame GROUPBY을 사용합니다 :
df.groupBy(df("age")).agg(Map("id"->"count"))
난 단지 열 "세"와 "수 (ID)"와 DataFrame을 얻을 것이다, 그러나 안양에서 "이름"과 같은 다른 많은 열이 있습니다.
모두에서는, 내가 MySQL을 같이 결과를 얻으려면,
사용이 스파크에 GROUPBY 때 어떻게해야합니까?
해결법
-
==============================
1.일반적으로 긴 이야기의 짧은 원래 테이블 집계 결과에 가입해야합니다. 집계 쿼리에 추가 열을 허용하지 않는 주요 데이터베이스 (PostgreSQL을, 오라클, MS SQL 서버)의 대부분으로 1999 년 대회 : 스파크 SQL 같은 사전-SQL을 따른다.
일반적으로 긴 이야기의 짧은 원래 테이블 집계 결과에 가입해야합니다. 집계 쿼리에 추가 열을 허용하지 않는 주요 데이터베이스 (PostgreSQL을, 오라클, MS SQL 서버)의 대부분으로 1999 년 대회 : 스파크 SQL 같은 사전-SQL을 따른다.
집계 결과와 같은 집계를 위해 잘 정의 된 행동이 쿼리의 유형을 지원하는 시스템으로 변화하는 경향이되지 않기 때문에 당신은 첫 번째 또는 마지막과 같은 임의의 집합을 사용하여 추가 열을 단지를 포함 할 수 있습니다.
하지만 상황에 따라 꽤 비싼 수있는 경우에 당신은 윈도우 기능 선택 및 후속 사용 AGG 교체 할 수 있습니다.
-
==============================
2.사용 기능에 가입하는 GROUPBY을 수행 한 후 모든 열을 얻을 수있는 한 가지 방법이다.
사용 기능에 가입하는 GROUPBY을 수행 한 후 모든 열을 얻을 수있는 한 가지 방법이다.
feature_group = ['name', 'age'] data_counts = df.groupBy(feature_group).count().alias("counts") data_joined = df.join(data_counts, feature_group)
data_joined 이제 카운트 값을 포함하여 모든 열을해야합니다.
-
==============================
3.어쩌면이 솔루션은 도움 것이다.
어쩌면이 솔루션은 도움 것이다.
from pyspark.sql import SQLContext from pyspark import SparkContext, SparkConf from pyspark.sql import functions as F from pyspark.sql import Window name_list = [(101, 'abc', 24), (102, 'cde', 24), (103, 'efg', 22), (104, 'ghi', 21), (105, 'ijk', 20), (106, 'klm', 19), (107, 'mno', 18), (108, 'pqr', 18), (109, 'rst', 26), (110, 'tuv', 27), (111, 'pqr', 18), (112, 'rst', 28), (113, 'tuv', 29)] age_w = Window.partitionBy("age") name_age_df = sqlContext.createDataFrame(name_list, ['id', 'name', 'age']) name_age_count_df = name_age_df.withColumn("count", F.count("id").over(age_w)).orderBy("count") name_age_count_df.show()
+---+----+---+-----+ | id|name|age|count| +---+----+---+-----+ |109| rst| 26| 1| |113| tuv| 29| 1| |110| tuv| 27| 1| |106| klm| 19| 1| |103| efg| 22| 1| |104| ghi| 21| 1| |105| ijk| 20| 1| |112| rst| 28| 1| |101| abc| 24| 2| |102| cde| 24| 2| |107| mno| 18| 3| |111| pqr| 18| 3| |108| pqr| 18| 3| +---+----+---+-----+
-
==============================
4.집계 기능은 그룹 내의 지정된 열의 행의 값을 감소시킨다. 다른 행 값을 유지하려면 각 값에서 비롯되는 행을 지정 감소 로직을 구현해야합니다. 예를 들어 시대의 최대 값과 첫 번째 행의 모든 값을 유지합니다. 당신이 UDAF을 사용할 수 있습니다이를 위해 그룹 내에서 행을 줄이기 위해 (사용자가 집계 함수를 정의).
집계 기능은 그룹 내의 지정된 열의 행의 값을 감소시킨다. 다른 행 값을 유지하려면 각 값에서 비롯되는 행을 지정 감소 로직을 구현해야합니다. 예를 들어 시대의 최대 값과 첫 번째 행의 모든 값을 유지합니다. 당신이 UDAF을 사용할 수 있습니다이를 위해 그룹 내에서 행을 줄이기 위해 (사용자가 집계 함수를 정의).
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ object AggregateKeepingRowJob { def main (args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName(this.getClass.getName.replace("$", "")) .master("local") .getOrCreate() val sc = sparkSession.sparkContext sc.setLogLevel("ERROR") import sparkSession.sqlContext.implicits._ val rawDf = Seq( (1L, "Moe", "Slap", 2.0, 18), (2L, "Larry", "Spank", 3.0, 15), (3L, "Curly", "Twist", 5.0, 15), (4L, "Laurel", "Whimper", 3.0, 15), (5L, "Hardy", "Laugh", 6.0, 15), (6L, "Charley", "Ignore", 5.0, 5) ).toDF("id", "name", "requisite", "money", "age") rawDf.show(false) rawDf.printSchema val maxAgeUdaf = new KeepRowWithMaxAge val aggDf = rawDf .groupBy("age") .agg( count("id"), max(col("money")), maxAgeUdaf( col("id"), col("name"), col("requisite"), col("money"), col("age")).as("KeepRowWithMaxAge") ) aggDf.printSchema aggDf.show(false) } }
외적 :
import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ class KeepRowWithMaxAmt extends UserDefinedAggregateFunction { // This is the input fields for your aggregate function. override def inputSchema: org.apache.spark.sql.types.StructType = StructType( StructField("store", StringType) :: StructField("prod", StringType) :: StructField("amt", DoubleType) :: StructField("units", IntegerType) :: Nil ) // This is the internal fields you keep for computing your aggregate. override def bufferSchema: StructType = StructType( StructField("store", StringType) :: StructField("prod", StringType) :: StructField("amt", DoubleType) :: StructField("units", IntegerType) :: Nil ) // This is the output type of your aggregation function. override def dataType: DataType = StructType((Array( StructField("store", StringType), StructField("prod", StringType), StructField("amt", DoubleType), StructField("units", IntegerType) ))) override def deterministic: Boolean = true // This is the initial value for your buffer schema. override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = "" buffer(1) = "" buffer(2) = 0.0 buffer(3) = 0 } // This is how to update your buffer schema given an input. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val amt = buffer.getAs[Double](2) val candidateAmt = input.getAs[Double](2) amt match { case a if a < candidateAmt => buffer(0) = input.getAs[String](0) buffer(1) = input.getAs[String](1) buffer(2) = input.getAs[Double](2) buffer(3) = input.getAs[Int](3) case _ => } } // This is how to merge two objects with the bufferSchema type. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer2.getAs[String](0) buffer1(1) = buffer2.getAs[String](1) buffer1(2) = buffer2.getAs[Double](2) buffer1(3) = buffer2.getAs[Int](3) } // This is where you output the final value, given the final value of your bufferSchema. override def evaluate(buffer: Row): Any = { buffer } }
-
==============================
5.나는에 걸쳐 온 것을 여기에 예 스파크 워크샵
나는에 걸쳐 온 것을 여기에 예 스파크 워크샵
val populationDF = spark.read .option("infer-schema", "true") .option("header", "true") .format("csv").load("file:///databricks/driver/population.csv") .select('name, regexp_replace(col("population"), "\\s", "").cast("integer").as("population"))
발 maxPopulationDF = populationDF.agg (최대 ( '인구)이 .as ( "populationmax"))
populationDF.join(maxPopulationDF,populationDF.col("population") === maxPopulationDF.col("populationmax")).select('name, 'populationmax).show()
-
==============================
6.당신은 집계 함수는 행을 줄이고, 따라서 당신이 감소 기능을 원하는 이름 행의 지정해야한다는 것을 기억해야합니다. 당신이 그룹의 모든 행을 유지하려는 경우리스트로를 수집 할 수 있습니다 (경고!이 폭발 또는 왜곡 된 파티션의 원인이 될 수 있습니다.) 그런 다음 내 예를 들어 돈, 당신의 기준에 따라이를 줄이기 위해 UDF (사용자 정의 함수)를 사용할 수 있습니다. 그리고 또 다른 UDF와 단일 감소 된 행에서 열을 확장합니다. 나는 가정이 답변의 목적을 위해 당신은 가장 돈이 사람의 이름을 유지하고 싶습니다.
당신은 집계 함수는 행을 줄이고, 따라서 당신이 감소 기능을 원하는 이름 행의 지정해야한다는 것을 기억해야합니다. 당신이 그룹의 모든 행을 유지하려는 경우리스트로를 수집 할 수 있습니다 (경고!이 폭발 또는 왜곡 된 파티션의 원인이 될 수 있습니다.) 그런 다음 내 예를 들어 돈, 당신의 기준에 따라이를 줄이기 위해 UDF (사용자 정의 함수)를 사용할 수 있습니다. 그리고 또 다른 UDF와 단일 감소 된 행에서 열을 확장합니다. 나는 가정이 답변의 목적을 위해 당신은 가장 돈이 사람의 이름을 유지하고 싶습니다.
import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType import scala.collection.mutable object TestJob3 { def main (args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName(this.getClass.getName.replace("$", "")) .master("local") .getOrCreate() val sc = sparkSession.sparkContext import sparkSession.sqlContext.implicits._ val rawDf = Seq( (1, "Moe", "Slap", 2.0, 18), (2, "Larry", "Spank", 3.0, 15), (3, "Curly", "Twist", 5.0, 15), (4, "Laurel", "Whimper", 3.0, 9), (5, "Hardy", "Laugh", 6.0, 18), (6, "Charley", "Ignore", 5.0, 5) ).toDF("id", "name", "requisite", "money", "age") rawDf.show(false) rawDf.printSchema val rawSchema = rawDf.schema val fUdf = udf(reduceByMoney, rawSchema) val nameUdf = udf(extractName, StringType) val aggDf = rawDf .groupBy("age") .agg( count(struct("*")).as("count"), max(col("money")), collect_list(struct("*")).as("horizontal") ) .withColumn("short", fUdf($"horizontal")) .withColumn("name", nameUdf($"short")) .drop("horizontal") aggDf.printSchema aggDf.show(false) } def reduceByMoney= (x: Any) => { val d = x.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]] val red = d.reduce((r1, r2) => { val money1 = r1.getAs[Double]("money") val money2 = r2.getAs[Double]("money") val r3 = money1 match { case a if a >= money2 => r1 case _ => r2 } r3 }) red } def extractName = (x: Any) => { val d = x.asInstanceOf[GenericRowWithSchema] d.getAs[String]("name") } }
여기 출력은
+---+-----+----------+----------------------------+-------+ |age|count|max(money)|short |name | +---+-----+----------+----------------------------+-------+ |5 |1 |5.0 |[6, Charley, Ignore, 5.0, 5]|Charley| |15 |2 |5.0 |[3, Curly, Twist, 5.0, 15] |Curly | |9 |1 |3.0 |[4, Laurel, Whimper, 3.0, 9]|Laurel | |18 |2 |6.0 |[5, Hardy, Laugh, 6.0, 18] |Hardy | +---+-----+----------+----------------------------+-------+
-
==============================
7.당신은 다음과 같이 할 수 있습니다 :
당신은 다음과 같이 할 수 있습니다 :
샘플 데이터 :
name age id abc 24 1001 cde 24 1002 efg 22 1003 ghi 21 1004 ijk 20 1005 klm 19 1006 mno 18 1007 pqr 18 1008 rst 26 1009 tuv 27 1010 pqr 18 1012 rst 28 1013 tuv 29 1011
산출:
+----+---+-----+ |name|age|count| +----+---+-----+ | efg| 22| 1| | tuv| 29| 1| | rst| 28| 1| | klm| 19| 1| | pqr| 18| 2| | cde| 24| 1| | tuv| 27| 1| | ijk| 20| 1| | abc| 24| 1| | mno| 18| 1| | ghi| 21| 1| | rst| 26| 1| +----+---+-----+
from https://stackoverflow.com/questions/34409875/how-to-get-other-columns-when-using-spark-dataframe-groupby by cc-by-sa and MIT license
'SQL' 카테고리의 다른 글
[SQL] 전혀 다른 테이블에있는 행에 대응 한 하나 개의 테이블에 행을 찾는 방법 (0) | 2020.05.01 |
---|---|
[SQL] 대안은 텍스트 또는 NTEXT 데이터 형식에 교체하려면 (0) | 2020.05.01 |
[SQL] 는 "파티 모델"의 장점은 무엇 뒤에 원칙은, 그리고? (0) | 2020.04.30 |
[SQL] 어떻게 MS-SQL 서버의 별칭 열에 의해 그룹을 수행합니까? (0) | 2020.04.30 |
[SQL] SQL 서버 2008 - IF는 NOT INSERT ELSE UPDATE 존재 (0) | 2020.04.30 |