복붙노트

[HADOOP] UDF에 전달하기 위해 가방을 절단

HADOOP

UDF에 전달하기 위해 가방을 절단

Hadoop 클러스터에서 Pig를 사용하면이 프로젝트에서 계속 작업하면서 필드를 정기적으로 추가하는 거대한 튜플이 많이 있습니다. 여러 UDF는 다양한 필드를 사용합니다. 각 튜플의 일부 필드에서 UDF를 호출하고 결과를 해당 특정 튜플에 다시 연결할 수 있기를 원합니다. 고유 ID를 사용하여 레코드를 다시 연결하기 위해 조인을 수행하면 수십억 개의 레코드가 영원히 걸립니다.

GENERATE 문 내 에서이 작업을 수행 할 수있는 방법이 있어야한다고 생각하지만 올바른 구문을 찾을 수 없습니다.

아이디어를 얻기 위해 Python UDF를 사용하는 장난감 코드가 있습니다.

Register 'jumper.py' using jython as myfuncs;

jumps = LOAD 'jumps.csv' USING PigStorage(',')
   AS (jumper:int, attempt:int, distance:double, location:chararray);

byJumper = GROUP jumps by jumper;

sigmas = FOREACH byJumper GENERATE
    jumps.jumper, jumps.attempt, jumps.distance, jumps.location,
    myfuncs.conv2sigma(jumps.distance);

rmf sigmas
STORE sigmas INTO 'sigmas' USING PigStorage(',');

이것은 내가 기대하는 형태의 튜플이 아닌 각 튜플에 단일 필드가있는 튜플 백을 생성합니다.

입력 데이터는

각 점프에 대해 점퍼가 평균에서 몇 개의 표준 편차 (시그마)를 생성했는지 확인한 다음 나중에 위치별로 시그마를 상관시켜 점퍼가 가장 잘 수행되는 위치를 확인합니다. 각 개인의 평균 및 표준 편차를 계산 한 다음 각 점프의 '시그마'를 계산하고 새 시그마 필드가 첨부 된 데이터를 저장해야합니다.

질문은 ~이야:

나는 여러 가지 방법으로 FLATTEN을 시도했으며 엄청난 양의 교차 제품 만 얻습니다. 점퍼를 가져 와서 트리플을 시도하고 출력 한 다음 JOIN을 수행하도록 UDF를 변경할 수 있지만 실제 환경에서는이 솔루션이 데이터 세트의 크기 때문에 비실용적입니다.

집에서 시도해보고 싶은 경우 지원 코드와 데이터는 다음과 같습니다.

jumper.py : (빠르고 신중하지 않은 구현-여기서 중요한 것은 가방 입력을 받아서 각 입력 튜플에 해당하는 하나의 출력 튜플로 가방 출력을 생성한다는 것입니다)

#!/usr/local/bin/python
# we're forced to use python 2.5.2 :-(

from math import sqrt

@outputSchema("y:bag{t:tuple(sigma:double)}")
def conv2sigma(bag):
    s = 0.0
    n = 0
    dd = []
    print('conv2sigma input bag:')
    print(bag)
    for word in bag:
        d = float(word[0])
        dd.append(d)
        n += 1
        s += d
    a = s / n
    ss = 0
    for d in dd:
        ss += (d-a)**2
    sd = sqrt(ss)
    outputBag = []
    for d in dd:
        outputBag.append( ( (d-a)/sd, ) )
    print('conv2sigma output bag:')
    print(outputBag)
    return outputBag

입력 파일이 jumps.csv :

0,0,5,a
0,1,6,b
0,2,7,c
0,3,5,a
0,4,8,c
0,5,7,b
0,6,6,b
0,7,7,c
0,8,5,a
1,0,6,a
1,1,5,a
1,2,7,b
1,3,4,a
1,4,5,a
1,5,7,b
1,6,8,c
1,7,9,c
1,8,5,a
1,9,4,a
1,10,5,a
1,11,6,b
1,12,8,c
1,13,8,b
2,0,7,b
2,1,5,a
2,2,6,b
2,3,5,a
2,4,7,c
2,5,5,a
2,6,6,c
2,7,5,a
2,8,7,b
2,9,5,a
2,10,6,b

작성된대로 출력이 생성됩니다.

