[SCALA] 스파크 : 조건부 dataframe에 열 추가
SCALA스파크 : 조건부 dataframe에 열 추가
내 입력 데이터를 취하려합니다 :
A B C
--------------
4 blah 2
2 3
56 foo 3
그리고 B는 빈 여부에 따라 마지막에 열을 추가 :
A B C D
--------------------
4 blah 2 1
2 3 0
56 foo 3 1
그때, 임시 테이블로 입력 dataframe를 등록 SQL 쿼리를 입력하여 쉽게이 작업을 수행 할 수 있습니다.
하지만 난 정말 스칼라 방법과 스칼라 내에서 SQL 쿼리를 입력하지 않아도이 작업을 수행하는 방법을 알고 싶습니다.
나는 .withColumn을 시도했지만, 내가 원하는 일을하는 것을 얻을 수 없습니다.
해결법
-
==============================
1.다음과 같은 경우에 기능을 withColumn보십시오 :
다음과 같은 경우에 기능을 withColumn보십시오 :
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // for `toDF` and $"" import org.apache.spark.sql.functions._ // for `when` val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5))) .toDF("A", "B", "C") val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))
newDf.show () 방송
+---+----+---+---+ | A| B| C| D| +---+----+---+---+ | 4|blah| 2| 1| | 2| | 3| 0| | 56| foo| 3| 1| |100|null| 5| 0| +---+----+---+---+
I는 isNull에 케이스를 시험 (100, NULL, 5)의 행을 추가했다.
나는 스파크 1.6.0이 코드를 시도했지만으로, 그것은 1.4.0 이후 버전에서 작동 할 때의 코드에 댓글을 달았습니다.
-
==============================
2.내 나쁜, 나는 문제의 한 부분을 놓쳤다.
내 나쁜, 나는 문제의 한 부분을 놓쳤다.
최고의, 가장 깨끗한 방법은 UDF를 사용하는 것입니다. 코드 내에서 설명.
// create some example data...BY DataFrame // note, third record has an empty string case class Stuff(a:String,b:Int) val d= sc.parallelize(Seq( ("a",1),("b",2), ("",3) ,("d",4)).map { x => Stuff(x._1,x._2) }).toDF // now the good stuff. import org.apache.spark.sql.functions.udf // function that returns 0 is string empty val func = udf( (s:String) => if(s.isEmpty) 0 else 1 ) // create new dataframe with added column named "notempty" val r = d.select( $"a", $"b", func($"a").as("notempty") ) scala> r.show +---+---+--------+ | a| b|notempty| +---+---+--------+ | a| 1| 1111| | b| 2| 1111| | | 3| 0| | d| 4| 1111| +---+---+--------+
-
==============================
3.어떻게 이런 일에 대해?
어떻게 이런 일에 대해?
val newDF = df.filter($"B" === "").take(1) match { case Array() => df case _ => df.withColumn("D", $"B" === "") }
테이크 (1)을 사용하여 최소한의 타격을가한다
from https://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 GROUPBY 후 수집에 값을 집계하는? (0) | 2019.11.05 |
---|---|
[SCALA] 어떻게 만들고 스칼라에서 다차원 배열을 사용 하는가? (0) | 2019.11.05 |
[SCALA] dataframe으로 csv 파일을 읽는 동안 스키마를 제공합니다 (0) | 2019.11.05 |
[SCALA] 때 등호 스칼라 방법 선언에 서명을 사용 하는가? (0) | 2019.11.05 |
[SCALA] 하나를 사용하여 스칼라 코드에서 오류를 처리하는 (0) | 2019.11.05 |