[SCALA] 스파크 : 트랜스 DataFrame를 집계하지 않고
SCALA스파크 : 트랜스 DataFrame를 집계하지 않고
나는 온라인 질문의 수를 살펴 보았다,하지만 그들은 내가 달성하기 위해 노력하고있어 할 것 같지 않습니다.
나는 스칼라와 아파치 불꽃 2.0.2을 사용하고 있습니다.
나는 dataframe 있습니다 :
+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
| 1| 100| 0| 0| 0| 0| 0|
| 2| 0| 50| 0| 0| 20| 0|
| 3| 0| 0| 0| 0| 0| 0|
| 4| 0| 0| 0| 0| 0| 0|
+----------+-----+----+----+----+----+----+
나는에 트랜스 할
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+-----+----+----+----+
나는 피벗 ()를 사용하여 시도했다 그러나 나는 정답을 가져올 수 없습니다. 내 발 {X} 열을 통해 반복, 아래에 따라 각 피봇 결국, 그러나 이것은 매우 느린 것으로 증명된다.
val d = df.select('segment_id, 'val1)
+----------+-----+
|segment_id| val1|
+----------+-----+
| 1| 100|
| 2| 0|
| 3| 0|
| 4| 0|
+----------+-----+
d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
+----+-----+----+----+----+
그런 다음 첫 dataframe에 발 {X}의 각 반복에 노동 조합 ()를 사용하여.
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val2| 0| 50| 0| 0|
+----+-----+----+----+----+
나는 집계 데이터에 원하지 않는 트랜스의보다 효율적인 방법이 있나요?
감사 :)
해결법
-
==============================
1.불행하게도 어떤 경우 경우가 없다 :
불행하게도 어떤 경우 경우가 없다 :
당신은 DataFrame이 불꽃에 구현 행의 분산 모음입니다 각 행은 하나의 노드에 저장, 가공하는 것을 기억해야합니다.
당신은 피벗으로 DataFrame에 전위를 표현할 수있다 :
val kv = explode(array(df.columns.tail.map { c => struct(lit(c).alias("k"), col(c).alias("v")) }: _*)) df .withColumn("kv", kv) .select($"segment_id", $"kv.k", $"kv.v") .groupBy($"k") .pivot("segment_id") .agg(first($"v")) .orderBy($"k") .withColumnRenamed("k", "vals")
그러나 그것은 단지 더 실용적인 응용 프로그램과 장난감 코드입니다. 실제로 그렇지 더 나은 데이터를 수집보다 :
val (header, data) = df.collect.map(_.toSeq.toArray).transpose match { case Array(h, t @ _*) => { (h.map(_.toString), t.map(_.collect { case x: Int => x })) } } val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) } val schema = StructType( StructField("vals", StringType) +: header.map(StructField(_, IntegerType)) ) spark.createDataFrame(sc.parallelize(rows), schema)
들어 DataFrame 같이 정의 :
val df = Seq( (1, 100, 0, 0, 0, 0, 0), (2, 0, 50, 0, 0, 20, 0), (3, 0, 0, 0, 0, 0, 0), (4, 0, 0, 0, 0, 0, 0) ).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
모두 당신은 당신이 원하는 결과를 줄 것이다 :
+----+---+---+---+---+ |vals| 1| 2| 3| 4| +----+---+---+---+---+ |val1|100| 0| 0| 0| |val2| 0| 50| 0| 0| |val3| 0| 0| 0| 0| |val4| 0| 0| 0| 0| |val5| 0| 20| 0| 0| |val6| 0| 0| 0| 0| +----+---+---+---+---+
그 존재는 분산 된 데이터 구조에 대한 효율적인 트랜스 포지션을 필요로하는 경우 당신이 다른 곳에서보고해야했다. 두 차원에 걸쳐 데이터를 분산 할 수 있고 전치 수있는 핵심 CoordinateMatrix 및 BlockMatrix을 포함한 구조의 숫자가있다.
-
==============================
2.이것은 완벽한 해결책이 될 것이다.
이것은 완벽한 해결책이 될 것이다.
val seq = Seq((1,100,0,0,0,0,0),(2,0,50,0,0,20,0),(3,0,0,0,0,0,0),(4,0,0,0,0,0,0)) val df1 = seq.toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6") df1.show() val schema = df1.schema val df2 = df1.flatMap(row => { val metric = row.getInt(0) (1 until row.size).map(i => { (metric, schema(i).name, row.getInt(i)) }) }) val df3 = df2.toDF("metric", "vals", "value") df3.show() import org.apache.spark.sql.functions._ val df4 = df3.groupBy("vals").pivot("metric").agg(first("value")) df4.show()
from https://stackoverflow.com/questions/40892459/spark-transpose-dataframe-without-aggregating by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 자바에서 스칼라 싱글 톤 객체를 사용할 수 있습니까? (0) | 2019.11.10 |
---|---|
[SCALA] 왜이 REPL에서 같은 이름의 변수를 선언 할 수있다? (0) | 2019.11.10 |
[SCALA] 어떻게 스파크 dataframe에 목록 / 배열에서 새 열을 추가하는 아파치 불꽃 (0) | 2019.11.10 |
[SCALA] 위 유형 경계에서 작동하지 않는 추상 형식을 무시 케이크 패턴 (0) | 2019.11.10 |
[SCALA] 평균으로 누락 된 값을 교체 - 불꽃 Dataframe을 (0) | 2019.11.10 |