[HADOOP] foreachPartition과 같은 RDD 메서드 / 클로저 내에서 SparkContext hadoop 구성 사용
HADOOPforeachPartition과 같은 RDD 메서드 / 클로저 내에서 SparkContext hadoop 구성 사용
Spark를 사용하여 많은 파일을 읽고 자세히 설명하고 모든 파일을 시퀀스 파일로 저장합니다. 내가 원하는 것은 파티션 당 하나의 시퀀스 파일을 갖는 것이 었습니다.
SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
.setMaster("local[2]")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
if(!imageByteRDD.isEmpty())
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
@Override
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
throws Exception {
[°°°SOME STUFF°°°]
SequenceFile.Writer writer = SequenceFile.createWriter(
jsc.hadoopConfiguration(),
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"
아무도 RDD 클로저 내에서 Hadoop 구성 객체를 사용하는 방법을 알고 있습니까?
해결법
-
==============================
1.여기서 문제는 Hadoop 구성이 직렬화 가능으로 태그되지 않았기 때문에 Spark가이를 RDD로 가져 오지 않습니다. 그것들은 쓰기 가능으로 표시되어 있으므로 Hadoop의 직렬화 메커니즘은 마샬링 및 마샬링 해제 할 수 있지만 Spark는 직접 작동하지 않습니다.
여기서 문제는 Hadoop 구성이 직렬화 가능으로 태그되지 않았기 때문에 Spark가이를 RDD로 가져 오지 않습니다. 그것들은 쓰기 가능으로 표시되어 있으므로 Hadoop의 직렬화 메커니즘은 마샬링 및 마샬링 해제 할 수 있지만 Spark는 직접 작동하지 않습니다.
두 가지 장기 수정 옵션은
Hadoop conf를 직렬화 가능하게 만드는 데 대한 주요 반대 의견은 없습니다. 쓰기 가능한 IO 호출에 위임하고 모든 키 / 값 쌍을 반복하는 사용자 정의 ser / deser 메소드를 구현 한 경우. 하둡 커미터라고합니다.
업데이트 : Hadoop 구성의 내용을 마샬링하는 serlializable 클래스를 만드는 코드는 다음과 같습니다. val ser = new ConfSerDeser (hadoopConf); RDD에서 ser.get ()으로 참조하십시오.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.hadoop.conf.Configuration /** * Class to make Hadoop configurations serializable; uses the * `Writeable` operations to do this. * Note: this only serializes the explicitly set values, not any set * in site/default or other XML resources. * @param conf */ class ConfigSerDeser(var conf: Configuration) extends Serializable { def this() { this(new Configuration()) } def get(): Configuration = conf private def writeObject (out: java.io.ObjectOutputStream): Unit = { conf.write(out) } private def readObject (in: java.io.ObjectInputStream): Unit = { conf = new Configuration() conf.readFields(in) } private def readObjectNoData(): Unit = { conf = new Configuration() } }
누군가가 모든 Writeable 클래스에 대해 일반적인 것을 만드는 것은 비교적 간단합니다. 생성자에 클래스 이름을 제공하고 역 직렬화 중에 쓰기 가능 인스턴스를 작성하는 데 사용해야합니다.
-
==============================
2.@Steve의 답변에 따르면 이것은 Java 구현입니다.
@Steve의 답변에 따르면 이것은 Java 구현입니다.
import java.io.Serializable; import java.io.IOException; import org.apache.hadoop.conf.Configuration; public class SerializableHadoopConfiguration implements Serializable { Configuration conf; public SerializableHadoopConfiguration(Configuration hadoopConf) { this.conf = hadoopConf; if (this.conf == null) { this.conf = new Configuration(); } } public SerializableHadoopConfiguration() { this.conf = new Configuration(); } public Configuration get() { return this.conf; } private void writeObject(java.io.ObjectOutputStream out) throws IOException { this.conf.write(out); } private void readObject(java.io.ObjectInputStream in) throws IOException { this.conf = new Configuration(); this.conf.readFields(in); } }
-
==============================
3.그것은 할 수없는 것처럼 보입니다, 그래서 여기 제가 사용하는 코드가 있습니다 :
그것은 할 수없는 것처럼 보입니다, 그래서 여기 제가 사용하는 코드가 있습니다 :
final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080"; JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath); if(!imageByteRDD.isEmpty()) imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() { @Override public void call(Iterator<Tuple2<String, PortableDataStream>> arg0) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNodePath); //the string above should be passed as argument SequenceFile.Writer writer = SequenceFile.createWriter( conf, SequenceFile.Writer.file([***ETCETERA...
from https://stackoverflow.com/questions/38224132/use-sparkcontext-hadoop-configuration-within-rdd-methods-closures-like-foreachp by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] localhost : 오류 : 데이터 노드 프로세스의 우선 순위를 설정할 수 없습니다 32156 (0) | 2019.08.07 |
---|---|
[HADOOP] 왜 YARN에서 일을하기 위해 32 개 이상의 코어를 요청할 수 있습니까? (0) | 2019.08.07 |
[HADOOP] 색조 파일 브라우저가 작동하지 않습니다. (0) | 2019.08.06 |
[HADOOP] Tomcat 서버를 종료 할 때 org.apache.hadoop.util.ShutdownHookManager를로드 할 수 없습니다. (0) | 2019.08.06 |
[HADOOP] 하둡 설치 : 네임 노드를 시작할 수 없습니다 (0) | 2019.08.06 |