복붙노트

[HADOOP] 돼지에서 여러 튜플로 튜플 분할하기

HADOOP

돼지에서 여러 튜플로 튜플 분할하기

단일 튜플에서 여러 개의 튜플을 생성하고 싶습니다. 내가 말하고 싶은 건: 나는 그 안에 다음 데이터가있는 파일이있다.

>> cat data
ID | ColumnName1:Value1 | ColumnName2:Value2

그래서 나는 다음 명령으로 그것을 적재한다.

grunt >> A = load '$data' using PigStorage('|');    
grunt >> dump A;    
(ID,ColumnName1:Value1,ColumnName2:Value2) 

이제이 튜플을 두 개의 튜플로 나누기를 원합니다.

(ID, ColumnName1, Value1)
(ID, ColumnName2, Value2)

foreach와 함께 UDF를 사용하여 생성 할 수 있습니까? 다음과 같은 것이 있습니까?

grunt >> foreach A generate SOMEUDF(A)

편집하다:

입력 튜플 : (id1, column1, column2) 출력 : 두 개의 튜플 (id1, column1)과 (id2, column2)이므로 List 또는 Bag을 반환해야합니까?

public class SPLITTUPPLE extends EvalFunc <List<Tuple>>
{
    public List<Tuple> exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
        try{
            // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory.
            // return list of tuples.
        }catch(Exception e){
            throw WrappedIOException.wrap("Caught exception processing input row ", e);
        }
    }
}

이 접근법이 맞습니까?

