[HADOOP] Hadoop의 MultipleOutputFormat과 같은 여러 파일에 결과를 쓸 수 있습니까?
HADOOPHadoop의 MultipleOutputFormat과 같은 여러 파일에 결과를 쓸 수 있습니까?
Apache Flink의 DataSet API를 사용하고 있습니다. 여러 결과를 다른 파일에 쓰는 작업을 구현하고 싶습니다.
어떻게해야합니까?
해결법
-
==============================
1.필요에 따라 DataSet 프로그램에 많은 데이터 싱크를 추가 할 수 있습니다.
필요에 따라 DataSet 프로그램에 많은 데이터 싱크를 추가 할 수 있습니다.
예를 들면 다음과 같은 프로그램에서 :
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<String, Long, Long>> data = env.readFromCsv(...); // apply MapFunction and emit data.map(new YourMapper()).writeToText("/foo/bar"); // apply FilterFunction and emit data.filter(new YourFilter()).writeToCsv("/foo/bar2");
CSV 파일에서 DataSet 데이터를 읽습니다. 이 데이터는 두 가지 후속 변환에 제공됩니다.
원하는대로 다중 데이터 소스와 분기 및 병합 데이터 세트 (유니온, 조인, 그룹, 크로스 또는 브로드 캐스트 세트 사용)를 사용할 수도 있습니다.
-
==============================
2.Flink에서 HadoopOutputFormat API를 다음과 같이 사용할 수 있습니다.
Flink에서 HadoopOutputFormat API를 다음과 같이 사용할 수 있습니다.
class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] { override def generateActualKey(key: K, value: V): K = NullWritable.get().asInstanceOf[K] override def generateFileNameForKeyValue(key: K, value: V, name: String): String = key.asInstanceOf[String] }
다음과 같이 IteblogMultipleTextOutputFormat을 사용할 수 있습니다.
val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]() val jc = new JobConf() FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/")) val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc) val batch = env.fromCollection(List(("A", "1"), ("A", "2"), ("A", "3"), ("B", "1"), ("B", "2"), ("C", "1"), ("D", "2"))) batch.output(format)
자세한 내용은 http : //www.iteblog.com/archives/1667에서 확인할 수 있습니다.
from https://stackoverflow.com/questions/37067959/can-flink-write-results-into-multiple-files-like-hadoops-multipleoutputformat by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] hbase 테이블로 TSV 파일 가져 오기 (0) | 2019.07.20 |
---|---|
[HADOOP] 관리자로 실행 중일 때도 Windows 오류 5 (액세스가 거부 됨)에서 Spark 실행 (0) | 2019.07.20 |
[HADOOP] spark jdbc에서 술어 사용하기 (0) | 2019.07.20 |
[HADOOP] Hadoop mapReduce HDFS에 값만 저장하는 방법 (0) | 2019.07.20 |
[HADOOP] 외부 hadoop 클러스터 인 Spark에서 H / A namenodes로 구성된 URI로 hdfs에 액세스하는 방법? (0) | 2019.07.20 |