[SCALA] 스칼라에서 임의의 값을 기존 DataFrame에 새 열을 추가하는 방법에 대한
SCALA스칼라에서 임의의 값을 기존 DataFrame에 새 열을 추가하는 방법에 대한
나는 마루 파일로 dataframe을하고 난 어떤 임의의 데이터를 새 열을 추가해야하지만, 본인은 각각 그 임의의 데이터를 다른이 필요합니다. 이건 내 실제 코드와 불꽃의 현재 버전은 1.5.1-CDH-5.5.2입니다 :
val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache
val r = scala.util.Random
import org.apache.spark.sql.functions.udf
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
이 코드로,이 데이터를 가지고 :
scala> myNewDF.select("myNewColumn").show(10,false)
+-----------+
|myNewColumn|
+-----------+
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
+-----------+
그것은, UDF를 myNextPositiveNumber 한 번만 호출된다처럼되지 보인다?
최신 정보 하나의 고유 한 값이, 확인 :
scala> myNewDF.select("myNewColumn").distinct.show(50,false)
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
...
+-----------+
|myNewColumn|
+-----------+
|889488717D |
+-----------+
내가 무슨 일을하고 있습니까?
2 업데이트 : 마지막으로, @의 도움으로이 코드를 user6910411 :
val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache
val r = scala.util.Random
import org.apache.spark.sql.functions.udf
val accum = sc.accumulator(1)
def myNextPositiveNumber():String = {
accum+=1
accum.value.toString.concat("D")
}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
myNewDF.select("myNewColumn").count
// 63385686
업데이트 3
실제 코드는 다음과 같이 데이터를 생성한다 :
scala> mydf.select("myNewColumn").show(5,false)
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-----------+
|myNewColumn|
+-----------+
|2D |
|2D |
|2D |
|2D |
|2D |
+-----------+
only showing top 5 rows
는 UDF 함수를 한 번만 호출 것, 그렇지 보인다? 그 열에서 새로운 임의의 요소가 필요합니다.
user6910411 @ 업데이트 4
내가 ID를 향상이 실제 코드를 가지고 있지만 마지막 문자을 연결하지 않는 것은, 그것은 이상한입니다. 이건 내 코드입니다 :
import org.apache.spark.sql.functions.udf
val mydf = sqlContext.read.parquet("some.parquet")
mydf.cache
def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))
scala> myNewDF.select("myNewColumn").show(5,false)
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_4_0]
+-----------+
|myNewColumn|
+-----------+
|0 |
|1 |
|2 |
|3 |
|4 |
+-----------+
내가 좋아하는 뭔가가 필요 :
+-----------+
|myNewColumn|
+-----------+
|1D |
|2D |
|3D |
|4D |
+-----------+
해결법
-
==============================
1.스파크> = 2.3
스파크> = 2.3
asNondeterministic 방법을 사용하여 몇 가지 최적화를 비활성화 할 수 있습니다 :
import org.apache.spark.sql.expressions.UserDefinedFunction val f: UserDefinedFunction = ??? val fNonDeterministic: UserDefinedFunction = f.asNondeterministic
이 옵션을 사용하기 전에 보증을 이해하도록하십시오.
스파크 <2.3
UDF로 전달 함수는 상수에 의해 대체 될 수 있고, null의 함수 호출 (SPARK-20,586의 가능한 예외) 결정되어야한다. 당신은 내장 함수에 임의의 숫자 사용을 생성하려면 :
예를 들면 요구 된 분포를 얻기 위해, 출력 변환 :
(rand * Integer.MAX_VALUE).cast("bigint").cast("string")
-
==============================
2.당신은 임의의 값을 생성하기 위해 monotonically_increasing_id를 사용할 수있다.
당신은 임의의 값을 생성하기 위해 monotonically_increasing_id를 사용할 수있다.
그럼 당신은 monotonically_increasing_id 기본적으로 긴 반환으로 String으로 캐스팅 후 모든 문자열을 추가하기 위해 UDF를 정의 할 수 있습니다.
scala> var df = Seq(("Ron"), ("John"), ("Steve"), ("Brawn"), ("Rock"), ("Rick")).toDF("names") +-----+ |names| +-----+ | Ron| | John| |Steve| |Brawn| | Rock| | Rick| +-----+ scala> val appendD = spark.sqlContext.udf.register("appendD", (s: String) => s.concat("D")) scala> df = df.withColumn("ID",monotonically_increasing_id).selectExpr("names","cast(ID as String) ID").withColumn("ID",appendD($"ID")) +-----+---+ |names| ID| +-----+---+ | Ron| 0D| | John| 1D| |Steve| 2D| |Brawn| 3D| | Rock| 4D| | Rick| 5D| +-----+---+
from https://stackoverflow.com/questions/42367464/about-how-to-add-a-new-column-to-an-existing-dataframe-with-random-values-in-sca by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 언제 스칼라에 벡터를 선택해야합니까? (0) | 2019.11.01 |
---|---|
[SCALA] 인 IntelliJ에 build.gradle를 가져 오는 방법 (0) | 2019.11.01 |
[SCALA] 명명 된 인수에 밑줄 (0) | 2019.11.01 |
[SCALA] 스파크 구조화 스트리밍이 자동으로 현지 시간으로 타임 스탬프로 변환 (0) | 2019.11.01 |
[SCALA] 스칼라 데프 foo는 = {}와 데프 foo는 ()의 차이점은 = {} 무엇입니까? (0) | 2019.11.01 |