해결법

  1. ==============================

    1.UDF를 작성하거나 내장 함수가있는 PIG 스크립트를 사용할 수 있습니다.

    UDF를 작성하거나 내장 함수가있는 PIG 스크립트를 사용할 수 있습니다.

    예 :

    -- data should be chararray, PigStorage('|') return bytearray which will not work for this example
    inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray);
    
    -- split by | and create a row so we can dereference it later
    splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ;
    
    -- first column is id, rest is converted into a bag and flatten it to make rows
    id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value;
    -- there will be records with (id, id), but id should not have ':'
    id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals;
    final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val);
    dump final;
    

    테스트 입력 :

    1|c1:11:33|c2:12
    234|c1:21|c2:22
    33|c1:31|c2:32
    345|c1:41|c2:42
    

    산출

    (1,c1,11:33)
    (1,c2,12)
    (234,c1,21)
    (234,c2,22)
    (33,c1,31)
    (33,c2,32)
    (345,c1,41)
    (345,c2,42)
    

    도움이되기를 바랍니다.

    건배.

  2. ==============================

    2.다음은 UDF 버전입니다. 나는 가방을 돌려 보내는 것을 선호한다 :

    다음은 UDF 버전입니다. 나는 가방을 돌려 보내는 것을 선호한다 :

    import java.io.IOException;
    
    import org.apache.pig.EvalFunc;
    import org.apache.pig.backend.executionengine.ExecException;
    import org.apache.pig.data.BagFactory;
    import org.apache.pig.data.DataBag;
    import org.apache.pig.data.DataType;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.data.TupleFactory;
    import org.apache.pig.impl.logicalLayer.FrontendException;
    import org.apache.pig.impl.logicalLayer.schema.Schema;
    
    /**
     * Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
     * {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...}
     *  
     *  Default rows separator is '|' and key value separator is ':'. 
     *  In this implementation white spaces around separator characters are not removed.
     *  ID can be made of any character (including sequence of white spaces). 
     * @author 
     *
     */
    public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> {
    
        private static final TupleFactory tupleFactory = TupleFactory.getInstance();
        private static final BagFactory bagFactory = BagFactory.getInstance();
    
        //Row separator character. Default is '|'.
        private String rowsSeparator;
        //Column value separator character. Default i
        private String columnValueSeparator;
    
        public TupleToBagColumnValuePairs() {
            this.rowsSeparator = "\\|";
            this.columnValueSeparator = ":";
        }
    
        public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) {
            this.rowsSeparator = rowsSeparator;
            this.columnValueSeparator = keyValueSeparator;
        }
    
        /**
         * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray)
         * @param outputBag Output tuples (id, column, value) are added to this bag
         * @param id
         * @param column
         * @param value
         * @throws ExecException
         */
        protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException {
            Tuple outputTuple = tupleFactory.newTuple();
            outputTuple.append(id);
            outputTuple.append(column);
            outputTuple.append( value);
            outputBag.add(outputTuple);
        }
    
        /**
         * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value)
         * @param outputBag Output tuples (id, column, value) should be added to this bag
         * @param id 
         * @param splitInputLine format column{separator}value, which start from index 1
         * @throws ExecException
         */
        protected void parseColumnValues(DataBag outputBag, String id,
                String[] splitInputLine) throws ExecException {
            for (int i = 1; i < splitInputLine.length; i++) {
                if (splitInputLine[i] != null) {
                    int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator);
                    if (columnValueSplitIndex != -1) {
                        String column = splitInputLine[i].substring(0, columnValueSplitIndex);
                        String value = null;
                        if (columnValueSplitIndex + 1 < splitInputLine[i].length()) {
                            value = splitInputLine[i].substring(columnValueSplitIndex + 1);
                        }
                        this.addTuple(outputBag, id, column, value);
                    } else {
                        String column = splitInputLine[i];
                        this.addTuple(outputBag, id, column, null);
                    }
                }
            }
        }
    
        /**
         * input - contains only one field of type chararray, which will be split by '|'
         * All inputs that are: null or of length 0 are ignored.
         */
        @Override
        public DataBag exec(Tuple input) throws IOException {
            if (input == null || input.size() != 1 || input.isNull(0)) {
                return null;
            }
    
            String inputLine = (String)input.get(0);
            String[] splitInputLine = inputLine.split(this.rowsSeparator, -1);
    
            if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) {
                String id = splitInputLine[0];
                DataBag outputBag = bagFactory.newDefaultBag();            
                if (splitInputLine.length == 1) { // there is just an id in the line
                    this.addTuple(outputBag, id, null, null);
                } else {
                    this.parseColumnValues(outputBag, id, splitInputLine);
                }
    
    
               return outputBag; 
            }
            return null;
        }
    
        @Override
        public Schema outputSchema(Schema input) {
            try {
                if (input.size() != 1) {
                    throw new RuntimeException("Expected input to have only one field");
                }
    
                Schema.FieldSchema inputFieldSchema = input.getField(0);
                if (inputFieldSchema.type != DataType.CHARARRAY) {
                    throw new RuntimeException("Expected a CHARARRAY as input");
                }
    
                Schema tupleSchema = new Schema();
                tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY));
                tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY));
                tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY));
    
                return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG));
            } catch (FrontendException exx) {
                throw new RuntimeException(exx);
            }
        }
    
    }
    

    다음은 PIG에서 사용되는 방법입니다.

    register 'path to the jar';
    define IdColumnValue myPackage.TupleToBagColumnValuePairs();
    
    inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray);
    result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2);
    dump result;
    

    가방으로 UDF를 작성하는 좋은 영감은 LinkedIn의 DataFu 소스 코드를 참조하십시오.

  3. ==============================

    3.당신은 가방을 얻기 위해 STRSPLIT의 결과물에 TransposeTupleToBag (DataFu lib의 UDF)를 사용할 수 있고 원래의 열 당 별도의 행을 만들기 위해 가방을 평평하게 만듭니다.

    당신은 가방을 얻기 위해 STRSPLIT의 결과물에 TransposeTupleToBag (DataFu lib의 UDF)를 사용할 수 있고 원래의 열 당 별도의 행을 만들기 위해 가방을 평평하게 만듭니다.

  4. from https://stackoverflow.com/questions/11287362/splitting-a-tuple-into-multiple-tuples-in-pig by cc-by-sa and MIT license