복붙노트

[SCALA] 스파크 : 트랜스 DataFrame를 집계하지 않고

SCALA

스파크 : 트랜스 DataFrame를 집계하지 않고

나는 온라인 질문의 수를 살펴 보았다,하지만 그들은 내가 달성하기 위해 노력하고있어 할 것 같지 않습니다.

나는 스칼라와 아파치 불꽃 2.0.2을 사용하고 있습니다.

나는 dataframe 있습니다 :

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
|         1|  100|   0|   0|   0|   0|   0|
|         2|    0|  50|   0|   0|  20|   0|
|         3|    0|   0|   0|   0|   0|   0|
|         4|    0|   0|   0|   0|   0|   0|
+----------+-----+----+----+----+----+----+

나는에 트랜스 할

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
|val2|    0|  50|   0|   0|
|val3|    0|   0|   0|   0|
|val4|    0|   0|   0|   0|
|val5|    0|  20|   0|   0|
|val6|    0|   0|   0|   0|
+----+-----+----+----+----+

나는 피벗 ()를 사용하여 시도했다 그러나 나는 정답을 가져올 수 없습니다. 내 발 {X} 열을 통해 반복, 아래에 따라 각 피봇 결국, 그러나 이것은 매우 느린 것으로 증명된다.

val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
|         1|  100|
|         2|    0|
|         3|    0|
|         4|    0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
+----+-----+----+----+----+

그런 다음 첫 dataframe에 발 {X}의 각 반복에 노동 조합 ()를 사용하여.

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val2|    0|  50|   0|   0|
+----+-----+----+----+----+

나는 집계 데이터에 원하지 않는 트랜스의보다 효율적인 방법이 있나요?

감사 :)

해결법

  1. ==============================

    1.불행하게도 어떤 경우 경우가 없다 :

    불행하게도 어떤 경우 경우가 없다 :

    당신은 DataFrame이 불꽃에 구현 행의 분산 모음입니다 각 행은 하나의 노드에 저장, 가공하는 것을 기억해야합니다.

    당신은 피벗으로 DataFrame에 전위를 표현할 수있다 :

    val kv = explode(array(df.columns.tail.map { 
      c => struct(lit(c).alias("k"), col(c).alias("v")) 
    }: _*))
    
    df
      .withColumn("kv", kv)
      .select($"segment_id", $"kv.k", $"kv.v")
      .groupBy($"k")
      .pivot("segment_id")
      .agg(first($"v"))
      .orderBy($"k")
      .withColumnRenamed("k", "vals")
    

    그러나 그것은 단지 더 실용적인 응용 프로그램과 장난감 코드입니다. 실제로 그렇지 더 나은 데이터를 수집보다 :

    val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
      case Array(h, t @ _*) => {
        (h.map(_.toString), t.map(_.collect { case x: Int => x }))
      }
    }
    
    val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
    val schema = StructType(
      StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
    )
    
    spark.createDataFrame(sc.parallelize(rows), schema)
    

    들어 DataFrame 같이 정의 :

    val df = Seq(
      (1, 100, 0, 0, 0, 0, 0),
      (2, 0, 50, 0, 0, 20, 0),
      (3, 0, 0, 0, 0, 0, 0),
      (4, 0, 0, 0, 0, 0, 0)
    ).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
    

    모두 당신은 당신이 원하는 결과를 줄 것이다 :

    +----+---+---+---+---+
    |vals|  1|  2|  3|  4|
    +----+---+---+---+---+
    |val1|100|  0|  0|  0|
    |val2|  0| 50|  0|  0|
    |val3|  0|  0|  0|  0|
    |val4|  0|  0|  0|  0|
    |val5|  0| 20|  0|  0|
    |val6|  0|  0|  0|  0|
    +----+---+---+---+---+
    

    그 존재는 분산 된 데이터 구조에 대한 효율적인 트랜스 포지션을 필요로하는 경우 당신이 다른 곳에서보고해야했다. 두 차원에 걸쳐 데이터를 분산 할 수 있고 전치 수있는 핵심 CoordinateMatrix 및 BlockMatrix을 포함한 구조의 숫자가있다.

  2. ==============================

    2.이것은 완벽한 해결책이 될 것이다.

    이것은 완벽한 해결책이 될 것이다.

    val seq = Seq((1,100,0,0,0,0,0),(2,0,50,0,0,20,0),(3,0,0,0,0,0,0),(4,0,0,0,0,0,0))
    val df1 = seq.toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
    df1.show()
    
    val schema = df1.schema
    
    val df2 = df1.flatMap(row => {
      val metric = row.getInt(0)
      (1 until row.size).map(i => {
        (metric, schema(i).name, row.getInt(i))
      })
    })
    
    val df3 = df2.toDF("metric", "vals", "value")
    df3.show()
    import org.apache.spark.sql.functions._
    
    val df4 = df3.groupBy("vals").pivot("metric").agg(first("value"))
    df4.show()
    
  3. from https://stackoverflow.com/questions/40892459/spark-transpose-dataframe-without-aggregating by cc-by-sa and MIT license