[HADOOP] mapreduce를 사용하여 HDFS에서 이미지 읽기
HADOOPmapreduce를 사용하여 HDFS에서 이미지 읽기
이 코드에서 나를 도우십시오. HDFS에서 이미지를 재 배열하려고합니다. WholeFileInputFormat을 사용하고 있습니다. WholeFileRecordreader와 함께. 컴파일 시간 오류가 없습니다. 그러나 코드는 런타임 오류를 제공합니다. 결과는 다음과 같습니다. 지정된 클래스 WholeFileInputFormat의 인스턴스를 만들 수 없습니다. map-reduce에서 hdfs의 입력으로 여러 이미지 파일을 읽는 방법에 대한 의견에 따라이 코드를 작성 했습니까? 이 코드에서 나를 도우십시오. 그것은 3 개의 클래스를 포함합니다. 어떻게 디버깅합니까? 아니면 다른 방법으로?
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class map2 extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<NullWritable, BytesWritable, Text, Text> {
private Text input_image = new Text();
private Text input_vector = new Text();
@Override
public void map(NullWritable key,BytesWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
System.out.println("CorrelogramIndex Method:");
String featureString;
int MAXIMUM_DISTANCE = 16;
AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;
byte[] identifier=value.getBytes();
BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier));
AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);
vd.extract(bimg);
featureString = vd.getStringRepresentation();
double[] bytearray = vd.getDoubleHistogram();
System.out.println("image: " + identifier + " " + featureString);
System.out.println(" ------------- ");
input_image.set(identifier);
input_vector.set(featureString);
output.collect(input_image, input_vector);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String out_vector = "";
while (values.hasNext()) {
out_vector += (values.next().toString());
}
output.collect(key, new Text(out_vector));
}
}
static int printUsage() {
System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), map2.class);
conf.setJobName("image_mapreduce");
conf.setInputFormat(WholeFileInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<>();
for (int i = 0; i < args.length; ++i) {
try {
switch (args[i]) {
case "-m":
conf.setNumMapTasks(Integer.parseInt(args[++i]));
break;
case "-r":
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
break;
default:
other_args.add(args[i]);
break;
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from "
+ args[i - 1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: "
+ other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new map2(), args);
System.exit(res);
}
}
-----------------------------------------------------------------------------------
//WholeFileInputFormat
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
public class WholeFileInputFormat<NullWritable, BytesWritable>
extends FileInputFormat<NullWritable, BytesWritable> {
// @Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
//@Override
public WholeFileRecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
@Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
throws IOException;
}
-------------------------------------------------------------------------------
//WholeInputFileRecorder
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;
class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> { //recordreader
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getJobConf();
}
@Override
public boolean next(NullWritable k, BytesWritable v) throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable createKey() {
return NullWritable.get();
}
@Override
public BytesWritable createValue() {
return value;
}
@Override
public long getPos() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public float getProgress() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
}
해결법
-
==============================
1.WholeFileInputFormat은 추상으로 정의됩니다. 어떻게 인스턴스를 만들려고합니까?
WholeFileInputFormat은 추상으로 정의됩니다. 어떻게 인스턴스를 만들려고합니까?
추상 구현을하지 않거나 구체적인 구현으로 서브 클래스 화하십시오.
from https://stackoverflow.com/questions/10885039/reading-images-from-hdfs-using-mapreduce by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Java API를 사용하여 Hive에서 테이블 속성 가져 오기 (0) | 2019.06.12 |
---|---|
[HADOOP] STRUCT를 사용한 간단한 Hive 쿼리의 구문 오류는 어디에 있습니까? (0) | 2019.06.12 |
[HADOOP] Flume HDFS 싱크는 작은 파일을 계속 롤링합니다. (0) | 2019.06.12 |
[HADOOP] Hive - Thrift - 이전 클라이언트의 readMessageBegin에 누락 된 버전이 있습니까? (0) | 2019.06.12 |
[HADOOP] Java에서 작동하지 않는 Kerberos 티켓 자동 갱신 (0) | 2019.06.12 |