[HADOOP] saveAsNewAPIHadoopFile 메소드에서 작동하지 않는 hdfs에 대한 스파크 쓰기
HADOOPsaveAsNewAPIHadoopFile 메소드에서 작동하지 않는 hdfs에 대한 스파크 쓰기
CDH 5.2.0에서 Spark 1.1.0을 사용하고 있으며 hdfs에서 읽고 쓸 수 있도록 노력하고 있습니다.
.text File 및 .saveAsTextFile이 이전 API를 호출하고 hdfs 버전과 호환되지 않는 것 같습니다.
def testHDFSReadOld(sc: SparkContext, readFile: String){
//THIS WILL FAIL WITH
//(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
//java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)
sc.textFile(readFile).take(2).foreach(println)
}
def testHDFSWriteOld(sc: SparkContext, writeFile: String){
//THIS WILL FAIL WITH
//(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
//java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)
sc.parallelize(List("THIS","ISCOOL")).saveAsTextFile(writeFile)
}
새로운 API 메소드로 이동하여 hdfs에서 읽기를 수정했습니다!
def testHDFSReadNew(sc: SparkContext, readFile: String){
//THIS WORKS
sc.newAPIHadoopFile(readFile, classOf[TextInputFormat], classOf[LongWritable],
classOf[Text],sc.hadoopConfiguration).map{
case (x:LongWritable, y: Text) => y.toString
}.take(2).foreach(println)
}
그래서 내가 진보하고있는 것처럼 보였습니다. 위와 같이 더 이상 쓰기 오류가 발생하지 않고 작동하는 것처럼 보입니다. 유일한 문제는 디렉토리의 외로운 SUCCESS 플래그 파일 외에는 아무것도 없다는 것입니다. 더 당황스럽게도, 로그는 데이터가 _temporary 디렉토리에 기록되고 있음을 보여주었습니다. 파일 커미터가 파일을 _temporary 디렉토리에서 출력 디렉토리로 이동해야한다는 것을 전혀 깨닫지 못한 것 같습니다.
def testHDFSWriteNew(sc: SparkContext, writeFile: String){
/*This will have an error message of:
INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(dl1rhd400.internal.edmunds.com,35927)
14/11/21 02:02:27 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@2281f1b2
14/11/21 02:02:27 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2281f1b2
java.nio.channels.CancelledKeyException
at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
However lately it hasn't even had errors, symptoms are no part files in the directory but a success flag is there
*/
val conf = sc.hadoopConfiguration
conf.set("mapreduce.task.files.preserve.failedtasks", "true")
conf.set("mapred.output.dir", writeFile)
sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
.saveAsNewAPIHadoopFile(writeFile, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf)
}
로컬로 실행하고 hdfs 경로를 지정하면 파일이 hdfs에서 잘 나타납니다. 이것은 Spark 독립형 클러스터에서 실행할 때만 발생합니다.
다음과 같이 직업을 제출합니다. spark-submit-배포 모드 클라이언트 --master spark : // sparkmaster --class driverclass driverjar
해결법
-
==============================
1.다음 코드로 시도해 볼 수 있습니까?
다음 코드로 시도해 볼 수 있습니까?
import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) nums.saveAsNewAPIHadoopFile[TextOutputFormat[IntWritable, Text]]("/data/newAPIHadoopFile")
다음 코드도 저에게 효과적이었습니다.
val x = sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x))) x.saveAsNewAPIHadoopFile("/data/nullwritable", classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], sc.hadoopConfiguration)
[root @ sparkmaster ~] # hadoop fs -cat / data / nullwritable / *
15/08/20 02:09:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
from https://stackoverflow.com/questions/27072911/spark-writing-to-hdfs-not-working-with-the-saveasnewapihadoopfile-method by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하이브 gzip 파일 압축 해제 (0) | 2019.08.08 |
---|---|
[HADOOP] 마루 파일을 복사하여 CSV로 변환하는 방법 (0) | 2019.08.08 |
[HADOOP] Cloudera 빠른 시작 도커에서 볼륨으로서의 HDFS (0) | 2019.08.08 |
[HADOOP] 레코드 리더 및 레코드 경계 (0) | 2019.08.07 |
[HADOOP] Spark에서 DynamoDB에 간단한 RDD 쓰기 (0) | 2019.08.07 |