[HADOOP] foreachPartition과 같은 RDD 메서드 / 클로저 내에서 SparkContext hadoop 구성 사용
HADOOPforeachPartition과 같은 RDD 메서드 / 클로저 내에서 SparkContext hadoop 구성 사용
Spark를 사용하여 많은 파일을 읽고 자세히 설명하고 모든 파일을 시퀀스 파일로 저장합니다. 내가 원하는 것은 파티션 당 하나의 시퀀스 파일을 갖는 것이 었습니다.
SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
throws Exception {
[°°°SOME STUFF°°°]
SequenceFile.Writer writer = SequenceFile.createWriter(
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"
아무도 RDD 클로저 내에서 Hadoop 구성 객체를 사용하는 방법을 알고 있습니까?
1.여기서 문제는 Hadoop 구성이 직렬화 가능으로 태그되지 않았기 때문에 Spark가이를 RDD로 가져 오지 않습니다. 그것들은 쓰기 가능으로 표시되어 있으므로 Hadoop의 직렬화 메커니즘은 마샬링 및 마샬링 해제 할 수 있지만 Spark는 직접 작동하지 않습니다.
여기서 문제는 Hadoop 구성이 직렬화 가능으로 태그되지 않았기 때문에 Spark가이를 RDD로 가져 오지 않습니다. 그것들은 쓰기 가능으로 표시되어 있으므로 Hadoop의 직렬화 메커니즘은 마샬링 및 마샬링 해제 할 수 있지만 Spark는 직접 작동하지 않습니다.
두 가지 장기 수정 옵션은
Hadoop conf를 직렬화 가능하게 만드는 데 대한 주요 반대 의견은 없습니다. 쓰기 가능한 IO 호출에 위임하고 모든 키 / 값 쌍을 반복하는 사용자 정의 ser / deser 메소드를 구현 한 경우. 하둡 커미터라고합니다.
업데이트 : Hadoop 구성의 내용을 마샬링하는 serlializable 클래스를 만드는 코드는 다음과 같습니다. val ser = new ConfSerDeser (hadoopConf); RDD에서 ser.get ()으로 참조하십시오.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.hadoop.conf.Configuration /** * Class to make Hadoop configurations serializable; uses the * `Writeable` operations to do this. * Note: this only serializes the explicitly set values, not any set * in site/default or other XML resources. * @param conf */ class ConfigSerDeser(var conf: Configuration) extends Serializable { def this() { this(new Configuration()) } def get(): Configuration = conf private def writeObject (out: java.io.ObjectOutputStream): Unit = { conf.write(out) } private def readObject (in: java.io.ObjectInputStream): Unit = { conf = new Configuration() conf.readFields(in) } private def readObjectNoData(): Unit = { conf = new Configuration() } }
누군가가 모든 Writeable 클래스에 대해 일반적인 것을 만드는 것은 비교적 간단합니다. 생성자에 클래스 이름을 제공하고 역 직렬화 중에 쓰기 가능 인스턴스를 작성하는 데 사용해야합니다.
2.@Steve의 답변에 따르면 이것은 Java 구현입니다.
@Steve의 답변에 따르면 이것은 Java 구현입니다.
import java.io.Serializable; import java.io.IOException; import org.apache.hadoop.conf.Configuration; public class SerializableHadoopConfiguration implements Serializable { Configuration conf; public SerializableHadoopConfiguration(Configuration hadoopConf) { this.conf = hadoopConf; if (this.conf == null) { this.conf = new Configuration(); } } public SerializableHadoopConfiguration() { this.conf = new Configuration(); } public Configuration get() { return this.conf; } private void writeObject(java.io.ObjectOutputStream out) throws IOException { this.conf.write(out); } private void readObject(java.io.ObjectInputStream in) throws IOException { this.conf = new Configuration(); this.conf.readFields(in); } }
3.그것은 할 수없는 것처럼 보입니다, 그래서 여기 제가 사용하는 코드가 있습니다 :
그것은 할 수없는 것처럼 보입니다, 그래서 여기 제가 사용하는 코드가 있습니다 :
final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080"; JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath); if(!imageByteRDD.isEmpty()) imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() { @Override public void call(Iterator<Tuple2<String, PortableDataStream>> arg0) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNodePath); //the string above should be passed as argument SequenceFile.Writer writer = SequenceFile.createWriter( conf, SequenceFile.Writer.file([***ETCETERA...
from https://stackoverflow.com/questions/38224132/use-sparkcontext-hadoop-configuration-within-rdd-methods-closures-like-foreachp by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] localhost : 오류 : 데이터 노드 프로세스의 우선 순위를 설정할 수 없습니다 32156 (0) | 2019.08.07 |
[HADOOP] 왜 YARN에서 일을하기 위해 32 개 이상의 코어를 요청할 수 있습니까? (0) | 2019.08.07 |
[HADOOP] 색조 파일 브라우저가 작동하지 않습니다. (0) | 2019.08.06 |
[HADOOP] Tomcat 서버를 종료 할 때 org.apache.hadoop.util.ShutdownHookManager를로드 할 수 없습니다. (0) | 2019.08.06 |
[HADOOP] 하둡 설치 : 네임 노드를 시작할 수 없습니다 (0) | 2019.08.06 |