복붙노트

[SCALA] 스칼라에서 임의의 값을 기존 DataFrame에 새 열을 추가하는 방법에 대한

SCALA

스칼라에서 임의의 값을 기존 DataFrame에 새 열을 추가하는 방법에 대한

나는 마루 파일로 dataframe을하고 난 어떤 임의의 데이터를 새 열을 추가해야하지만, 본인은 각각 그 임의의 데이터를 다른이 필요합니다. 이건 내 실제 코드와 불꽃의 현재 버전은 1.5.1-CDH-5.5.2입니다 :

val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686 
mydf.cache

val r = scala.util.Random
import org.apache.spark.sql.functions.udf
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

이 코드로,이 데이터를 가지고 :

scala> myNewDF.select("myNewColumn").show(10,false)
+-----------+
|myNewColumn|
+-----------+
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
+-----------+

그것은, UDF를 myNextPositiveNumber 한 번만 호출된다처럼되지 보인다?

최신 정보 하나의 고유 한 값이, 확인 :

scala> myNewDF.select("myNewColumn").distinct.show(50,false)
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
...

+-----------+                                                                   
|myNewColumn|
+-----------+
|889488717D |
+-----------+

내가 무슨 일을하고 있습니까?

2 업데이트 : 마지막으로, @의 도움으로이 코드를 user6910411 :

val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686 
mydf.cache

val r = scala.util.Random

import org.apache.spark.sql.functions.udf

val accum = sc.accumulator(1)

def myNextPositiveNumber():String = {
   accum+=1
   accum.value.toString.concat("D")
}

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

myNewDF.select("myNewColumn").count

// 63385686

업데이트 3

실제 코드는 다음과 같이 데이터를 생성한다 :

scala> mydf.select("myNewColumn").show(5,false)
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-----------+
|myNewColumn|
+-----------+
|2D         |
|2D         |
|2D         |
|2D         |
|2D         |
+-----------+
only showing top 5 rows

는 UDF 함수를 한 번만 호출 것, 그렇지 보인다? 그 열에서 새로운 임의의 요소가 필요합니다.

user6910411 @ 업데이트 4

내가 ID를 향상이 실제 코드를 가지고 있지만 마지막 문자을 연결하지 않는 것은, 그것은 이상한입니다. 이건 내 코드입니다 :

import org.apache.spark.sql.functions.udf


val mydf = sqlContext.read.parquet("some.parquet")

mydf.cache

def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))

scala> myNewDF.select("myNewColumn").show(5,false)
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_4_0]
+-----------+
|myNewColumn|
+-----------+
|0          |
|1          |
|2          |
|3          |
|4          |
+-----------+

내가 좋아하는 뭔가가 필요 :

+-----------+
|myNewColumn|
+-----------+
|1D         |
|2D         |
|3D         |
|4D         |
+-----------+

해결법

  1. ==============================

    1.스파크> = 2.3

    스파크> = 2.3

    asNondeterministic 방법을 사용하여 몇 가지 최적화를 비활성화 할 수 있습니다 :

    import org.apache.spark.sql.expressions.UserDefinedFunction
    
    val f: UserDefinedFunction = ???
    val fNonDeterministic: UserDefinedFunction = f.asNondeterministic
    

    이 옵션을 사용하기 전에 보증을 이해하도록하십시오.

    스파크 <2.3

    UDF로 전달 함수는 상수에 의해 대체 될 수 있고, null의 함수 호출 (SPARK-20,586의 가능한 예외) 결정되어야한다. 당신은 내장 함수에 임의의 숫자 사용을 생성하려면 :

    예를 들면 요구 된 분포를 얻기 위해, 출력 변환 :

    (rand * Integer.MAX_VALUE).cast("bigint").cast("string")
    
  2. ==============================

    2.당신은 임의의 값을 생성하기 위해 monotonically_increasing_id를 사용할 수있다.

    당신은 임의의 값을 생성하기 위해 monotonically_increasing_id를 사용할 수있다.

    그럼 당신은 monotonically_increasing_id 기본적으로 긴 반환으로 String으로 캐스팅 후 모든 문자열을 추가하기 위해 UDF를 정의 할 수 있습니다.

    scala> var df = Seq(("Ron"), ("John"), ("Steve"), ("Brawn"), ("Rock"), ("Rick")).toDF("names")
    +-----+
    |names|
    +-----+
    |  Ron|
    | John|
    |Steve|
    |Brawn|
    | Rock|
    | Rick|
    +-----+
    
    scala> val appendD = spark.sqlContext.udf.register("appendD", (s: String) => s.concat("D"))
    
    scala> df = df.withColumn("ID",monotonically_increasing_id).selectExpr("names","cast(ID as String) ID").withColumn("ID",appendD($"ID"))
    +-----+---+
    |names| ID|
    +-----+---+
    |  Ron| 0D|
    | John| 1D|
    |Steve| 2D|
    |Brawn| 3D|
    | Rock| 4D|
    | Rick| 5D|
    +-----+---+
    
  3. from https://stackoverflow.com/questions/42367464/about-how-to-add-a-new-column-to-an-existing-dataframe-with-random-values-in-sca by cc-by-sa and MIT license