{(0),(0),(0),(0),(0),(0),(0),(0),(0)},{(1),(2),(3),(4),(5),(6),(7),(8),(0)},{(6.0),(7.0),(5.0),(8.0),(7.0),(6.0),(7.0),(5.0),(5.0)},{(b),(c),(a),(c),(b),(b),(c),(a),(a)},{(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(0.5751081237516715),(0.25160980414135625),(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(-0.39538683507927425)}
{(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1)},{(8),(0),(1),(2),(3),(4),(5),(6),(7),(9),(10),(11),(12),(13)},{(5.0),(6.0),(5.0),(7.0),(4.0),(5.0),(7.0),(8.0),(9.0),(4.0),(5.0),(6.0),(8.0),(8.0)},{(a),(a),(a),(b),(a),(a),(b),(c),(c),(a),(a),(b),(c),(b)},{(-0.20716308289978433),(-0.03655819109996196),(-0.20716308289978433),(0.1340467006998604),(-0.3777679746996067),(-0.20716308289978433),(0.1340467006998604),(0.30465159249968277),(0.4752564842995052),(-0.3777679746996067),(-0.20716308289978433),(-0.03655819109996196),(0.30465159249968277),(0.30465159249968277)}
{(2),(2),(2),(2),(2),(2),(2),(2),(2),(2),(2)},{(0),(1),(2),(3),(4),(5),(6),(7),(8),(9),(10)},{(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0)},{(b),(a),(b),(a),(c),(a),(c),(a),(b),(a),(b)},{(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684)}

각 출력 튜플은 백 모음이며 각 백은 한 필드에서 하나의 항목을 가진 튜플을 포함하며 이는 우리가 원하는 것이 아닙니다.

해결법

  1. ==============================

    1.이 작업은 두 단계로 수행해야합니다. 각 점프에는 고유 한 시그마 값이 있으므로 각 시그마를 올바른 점프와 올바르게 연관 시키려면 ID를 시그마 계산 UDF에 전달한 다음 결과를 다시 결합 (나쁜 생각)해야합니다. 요약 통계를 먼저 (평균 및 표준 편차) 사용한 다음 나중에 시그마를 도출합니다. 방법은 다음과 같습니다.

    이 작업은 두 단계로 수행해야합니다. 각 점프에는 고유 한 시그마 값이 있으므로 각 시그마를 올바른 점프와 올바르게 연관 시키려면 ID를 시그마 계산 UDF에 전달한 다음 결과를 다시 결합 (나쁜 생각)해야합니다. 요약 통계를 먼저 (평균 및 표준 편차) 사용한 다음 나중에 시그마를 도출합니다. 방법은 다음과 같습니다.

    jumps = LOAD 'jumps.csv' USING PigStorage(',')
       AS (jumper:int, attempt:int, distance:double, location:chararray);
    
    byJumper = GROUP jumps by jumper;
    
    jumperSummaries =
        FOREACH byJumper
        GENERATE
            group AS jumper,
            FLATTEN(jumps.(attempt, distance, location)),
            myfuncs.mean(jumps.distance) AS mean,
            myfunds.stddev(jumps.distance) AS stddev;
    
    sigmas =
        FOREACH jumperSummaries
        GENERATE
            jumper,
            attempt,
            distance,
            location,
            myfuncs.sigma(distance, mean, stddev) AS sigma;
    

    FLATTEN은 모든 점프를 그룹 해제하고 원래 입력을 되돌려줍니다. 단, 모든 레코드는 해당 점퍼에 대한 평균 및 표준 편차를 복사 한 다음 각 점프마다 시그마를 계산하는 데 사용할 수 있습니다.

    이것은 개념적으로 두 단계이지만 여전히 맵 축소 작업을 한 번만 수행합니다.

  2. ==============================

    2.WinnieNicklaus의 답변과 비교하고 의견을 제시하기 위해 내가 생각해 낸 해결책은 다음과 같습니다.

    WinnieNicklaus의 답변과 비교하고 의견을 제시하기 위해 내가 생각해 낸 해결책은 다음과 같습니다.

    Register 'jumper.py' using jython as myfuncs;
    
    jumps = LOAD 'jumps.csv' USING PigStorage(',')
       AS (jumper:int, attempt:int, distance:double, location:chararray);
    
    byJumper = GROUP jumps by jumper;
    
    sigmas0 = FOREACH byJumper
        GENERATE
            FLATTEN(jumps),
            FLATTEN(myfuncs.conv2sigma(jumps.(jumper,attempt,distance)));
    
    sigmas1 = FILTER sigmas0 BY jumper == s_id AND attempt == s_att;
    
    sigmas = FOREACH sigmas1
            GENERATE jumper, attempt, distance, location, sigma;
    
    rmf sigmas
    STORE sigmas INTO 'sigmas' USING PigStorage(',');
    

    첫 번째 FOREACH는 (잠재적으로 큰) 교차 곱 sigma0을 작성하고, 제품의 "잘못된"요소를 필터링하고 원하는 필드를 생성합니다. JOIN은 종종 이런 식으로 학술적으로 설명됩니다.

    속도가 느릴 수 있습니다.

    그러나 여전히 단일 Map-Reduce 작업이 발생하며 실제로는 빠른 것 같습니다.

    나를위한 큰 승리는 내 UDF가 임의로 복잡한 작업을 수행하고 입력 데이터에 다시 연결된 많은 튜플을 임의로 반환 할 수 있다는 것입니다.

  3. from https://stackoverflow.com/questions/21711584/cutting-down-bag-to-pass-to-udf by cc-by-sa and MIT license