복붙노트

[SCALA] 어떻게 스파크의 분류에 대한 정확한 데이터 프레임을 만들 수 있습니다 ML

SCALA

어떻게 스파크의 분류에 대한 정확한 데이터 프레임을 만들 수 있습니다 ML

나는 스파크 ML API를 사용하여 임의 숲 분류를 실행하려고하지만 파이프 라인에 올바른 데이터 프레임 입력을 만드는 문제가 있어요.

여기에 샘플 데이터는 다음과 같습니다

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"

라벨 salaryRange을 포함한 다른 기능 범주 동안 나이와 hours_per_week는 정수 (문자열)

이 같은 스파크 CSV 라이브러리에 의해 수행 할 수 있습니다 (이 sample.csv를 호출 할 수 있습니다)이 CSV 파일을로드 :

val data = sqlContext.csvFile("/home/dusan/sample.csv")

우리가 변화 int로 "세"와 "hours_per_week"필요하므로 기본적으로 모든 열은 문자열로 가져옵니다 :

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))

그냥 스키마 지금의 모양을 확인합니다 :

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)

그럼 교차 검증 및 파이프 라인을 설정할 수 있습니다 :

val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf)) 
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

이 줄을 실행할 때까지 오류 쇼 :

val cmModel = cv.fit(dataFixed)

예외 : IllegalArgumentException 필드는 "기능"이 존재하지 않습니다.

그러나 그것은 내가 예측 (기능) 등 4 열뿐만 아니라 하나를 가지고, RandomForestClassifier에서 설정 라벨 열 및 기능을 열 수 있습니다.

이 레이블이과 열이 제대로 구성되어 있습니다, 그래서 어떻게 데이터 프레임을 구성해야합니까?

여기에 귀하의 편의를 위해 전체 코드입니다 :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}


object SampleClassification {

  def main(args: Array[String]): Unit = {

    //set spark context
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import com.databricks.spark.csv._

    //load data by using databricks "Spark CSV Library" 
    val data = sqlContext.csvFile("/home/dusan/sample.csv")

    //by default all columns are imported as string so we need to change "age" and  "hours_per_week" to Int
    val toInt    = udf[Int, String]( _.toInt)
    val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))


    val rf = new RandomForestClassifier()

    val pipeline = new Pipeline().setStages(Array(rf))

    val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

    // this fails with error
    //java.lang.IllegalArgumentException: Field "features" does not exist.
    val cmModel = cv.fit(dataFixed) 
  }

}

도와 주셔서 감사합니다!

해결법

  1. ==============================

    1.당신은 단순히 당신이 아래 표와 같이 유형 VectorUDF의 당신의 dataframe에서 "기능"열이 있는지 확인해야합니다

    당신은 단순히 당신이 아래 표와 같이 유형 VectorUDF의 당신의 dataframe에서 "기능"열이 있는지 확인해야합니다

    scala> val df2 = dataFixed.withColumnRenamed("age", "features")
    df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]
    
    scala> val cmModel = cv.fit(df2) 
    java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
        at scala.Predef$.require(Predef.scala:233)
        at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
        at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
        at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
        at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
        at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
        at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
        at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
        at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
        at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
        at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
        at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
        at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
    

    EDIT1

    기본적으로 인스턴스 레이블에 대한 특징 벡터와 "라벨"에 대한 데이터 프레임 "기능"에서 두 개의 필드가있을 필요가있다. 인스턴스 유형을 두 번이어야합니다.

    벡터 형식의 "기능"필드를 만들려면 먼저 아래 표와 같은 UDF를 만들 :

    val toVec4    = udf[Vector, Int, Int, String, String] { (a,b,c,d) => 
      val e3 = c match {
        case "hs-grad" => 0
        case "bachelors" => 1
        case "masters" => 2
      }
      val e4 = d match {case "male" => 0 case "female" => 1}
      Vectors.dense(a, b, e3, e4) 
    }
    

    아래 그림과 같이 이제 또 다른 UDF를 만듭니다 "라벨"필드를 인코딩합니다 :

    val encodeLabel    = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )
    

    이제 우리는이 두 UDF를 사용하여 원래의 dataframe 변환 :

    val df = dataFixed.withColumn(
      "features",
      toVec4(
        dataFixed("age"),
        dataFixed("hours_per_week"),
        dataFixed("education"),
        dataFixed("sex")
      )
    ).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")
    

    단지 특징 및 레이블 추가 열 /가있을 수 있음을 참고는 dataframe에 존재하는 필드하지만,이 경우 내가 선택한 :

    scala> df.show()
    +-------------------+-----+
    |           features|label|
    +-------------------+-----+
    |[38.0,40.0,0.0,0.0]|  0.0|
    |[28.0,40.0,1.0,1.0]|  0.0|
    |[52.0,45.0,0.0,0.0]|  1.0|
    |[31.0,50.0,2.0,1.0]|  1.0|
    |[42.0,40.0,1.0,0.0]|  1.0|
    +-------------------+-----+
    

    지금은 최대 개까지 당신은 그것이 작동되도록하는 학습 알고리즘에 대한 올바른 매개 변수를 설정합니다.

  2. ==============================

    2.스파크 1.4, 당신은 변압기 org.apache.spark.ml.feature.VectorAssembler를 사용할 수 있습니다. 그냥 당신이 기능되고 싶어 열 이름을 제공합니다.

    스파크 1.4, 당신은 변압기 org.apache.spark.ml.feature.VectorAssembler를 사용할 수 있습니다. 그냥 당신이 기능되고 싶어 열 이름을 제공합니다.

    val assembler = new VectorAssembler()
      .setInputCols(Array("col1", "col2", "col3"))
      .setOutputCol("features")
    

    당신의 파이프 라인에 추가합니다.

  3. ==============================

    3.mllib에 스파크 문서에 따르면 - 임의 나무, 당신이 기능을 사용하고 포인트가 labeledpoint해야한다는 매핑 정의해야 나에게 보인다.

    mllib에 스파크 문서에 따르면 - 임의 나무, 당신이 기능을 사용하고 포인트가 labeledpoint해야한다는 매핑 정의해야 나에게 보인다.

    이 예측하고있는 것이 특징이다 표기 열 알고리즘을 말할 것이다.

    https://spark.apache.org/docs/latest/mllib-decision-tree.html

  4. from https://stackoverflow.com/questions/31028806/how-to-create-correct-data-frame-for-classification-in-spark-ml by cc-by-sa and MIT license