[HADOOP] RDD가 아닌 Spark 프로그램에서 일반 텍스트 HDFS (또는 로컬) 파일을 쓸 수 있습니까?
HADOOPRDD가 아닌 Spark 프로그램에서 일반 텍스트 HDFS (또는 로컬) 파일을 쓸 수 있습니까?
Spark 프로그램 (스칼라에서)과 SparkContext가 있습니다. 나는 RDD의 saveAsTextFile 파일을 쓰고있다. 내 로컬 컴퓨터에서 로컬 파일 경로를 사용할 수 있으며 로컬 파일 시스템에서 작동합니다. 내 클러스터에서는 HDFS와 함께 작동합니다.
또한 다른 임의의 파일을 처리 결과로 쓰고 싶습니다. 로컬 컴퓨터에서 파일을 일반 파일로 쓰고 있지만 클러스터에서 HDFS로 이동하려고합니다.
SparkContext는 몇 가지 파일 관련 메서드가있는 것 같지만 출력이 아닌 입력으로 간주됩니다.
어떻게해야합니까?
해결법
-
==============================
1.marios 및 kostya 덕분에 Spark에서 HDFS로 텍스트 파일을 작성하는 몇 가지 단계가 있습니다.
marios 및 kostya 덕분에 Spark에서 HDFS로 텍스트 파일을 작성하는 몇 가지 단계가 있습니다.
// Hadoop Config is accessible from SparkContext val fs = FileSystem.get(sparkContext.hadoopConfiguration); // Output file can be created from file system. val output = fs.create(new Path(filename)); // But BufferedOutputStream must be used to output an actual text file. val os = BufferedOutputStream(output) os.write("Hello World".getBytes("UTF-8")) os.close()
제안 된 FSDataOutputStream은 Java 직렬화 된 객체 출력 스트림이며 텍스트 출력 스트림이 아닙니다. writeUTF 메서드는 plaint 텍스트를 쓰는 것처럼 보이지만 실제로는 여분의 바이트를 포함하는 이진 직렬화 형식입니다.
-
==============================
2.Spark 2.0을 사용하여 나에게 가장 잘 맞는 것은 다음과 같다.
Spark 2.0을 사용하여 나에게 가장 잘 맞는 것은 다음과 같다.
val path = new Path("hdfs://namenode:8020/some/folder/myfile.txt") val conf = new Configuration(spark.sparkContext.hadoopConfiguration) conf.setInt("dfs.blocksize", 16 * 1024 * 1024) // 16MB HDFS Block Size val fs = path.getFileSystem(conf) if (fs.exists(path)) fs.delete(path, true) val out = new BufferedOutputStream(fs.create(path))) val txt = "Some text to output" out.write(txt.getBytes("UTF-8")) out.flush() out.close() fs.close()
-
==============================
3.HDFS API (hadoop-hdfs.jar)를 사용하면 HDFS 경로에 대한 InputStream / OutputStream을 만들고 일반 java.io 클래스를 사용하여 파일에서 읽고 쓸 수 있습니다. 예 :
HDFS API (hadoop-hdfs.jar)를 사용하면 HDFS 경로에 대한 InputStream / OutputStream을 만들고 일반 java.io 클래스를 사용하여 파일에서 읽고 쓸 수 있습니다. 예 :
URI uri = URI.create (“hdfs://host:port/file path”); Configuration conf = new Configuration(); FileSystem file = FileSystem.get(uri, conf); FSDataInputStream in = file.open(new Path(uri));
이 코드는 로컬 파일에서도 작동합니다 (hdfs : //를 file : //로 변경).
-
==============================
4.HDFS에 파일을 쓰는 한 가지 간단한 방법은 SequenceFiles를 사용하는 것입니다. 여기에서는 Spark에서 제공하지 않는 원시 Hadoop API를 사용합니다.
HDFS에 파일을 쓰는 한 가지 간단한 방법은 SequenceFiles를 사용하는 것입니다. 여기에서는 Spark에서 제공하지 않는 원시 Hadoop API를 사용합니다.
여기 간단한 스 니펫 (Scala에서)이있다 :
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.io._ val conf = new Configuration() // Hadoop configuration val sfwriter = SequenceFile.createWriter(conf, SequenceFile.Writer.file(new Path("hdfs://nn1.example.com/file1")), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(Text.class)) val lw = new LongWritable() val txt = new Text() lw.set(12) text.set("hello") sfwriter.append(lw, txt) sfwriter.close() ...
키가없는 경우 NullWritable.class를 대신 사용할 수 있습니다.
SequenceFile.Writer.keyClass(NullWritable.class) sfwriter.append(NullWritable.get(), new Text("12345"));
from https://stackoverflow.com/questions/32952121/can-i-write-a-plain-text-hdfs-or-local-file-from-a-spark-program-not-from-an by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hive에 저장된 데이터에 대해 돼지 쿼리 실행 (0) | 2019.06.15 |
---|---|
[HADOOP] 하나의지도에 여러 줄의 텍스트 제공 (0) | 2019.06.15 |
[HADOOP] (하둡) MapReduce - 체인 작업 - JobControl이 멈추지 않습니다 (0) | 2019.06.15 |
[HADOOP] 먼저 실행되는 것은 파티셔너 또는 결합기입니까? (0) | 2019.06.15 |
[HADOOP] MapReduceBase 및 Mapper가 더 이상 사용되지 않습니다. (0) | 2019.06.15 |