복붙노트

[SCALA] 업데이트 된 행에 dataframe 행을지도하는 동안 인코더 오류

SCALA

업데이트 된 행에 dataframe 행을지도하는 동안 인코더 오류

아래에 언급 한 바와 같이 내 코드에서 같은 일을하기 위해 노력하고있어 때

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

여기에서 위의 참조를 가지고있다 : 스칼라 : 나는 스칼라를 사용 Dataframe에 값을 대체 할 수있는 방법 그러나 나는 인코더 오류로 무엇입니까

참고 : 나는 스파크 2.0를 사용하고 있습니다!

해결법

  1. ==============================

    1.여기에 예상치 못한 아무것도 없다. 당신은 스파크 1.x에서 작성되지 않은 더 이상 스파크 2.0에서 지원되는 코드를 사용하려는 :

    여기에 예상치 못한 아무것도 없다. 당신은 스파크 1.x에서 작성되지 않은 더 이상 스파크 2.0에서 지원되는 코드를 사용하려는 :

    솔직히 말해서 그것은 하나 1.x에서 훨씬 이해가되지 않았다. 버전의 독립 당신은 단순히 DataFrame API를 사용할 수 있습니다 :

    import org.apache.spark.sql.functions.{when, lower}
    
    val df = Seq(
      (2012, "Tesla", "S"), (1997, "Ford", "E350"),
      (2015, "Chevy", "Volt")
    ).toDF("year", "make", "model")
    
    df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
    

    당신이 정말로지도를 사용하려면 당신은 정적으로 입력 된 데이터 집합을 사용한다 :

    import spark.implicits._
    
    case class Record(year: Int, make: String, model: String)
    
    df.as[Record].map {
      case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
      case rec => rec
    }
    

    또는 적어도 암시 인코더있을 것이다 객체를 반환 :

    df.map {
      case Row(year: Int, make: String, model: String) => 
        (year, if(make.toLowerCase == "tesla") "S" else make, model)
    }
    

    일부 완전히 미친 이유 당신이 정말로 데이터 집합 [행]을 통해 매핑 할 경우 마지막으로 당신은 필요한 인코더를 제공해야합니다 :

    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    // Yup, it would be possible to reuse df.schema here
    val schema = StructType(Seq(
      StructField("year", IntegerType),
      StructField("make", StringType),
      StructField("model", StringType)
    ))
    
    val encoder = RowEncoder(schema)
    
    df.map {
      case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
        Row(year, "S", model)
      case row => row
    } (encoder)
    
  2. ==============================

    2.dataframe 스키마 @ zero323에 의해 주어진 사전 대답 알려진 시나리오에 대한 솔루션입니다

    dataframe 스키마 @ zero323에 의해 주어진 사전 대답 알려진 시나리오에 대한 솔루션입니다

    하지만 동적 스키마 / 또는 일반적인 기능에 여러 dataframe 지나가는 시나리오 : 1.6.1에서 2.2.0에서 마이그레이션하는 동안 다음 코드는 우리를 위해 일했다

    import org.apache.spark.sql.Row
    
    val df = Seq(
       (2012, "Tesla", "S"), (1997, "Ford", "E350"),
       (2015, "Chevy", "Volt")
     ).toDF("year", "make", "model")
    
    val data = df.rdd.map(row => {
      val row1 = row.getAs[String](1)
      val make = if (row1.toLowerCase == "tesla") "S" else row1
      Row(row(0),make,row(2))
    })
    

    이 코드는 스파크의 버전을 모두 실행한다.

    단점 : 최적화 제공 dataframe에 의해 스파크 / 데이터 세트의 API 못해 적용될.

  3. from https://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row by cc-by-sa and MIT license