복붙노트

[SCALA] 어떻게 스파크 SQL의 사용자 정의 유형에 대한 스키마를 정의?

SCALA

어떻게 스파크 SQL의 사용자 정의 유형에 대한 스키마를 정의?

다음 예제 코드는 dataframe에 일부의 경우 물건을 올려하려고합니다. 코드는 케이스 오브젝트 계층이 특성을 이용하는 경우, 클래스의 정의를 포함한다 :

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

코드를 실행했을 때, 나는 불행하게도 다음과 같은 예외가 발생할 :

java.lang.UnsupportedOperationException: Schema for type Some is not supported

열거에 대한 코드 :

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

미리 감사드립니다. 나는 가장 좋은 방법 대신 문자열을 사용하지 않는 것입니다 것을 희망한다.

해결법

  1. ==============================

    1.2.0.0+ 스파크 :

    2.0.0+ 스파크 :

    UserDefinedType은 스파크 2.0.0에서 비공개되었으며, 지금으로는 더 데이터 집합 친화적 인 대체가 없습니다.

    참조 : SPARK-14155 (스파크 2.0에서 숨기기 UserDefinedType를)

    대부분의 시간은 정적 데이터 세트가 대체 역할을 할 수 입력 대상 버전 2.4 다시 UDT API가 공개하는 보류 락스의 SPARK-7768이있다.

    데이터 집합에서 사용자 정의 개체를 저장하는 방법도 참조?

    스파크 <2.0.0

    나는 대답은 당신이 필요로하는지 심하게에 따라 달라집니다 것 같아요. UserDefinedType을 만들 수 있지만 DeveloperApi에 대한 액세스를 필요로하고 정확하게 간단하거나 잘 문서화되지 것 같습니다.

    import org.apache.spark.sql.types._
    
    @SQLUserDefinedType(udt = classOf[SomeUDT])
    sealed trait Some
    case object AType extends Some
    case object BType extends Some
    
    class SomeUDT extends UserDefinedType[Some] {
      override def sqlType: DataType = IntegerType
    
      override def serialize(obj: Any) = {
        obj match {
          case AType => 0
          case BType => 1
        }
      }
    
      override def deserialize(datum: Any): Some = {
        datum match {
          case 0 => AType
          case 1 => BType
        }
      }
    
      override def userClass: Class[Some] = classOf[Some]
    }
    

    당신은 아마 해시 코드를 무시하고뿐만 아니라 동일합니다.

    그 PySpark의 대응은 다음과 같이 할 수 있습니다 :

    from enum import Enum, unique
    from pyspark.sql.types import UserDefinedType, IntegerType
    
    class SomeUDT(UserDefinedType):
        @classmethod
        def sqlType(self):
            return IntegerType()
    
        @classmethod
        def module(cls):
            return cls.__module__
    
        @classmethod 
        def scalaUDT(cls): # Required in Spark < 1.5
            return 'net.zero323.enum.SomeUDT'
    
        def serialize(self, obj):
            return obj.value
    
        def deserialize(self, datum):
            return {x.value: x for x in Some}[datum]
    
    @unique
    class Some(Enum):
        __UDT__ = SomeUDT()
        AType = 0
        BType = 1
    

    스파크에서 <1.5 파이썬 UDT는 페어링 스칼라 UDT 필요하지만, 더 이상 1.5의 경우처럼 보일하지 않습니다.

    당신 같은 간단한 UDT (예 IntegerType 대신 전체 구조체에 대한) 간단한 형식을 사용할 수 있습니다하십시오.

  2. from https://stackoverflow.com/questions/32440461/how-to-define-schema-for-custom-type-in-spark-sql by cc-by-sa and MIT license