[SCALA] 업데이트 된 행에 dataframe 행을지도하는 동안 인코더 오류
SCALA업데이트 된 행에 dataframe 행을지도하는 동안 인코더 오류
아래에 언급 한 바와 같이 내 코드에서 같은 일을하기 위해 노력하고있어 때
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
여기에서 위의 참조를 가지고있다 : 스칼라 : 나는 스칼라를 사용 Dataframe에 값을 대체 할 수있는 방법 그러나 나는 인코더 오류로 무엇입니까
참고 : 나는 스파크 2.0를 사용하고 있습니다!
해결법
-
==============================
1.여기에 예상치 못한 아무것도 없다. 당신은 스파크 1.x에서 작성되지 않은 더 이상 스파크 2.0에서 지원되는 코드를 사용하려는 :
여기에 예상치 못한 아무것도 없다. 당신은 스파크 1.x에서 작성되지 않은 더 이상 스파크 2.0에서 지원되는 코드를 사용하려는 :
솔직히 말해서 그것은 하나 1.x에서 훨씬 이해가되지 않았다. 버전의 독립 당신은 단순히 DataFrame API를 사용할 수 있습니다 :
import org.apache.spark.sql.functions.{when, lower} val df = Seq( (2012, "Tesla", "S"), (1997, "Ford", "E350"), (2015, "Chevy", "Volt") ).toDF("year", "make", "model") df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
당신이 정말로지도를 사용하려면 당신은 정적으로 입력 된 데이터 집합을 사용한다 :
import spark.implicits._ case class Record(year: Int, make: String, model: String) df.as[Record].map { case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S") case rec => rec }
또는 적어도 암시 인코더있을 것이다 객체를 반환 :
df.map { case Row(year: Int, make: String, model: String) => (year, if(make.toLowerCase == "tesla") "S" else make, model) }
일부 완전히 미친 이유 당신이 정말로 데이터 집합 [행]을 통해 매핑 할 경우 마지막으로 당신은 필요한 인코더를 제공해야합니다 :
import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.Row // Yup, it would be possible to reuse df.schema here val schema = StructType(Seq( StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType) )) val encoder = RowEncoder(schema) df.map { case Row(year, make: String, model) if make.toLowerCase == "tesla" => Row(year, "S", model) case row => row } (encoder)
-
==============================
2.dataframe 스키마 @ zero323에 의해 주어진 사전 대답 알려진 시나리오에 대한 솔루션입니다
dataframe 스키마 @ zero323에 의해 주어진 사전 대답 알려진 시나리오에 대한 솔루션입니다
하지만 동적 스키마 / 또는 일반적인 기능에 여러 dataframe 지나가는 시나리오 : 1.6.1에서 2.2.0에서 마이그레이션하는 동안 다음 코드는 우리를 위해 일했다
import org.apache.spark.sql.Row val df = Seq( (2012, "Tesla", "S"), (1997, "Ford", "E350"), (2015, "Chevy", "Volt") ).toDF("year", "make", "model") val data = df.rdd.map(row => { val row1 = row.getAs[String](1) val make = if (row1.toLowerCase == "tesla") "S" else row1 Row(row(0),make,row(2)) })
이 코드는 스파크의 버전을 모두 실행한다.
단점 : 최적화 제공 dataframe에 의해 스파크 / 데이터 세트의 API 못해 적용될.
from https://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] (왜) 우리는 캐시를 호출하거나 RDD에 유지해야합니까 (0) | 2019.10.29 |
---|---|
[SCALA] 스칼라의 숨겨진 기능 (0) | 2019.10.29 |
[SCALA] 스칼라 2.8 컬렉션 라이브러리는 "역사에서 가장 긴 유서"의 경우인가? [닫은] (0) | 2019.10.29 |
[SCALA] 어떻게 스파크에서 CSV 파일에서 헤더를 건너 뛸 수 있습니까? (0) | 2019.10.29 |
[SCALA] 스칼라 어떤 JSON 라이브러리를 사용 하는가? [닫은] (0) | 2019.10.29 |