[SCALA] 스파크 dataframe에 열의 Null 허용 속성을 변경
SCALA스파크 dataframe에 열의 Null 허용 속성을 변경
나는 수동으로 몇 가지 테스트를위한 dataframe을 만드는거야. 를 작성하는 코드는 다음과 같습니다
case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
.createDataFrame(List(input(1110,0,1001,-10.00),
input(1111,1,1001,10.00),
input(1111,0,1002,10.00)))
이 같은 스키마 외모 그래서 :
root
|-- id: long (nullable = false)
|-- var1: integer (nullable = false)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
나는 이러한 변수의 각각에 대해 '사실 널 (NULL) ='을 만들고 싶어. 어떻게 그 시작부터 선언하거나이 만들어 됐어요 후 새로운 dataframe에 전환 할 수 있습니까?
해결법
-
==============================
1.수입으로
수입으로
import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext}
당신이 사용할 수있는
/** * Set nullable property of column. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { // get schema val schema = df.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m) case y: StructField => y }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema ) }
직접.
또한 당신은 당신이 호출 할 수있는, 그런 (무엇 DataFrame에서 사용자 정의 메소드를 정의하는? 가장 좋은 방법은 내 SO 포스트 참조) 방법은 "내 라이브러리 포주"라이브러리 패턴을 통해 제공 할 수 있습니다
val df = .... val df2 = df.setNullableStateOfColumn( "id", true )
setNullableStateOfColumn의 약간의 수정 된 버전을 사용하여
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = { // get schema val schema = df.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m) }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema ) }
명시 적으로 스키마를 정의합니다. (사용 반사 더 일반적인하는 솔루션을 만들 수 있습니다)
configuredUnitTest("Stackoverflow.") { sparkContext => case class Input(id:Long, var1:Int, var2:Int, var3:Double) val sqlContext = new SQLContext(sparkContext) import sqlContext.implicits._ // use this to set the schema explicitly or // use refelection on the case class member to construct the schema val schema = StructType( Seq ( StructField( "id", LongType, true), StructField( "var1", IntegerType, true), StructField( "var2", IntegerType, true), StructField( "var3", DoubleType, true) )) val is: List[Input] = List( Input(1110, 0, 1001,-10.00), Input(1111, 1, 1001, 10.00), Input(1111, 0, 1002, 10.00) ) val rdd: RDD[Input] = sparkContext.parallelize( is ) val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3)) val inputDF = sqlContext.createDataFrame( rowRDD, schema ) inputDF.printSchema inputDF.show() }
-
==============================
2.이 늦은 대답이지만, 여기에 오는 사람들을위한 대체 솔루션을 제공하고 싶었다. 당신은 자동으로 코드에 다음과 같은 수정에 의해 처음부터 DataFrame 열 널 (NULL)을 만들 수 있습니다 :
이 늦은 대답이지만, 여기에 오는 사람들을위한 대체 솔루션을 제공하고 싶었다. 당신은 자동으로 코드에 다음과 같은 수정에 의해 처음부터 DataFrame 열 널 (NULL)을 만들 수 있습니다 :
case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double) val inputDF = sqlContext .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00), input(Some(1111),Some(1),1001,10.00), input(Some(1111),Some(0),1002,10.00))) inputDF.printSchema
이 얻을 것입니다 :
root |-- id: long (nullable = true) |-- var1: integer (nullable = true) |-- var2: integer (nullable = false) |-- var3: double (nullable = false) defined class input inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]
만약 실제 입력 등의 일부를 이용하여 옵션 ([요소]) 또는 유료로 필드를 선언하면 본질적으로, 다음 필드는 널 수있다. 그렇지 않으면,이 필드는 null 허용되지 않습니다. 이게 도움이 되길 바란다!
-
==============================
3.대신 (널 널 = C, t, m) 케이스 StructField (c, t, _, m) ⇒ StructField 중 하나 (널 = 널) _.copy을 사용할 수있다. 그런 다음 전체 함수는 다음과 같이 쓸 수있다 :
대신 (널 널 = C, t, m) 케이스 StructField (c, t, _, m) ⇒ StructField 중 하나 (널 = 널) _.copy을 사용할 수있다. 그런 다음 전체 함수는 다음과 같이 쓸 수있다 :
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = { df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable)))) }
-
==============================
4.이 자리에서 dataframe을 변경해야하고, 재창조 불가능하면 또 다른 방법은, 당신은 이런 식으로 뭔가를 할 수 있습니다 :
이 자리에서 dataframe을 변경해야하고, 재창조 불가능하면 또 다른 방법은, 당신은 이런 식으로 뭔가를 할 수 있습니다 :
.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
스파크는이 열이 널 (null)을 포함 할 수 있으며, Null 허용 여부가 true로 설정 될 것이라고 생각합니다. 또한, 옵션에 값을 래핑하는, UDF 사용할 수 있습니다. 심지어 경우 스트리밍 잘 작동합니다.
-
==============================
5.그냥 경우 클래스 대신 scala.Int의 java.lang.Integer의를 사용합니다.
그냥 경우 클래스 대신 scala.Int의 java.lang.Integer의를 사용합니다.
case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
-
==============================
6.덕분에 마틴 Senne. 그냥 약간의 추가. 내부 구조체 유형의 경우에는 다음과 같이 재귀 적으로 널 (NULL)로 설정해야 할 수도 있습니다 :
덕분에 마틴 Senne. 그냥 약간의 추가. 내부 구조체 유형의 경우에는 다음과 같이 재귀 적으로 널 (NULL)로 설정해야 할 수도 있습니다 :
def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = { def set(st: StructType): StructType = { StructType(st.map { case StructField(name, dataType, _, metadata) => val newDataType = dataType match { case t: StructType => set(t) case _ => dataType } StructField(name, newDataType, nullable = nullable, metadata) }) } df.sqlContext.createDataFrame(df.rdd, set(df.schema)) }
from https://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 형식화가 스파크 데이터 세트와 스칼라에 참여 수행 (0) | 2019.11.11 |
---|---|
[SCALA] 스칼라 : 패턴 매칭의 짧은 형식 부울을 반환 (0) | 2019.11.11 |
[SCALA] 스칼라의 케이스 클래스에 대한 과부하 생성자? (0) | 2019.11.11 |
[SCALA] 스칼라에서, 무엇을 정확히 수행 '발 A : A = _'(밑줄) 평균? (0) | 2019.11.11 |
[SCALA] 어떻게 스파크 RDD / Dataframe 크기를 찾는 방법은? (0) | 2019.11.11 |