복붙노트

[SCALA] 어떻게 UDF에서 사용자 정의 변압기를 만드는 방법?

SCALA

어떻게 UDF에서 사용자 정의 변압기를 만드는 방법?

내가 만들고 사용자 지정 단계와 파이프 라인을 구하려고했다. 나는 UDF를 사용하여 내 DataFrame에 열을 추가해야합니다. UDF 또는 변압기에 유사한 조치를 변환 할 수 있다면 따라서 궁금 해서요?

내 사용자 정의 UDF의이 같은 외모와 나는 사용자 정의 변압기와 같은 UDF를 사용하여 작업을 수행하는 방법을 알고 싶습니다.

def getFeatures(n: String) = {
    val NUMBER_FEATURES = 4  
    val name = n.split(" +")(0).toLowerCase
    ((1 to NUMBER_FEATURES)
         .filter(size => size <= name.length)
         .map(size => name.substring(name.length - size)))
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))

해결법

  1. ==============================

    1.그것은 완벽한 기능을 갖춘 솔루션이 아니라 당신은 다음과 같이 시작할 수 있습니다 :

    그것은 완벽한 기능을 갖춘 솔루션이 아니라 당신은 다음과 같이 시작할 수 있습니다 :

    import org.apache.spark.ml.{UnaryTransformer}
    import org.apache.spark.ml.util.Identifiable
    import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
    
    class NGramTokenizer(override val uid: String)
      extends UnaryTransformer[String, Seq[String], NGramTokenizer]  {
    
      def this() = this(Identifiable.randomUID("ngramtokenizer"))
    
      override protected def createTransformFunc: String => Seq[String] = {
        getFeatures _
      }
    
      override protected def validateInputType(inputType: DataType): Unit = {
        require(inputType == StringType)
      }
    
      override protected def outputDataType: DataType = {
        new ArrayType(StringType, true)
      }
    }
    

    빠른 확인:

    val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
    val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")
    
    transformer.transform(df).show
    // +---+------+------------------+
    // |  k|     v|                vs|
    // +---+------+------------------+
    // |  1|abcdef|[f, ef, def, cdef]|
    // |  2|foobar|[r, ar, bar, obar]|
    // +---+------+------------------+
    

    당신은 이런 일에 일반화를 시도 할 수 있습니다 :

    import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
    import scala.reflect.runtime.universe._
    
    class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
      override val uid: String,
      f: T => U
    ) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]]  {
    
      override protected def createTransformFunc: T => U = f
    
      override protected def validateInputType(inputType: DataType): Unit = 
        require(inputType == schemaFor[T].dataType)
    
      override protected def outputDataType: DataType = schemaFor[U].dataType
    }
    
    val transformer = new UnaryUDFTransformer("featurize", getFeatures)
      .setInputCol("v")
      .setOutputCol("vs")
    

    당신은 UDF되지 랩 기능을 사용하려면 직접 변압기를 확장해야하고 재정의 방법을 변환합니다. 오히려 까다로울 수 있도록 유용한 클래스의 불행하게도 대부분은 비공개입니다.

    또는 당신은 UDF를 등록 할 수 있습니다 :

    spark.udf.register("getFeatures", getFeatures _)
    

    및 사용 SQLTransformer

    import org.apache.spark.ml.feature.SQLTransformer
    
    val transformer = new SQLTransformer()
      .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")
    
    transformer.transform(df).show
    // +---+------+------------------+
    // |  k|     v|                vs|
    // +---+------+------------------+
    // |  1|abcdef|[f, ef, def, cdef]|
    // |  2|foobar|[r, ar, bar, obar]|
    // +---+------+------------------+
    
  2. ==============================

    2.내가 처음 변압기 및 UnaryTransformer 초록을 확장하려하지만 내 응용 프로그램이 DefaultParamsWriteable.As 문제에 관련이있을 수 예를 도달 할 수없는 존재에 문제가 발생,이 예제를 따라 다음은 UDF와 같은 간단한 용어 정규화를 만들었습니다. 내 목표는 일반적인 용어로 대체하는 패턴과 세트에 대한 용어와 일치하는 것입니다. 예를 들면 :

    내가 처음 변압기 및 UnaryTransformer 초록을 확장하려하지만 내 응용 프로그램이 DefaultParamsWriteable.As 문제에 관련이있을 수 예를 도달 할 수없는 존재에 문제가 발생,이 예제를 따라 다음은 UDF와 같은 간단한 용어 정규화를 만들었습니다. 내 목표는 일반적인 용어로 대체하는 패턴과 세트에 대한 용어와 일치하는 것입니다. 예를 들면 :

    "\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr"
    

    이것은 클래스

    import scala.util.matching.Regex
    
    class TermNormalizer(normMap: Map[Any, String]) {
      val normalizationMap = normMap
    
      def normalizeTerms(terms: Seq[String]): Seq[String] = {
        var termsUpdated = terms
        for ((term, idx) <- termsUpdated.view.zipWithIndex) {
          for (normalizer <- normalizationMap.keys: Iterable[Any]) {
            normalizer match {
              case (regex: Regex) =>
                if (!regex.findFirstIn(term).isEmpty) termsUpdated = 
                  termsUpdated.updated(idx, normalizationMap(regex))
              case (set: Set[String]) =>
                if (set.contains(term)) termsUpdated = 
                  termsUpdated.updated(idx, normalizationMap(set))
            }
          }
        }
        termsUpdated
      }
    }
    

    나는이 같이 사용할 수 :

    val testMap: Map[Any, String] = Map("hadoop".r -> "elephant",
      "spark".r -> "sparky", "cool".r -> "neat", 
      Set("123", "456") -> "set1",
      Set("789", "10") -> "set2")
    
    val testTermNormalizer = new TermNormalizer(testMap)
    val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String]))
    
    val trainingTest = sqlContext.createDataFrame(Seq(
      (0L, "spark is cool 123", 1.0),
      (1L, "adsjkfadfk akjdsfhad 456", 0.0),
      (2L, "spark rocks my socks 789 10", 1.0),
      (3L, "hadoop is cool 10", 0.0)
    )).toDF("id", "text", "label")
    
    val testTokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    
    val tokenizedTrainingTest = testTokenizer.transform(trainingTest)
    println(tokenizedTrainingTest
      .select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false))
    

    지금은 좀 더 가까이 질문을 읽어, 당신이 LOL 이런 식으로 일을 방지하는 방법을 묻는 것 같은 소리가 난다. 어쨌든, 난 여전히 미래의 경우 누군가에 게재됩니다은 기능 같은 변압기 틱을 적용 할 수있는 쉬운 방법을 찾고있다

  3. ==============================

    3.당신은뿐만 아니라 변압기를 쓰기 가능하게하려면, 당신은 당신의 선택의 공개 패키지에 sharedParams 라이브러리에 HasInputCol로 특성을 다시 구현하고 변압기 또한 지속을 만들기 위해 DefaultParamsWritable 특성으로 사용할 수 있습니다.

    당신은뿐만 아니라 변압기를 쓰기 가능하게하려면, 당신은 당신의 선택의 공개 패키지에 sharedParams 라이브러리에 HasInputCol로 특성을 다시 구현하고 변압기 또한 지속을 만들기 위해 DefaultParamsWritable 특성으로 사용할 수 있습니다.

    당신은 또한 자신의 패키지에 PARAMS의 병렬 세트를 유지의 종류 장소 스파크 코어 ml의 패키지 내부 코드의 일부하지만 당신에게 것을 방지 할 수 있습니다이 방법. 이 밤은 정말 문제는 좀처럼 변경하지 주어진다.

    하지만 그 사람들이 직접 외부 클래스에서 사람들을 사용할 수 있도록 일반적인 sharedParams 중 일부는 ml의 공공 대신 개인을 만들 수 요청이 여기에 자신의 JIRA 보드의 버그를 추적 할.

  4. from https://stackoverflow.com/questions/35180527/how-to-create-a-custom-transformer-from-a-udf by cc-by-sa and MIT license