[HADOOP] MapReduce 작업에서 Hive에 파티션 추가
HADOOPMapReduce 작업에서 Hive에 파티션 추가
저는 Hive와 MapReduce를 처음 사용하며 귀하의 답변에 진심으로 감사하고 올바른 접근 방식을 제공 할 것입니다.
hdfs / data / logs /에 외부 위치가있는 날짜 및 원본 서버로 파티션 된 하이브에서 외부 테이블 로그를 정의했습니다. 이 로그 파일을 가져 와서 나누고 위에서 언급 한 폴더 아래에 저장하는 MapReduce 작업이 있습니다. 처럼
"/data/logs/dt=2012-10-01/server01/"
"/data/logs/dt=2012-10-01/server02/"
...
...
MapReduce 작업에서 Hive의 테이블 로그에 파티션을 추가하고 싶습니다. 나는 두 가지 접근법을 안다
접근법 2의 경우 나는 옵션이 아닌 INSERT OVERWRITE의 예 만 봅니다. 작업 종료 후 이러한 새 파티션을 테이블에 추가하는 방법이 있습니까?
해결법
-
==============================
1.Map / Reduce 작업 내에서이 작업을 수행하려면 Hadoop에서 새 프로젝트 인 Apache HCatalog를 사용하는 것이 좋습니다.
Map / Reduce 작업 내에서이 작업을 수행하려면 Hadoop에서 새 프로젝트 인 Apache HCatalog를 사용하는 것이 좋습니다.
HCatalog는 실제로 HDFS 위에 추상화 계층이므로 Hive, Pig 또는 M / R 등의 표준화 된 방식으로 출력을 작성할 수 있습니다. 이것이 당신을 위해 그림에 나오는 곳은 출력 형식 HCatOutputFormat을 사용하여 Map / Reduce 작업에서 Hive로 데이터를 직접로드 할 수 있다는 것입니다. 아래는 공식 웹 사이트에서 가져온 예입니다.
(a = 1, b = 1)에 대한 특정 파티션을 작성하는 현재 코드 예제는 다음과 같습니다.
Map<String, String> partitionValues = new HashMap<String, String>(); partitionValues.put("a", "1"); partitionValues.put("b", "1"); HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues); HCatOutputFormat.setOutput(job, info);
그리고 여러 파티션에 쓰려면 위의 각 작업으로 별도의 작업을 시작해야합니다.
HCatalog와 함께 동적 파티션을 사용할 수도 있습니다.이 경우 동일한 작업에서 원하는만큼의 파티션을로드 할 수 있습니다!
위에 제공된 웹 사이트에서 HCatalog를 자세히 읽어 보는 것이 좋습니다. 필요한 경우 자세한 정보를 제공해야합니다.
-
==============================
2.실제로는 상황보다 조금 더 복잡합니다. 공식 출처 (현재)에 문서화되어 있지 않기 때문에 불행한 일이며, 알아내는 데 며칠이 걸립니다.
실제로는 상황보다 조금 더 복잡합니다. 공식 출처 (현재)에 문서화되어 있지 않기 때문에 불행한 일이며, 알아내는 데 며칠이 걸립니다.
HCatalog Mapreduce 작업이 동적 파티션에 쓰는 작업을 수행하려면 다음을 수행해야 함을 발견했습니다.
내 작업 (보통 리듀서)의 레코드 작성 단계에서 동적 파티션 (HCat Field Schema)을 HCat Schema 객체에 수동으로 추가해야합니다.
문제는 HCatOutputFormat.getTableSchema (config)가 실제로 분할 된 필드를 반환하지 않는다는 것입니다. 수동으로 추가해야합니다
HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null); HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null); schema.append(hfs1); schema.append(hfs2);
-
==============================
3.HCatalog를 사용하여 한 작업에서 동적 파티셔닝으로 여러 테이블에 쓰는 코드는 다음과 같습니다.이 코드는 Hadoop 2.5.0, Hive 0.13.1에서 테스트되었습니다.
HCatalog를 사용하여 한 작업에서 동적 파티셔닝으로 여러 테이블에 쓰는 코드는 다음과 같습니다.이 코드는 Hadoop 2.5.0, Hive 0.13.1에서 테스트되었습니다.
// ... Job setup, InputFormatClass, etc ... String dbName = null; String[] tables = {"table0", "table1"}; job.setOutputFormatClass(MultiOutputFormat.class); MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job); List<String> partitions = new ArrayList<String>(); partitions.add(0, "partition0"); partitions.add(1, "partition1"); HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null); HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null); for (String table : tables) { configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class); OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null); outputJobInfo.setDynamicPartitioningKeys(partitions); HCatOutputFormat.setOutput( configurer.getJob(table), outputJobInfo ); HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration()); schema.append(partition0); schema.append(partition1); HCatOutputFormat.setSchema( configurer.getJob(table), schema ); } configurer.configure(); return job.waitForCompletion(true) ? 0 : 1;
매퍼 :
public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { HCatRecord record = new DefaultHCatRecord(3); // Including partitions record.set(0, value.toString()); // partitions must be set after non-partition fields record.set(1, "0"); // partition0=0 record.set(2, "1"); // partition1=1 MultiOutputFormat.write("table0", null, record, context); MultiOutputFormat.write("table1", null, record, context); } }
from https://stackoverflow.com/questions/14286256/adding-partitions-to-hive-from-a-mapreduce-job by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하둡 청크 크기 대 분할 대 블록 크기 (0) | 2019.08.09 |
---|---|
[HADOOP] 스트리밍 작업을 위해 Hadoop 노드에 R 패키지를 임시로 설치 (0) | 2019.08.09 |
[HADOOP] Apache Sqoop과 Flume을 서로 바꿔 사용할 수 있습니까? (0) | 2019.08.09 |
[HADOOP] Combiner 또는 Partitioner를 누가 먼저 실행할 수 있습니까? (0) | 2019.08.09 |
[HADOOP] HTTP 콜백으로 외부 클라이언트가 Oozie 워크 플로우에 알리는 방법 (0) | 2019.08.09 |