[SCALA] MatchError 스파크 2.0 벡터 열에 액세스 할 때
SCALAMatchError 스파크 2.0 벡터 열에 액세스 할 때
내가 JSON 파일에 LDA 모델을 만들려고하고 있습니다.
JSON 파일과 스파크 컨텍스트를 만들기 :
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.getOrCreate()
val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")
df라는 표시하기 DataFrame를 표시해야합니다
display(df)
텍스트를 토큰 화
import org.apache.spark.ml.feature.RegexTokenizer
// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setPattern("[\\W_]+")
.setMinTokenLength(4) // Filter away tokens with length < 4
.setInputCol("text")
.setOutputCol("tokens")
// Tokenize document
val tokenized_df = tokenizer.transform(df)
이것은 tokenized_df를 표시한다
display(tokenized_df)
중지 단어를 가져옵니다
%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords
선택 사항 : TMP 폴더에 중지 단어를 복사
%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords
모든 중지 단어를 수집
val stopwords = sc.textFile("/tmp/stopwords").collect()
중지 단어를 필터링
import org.apache.spark.ml.feature.StopWordsRemover
// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional
.setInputCol("tokens")
.setOutputCol("filtered")
// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)
중지 단어를 확인해야합니다 필터링 DF 표시가 제거되었다
display(filtered_df)
단어의 발생 빈도를 벡터화
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.CountVectorizer
// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.fit(filtered_df)
벡터 기 확인
vectorizer.transform(filtered_df)
.select("id", "text","features","filtered").show()
이후 나는 LDA이 벡터화 피팅에 문제를보고하고있다. 내가 믿는 문제는 CountVectorizer 스파 스 벡터를주고 있지만, LDA는 밀도 벡터가 필요하다. 아직도 문제를 파악하려고합니다.
여기에지도를 변환 할 수없는 예외입니다.
import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
display(ldaDF)
예외 :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
어떤 문제를 던지고되지 LDA에 대한 작업 샘플이 있습니다
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF
val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) }
val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)
유일한 차이점은 우리가 조밀 한 매트릭스가있는 두 번째 조각이다.
해결법
-
==============================
1.이 희소성과는 아무 상관이 없습니다. 스파크 2.0.0 이후 ML 변압기는 더 이상 o.a.s.mllib.linalg.VectorUDT하지만 o.a.s.ml.linalg.VectorUDT을 생성하지 않으며 o.a.s.ml.linalg.Vector의 서브 클래스에 로컬로 매핑됩니다. 다음은 스파크 2.0.0에 중단으로 이동 된 MLLib의 API와 호환되지 않습니다.
이 희소성과는 아무 상관이 없습니다. 스파크 2.0.0 이후 ML 변압기는 더 이상 o.a.s.mllib.linalg.VectorUDT하지만 o.a.s.ml.linalg.VectorUDT을 생성하지 않으며 o.a.s.ml.linalg.Vector의 서브 클래스에 로컬로 매핑됩니다. 다음은 스파크 2.0.0에 중단으로 이동 된 MLLib의 API와 호환되지 않습니다.
당신은 Vectors.fromML을 사용하여 "이전"에 사이에 변환 할 수 있습니다 :
import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.ml.linalg.{Vectors => NewVectors} OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0)) OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))
그러나 당신이 이미 ML 변압기를 사용하는 경우 LDA의 ML 구현을 사용하는 것이 더 의미가.
편의를 위해 당신은 암시 적 변환을 사용할 수 있습니다 :
import scala.languageFeature.implicitConversions object VectorConversions { import org.apache.spark.mllib.{linalg => mllib} import org.apache.spark.ml.{linalg => ml} implicit def toNewVector(v: mllib.Vector) = v.asML implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v) }
-
==============================
2.나는 변경 :
나는 변경 :
val ldaDF = countVectors.map { case Row(id: String, countVector: Vector) => (id, countVector) }
에:
val ldaDF = countVectors.map { case Row(docId: String, features: MLVector) => (docId.toLong, Vectors.fromML(features)) }
그리고 그것은 마법처럼 일했다! 그것은 zero323 @ 쓴 것과 정렬됩니다.
수입 목록 :
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover} import org.apache.spark.ml.linalg.{Vector => MLVector} import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.{Row, SparkSession}
-
==============================
3.해결 방법은 매우 간단합니다 사람이 .. 아래 찾을 수있다
해결 방법은 매우 간단합니다 사람이 .. 아래 찾을 수있다
//import org.apache.spark.mllib.linalg.Vector import org.apache.spark.ml.linalg.Vector
from https://stackoverflow.com/questions/38818879/matcherror-while-accessing-vector-column-in-spark-2-0 by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 동적 작업을 입력 기능과 사용 방법에? (0) | 2019.11.04 |
---|---|
[SCALA] (왼쪽 / 오른쪽) 배, 줄이거 나 스캔? (0) | 2019.11.04 |
[SCALA] 한 인수 함수를 호출 할 때 왜 그리고 어떻게 스칼라 특별히 튜플을 치료한다? (0) | 2019.11.04 |
[SCALA] 스칼라는 최소한의 기능을 강조 (0) | 2019.11.04 |
[SCALA] 스파크 쉼표 목록, 어떻게 항아리의 디렉토리를 선언하는 원한다 --jars 인수를 스파크 제출? (0) | 2019.11.04 |