[HADOOP] UDF에 전달하기 위해 가방을 절단
HADOOPUDF에 전달하기 위해 가방을 절단
Hadoop 클러스터에서 Pig를 사용하면이 프로젝트에서 계속 작업하면서 필드를 정기적으로 추가하는 거대한 튜플이 많이 있습니다. 여러 UDF는 다양한 필드를 사용합니다. 각 튜플의 일부 필드에서 UDF를 호출하고 결과를 해당 특정 튜플에 다시 연결할 수 있기를 원합니다. 고유 ID를 사용하여 레코드를 다시 연결하기 위해 조인을 수행하면 수십억 개의 레코드가 영원히 걸립니다.
GENERATE 문 내 에서이 작업을 수행 할 수있는 방법이 있어야한다고 생각하지만 올바른 구문을 찾을 수 없습니다.
아이디어를 얻기 위해 Python UDF를 사용하는 장난감 코드가 있습니다.
Register 'jumper.py' using jython as myfuncs;
jumps = LOAD 'jumps.csv' USING PigStorage(',')
AS (jumper:int, attempt:int, distance:double, location:chararray);
byJumper = GROUP jumps by jumper;
sigmas = FOREACH byJumper GENERATE
jumps.jumper, jumps.attempt, jumps.distance, jumps.location,
myfuncs.conv2sigma(jumps.distance);
rmf sigmas
STORE sigmas INTO 'sigmas' USING PigStorage(',');
이것은 내가 기대하는 형태의 튜플이 아닌 각 튜플에 단일 필드가있는 튜플 백을 생성합니다.
입력 데이터는
각 점프에 대해 점퍼가 평균에서 몇 개의 표준 편차 (시그마)를 생성했는지 확인한 다음 나중에 위치별로 시그마를 상관시켜 점퍼가 가장 잘 수행되는 위치를 확인합니다. 각 개인의 평균 및 표준 편차를 계산 한 다음 각 점프의 '시그마'를 계산하고 새 시그마 필드가 첨부 된 데이터를 저장해야합니다.
질문은 ~이야:
나는 여러 가지 방법으로 FLATTEN을 시도했으며 엄청난 양의 교차 제품 만 얻습니다. 점퍼를 가져 와서 트리플을 시도하고 출력 한 다음 JOIN을 수행하도록 UDF를 변경할 수 있지만 실제 환경에서는이 솔루션이 데이터 세트의 크기 때문에 비실용적입니다.
집에서 시도해보고 싶은 경우 지원 코드와 데이터는 다음과 같습니다.
jumper.py : (빠르고 신중하지 않은 구현-여기서 중요한 것은 가방 입력을 받아서 각 입력 튜플에 해당하는 하나의 출력 튜플로 가방 출력을 생성한다는 것입니다)
#!/usr/local/bin/python
# we're forced to use python 2.5.2 :-(
from math import sqrt
@outputSchema("y:bag{t:tuple(sigma:double)}")
def conv2sigma(bag):
s = 0.0
n = 0
dd = []
print('conv2sigma input bag:')
print(bag)
for word in bag:
d = float(word[0])
dd.append(d)
n += 1
s += d
a = s / n
ss = 0
for d in dd:
ss += (d-a)**2
sd = sqrt(ss)
outputBag = []
for d in dd:
outputBag.append( ( (d-a)/sd, ) )
print('conv2sigma output bag:')
print(outputBag)
return outputBag
입력 파일이 jumps.csv :
0,0,5,a
0,1,6,b
0,2,7,c
0,3,5,a
0,4,8,c
0,5,7,b
0,6,6,b
0,7,7,c
0,8,5,a
1,0,6,a
1,1,5,a
1,2,7,b
1,3,4,a
1,4,5,a
1,5,7,b
1,6,8,c
1,7,9,c
1,8,5,a
1,9,4,a
1,10,5,a
1,11,6,b
1,12,8,c
1,13,8,b
2,0,7,b
2,1,5,a
2,2,6,b
2,3,5,a
2,4,7,c
2,5,5,a
2,6,6,c
2,7,5,a
2,8,7,b
2,9,5,a
2,10,6,b
작성된대로 출력이 생성됩니다.
{(0),(0),(0),(0),(0),(0),(0),(0),(0)},{(1),(2),(3),(4),(5),(6),(7),(8),(0)},{(6.0),(7.0),(5.0),(8.0),(7.0),(6.0),(7.0),(5.0),(5.0)},{(b),(c),(a),(c),(b),(b),(c),(a),(a)},{(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(0.5751081237516715),(0.25160980414135625),(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(-0.39538683507927425)}
{(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1)},{(8),(0),(1),(2),(3),(4),(5),(6),(7),(9),(10),(11),(12),(13)},{(5.0),(6.0),(5.0),(7.0),(4.0),(5.0),(7.0),(8.0),(9.0),(4.0),(5.0),(6.0),(8.0),(8.0)},{(a),(a),(a),(b),(a),(a),(b),(c),(c),(a),(a),(b),(c),(b)},{(-0.20716308289978433),(-0.03655819109996196),(-0.20716308289978433),(0.1340467006998604),(-0.3777679746996067),(-0.20716308289978433),(0.1340467006998604),(0.30465159249968277),(0.4752564842995052),(-0.3777679746996067),(-0.20716308289978433),(-0.03655819109996196),(0.30465159249968277),(0.30465159249968277)}
{(2),(2),(2),(2),(2),(2),(2),(2),(2),(2),(2)},{(0),(1),(2),(3),(4),(5),(6),(7),(8),(9),(10)},{(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0)},{(b),(a),(b),(a),(c),(a),(c),(a),(b),(a),(b)},{(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684)}
각 출력 튜플은 백 모음이며 각 백은 한 필드에서 하나의 항목을 가진 튜플을 포함하며 이는 우리가 원하는 것이 아닙니다.
해결법
-
==============================
1.이 작업은 두 단계로 수행해야합니다. 각 점프에는 고유 한 시그마 값이 있으므로 각 시그마를 올바른 점프와 올바르게 연관 시키려면 ID를 시그마 계산 UDF에 전달한 다음 결과를 다시 결합 (나쁜 생각)해야합니다. 요약 통계를 먼저 (평균 및 표준 편차) 사용한 다음 나중에 시그마를 도출합니다. 방법은 다음과 같습니다.
이 작업은 두 단계로 수행해야합니다. 각 점프에는 고유 한 시그마 값이 있으므로 각 시그마를 올바른 점프와 올바르게 연관 시키려면 ID를 시그마 계산 UDF에 전달한 다음 결과를 다시 결합 (나쁜 생각)해야합니다. 요약 통계를 먼저 (평균 및 표준 편차) 사용한 다음 나중에 시그마를 도출합니다. 방법은 다음과 같습니다.
jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; jumperSummaries = FOREACH byJumper GENERATE group AS jumper, FLATTEN(jumps.(attempt, distance, location)), myfuncs.mean(jumps.distance) AS mean, myfunds.stddev(jumps.distance) AS stddev; sigmas = FOREACH jumperSummaries GENERATE jumper, attempt, distance, location, myfuncs.sigma(distance, mean, stddev) AS sigma;
FLATTEN은 모든 점프를 그룹 해제하고 원래 입력을 되돌려줍니다. 단, 모든 레코드는 해당 점퍼에 대한 평균 및 표준 편차를 복사 한 다음 각 점프마다 시그마를 계산하는 데 사용할 수 있습니다.
이것은 개념적으로 두 단계이지만 여전히 맵 축소 작업을 한 번만 수행합니다.
-
==============================
2.WinnieNicklaus의 답변과 비교하고 의견을 제시하기 위해 내가 생각해 낸 해결책은 다음과 같습니다.
WinnieNicklaus의 답변과 비교하고 의견을 제시하기 위해 내가 생각해 낸 해결책은 다음과 같습니다.
Register 'jumper.py' using jython as myfuncs; jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; sigmas0 = FOREACH byJumper GENERATE FLATTEN(jumps), FLATTEN(myfuncs.conv2sigma(jumps.(jumper,attempt,distance))); sigmas1 = FILTER sigmas0 BY jumper == s_id AND attempt == s_att; sigmas = FOREACH sigmas1 GENERATE jumper, attempt, distance, location, sigma; rmf sigmas STORE sigmas INTO 'sigmas' USING PigStorage(',');
첫 번째 FOREACH는 (잠재적으로 큰) 교차 곱 sigma0을 작성하고, 제품의 "잘못된"요소를 필터링하고 원하는 필드를 생성합니다. JOIN은 종종 이런 식으로 학술적으로 설명됩니다.
속도가 느릴 수 있습니다.
그러나 여전히 단일 Map-Reduce 작업이 발생하며 실제로는 빠른 것 같습니다.
나를위한 큰 승리는 내 UDF가 임의로 복잡한 작업을 수행하고 입력 데이터에 다시 연결된 많은 튜플을 임의로 반환 할 수 있다는 것입니다.
from https://stackoverflow.com/questions/21711584/cutting-down-bag-to-pass-to-udf by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Pig Latin의 STRSPLIT 및 REGEX_EXTRACT_ALL (0) | 2019.09.07 |
---|---|
[HADOOP] FATAL master.HMaster : 예기치 않은 상태 : .. 오프라인으로 전환 할 수 없습니다 (0) | 2019.09.07 |
[HADOOP] S3 병렬 읽기 및 쓰기 성능? (0) | 2019.09.07 |
[HADOOP] PIG : 열 이름에서 '::'를 제거하는 방법 (0) | 2019.09.07 |
[HADOOP] HDFS는 사용 가능한 블록을 어떻게 계산합니까? (0) | 2019.09.07 |