[SCALA] 어떻게 데이터 집합에서 사용자 정의 개체를 저장하는 방법?
SCALA어떻게 데이터 집합에서 사용자 정의 개체를 저장하는 방법?
스파크 데이터 집합 소개에 따르면 :
사용자 지정 저장과 같은 오류를 다음에 데이터 집합 리드를 입력하려고 시도합니다 :
또는:
기존의 해결 방법이 있습니까?
이 질문은 커뮤니티 위키 답변에 대한 진입 점으로 존재합니다. 업데이트 / 질문과 답변을 모두 개선 주시기 바랍니다.
해결법
-
==============================
1.일이 지금에 내장 된 추가합니다 설정, 서열,지도, 날짜, 타임 스탬프 및 BigDecimal를위한 인코더 지원 더 나은 이후 2.2 / 2.3, 있지만이 대답은 여전히 유효하고 유익한입니다. 당신이 경우에만 클래스와 일반 스칼라 유형과 종류를 만들기에 충실하면 SQLImplicits 단지 암시 적으로 잘해야한다.
일이 지금에 내장 된 추가합니다 설정, 서열,지도, 날짜, 타임 스탬프 및 BigDecimal를위한 인코더 지원 더 나은 이후 2.2 / 2.3, 있지만이 대답은 여전히 유효하고 유익한입니다. 당신이 경우에만 클래스와 일반 스칼라 유형과 종류를 만들기에 충실하면 SQLImplicits 단지 암시 적으로 잘해야한다.
불행하게도, 거의 아무것도이 도움이 추가되지 않았습니다. Encoders.scala 또는 SQLImplicits.scala에서 @since 2.0.0을 검색하는 것은 대부분의 원시 타입 (케이스 클래스의 일부 조정)으로 할 일을 찾습니다. 그래서, 먼저 대답 : 현재 사용자 정의 클래스 인코더에 대한 진짜 좋은 지원이 없다. 길의 밖으로, 어떤 다음은 우리가 이제까지 우리는 현재 우리의 처분에 가지고있는 주어진 희망 수있는만큼 좋은 일을 할 몇 가지 트릭입니다. 선행 면책 조항으로이 완벽하게 작동하지 않습니다와 나는 모든 제한이 명확하고 솔직하기 위해 최선을 다하겠습니다.
데이터 집합을 만들고 싶어 할 때, 불꽃은 "일반적으로 SparkSession에서 implicits을 통해 자동으로 생성됩니다, 또는 정적 호출하여 명시 적으로 생성 할 수있다 (에 내부 스파크 SQL 표현에서 형 T의 JVM 개체를 변환) 인코더가 필요 (createDataset의 문서에서 가져온) 인코더 "메소드. 인코더는 양식 인코더 T는 인코딩하는 유형입니다 [T]를 걸릴 것입니다. 첫 번째 제안은 두 번째 제안은 명시 적으로 인코더 관련 기능 세트를 사용하여 암시 적 인코더에 전달하는 것입니다 (당신이 암시 인코더를 제공하는) 수입 spark.implicits._를 추가하는 것입니다.
정규 수업에 사용할 수있는 인코더는 너무 없다
import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
당신에게 다음과 같은 암시 적 관련 컴파일 시간 오류를 줄 것이다 :
방금 제품을 확장 몇 가지 클래스 위의 오류를 가져 오는 데 사용 어떤 유형의 포장 경우, 오류가 혼동 때문에, 런타임에 지연 도착
import spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
잘 컴파일,하지만 함께 런타임에 실패
그 이유는 실제로 implicits 런타임에만 이루어진다와 인코더 스파크 (스칼라 relfection 통해)를 생성한다는 것이다. 내가 시도하는 경우이 경우, 컴파일시에 모든 스파크 검사가 가장 바깥 쪽 클래스 제품 (모든 경우 클래스가 수행하는) 확장 만 여전히하여 MyObj에 무엇을 해야할지하지 않습니다 런타임에 실현한다는 것이다 (같은 문제가 발생합니다 )하여 MyObj에 BARF 런타임 점화까지 대기 - 데이터 집합 [(INT,하여 MyObj)]을 확인한다. 이 고정되는 무서운 필요로하는 중앙 문제가 있습니다 :
모든 사람이 제안이 솔루션은 kryo 인코더를 사용하는 것입니다.
import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
하지만 이것은 빠른 꽤 지루한 가져옵니다. 특히 코드 등 그룹화, 결합, 데이터 셋의 모든 종류를 조작한다 당신은 추가 implicits의 무리를 건 드리는 끝날 경우. 그럼, 왜 그냥이 모두 자동으로 않는 것을 암시 할?
import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
그리고 지금, 그것은 내가 원하는 거의 모든 것을 할 수있는 것 같다 (아래의 예는 spark.implicits._ 자동으로 가져 스파크 쉘에서 작동하지 않습니다)
class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
또는 거의. 문제 kryo 리드를 사용하여 단지 평면 바이너리 객체로서 집합의 모든 행을 저장 촉발한다는 것이다. 충분하지만, 가입 등의 작업을 위해, 스파크 정말이 컬럼으로 분리 될 필요지도, 필터, foreach는하십시오. D2 또는 D3에 대한 스키마를 검사, 당신은 단지 하나의 바이너리 열이 참조 :
d2.printSchema // root // |-- value: binary (nullable = true)
그래서, (6.26.3 과부하 해상도 이상) 스칼라에서 implicits의 마법을 사용하여, 나는 나 자신에게 적어도 튜플을 위해, 가능한 한 좋은 일을 할 것입니다 implicits의 시리즈를 만들 수 있으며, 기존의 implicits 잘 작동합니다 :
import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these
그런 다음,이 implicits로 무장, 좀 열 이름 바꾸기이기는하지만, 직장 내 위의 예제를 만들 수 있습니다
class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
다른 사람이 놀러 원하는 경우 이름 "값"이 소개됩니다 곳이있다 - 나는 아직 이름을 변경하지 않고 기본적으로 예상 튜플 이름 (_1, _2, ...)를하는 방법을 알아 내지 못했다 튜플 이름이 일반적으로 추가되는 곳이다. 그러나 중요한 점은 지금은 멋진 구조화 된 스키마를 가지고 있다는 것입니다 :
d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true)
그래서, 요약,이 해결 방법 :
이 사람은 덜 쾌적하고 더 좋은 해결책이 없습니다. 그러나, 우리는 위의 튜플 솔루션을 지금, 나는 다른 답변에서 암시 적 변환 솔루션은 조금 덜 고통스러운 당신이 튜플에 더 복잡한 클래스를 변환 할 수도 있기 때문에 될 것 직감이있다. 그런 다음 데이터 세트를 생성 한 후, 당신은 아마 dataframe 접근 방식을 사용하여 열 이름을 바꿀 것입니다. 모두가 잘된다면 나는 지금 내 수업의 분야에 조인 수행 할 수 있기 때문에, 이것은 정말 개선이다. 난 그냥 불가능했을 것 하나 평면 진 kryo 시리얼을 사용했다면.
여기에 모든 것을 조금을 수행하는 예입니다 : 내가 유형 INT, java.util.UUID 및 설정 [문자열]의 필드가 클래스하여 MyObj 있습니다. 첫 번째는 자체 처리합니다. 두 번째, 내가 (UUID를 보통 무언가 때문에 나는에 가입 할 수 있습니다) 문자열로 저장하면 더 유용 할 것 kryo를 사용하여 직렬화 수 있지만. 세 번째는 정말 그냥 바이너리 컬럼에 속한다.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
지금, 나는이 기계를 사용하여 좋은 스키마와 데이터 집합을 만들 수 있습니다 :
val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded]
그리고 스키마 나에게 올바른 이름을 가진 내가에 가입 할 수 있습니다 첫 번째 두 두 가지로 I 열을 보여줍니다.
d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true)
-
==============================
2.관련 질문 :
관련 질문 :
-
==============================
3.당신은 모든 당신의 사용자 정의 유형 제대로 작동 ... 등 다음 UDTRegistration 및 사례 클래스, 튜플을 사용할 수 있습니다!
당신은 모든 당신의 사용자 정의 유형 제대로 작동 ... 등 다음 UDTRegistration 및 사례 클래스, 튜플을 사용할 수 있습니다!
사용자 정의 열거를 사용하고자하는 말 :
trait CustomEnum { def value:String } case object Foo extends CustomEnum { val value = "F" } case object Bar extends CustomEnum { val value = "B" } object CustomEnum { def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get }
이처럼 등록
// First define a UDT class for it: class CustomEnumUDT extends UserDefinedType[CustomEnum] { override def sqlType: DataType = org.apache.spark.sql.types.StringType override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value) // Note that this will be a UTF8String type override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString) override def userClass: Class[CustomEnum] = classOf[CustomEnum] } // Then Register the UDT Class! // NOTE: you have to put this file into the org.apache.spark package! UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
그것을 사용!
case class UsingCustomEnum(id:Int, en:CustomEnum) val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo) ).toDS() seq.filter(_.en == Foo).show() println(seq.collect())
당신이 다형성 기록을 사용하고자하는 말 :
trait CustomPoly case class FooPoly(id:Int) extends CustomPoly case class BarPoly(value:String, secondValue:Long) extends CustomPoly
...이 같은 사용을 :
case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show()
당신은 바이트에 이르기까지 모든 인코딩 사용자 정의 UDT 쓸 수 있습니다 (I 여기에 자바 직렬화를 사용하고 있습니다를하지만 악기 스파크의 Kryo 컨텍스트에 아마 더 낫다).
먼저 UDT 클래스를 정의 :
class CustomPolyUDT extends UserDefinedType[CustomPoly] { val kryo = new Kryo() override def sqlType: DataType = org.apache.spark.sql.types.BinaryType override def serialize(obj: CustomPoly): Any = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(obj) bos.toByteArray } override def deserialize(datum: Any): CustomPoly = { val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]]) val ois = new ObjectInputStream(bis) val obj = ois.readObject() obj.asInstanceOf[CustomPoly] } override def userClass: Class[CustomPoly] = classOf[CustomPoly] }
그런 다음 등록 :
// NOTE: The file you do this in has to be inside of the org.apache.spark package! UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
그럼 당신은 그것을 사용할 수 있습니다!
// As shown above: case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show()
-
==============================
4.인코더는 Spark2.0에서 더 많거나 적은 같은 작동합니다. 그리고 Kryo은 여전히 권장 직렬화 선택입니다.
인코더는 Spark2.0에서 더 많거나 적은 같은 작동합니다. 그리고 Kryo은 여전히 권장 직렬화 선택입니다.
당신은 불꽃 쉘 다음의 예를 볼 수 있습니다
scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old.
이제]에는 적절한 인코더 본 범위 없었다까지 우리 인 바이너리 값으로 인코딩되지 않도록 하였다. 우리가 Kryo 직렬화를 사용하여 일부 암시 인코더를 제공하지만 일단 그 변경됩니다.
// Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old.
-
==============================
5.자바 빈 클래스의 경우,이 유용 할 수 있습니다
자바 빈 클래스의 경우,이 유용 할 수 있습니다
import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
지금 당신은 단순히 사용자 정의 DataFrame로 dataFrame를 읽을 수 있습니다
dataFrame.as[MyClass]
이 사용자 정의 클래스 인코더 아닌 바이너리를 생성합니다.
-
==============================
6.내 예는 자바에있을 것입니다,하지만 난 그것을 스칼라 어려운 적응시키는 것으로 생각하지 않습니다.
내 예는 자바에있을 것입니다,하지만 난 그것을 스칼라 어려운 적응시키는 것으로 생각하지 않습니다.
나는 한 과일 간단한 자바 빈처럼 spark.createDataset 및 Encoders.bean를 사용하여 데이터 집합 <과일>에 RDD <과일> 변환 매우 성공적이었다.
1 단계 : 간단한 자바 빈을 만듭니다.
public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out }
DataBricks의 사람들이 자신의 인코더를 강화하기 전에 나는 필드로 원시 타입과 문자열과 함께 수업에 충실 것입니다. 중첩 된 객체와 클래스가있는 경우, 평평 모든 필드와 다른 간단한 자바 빈을 생성, 그래서 당신은 하나의 간단한에 복잡한 유형을 매핑 할 RDD 변환을 사용할 수 있습니다. 물론 그것은 약간의 추가 작업이지만, 나는 그것이 평평한 스키마 작업 성능에 많은 도움이 될 것입니다 상상한다.
2 단계 : RDD에서 데이터 세트를 가져옵니다
SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List<Fruit> fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList); RDD<Fruit> fruitRDD = fruitJavaRDD.rdd(); Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class); Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);
그리고 짜잔! 비누 거품, 반복을 씻어.
-
==============================
7.내 상황에 나도 여기 내 대답을 넣을 수 있습니다 사람들을 위해.
내 상황에 나도 여기 내 대답을 넣을 수 있습니다 사람들을 위해.
구체적으로,
-
==============================
8.이미 주어진 제안뿐만 아니라, 내가 최근에 발견 된 또 다른 옵션은 형질 org.apache.spark.sql.catalyst.DefinedByConstructorParams 포함하여 사용자 정의 클래스를 선언 할 수 있다는 것입니다.
이미 주어진 제안뿐만 아니라, 내가 최근에 발견 된 또 다른 옵션은 형질 org.apache.spark.sql.catalyst.DefinedByConstructorParams 포함하여 사용자 정의 클래스를 선언 할 수 있다는 것입니다.
클래스가 ExpressionEncoder가 이해할 수있는 유형, 즉 원시 값과 표준 컬렉션을 사용하는 생성자가있는 경우이 작동합니다. 그것은 당신이 경우 클래스와 클래스를 선언 할 수없는 경우에 유용하게 사용할 수 있지만, 그것을이 데이터 집합에 포함 된 모든 시간을 인코딩하는 Kryo을 사용하지 않습니다.
예를 들어, 나는 바람 벡터를 포함하는 경우 클래스를 선언하고 싶었다. 즉 일반적으로 Kryo 것 처리 할 수있을 것입니다 유일한 인코더. 나는 바람 DenseVector 및 DefinedByConstructorParams을 확장 서브 클래스를 선언하는 경우 단, ExpressionEncoder 그것이 복식의 배열로 직렬화 할 수 있음을 이해.
내가 그것을 선언하는 방법은 다음과 같습니다
class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]
지금은 데이터 집합 (직접, 또는 제품의 일부로서) 간단한 ExpressionEncoder를 사용없이 Kryo에 SerializableDenseVector를 사용할 수 있습니다. 그냥 바람 DenseVector처럼 작동하지만, 배열 [더블]로 직렬화한다.
from https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라 2.8 브레이크 아웃 (0) | 2019.10.28 |
---|---|
[SCALA] 어떻게 스파크 DataFrames를 사용하여 JSON 데이터 열을 쿼리합니다? (0) | 2019.10.28 |
[SCALA] 복잡한 유형의 쿼리 스파크 SQL의 DataFrame (0) | 2019.10.28 |
[SCALA] 어디 스칼라 implicits 찾아합니까? (0) | 2019.10.28 |
[SCALA] 어떻게 스칼라의 타입 삭제 주위를받을 수 있나요? 또는, 왜 나는 내 컬렉션의 형식 매개 변수를 얻을 수 없다? (0) | 2019.10.28 |