[SCALA] org.apache.spark.sql.Dataset에서 java.lang.NullPointerException이 :에 의해 발생
SCALAorg.apache.spark.sql.Dataset에서 java.lang.NullPointerException이 :에 의해 발생
아래 난 내 코드를 제공합니다. 나는 DataFrame prodRows 반복하고 각 product_PK 위해 나는 prodRows에서 product_PKs의 일부 일치하는 하위 목록을 찾을 수 있습니다.
numRecProducts = 10
var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
prodRows.foreach{ row : Row =>
val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
val gender = row.get(row.fieldIndex("gender_PK")).toString
val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
var productList: Array[(Long, Int)] = Array()
if (!selection.rdd.isEmpty()) {
productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
}
listOfProducts = listOfProducts + (product_PK -> productList)
}
내가 그것을 실행하면, 그것은 나에게 다음과 같은 오류를 제공합니다. 선택은 몇 가지 반복 빈 것 같습니다. 그러나, 나는이 오류를 처리 할 수있는 방법을 이해하지 않습니다
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
at org.test.ComputeNumSim.run(ComputeNumSim.scala:69)
at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19)
at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304)
at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:74)
at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
그것은 무엇을 의미합니까이며 어떻게 처리 할 수 있습니까?
해결법
-
==============================
1.당신은 스파크의 DataFrame / RDD 변환 중 하나에 전달하는 기능 내에서 스파크의 "드라이버 측"추상화 (RDDs, DataFrames, 데이터 집합, SparkSession ...)에 액세스 할 수 없습니다. 이러한 기능 내에서 드라이버 측 가변 객체를 업데이트 할 수 없습니다.
당신은 스파크의 DataFrame / RDD 변환 중 하나에 전달하는 기능 내에서 스파크의 "드라이버 측"추상화 (RDDs, DataFrames, 데이터 집합, SparkSession ...)에 액세스 할 수 없습니다. 이러한 기능 내에서 드라이버 측 가변 객체를 업데이트 할 수 없습니다.
귀하의 경우에는 - 당신은 DataFrame.foreach에 전달되는 함수 내에서 prodRows 및 선택 (모두 DataFrames 있습니다) 사용하려는. 당신은 또한 같은 함수 내에서 listOfProducts (로컬 드라이버 측 변수)를 업데이트하려고 노력하고 있습니다.
왜?
당신은이 문제를 어떻게 해결할 수 있습니까? 특히 DataFrames와, 불꽃과 함께 작업 할 때, 당신은 데이터에 "반복"을 피하려고, 대신 DataFrame의 선언적 작업을 사용해야합니다. 당신이 당신의 DataFrame의 각 레코드에 대한 또 다른 DataFrame의 데이터를 참조 할 때 대부분의 경우, 당신은 레코드가 두 DataFrames에서 데이터를 결합으로 새로운 DataFrame를 만들 조인을 사용하고자하는 것입니다.
이 특정한 경우에, 여기에 내가 제대로 결론을 관리하는 경우에 당신이 뭘 하려는지 않는다는 거의 비슷 솔루션입니다. 자세한 내용을 알아 내기 위해이를 사용하고 DataFrame 문서를 읽어보십시오 :
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import spark.implicits._ val numRecProducts = 10 val result = prodRows.as("left") // self-join by gender: .join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX") // limit to 10 results per record: .withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK"))) .filter($"rn" <= numRecProducts).drop($"rn") // group and collect_list to create products column: .groupBy($"left.product_PK" as "product_PK") .agg(collect_list(struct($"right.product_PK", lit(1))) as "products")
-
==============================
2.문제는 당신이 prodRows.foreach 내에서 prodRows에 액세스하려고 할 것입니다. 당신은 dataframes 만 드라이버에 존재, 변환 내 dataframe을 사용할 수 없습니다.
문제는 당신이 prodRows.foreach 내에서 prodRows에 액세스하려고 할 것입니다. 당신은 dataframes 만 드라이버에 존재, 변환 내 dataframe을 사용할 수 없습니다.
from https://stackoverflow.com/questions/47358177/caused-by-java-lang-nullpointerexception-at-org-apache-spark-sql-dataset by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라 스파크, listbuffer가 비어 있습니다 (0) | 2019.11.01 |
---|---|
[SCALA] 왜 스파크 코드는 NullPointerException이 만드는가? (0) | 2019.10.31 |
[SCALA] 왜 사용자 정의 케이스 클래스의 데이터 집합을 만들 때 "데이터 집합에 저장 유형에 대한 인코더를 찾을 수 없습니다"인가? (0) | 2019.10.31 |
[SCALA] 스칼라는 이상한 행동을 foreach는 (0) | 2019.10.31 |
[SCALA] 스칼라 형 ascriptions의 목적은 무엇인가? (0) | 2019.10.31 |