복붙노트

[SCALA] 어떻게 스파크 DataFrame를 피벗?

SCALA

어떻게 스파크 DataFrame를 피벗?

나는 스파크 DataFrames을 사용하기 시작하고 난 여러 행 1 열 중 여러 열을 생성하는 데이터를 피벗 할 수 있어야합니다. 이 끓는에서 그것을위한 기능 내장 및 파이썬에서 판다에 생각하지만, 나는 새로운 스파크 Dataframe에 대한 아무것도 찾을 수있다.

나는이 작업을 수행하는 몇 가지 종류의 사용자 정의 함수를 작성할 수 있습니다 생각하지만 난 스파크와 초보자입니다, 특히 이후 시작하는 방법을 모르겠어요. 나는 누군가가 기능이나 스칼라에서 뭔가를 작성하는 방법에 대한 제안에 내장이 작업을 수행하는 방법을 알고, 그것은 대단히 감사합니다.

해결법

  1. ==============================

    1.데이비드 앤더슨 스파크에 의해 언급 한 바와 같이 버전 1.6 이후 피벗 기능을 제공합니다. 다음과 같이 일반 구문은 같습니다

    데이비드 앤더슨 스파크에 의해 언급 한 바와 같이 버전 1.6 이후 피벗 기능을 제공합니다. 다음과 같이 일반 구문은 같습니다

    df
      .groupBy(grouping_columns)
      .pivot(pivot_column, [values]) 
      .agg(aggregate_expressions)
    

    nycflights13 및 CSV 형식을 사용하여 사용 예 :

    파이썬 :

    from pyspark.sql.functions import avg
    
    flights = (sqlContext
        .read
        .format("csv")
        .options(inferSchema="true", header="true")
        .load("flights.csv")
        .na.drop())
    
    flights.registerTempTable("flights")
    sqlContext.cacheTable("flights")
    
    gexprs = ("origin", "dest", "carrier")
    aggexpr = avg("arr_delay")
    
    flights.count()
    ## 336776
    
    %timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
    ## 10 loops, best of 3: 1.03 s per loop
    

    규모 :

    val flights = sqlContext
      .read
      .format("csv")
      .options(Map("inferSchema" -> "true", "header" -> "true"))
      .load("flights.csv")
    
    flights
      .groupBy($"origin", $"dest", $"carrier")
      .pivot("hour")
      .agg(avg($"arr_delay"))
    

    자바:

    import static org.apache.spark.sql.functions.*;
    import org.apache.spark.sql.*;
    
    Dataset<Row> df = spark.read().format("csv")
            .option("inferSchema", "true")
            .option("header", "true")
            .load("flights.csv");
    
    df.groupBy(col("origin"), col("dest"), col("carrier"))
            .pivot("hour")
            .agg(avg(col("arr_delay")));
    

    R / SparkR :

    library(magrittr)
    
    flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)
    
    flights %>% 
      groupBy("origin", "dest", "carrier") %>% 
      pivot("hour") %>% 
      agg(avg(column("arr_delay")))
    

    R / sparklyr

    library(dplyr)
    
    flights <- spark_read_csv(sc, "flights", "flights.csv")
    
    avg.arr.delay <- function(gdf) {
       expr <- invoke_static(
          sc,
          "org.apache.spark.sql.functions",
          "avg",
          "arr_delay"
        )
        gdf %>% invoke("agg", expr, list())
    }
    
    flights %>% 
      sdf_pivot(origin + dest + carrier ~  hour, fun.aggregate=avg.arr.delay)
    

    SQL :

    스파크 SQL에서 해당 PIVOT 키워드는 버전 2.4부터 지원됩니다.

    CREATE TEMPORARY VIEW flights 
    USING csv 
    OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;
    
     SELECT * FROM (
       SELECT origin, dest, carrier, arr_delay, hour FROM flights
     ) PIVOT (
       avg(arr_delay)
       FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
                    13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
     );
    

    예 데이터 :

    "year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
    2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
    2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
    2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
    2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
    2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
    2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
    2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
    2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
    2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
    2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00
    

    성능 고려 사항 :

    일반적으로 회전은 비용이 많이 드는 작업이다.

    관련 질문 :

  2. ==============================

    2.나는 동적 SQL 쿼리를 생성하는 루프를 작성하여이를 극복했다. 내가 가진 말 :

    나는 동적 SQL 쿼리를 생성하는 루프를 작성하여이를 극복했다. 내가 가진 말 :

    id  tag  value
    1   US    50
    1   UK    100
    1   Can   125
    2   US    75
    2   UK    150
    2   Can   175
    

    그리고 나는 원한다:

    id  US  UK   Can
    1   50  100  125
    2   75  150  175
    

    나는 선회하고 내가 필요로하는 SQL 쿼리를 포함하는 문자열을 만들려는 값 목록을 만들 수 있습니다.

    val countries = List("US", "UK", "Can")
    val numCountries = countries.length - 1
    
    var query = "select *, "
    for (i <- 0 to numCountries-1) {
      query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "
    }
    query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"
    
    myDataFrame.registerTempTable("myTable")
    val myDF1 = sqlContext.sql(query)
    

    그때 집계를 수행하는 유사한 쿼리를 만들 수 있습니다. 아니 아주 우아한 해결책은하지만 작동 코드를 호출 할 때 또한 인수로 전달 될 수있는 값의 목록은 유연하다.

  3. ==============================

    3.피벗 연산자 스파크 dataframe의 API에 첨가하고, 1.6 스파크의 일부되었다.

    피벗 연산자 스파크 dataframe의 API에 첨가하고, 1.6 스파크의 일부되었다.

    자세한 내용은 https://github.com/apache/spark/pull/7841를 참조하십시오.

  4. ==============================

    4.나는 다음과 같은 단계를 dataframes를 사용하여 유사한 문제를 해결했다 :

    나는 다음과 같은 단계를 dataframes를 사용하여 유사한 문제를 해결했다 :

    값으로 '가치'를 가진 모든 국가에 대한 열을 만듭니다

    import org.apache.spark.sql.functions._
    val countries = List("US", "UK", "Can")
    val countryValue = udf{(countryToCheck: String, countryInRow: String, value: Long) =>
      if(countryToCheck == countryInRow) value else 0
    }
    val countryFuncs = countries.map{country => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value"))) }
    val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")
    

    귀하의 dataframe 'dfWithCountries'는 다음과 같이 표시됩니다

    +--+--+---+---+
    |id|US| UK|Can|
    +--+--+---+---+
    | 1|50|  0|  0|
    | 1| 0|100|  0|
    | 1| 0|  0|125|
    | 2|75|  0|  0|
    | 2| 0|150|  0|
    | 2| 0|  0|175|
    +--+--+---+---+
    

    지금 당신은 당신의 원하는 결과에 대한 모든 값을 함께 요약 할 수 있습니다 :

    dfWithCountries.groupBy("id").sum(countries: _*).show
    

    결과:

    +--+-------+-------+--------+
    |id|SUM(US)|SUM(UK)|SUM(Can)|
    +--+-------+-------+--------+
    | 1|     50|    100|     125|
    | 2|     75|    150|     175|
    +--+-------+-------+--------+
    

    그래도 아주 우아한 해결책이 아니다. 나는 모든 컬럼에 추가 기능의 체인을 만들 수 있었다. 나는이 나라의 많은이있는 경우 또한, 나는 매우 넓은 제로의 많은 설정으로 설정 내 임시 데이터를 확장됩니다.

  5. ==============================

    5.처음에 나는 알 M의 솔루션을 채택했다. 나중에 같은 생각을 가져 갔고, 전치 기능으로이 기능을 다시 썼다.

    처음에 나는 알 M의 솔루션을 채택했다. 나중에 같은 생각을 가져 갔고, 전치 기능으로이 기능을 다시 썼다.

    이 방법은 키와 값 열을 이용하여 데이터 포맷의 열 어떤 DF 행을 이항

    입력에 대한 CSV

    id,tag,value
    1,US,50a
    1,UK,100
    1,Can,125
    2,US,75
    2,UK,150
    2,Can,175
    

    산출

    +--+---+---+---+
    |id| UK| US|Can|
    +--+---+---+---+
    | 2|150| 75|175|
    | 1|100|50a|125|
    +--+---+---+---+
    

    방법을 바꾸어 :

    def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = {
    
    val distinctCols =   df.select(key).distinct.map { r => r(0) }.collect().toList
    
    val rdd = df.map { row =>
    (compositeId.collect { case id => row.getAs(id).asInstanceOf[Any] },
    scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))
    }
    val pairRdd = rdd.reduceByKey(_ ++ _)
    val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
    hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))
    
    }
    
    private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) = {
    val cols = colNames.collect { case col => r._2.getOrElse(col.toString(), null) }
    val array = r._1 ++ cols
    Row(array: _*)
    }
    
    private  def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType = {
    val idSchema = idCols.map { idCol => srcSchema.apply(idCol) }
    val colSchema = srcSchema.apply(distinctCols._1)
    val colsSchema = distinctCols._2.map { col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable) }
    StructType(idSchema ++ colsSchema)
    }
    

    주요 조각

    import java.util.Date
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.types.StructField
    
    
    ...
    ...
    def main(args: Array[String]): Unit = {
    
        val sc = new SparkContext(conf)
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
        .load("data.csv")
        dfdata1.show()  
        val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
        dfOutput.show
    
    }
    
  6. ==============================

    6.간단하고 우아한 해결책이있다.

    간단하고 우아한 해결책이있다.

    scala> spark.sql("select * from k_tags limit 10").show()
    +---------------+-------------+------+
    |           imsi|         name| value|
    +---------------+-------------+------+
    |246021000000000|          age|    37|
    |246021000000000|       gender|Female|
    |246021000000000|         arpu|    22|
    |246021000000000|   DeviceType| Phone|
    |246021000000000|DataAllowance|   6GB|
    +---------------+-------------+------+
    
    scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
    +---------------+-------------+----------+---+----+------+
    |           imsi|DataAllowance|DeviceType|age|arpu|gender|
    +---------------+-------------+----------+---+----+------+
    |246021000000000|          6GB|     Phone| 37|  22|Female|
    |246021000000001|          1GB|     Phone| 72|  10|  Male|
    +---------------+-------------+----------+---+----+------+
    
  7. ==============================

    7.이 데이터 세트 / dataframe에 피벗 동작의 예를 많이하지만, 나는 SQL을 사용하여 많은 찾을 수 없습니다. 여기에 나를 위해 일한 예입니다.

    이 데이터 세트 / dataframe에 피벗 동작의 예를 많이하지만, 나는 SQL을 사용하여 많은 찾을 수 없습니다. 여기에 나를 위해 일한 예입니다.

    create or replace temporary view faang 
    as SELECT stock.date AS `Date`,
        stock.adj_close AS `Price`,
        stock.symbol as `Symbol` 
    FROM stock  
    WHERE (stock.symbol rlike '^(FB|AAPL|GOOG|AMZN)$') and year(date) > 2010;
    
    
    SELECT * from faang 
    
    PIVOT (max(price) for symbol in ('AAPL', 'FB', 'GOOG', 'AMZN')) order by date; 
    
    
  8. ==============================

    8.내장 스파크 피봇 기능은 비효율적이다. 벨로우즈 구현 2.4+ 스파크에서 작동 - 아이디어는지도를 집계하고 열로 값을 추출하는 것입니다. 유일한 제한은 피봇 열의 집합 함수 만 컬럼 (들)을 처리하지 않을 것이다.

    내장 스파크 피봇 기능은 비효율적이다. 벨로우즈 구현 2.4+ 스파크에서 작동 - 아이디어는지도를 집계하고 열로 값을 추출하는 것입니다. 유일한 제한은 피봇 열의 집합 함수 만 컬럼 (들)을 처리하지 않을 것이다.

    8M 테이블에서 그 기능이 내장 된 스파크 버전 40분 대, 3 secondes에 적용

    # pass an optional list of string to avoid computation of columns
    def pivot(df, group_by, key, aggFunction, levels=[]):
        if not levels:
            levels = [row[key] for row in df.filter(col(key).isNotNull()).groupBy(col(key)).agg(count(key)).select(key).collect()]
        return df.filter(col(key).isin(*levels) == True).groupBy(group_by).agg(map_from_entries(collect_list(struct(key, expr(aggFunction)))).alias("group_map")).select([group_by] + ["group_map." + l for l in levels])
    
    # Usage
    pivot(df, "id", "key", "value")
    pivot(df, "id", "key", "array(value)")
    
    // pass an optional list of string to avoid computation of columns
      def pivot(df: DataFrame, groupBy: Column, key: Column, aggFunct: String, _levels: List[String] = Nil): DataFrame = {
        val levels =
          if (_levels.isEmpty) df.filter(key.isNotNull).select(key).distinct().collect().map(row => row.getString(0)).toList
          else _levels
    
        df
          .filter(key.isInCollection(levels))
          .groupBy(groupBy)
          .agg(map_from_entries(collect_list(struct(key, expr(aggFunct)))).alias("group_map"))
          .select(groupBy.toString, levels.map(f => "group_map." + f): _*)
      }
    
    // Usage:
    pivot(df, col("id"), col("key"), "value")
    pivot(df, col("id"), col("key"), "array(value)")
    
  9. ==============================

    9.스파크는 스파크 DataFrame 회전식 개선을 제공하고 있습니다. 피벗 기능 1.6 버전을 점화하는 점화 DataFrame의 API에 첨가하고, 그 성능 문제를 가지고 즉 스파크 2.0 수정되었습니다되었습니다

    스파크는 스파크 DataFrame 회전식 개선을 제공하고 있습니다. 피벗 기능 1.6 버전을 점화하는 점화 DataFrame의 API에 첨가하고, 그 성능 문제를 가지고 즉 스파크 2.0 수정되었습니다되었습니다

    그러나, 당신은 낮은 버전을 사용하는 경우, 그 피봇 때문에 매우 비용이 많이 드는 작업이다 참고 다음과 같이 함수에 인수로 (알려진 경우) 컬럼 데이터를 제공 할 것을 권장한다.

    val countries = Seq("USA","China","Canada","Mexico")
    val pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
    pivotDF.show()
    

    이 피벗과 Unpivoting 스파크 DataFrame에 대한 자세한 설명하고있다

    해피 학습!

  10. from https://stackoverflow.com/questions/30244910/how-to-pivot-spark-dataframe by cc-by-sa and MIT license