복붙노트

[SCALA] 스파크 dataframe에 열의 Null 허용 속성을 변경

SCALA

스파크 dataframe에 열의 Null 허용 속성을 변경

나는 수동으로 몇 가지 테스트를위한 dataframe을 만드는거야. 를 작성하는 코드는 다음과 같습니다

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))

이 같은 스키마 외모 그래서 :

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

나는 이러한 변수의 각각에 대해 '사실 널 (NULL) ='을 만들고 싶어. 어떻게 그 시작부터 선언하거나이 만들어 됐어요 후 새로운 dataframe에 전환 할 수 있습니까?

해결법

  1. ==============================

    1.수입으로

    수입으로

    import org.apache.spark.sql.types.{StructField, StructType}
    import org.apache.spark.sql.{DataFrame, SQLContext}
    import org.apache.spark.{SparkConf, SparkContext}
    

    당신이 사용할 수있는

    /**
     * Set nullable property of column.
     * @param df source DataFrame
     * @param cn is the column name to change
     * @param nullable is the flag to set, such that the column is  either nullable or not
     */
    def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
    
      // get schema
      val schema = df.schema
      // modify [[StructField] with name `cn`
      val newSchema = StructType(schema.map {
        case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
        case y: StructField => y
      })
      // apply new schema
      df.sqlContext.createDataFrame( df.rdd, newSchema )
    }
    

    직접.

    또한 당신은 당신이 호출 할 수있는, 그런 (무엇 DataFrame에서 사용자 정의 메소드를 정의하는? 가장 좋은 방법은 내 SO 포스트 참조) 방법은 "내 라이브러리 포주"라이브러리 패턴을 통해 제공 할 수 있습니다

    val df = ....
    val df2 = df.setNullableStateOfColumn( "id", true )
    

    setNullableStateOfColumn의 약간의 수정 된 버전을 사용하여

    def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
      // get schema
      val schema = df.schema
      // modify [[StructField] with name `cn`
      val newSchema = StructType(schema.map {
        case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
      })
      // apply new schema
      df.sqlContext.createDataFrame( df.rdd, newSchema )
    }
    

    명시 적으로 스키마를 정의합니다. (사용 반사 더 일반적인하는 솔루션을 만들 수 있습니다)

    configuredUnitTest("Stackoverflow.") { sparkContext =>
    
      case class Input(id:Long, var1:Int, var2:Int, var3:Double)
    
      val sqlContext = new SQLContext(sparkContext)
      import sqlContext.implicits._
    
    
      // use this to set the schema explicitly or
      // use refelection on the case class member to construct the schema
      val schema = StructType( Seq (
        StructField( "id", LongType, true),
        StructField( "var1", IntegerType, true),
        StructField( "var2", IntegerType, true),
        StructField( "var3", DoubleType, true)
      ))
    
      val is: List[Input] = List(
        Input(1110, 0, 1001,-10.00),
        Input(1111, 1, 1001, 10.00),
        Input(1111, 0, 1002, 10.00)
      )
    
      val rdd: RDD[Input] =  sparkContext.parallelize( is )
      val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
      val inputDF = sqlContext.createDataFrame( rowRDD, schema ) 
    
      inputDF.printSchema
      inputDF.show()
    }
    
  2. ==============================

    2.이 늦은 대답이지만, 여기에 오는 사람들을위한 대체 솔루션을 제공하고 싶었다. 당신은 자동으로 코드에 다음과 같은 수정에 의해 처음부터 DataFrame 열 널 (NULL)을 만들 수 있습니다 :

    이 늦은 대답이지만, 여기에 오는 사람들을위한 대체 솔루션을 제공하고 싶었다. 당신은 자동으로 코드에 다음과 같은 수정에 의해 처음부터 DataFrame 열 널 (NULL)을 만들 수 있습니다 :

    case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
    val inputDF = sqlContext
      .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
        input(Some(1111),Some(1),1001,10.00),
        input(Some(1111),Some(0),1002,10.00)))
    inputDF.printSchema
    

    이 얻을 것입니다 :

    root
     |-- id: long (nullable = true)
     |-- var1: integer (nullable = true)
     |-- var2: integer (nullable = false)
     |-- var3: double (nullable = false)
    
    defined class input
    inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]
    

    만약 실제 입력 등의 일부를 이용하여 옵션 ([요소]) 또는 유료로 필드를 선언하면 본질적으로, 다음 필드는 널 수있다. 그렇지 않으면,이 필드는 null 허용되지 않습니다. 이게 도움이 되길 바란다!

  3. ==============================

    3.대신 (널 널 = C, t, m) 케이스 StructField (c, t, _, m) ⇒ StructField 중 하나 (널 = 널) _.copy을 사용할 수있다. 그런 다음 전체 함수는 다음과 같이 쓸 수있다 :

    대신 (널 널 = C, t, m) 케이스 StructField (c, t, _, m) ⇒ StructField 중 하나 (널 = 널) _.copy을 사용할 수있다. 그런 다음 전체 함수는 다음과 같이 쓸 수있다 :

    def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
      df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
    }
    
  4. ==============================

    4.이 자리에서 dataframe을 변경해야하고, 재창조 불가능하면 또 다른 방법은, 당신은 이런 식으로 뭔가를 할 수 있습니다 :

    이 자리에서 dataframe을 변경해야하고, 재창조 불가능하면 또 다른 방법은, 당신은 이런 식으로 뭔가를 할 수 있습니다 :

    .withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
    

    스파크는이 열이 널 (null)을 포함 할 수 있으며, Null 허용 여부가 true로 설정 될 것이라고 생각합니다. 또한, 옵션에 값을 래핑하는, UDF 사용할 수 있습니다. 심지어 경우 스트리밍 잘 작동합니다.

  5. ==============================

    5.그냥 경우 클래스 대신 scala.Int의 java.lang.Integer의를 사용합니다.

    그냥 경우 클래스 대신 scala.Int의 java.lang.Integer의를 사용합니다.

    case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
    
  6. ==============================

    6.덕분에 마틴 Senne. 그냥 약간의 추가. 내부 구조체 유형의 경우에는 다음과 같이 재귀 적으로 널 (NULL)로 설정해야 할 수도 있습니다 :

    덕분에 마틴 Senne. 그냥 약간의 추가. 내부 구조체 유형의 경우에는 다음과 같이 재귀 적으로 널 (NULL)로 설정해야 할 수도 있습니다 :

    def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
        def set(st: StructType): StructType = {
          StructType(st.map {
            case StructField(name, dataType, _, metadata) =>
              val newDataType = dataType match {
                case t: StructType => set(t)
                case _ => dataType
              }
              StructField(name, newDataType, nullable = nullable, metadata)
          })
        }
    
        df.sqlContext.createDataFrame(df.rdd, set(df.schema))
      }
    
  7. from https://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe by cc-by-sa and MIT license