[HADOOP] 스파크 / 스칼라에서 RDD를 데이터 프레임으로 변환
HADOOP스파크 / 스칼라에서 RDD를 데이터 프레임으로 변환
RDD는 Array [Array [String]] 형식으로 만들어지며 다음 값을가집니다.
val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"),
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))
스키마를 사용하여 dataFrame을 만들고 싶습니다.
val schemaString = "callId oCallId callTime duration calltype swId"
다음 단계:
scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)
다음 오류를 제공합니다.
console:45: error: overloaded method value createDataFrame with alternatives:
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],
org.apache.spark.sql.types.StructType)
val calDF = sqlContext.createDataFrame(rowRDD, schema)
해결법
-
==============================
1.스파크 셸에 붙여 넣기 만하면됩니다.
스파크 셸에 붙여 넣기 만하면됩니다.
val a = Array( Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1")) val rdd = sc.makeRDD(a) case class X(callId: String, oCallId: String, callTime: String, duration: String, calltype: String, swId: String)
그런 다음 RDD 위로 map ()하여 case 클래스의 인스턴스를 만든 다음 toDF ()를 사용하여 DataFrame을 만듭니다.
scala> val df = rdd.map { case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF() df: org.apache.spark.sql.DataFrame = [callId: string, oCallId: string, callTime: string, duration: string, calltype: string, swId: string]
이것은 사례 클래스의 스키마를 유추합니다.
그런 다음 진행할 수 있습니다.
scala> df.printSchema() root |-- callId: string (nullable = true) |-- oCallId: string (nullable = true) |-- callTime: string (nullable = true) |-- duration: string (nullable = true) |-- calltype: string (nullable = true) |-- swId: string (nullable = true) scala> df.show() +----------+-------+-------------------+--------+--------+----+ | callId|oCallId| callTime|duration|calltype|swId| +----------+-------+-------------------+--------+--------+----+ |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| +----------+-------+-------------------+--------+--------+----+
스파크 셸이 아닌 일반 프로그램에서 toDF ()를 사용하려면 여기에서 인용하십시오.
-
==============================
2.먼저 배열을 행으로 변환 한 다음 스키마를 정의해야합니다. 나는 너희들의 밭 대부분이 길다는 가정을했다.
먼저 배열을 행으로 변환 한 다음 스키마를 정의해야합니다. 나는 너희들의 밭 대부분이 길다는 가정을했다.
val rdd: RDD[Array[String]] = ??? val rows: RDD[Row] = rdd map { case Array(callId, oCallId, callTime, duration, swId) => Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong) } object schema { val callId = StructField("callId", LongType) val oCallId = StructField("oCallId", StringType) val callTime = StructField("callTime", StringType) val duration = StructField("duration", LongType) val swId = StructField("swId", LongType) val struct = StructType(Array(callId, oCallId, callTime, duration, swId)) } sqlContext.createDataFrame(rows, schema.struct)
-
==============================
3.필자는 스파크 가이드처럼 스키마가 다음과 같이 있다고 가정합니다.
필자는 스파크 가이드처럼 스키마가 다음과 같이 있다고 가정합니다.
val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
createDataFrame의 서명을 살펴보면 두 번째 인수로 StructType을 수락하는 구문 (Scala의 경우)이 있습니다.
따라서 첫 번째 인수로 RDD [행]을 허용합니다. rowRDD에있는 것은 RDD [Array [String]]이므로 불일치가 있습니다.
RDD [Array [String]]가 필요합니까?
그렇지 않으면 다음을 사용하여 데이터 프레임을 만들 수 있습니다.
val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))
-
==============================
4.스파크 1.6.1 및 스칼라 2.10 사용
스파크 1.6.1 및 스칼라 2.10 사용
같은 오류 오류가 발생했습니다 : 오버로드 된 메서드 값 createDataFrame 대안 :
나에게있어서, gotcha는 createDataFrame의 서명이었고, 나는 val rdd : List [Row]를 사용하려했지만 실패했다. 왜냐하면 java.util.List [org.apache.spark.sql.Row]와 scala.collection.immutable.List [org.apache.spark.sql.Row]는 동일하지 않기 때문입니다.
내가 발견 한 실제 해결책은 목록 [Array [String]]을 통해 val rdd : Array [Array [String]]을 RDD [Row]로 변환하는 것입니다. 문서에서 무엇에 가장 가깝습니다.
import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType}; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rdd_original : Array[Array[String]] = Array( Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1")) val rdd : List[Array[String]] = rdd_original.toList val schemaString = "callId oCallId callTime duration calltype swId" // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD to Rows. val rowRDD = rdd.map(p => Row(p: _*)) // using splat is easier // val rowRDD = rdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))) // this also works val df = sqlContext.createDataFrame(sc.parallelize(rowRDD:List[Row]), schema) df.show
from https://stackoverflow.com/questions/33127970/convert-rdd-to-dataframe-in-spark-scala by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] YARN 클라이언트 모드에서 spark-shell으로 ClosedChannelException이 발생하는 이유는 무엇입니까? (0) | 2019.07.06 |
---|---|
[HADOOP] Hadoop 및 Amazon Web Services [닫힘] (0) | 2019.07.06 |
[HADOOP] OS / X에서 Hadoop 기본 라이브러리를 찾을 수 없음 (0) | 2019.07.06 |
[HADOOP] Hadoop 기계 학습 / 데이터 마이닝 프로젝트 아이디어? [닫은] (0) | 2019.07.06 |
[HADOOP] mapper (Hadoop)에서 MATLAB 코드를 사용하는 방법은 무엇입니까? (0) | 2019.07.06 |