[SCALA] 어떻게 UDF에서 사용자 정의 변압기를 만드는 방법?
SCALA어떻게 UDF에서 사용자 정의 변압기를 만드는 방법?
내가 만들고 사용자 지정 단계와 파이프 라인을 구하려고했다. 나는 UDF를 사용하여 내 DataFrame에 열을 추가해야합니다. UDF 또는 변압기에 유사한 조치를 변환 할 수 있다면 따라서 궁금 해서요?
내 사용자 정의 UDF의이 같은 외모와 나는 사용자 정의 변압기와 같은 UDF를 사용하여 작업을 수행하는 방법을 알고 싶습니다.
def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}
val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
해결법
-
==============================
1.그것은 완벽한 기능을 갖춘 솔루션이 아니라 당신은 다음과 같이 시작할 수 있습니다 :
그것은 완벽한 기능을 갖춘 솔루션이 아니라 당신은 다음과 같이 시작할 수 있습니다 :
import org.apache.spark.ml.{UnaryTransformer} import org.apache.spark.ml.util.Identifiable import org.apache.spark.sql.types.{ArrayType, DataType, StringType} class NGramTokenizer(override val uid: String) extends UnaryTransformer[String, Seq[String], NGramTokenizer] { def this() = this(Identifiable.randomUID("ngramtokenizer")) override protected def createTransformFunc: String => Seq[String] = { getFeatures _ } override protected def validateInputType(inputType: DataType): Unit = { require(inputType == StringType) } override protected def outputDataType: DataType = { new ArrayType(StringType, true) } }
빠른 확인:
val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v") val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs") transformer.transform(df).show // +---+------+------------------+ // | k| v| vs| // +---+------+------------------+ // | 1|abcdef|[f, ef, def, cdef]| // | 2|foobar|[r, ar, bar, obar]| // +---+------+------------------+
당신은 이런 일에 일반화를 시도 할 수 있습니다 :
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor import scala.reflect.runtime.universe._ class UnaryUDFTransformer[T : TypeTag, U : TypeTag]( override val uid: String, f: T => U ) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] { override protected def createTransformFunc: T => U = f override protected def validateInputType(inputType: DataType): Unit = require(inputType == schemaFor[T].dataType) override protected def outputDataType: DataType = schemaFor[U].dataType } val transformer = new UnaryUDFTransformer("featurize", getFeatures) .setInputCol("v") .setOutputCol("vs")
당신은 UDF되지 랩 기능을 사용하려면 직접 변압기를 확장해야하고 재정의 방법을 변환합니다. 오히려 까다로울 수 있도록 유용한 클래스의 불행하게도 대부분은 비공개입니다.
또는 당신은 UDF를 등록 할 수 있습니다 :
spark.udf.register("getFeatures", getFeatures _)
및 사용 SQLTransformer
import org.apache.spark.ml.feature.SQLTransformer val transformer = new SQLTransformer() .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__") transformer.transform(df).show // +---+------+------------------+ // | k| v| vs| // +---+------+------------------+ // | 1|abcdef|[f, ef, def, cdef]| // | 2|foobar|[r, ar, bar, obar]| // +---+------+------------------+
-
==============================
2.내가 처음 변압기 및 UnaryTransformer 초록을 확장하려하지만 내 응용 프로그램이 DefaultParamsWriteable.As 문제에 관련이있을 수 예를 도달 할 수없는 존재에 문제가 발생,이 예제를 따라 다음은 UDF와 같은 간단한 용어 정규화를 만들었습니다. 내 목표는 일반적인 용어로 대체하는 패턴과 세트에 대한 용어와 일치하는 것입니다. 예를 들면 :
내가 처음 변압기 및 UnaryTransformer 초록을 확장하려하지만 내 응용 프로그램이 DefaultParamsWriteable.As 문제에 관련이있을 수 예를 도달 할 수없는 존재에 문제가 발생,이 예제를 따라 다음은 UDF와 같은 간단한 용어 정규화를 만들었습니다. 내 목표는 일반적인 용어로 대체하는 패턴과 세트에 대한 용어와 일치하는 것입니다. 예를 들면 :
"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr"
이것은 클래스
import scala.util.matching.Regex class TermNormalizer(normMap: Map[Any, String]) { val normalizationMap = normMap def normalizeTerms(terms: Seq[String]): Seq[String] = { var termsUpdated = terms for ((term, idx) <- termsUpdated.view.zipWithIndex) { for (normalizer <- normalizationMap.keys: Iterable[Any]) { normalizer match { case (regex: Regex) => if (!regex.findFirstIn(term).isEmpty) termsUpdated = termsUpdated.updated(idx, normalizationMap(regex)) case (set: Set[String]) => if (set.contains(term)) termsUpdated = termsUpdated.updated(idx, normalizationMap(set)) } } } termsUpdated } }
나는이 같이 사용할 수 :
val testMap: Map[Any, String] = Map("hadoop".r -> "elephant", "spark".r -> "sparky", "cool".r -> "neat", Set("123", "456") -> "set1", Set("789", "10") -> "set2") val testTermNormalizer = new TermNormalizer(testMap) val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String])) val trainingTest = sqlContext.createDataFrame(Seq( (0L, "spark is cool 123", 1.0), (1L, "adsjkfadfk akjdsfhad 456", 0.0), (2L, "spark rocks my socks 789 10", 1.0), (3L, "hadoop is cool 10", 0.0) )).toDF("id", "text", "label") val testTokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val tokenizedTrainingTest = testTokenizer.transform(trainingTest) println(tokenizedTrainingTest .select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false))
지금은 좀 더 가까이 질문을 읽어, 당신이 LOL 이런 식으로 일을 방지하는 방법을 묻는 것 같은 소리가 난다. 어쨌든, 난 여전히 미래의 경우 누군가에 게재됩니다은 기능 같은 변압기 틱을 적용 할 수있는 쉬운 방법을 찾고있다
-
==============================
3.당신은뿐만 아니라 변압기를 쓰기 가능하게하려면, 당신은 당신의 선택의 공개 패키지에 sharedParams 라이브러리에 HasInputCol로 특성을 다시 구현하고 변압기 또한 지속을 만들기 위해 DefaultParamsWritable 특성으로 사용할 수 있습니다.
당신은뿐만 아니라 변압기를 쓰기 가능하게하려면, 당신은 당신의 선택의 공개 패키지에 sharedParams 라이브러리에 HasInputCol로 특성을 다시 구현하고 변압기 또한 지속을 만들기 위해 DefaultParamsWritable 특성으로 사용할 수 있습니다.
당신은 또한 자신의 패키지에 PARAMS의 병렬 세트를 유지의 종류 장소 스파크 코어 ml의 패키지 내부 코드의 일부하지만 당신에게 것을 방지 할 수 있습니다이 방법. 이 밤은 정말 문제는 좀처럼 변경하지 주어진다.
하지만 그 사람들이 직접 외부 클래스에서 사람들을 사용할 수 있도록 일반적인 sharedParams 중 일부는 ml의 공공 대신 개인을 만들 수 요청이 여기에 자신의 JIRA 보드의 버그를 추적 할.
from https://stackoverflow.com/questions/35180527/how-to-create-a-custom-transformer-from-a-udf by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 왜 scaladoc 방법 서명이 잘못입니까? (0) | 2019.11.23 |
---|---|
[SCALA] 스칼라 스크립트와 응용 프로그램 사이의 차이 (0) | 2019.11.23 |
[SCALA] 스칼라 방법은 다른의 각 요소와 함께 반복자의 각 요소를 결합? (0) | 2019.11.23 |
[SCALA] log4j에에서 스파크 로그에서 Logback에서 응용 프로그램 로그를 분리 (0) | 2019.11.23 |
[SCALA] 서열로서 문자열 [숯] 패턴 매칭 (0) | 2019.11.23 |