복붙노트

[HADOOP] 스파크 / 스칼라에서 RDD를 데이터 프레임으로 변환

HADOOP

스파크 / 스칼라에서 RDD를 데이터 프레임으로 변환

RDD는 Array [Array [String]] 형식으로 만들어지며 다음 값을가집니다.

val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))

스키마를 사용하여 dataFrame을 만들고 싶습니다.

val schemaString = "callId oCallId callTime duration calltype swId"

다음 단계:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

다음 오류를 제공합니다.

console:45: error: overloaded method value createDataFrame with alternatives:
     (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
    cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],   
    org.apache.spark.sql.types.StructType)
       val calDF = sqlContext.createDataFrame(rowRDD, schema)

해결법

  1. ==============================

    1.스파크 셸에 붙여 넣기 만하면됩니다.

    스파크 셸에 붙여 넣기 만하면됩니다.

    val a = 
      Array(
        Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
        Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))
    
    val rdd = sc.makeRDD(a)
    
    case class X(callId: String, oCallId: String, 
      callTime: String, duration: String, calltype: String, swId: String)
    

    그런 다음 RDD 위로 map ()하여 case 클래스의 인스턴스를 만든 다음 toDF ()를 사용하여 DataFrame을 만듭니다.

    scala> val df = rdd.map { 
      case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
    df: org.apache.spark.sql.DataFrame = 
      [callId: string, oCallId: string, callTime: string, 
        duration: string, calltype: string, swId: string]
    

    이것은 사례 클래스의 스키마를 유추합니다.

    그런 다음 진행할 수 있습니다.

    scala> df.printSchema()
    root
     |-- callId: string (nullable = true)
     |-- oCallId: string (nullable = true)
     |-- callTime: string (nullable = true)
     |-- duration: string (nullable = true)
     |-- calltype: string (nullable = true)
     |-- swId: string (nullable = true)
    
    scala> df.show()
    +----------+-------+-------------------+--------+--------+----+
    |    callId|oCallId|           callTime|duration|calltype|swId|
    +----------+-------+-------------------+--------+--------+----+
    |4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
    |4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
    +----------+-------+-------------------+--------+--------+----+
    

    스파크 셸이 아닌 일반 프로그램에서 toDF ()를 사용하려면 여기에서 인용하십시오.

  2. ==============================

    2.먼저 배열을 행으로 변환 한 다음 스키마를 정의해야합니다. 나는 너희들의 밭 대부분이 길다는 가정을했다.

    먼저 배열을 행으로 변환 한 다음 스키마를 정의해야합니다. 나는 너희들의 밭 대부분이 길다는 가정을했다.

        val rdd: RDD[Array[String]] = ???
        val rows: RDD[Row] = rdd map {
          case Array(callId, oCallId, callTime, duration, swId) =>
            Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
        }
    
        object schema {
          val callId = StructField("callId", LongType)
          val oCallId = StructField("oCallId", StringType)
          val callTime = StructField("callTime", StringType)
          val duration = StructField("duration", LongType)
          val swId = StructField("swId", LongType)
    
          val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
        }
    
        sqlContext.createDataFrame(rows, schema.struct)
    
  3. ==============================

    3.필자는 스파크 가이드처럼 스키마가 다음과 같이 있다고 가정합니다.

    필자는 스파크 가이드처럼 스키마가 다음과 같이 있다고 가정합니다.

    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    

    createDataFrame의 서명을 살펴보면 두 번째 인수로 StructType을 수락하는 구문 (Scala의 경우)이 있습니다.

    따라서 첫 번째 인수로 RDD [행]을 허용합니다. rowRDD에있는 것은 RDD [Array [String]]이므로 불일치가 있습니다.

    RDD [Array [String]]가 필요합니까?

    그렇지 않으면 다음을 사용하여 데이터 프레임을 만들 수 있습니다.

    val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))
    
  4. ==============================

    4.스파크 1.6.1 및 스칼라 2.10 사용

    스파크 1.6.1 및 스칼라 2.10 사용

    같은 오류 오류가 발생했습니다 : 오버로드 된 메서드 값 createDataFrame 대안 :

    나에게있어서, gotcha는 createDataFrame의 서명이었고, 나는 val rdd : List [Row]를 사용하려했지만 실패했다.  왜냐하면 java.util.List [org.apache.spark.sql.Row]와 scala.collection.immutable.List [org.apache.spark.sql.Row]는 동일하지 않기 때문입니다.

    내가 발견 한 실제 해결책은 목록 [Array [String]]을 통해 val rdd : Array [Array [String]]을 RDD [Row]로 변환하는 것입니다. 문서에서 무엇에 가장 가깝습니다.

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType,StructField,StringType};
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    val rdd_original : Array[Array[String]] = Array(
        Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
        Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))
    
    val rdd : List[Array[String]] = rdd_original.toList
    
    val schemaString = "callId oCallId callTime duration calltype swId"
    
    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    
    // Convert records of the RDD to Rows.
    val rowRDD = rdd.map(p => Row(p: _*)) // using splat is easier
    // val rowRDD = rdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))) // this also works
    
    val df = sqlContext.createDataFrame(sc.parallelize(rowRDD:List[Row]), schema)
    df.show
    
  5. from https://stackoverflow.com/questions/33127970/convert-rdd-to-dataframe-in-spark-scala by cc-by-sa and MIT license