[HADOOP] 돼지에서 여러 튜플로 튜플 분할하기
HADOOP돼지에서 여러 튜플로 튜플 분할하기
단일 튜플에서 여러 개의 튜플을 생성하고 싶습니다. 내가 말하고 싶은 건: 나는 그 안에 다음 데이터가있는 파일이있다.
>> cat data
ID | ColumnName1:Value1 | ColumnName2:Value2
그래서 나는 다음 명령으로 그것을 적재한다.
grunt >> A = load '$data' using PigStorage('|');
grunt >> dump A;
(ID,ColumnName1:Value1,ColumnName2:Value2)
이제이 튜플을 두 개의 튜플로 나누기를 원합니다.
(ID, ColumnName1, Value1)
(ID, ColumnName2, Value2)
foreach와 함께 UDF를 사용하여 생성 할 수 있습니까? 다음과 같은 것이 있습니까?
grunt >> foreach A generate SOMEUDF(A)
편집하다:
입력 튜플 : (id1, column1, column2) 출력 : 두 개의 튜플 (id1, column1)과 (id2, column2)이므로 List 또는 Bag을 반환해야합니까?
public class SPLITTUPPLE extends EvalFunc <List<Tuple>>
{
public List<Tuple> exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
try{
// not sure how whether I can create tuples on my own. Looks like I should use TupleFactory.
// return list of tuples.
}catch(Exception e){
throw WrappedIOException.wrap("Caught exception processing input row ", e);
}
}
}
이 접근법이 맞습니까?
해결법
-
==============================
1.UDF를 작성하거나 내장 함수가있는 PIG 스크립트를 사용할 수 있습니다.
UDF를 작성하거나 내장 함수가있는 PIG 스크립트를 사용할 수 있습니다.
예 :
-- data should be chararray, PigStorage('|') return bytearray which will not work for this example inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); -- split by | and create a row so we can dereference it later splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ; -- first column is id, rest is converted into a bag and flatten it to make rows id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value; -- there will be records with (id, id), but id should not have ':' id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals; final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val); dump final;
테스트 입력 :
1|c1:11:33|c2:12 234|c1:21|c2:22 33|c1:31|c2:32 345|c1:41|c2:42
산출
(1,c1,11:33) (1,c2,12) (234,c1,21) (234,c2,22) (33,c1,31) (33,c2,32) (345,c1,41) (345,c2,42)
도움이되기를 바랍니다.
건배.
-
==============================
2.다음은 UDF 버전입니다. 나는 가방을 돌려 보내는 것을 선호한다 :
다음은 UDF 버전입니다. 나는 가방을 돌려 보내는 것을 선호한다 :
import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; /** * Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag * {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...} * * Default rows separator is '|' and key value separator is ':'. * In this implementation white spaces around separator characters are not removed. * ID can be made of any character (including sequence of white spaces). * @author * */ public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> { private static final TupleFactory tupleFactory = TupleFactory.getInstance(); private static final BagFactory bagFactory = BagFactory.getInstance(); //Row separator character. Default is '|'. private String rowsSeparator; //Column value separator character. Default i private String columnValueSeparator; public TupleToBagColumnValuePairs() { this.rowsSeparator = "\\|"; this.columnValueSeparator = ":"; } public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) { this.rowsSeparator = rowsSeparator; this.columnValueSeparator = keyValueSeparator; } /** * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray) * @param outputBag Output tuples (id, column, value) are added to this bag * @param id * @param column * @param value * @throws ExecException */ protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException { Tuple outputTuple = tupleFactory.newTuple(); outputTuple.append(id); outputTuple.append(column); outputTuple.append( value); outputBag.add(outputTuple); } /** * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value) * @param outputBag Output tuples (id, column, value) should be added to this bag * @param id * @param splitInputLine format column{separator}value, which start from index 1 * @throws ExecException */ protected void parseColumnValues(DataBag outputBag, String id, String[] splitInputLine) throws ExecException { for (int i = 1; i < splitInputLine.length; i++) { if (splitInputLine[i] != null) { int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator); if (columnValueSplitIndex != -1) { String column = splitInputLine[i].substring(0, columnValueSplitIndex); String value = null; if (columnValueSplitIndex + 1 < splitInputLine[i].length()) { value = splitInputLine[i].substring(columnValueSplitIndex + 1); } this.addTuple(outputBag, id, column, value); } else { String column = splitInputLine[i]; this.addTuple(outputBag, id, column, null); } } } } /** * input - contains only one field of type chararray, which will be split by '|' * All inputs that are: null or of length 0 are ignored. */ @Override public DataBag exec(Tuple input) throws IOException { if (input == null || input.size() != 1 || input.isNull(0)) { return null; } String inputLine = (String)input.get(0); String[] splitInputLine = inputLine.split(this.rowsSeparator, -1); if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) { String id = splitInputLine[0]; DataBag outputBag = bagFactory.newDefaultBag(); if (splitInputLine.length == 1) { // there is just an id in the line this.addTuple(outputBag, id, null, null); } else { this.parseColumnValues(outputBag, id, splitInputLine); } return outputBag; } return null; } @Override public Schema outputSchema(Schema input) { try { if (input.size() != 1) { throw new RuntimeException("Expected input to have only one field"); } Schema.FieldSchema inputFieldSchema = input.getField(0); if (inputFieldSchema.type != DataType.CHARARRAY) { throw new RuntimeException("Expected a CHARARRAY as input"); } Schema tupleSchema = new Schema(); tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY)); tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY)); tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY)); return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG)); } catch (FrontendException exx) { throw new RuntimeException(exx); } } }
다음은 PIG에서 사용되는 방법입니다.
register 'path to the jar'; define IdColumnValue myPackage.TupleToBagColumnValuePairs(); inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2); dump result;
가방으로 UDF를 작성하는 좋은 영감은 LinkedIn의 DataFu 소스 코드를 참조하십시오.
-
==============================
3.당신은 가방을 얻기 위해 STRSPLIT의 결과물에 TransposeTupleToBag (DataFu lib의 UDF)를 사용할 수 있고 원래의 열 당 별도의 행을 만들기 위해 가방을 평평하게 만듭니다.
당신은 가방을 얻기 위해 STRSPLIT의 결과물에 TransposeTupleToBag (DataFu lib의 UDF)를 사용할 수 있고 원래의 열 당 별도의 행을 만들기 위해 가방을 평평하게 만듭니다.
from https://stackoverflow.com/questions/11287362/splitting-a-tuple-into-multiple-tuples-in-pig by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hadoop이 String이나 Integer 대신에 Text 나 IntWritable 같은 클래스를 필요로하는 이유는 무엇입니까? (0) | 2019.06.13 |
---|---|
[HADOOP] '지도 만'hadoop 작업을 작성하는 방법? (0) | 2019.06.13 |
[HADOOP] MapReduce2에서 vcores 및 메모리를 기반으로 컨테이너를 만드는 방법은 무엇입니까? (0) | 2019.06.13 |
[HADOOP] Hadoop mapreduce 작업에서 JVM 재사용 (0) | 2019.06.13 |
[HADOOP] Windows 서버의 Hadoop (0) | 2019.06.13 |