[HADOOP] 직렬화에서 Avro NullPointerException이있는 MRUnit
HADOOP직렬화에서 Avro NullPointerException이있는 MRUnit
MRUnit을 사용하여 Hadoop .mapreduce Avro 작업을 테스트하려고합니다. 아래에서 볼 수 있듯이 NullPointerException을 받고 있습니다. 나는 pom과 소스 코드의 일부를 첨부했다. 어떤 도움을 주시면 감사하겠습니다.
감사
내가 얻는 오류는 다음과 같습니다.
java.lang.NullPointerException
at org.apache.hadoop.mrunit.internal.io.Serialization.copy(Serialization.java:73)
at org.apache.hadoop.mrunit.internal.io.Serialization.copy(Serialization.java:91)
at org.apache.hadoop.mrunit.internal.io.Serialization.copyWithConf(Serialization.java:104)
at org.apache.hadoop.mrunit.TestDriver.copy(TestDriver.java:608)
at org.apache.hadoop.mrunit.MapDriverBase.setInputKey(MapDriverBase.java:64)
at org.apache.hadoop.mrunit.MapDriverBase.setInput(MapDriverBase.java:104)
at org.apache.hadoop.mrunit.MapDriverBase.withInput(MapDriverBase.java:218)
at org.lab41.project.mapreduce.ParseMetadataAsTextIntoAvroTest.testMap(ParseMetadataAsTextIntoAvroTest.java:115)
.....
pom snippet :
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>0.9.0-incubating</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
<avro.version>1.7.4</avro.version>
<hadoop.version>2.0.0-mr1-cdh4.1.3</hadoop.version>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>${avro.version}</version>
<classifier>hadoop2</classifier>
</dependency>
다음은 시험 발췌입니다.
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.lab41.project.domain.DataRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ParseMetadataAsTextIntoAvroTest {
Logger logger = LoggerFactory
.getLogger(ParseMetadataAsTextIntoAvroTest.class);
private MapDriver<LongWritable, Text, AvroKey<Long>, AvroValue<DataRecord>> mapDriver;
@BeforeClass
public static void setUpClass() {
}
@AfterClass
public static void tearDownClass() {
}
@Before
public void setUp() throws IOException {
ParseMetadataAsTextIntoAvroMapper mapper = new ParseMetadataAsTextIntoAvroMapper();
mapDriver = new MapDriver<LongWritable, Text, AvroKey<Long>, AvroValue<DataRecord>>();
mapDriver.setMapper(mapper);
mapDriver.getConfiguration().setStrings("io.serializations", new String[]{
AvroSerialization.class.getName()
});
}
@Test
public void testMap() throws ParseException, IOException {
Text testInputText = new Text(test0);
DataRecord record = new DataRecord();
….
AvroKey<Long> expectedPivot = new AvroKey<Long>(1L);
AvroValue<DataRecord> expectedRecord = new AvroValue<DataRecord>(record);
mapDriver.withInput(new Pair<LongWritable, Text>(new LongWritable(1), testInputText));
mapDriver.withOutput(new Pair<AvroKey<Long>, AvroValue<DataRecord>>(expectedPivot, expectedRecord));
mapDriver.runTest();
}
}
해결법
-
==============================
1.이것을 작동 시키려면 AvroSerializatio를 기본 serailization에 추가하십시오. 또한 AvroSerializationn을 구성해야합니다.
이것을 작동 시키려면 AvroSerializatio를 기본 serailization에 추가하십시오. 또한 AvroSerializationn을 구성해야합니다.
@Before public void setUp() throws IOException { ParseMetadataAsTextIntoAvroMapper mapper = new ParseMetadataAsTextIntoAvroMapper(); mapDriver = new MapDriver<LongWritable, Text, AvroKey<Long>, AvroValue<NetworkRecord>>(); mapDriver.setMapper(mapper); //Copy over the default io.serializations. If you don't do this then you will //not be able to deserialize the inputs to the mapper String[] strings = mapDriver.getConfiguration().getStrings("io.serializations"); String[] newStrings = new String[strings.length +1]; System.arraycopy( strings, 0, newStrings, 0, strings.length ); newStrings[newStrings.length-1] = AvroSerialization.class.getName(); //Now you have to configure AvroSerialization by sepecifying the key //writer Schema and the value writer schema. mapDriver.getConfiguration().setStrings("io.serializations", newStrings); mapDriver.getConfiguration().setStrings("avro.serialization.key.writer.schema", Schema.create(Schema.Type.LONG).toString(true)); mapDriver.getConfiguration().setStrings("avro.serialization.value.writer.schema", NetworkRecord.SCHEMA$.toString(true)); }
-
==============================
2.이것은 또한 더 짧고 명확한 코드의 장점으로 문제를 해결합니다.
이것은 또한 더 짧고 명확한 코드의 장점으로 문제를 해결합니다.
MapDriver driver = MapDriver.newMapDriver(your mapper); Configuration conf = driver.getConfiguration(); AvroSerialization.addToConfiguration(conf); AvroSerialization.setKeyWriterSchema(conf, your schema); AvroSerialization.setKeyReaderSchema(conf, your schema); Job job = new Job(conf); job.set... your job settings; AvroJob.set... your avro job settings;
mrunit의 버그 일 수 있으며, io.serializations 권한을 설정하지 마십시오. 대신 job.setInputFormatClass (AvroKeyInputFormat.class)에 의해 설정되어 있어야합니다.
-
==============================
3.기본 직렬화에 AvroSerialization을 추가하고 AvroSerialization을 구성해야합니다.
기본 직렬화에 AvroSerialization을 추가하고 AvroSerialization을 구성해야합니다.
@Before public void setUp() throws IOException { ParseMetadataAsTextIntoAvroMapper mapper = new ParseMetadataAsTextIntoAvroMapper(); mapDriver = new MapDriver<LongWritable, Text, AvroKey<Long>, AvroValue<NetworkRecord>>(); mapDriver.setMapper(mapper); Configuration configuration = mapDriver.getConfiguration(); // Add AvroSerialization to the configuration // (copy over the default serializations for deserializing the mapper inputs) String[] serializations = configuration.getStrings(CommonConfigurationKeysPublic.IO_SERIALIZATIONS_KEY); String[] newSerializations = Arrays.copyOf(serializations, serializations.length + 1); newSerializations[serializations.length] = AvroSerialization.class.getName(); configuration.setStrings(CommonConfigurationKeysPublic.IO_SERIALIZATIONS_KEY, newSerializations); //Configure AvroSerialization by specifying the key writer and value writer schemas AvroSerialization.setKeyWriterSchema(configuration, Schema.create(Schema.Type.LONG)); AvroSerialization.setValueWriterSchema(configuration, NetworkRecord.SCHEMA$) }
-
==============================
4.여기에 답변 : https://issues.apache.org/jira/browse/MRUNIT-181 구체적으로 : https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+with+Avro
여기에 답변 : https://issues.apache.org/jira/browse/MRUNIT-181 구체적으로 : https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+with+Avro
from https://stackoverflow.com/questions/15230482/mrunit-with-avro-nullpointerexception-in-serialization by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] PL / SQL을 하이브 QL로 변환 (0) | 2019.06.11 |
---|---|
[HADOOP] saveAsTextFile을 사용하여 NullPointerException을 발생시킵니다. (0) | 2019.06.11 |
[HADOOP] Hadoop MapReduce에서 Map 출력 값으로 Object를 어떻게 설정합니까? (0) | 2019.06.11 |
[HADOOP] 로컬로 스파크 작업을 실행할 때 "Scheme : gs에 대한 파일 시스템 없음" (0) | 2019.06.11 |
[HADOOP] 데이터를 HDFS로 복사 할 때 createBlockOutputStream의 예외 (0) | 2019.06.11 |