복붙노트

[HADOOP] 로컬 MapReduce의 실행의 출력으로부터 다른 하둡 MapReduce의 스트리밍 출력

HADOOP

로컬 MapReduce의 실행의 출력으로부터 다른 하둡 MapReduce의 스트리밍 출력

나는 파이썬으로 작성된 간단한 맵리 듀스 작업을 실행하고 나는 내가 스크립트를 로컬로 테스트 할 때 나는 하둡에 작업을 실행할 때, 내가 밖으로 다음 다른를 얻을 것으로 나타났습니다. 내 입력은 가지 무언가이다 :

key1        val1
key1        val2
key1        val3
key1        val4
key2        val1
key2        val3
key2        val5
key3        val5
key4        val4

내 사상은 해당 키의리스트 (문자열 (); 키 1을 val2; val3 키 1, 키 2 .... 예컨대 VAL1 KEY1, 키 2)가 값의 사전을 생성한다. 그리고 사전에 각 값에 대해 나는 가능한 모든 키 쌍을 인쇄 할 수 있습니다. 그래서 내 매퍼의 출력은 다음과 같습니다

key1_key2   1   # obtained from val1
key1_key2   1   # obtained from val3
key1_key4   1   # obtained from val4
key2_key3   1   # obtained from val5

감속기는 동일한 키 쌍의 수를 카운트하고 카운트를 인쇄합니다. 내 매퍼 코드는 다음과 같습니다

val_dic = dict()
def print_dic(dic):
    for val, key_array in dic.iteritems():
        key_pair= ""
        i=0
        j=1
        for i in range(len(key_array)-1):
            for j in range(i+1,len(key_array)):
                key_pair = key_array[i]+"_"+key_array[j]
                print "{0}\t{1}".format(key_pair,"1")
for line in sys.stdin:  
    key, val = line.strip().split("\t")
    if (not val in val_dic):
        val_dic[val]=[]
    val_dic[val].append(key) 
print_dic(val_dic)

감속이 모두 동일 값을 계산한다 :

   current_pair = None
    current_count = 0 
    for line in sys.stdin:
    key_pair, count = line.strip().split("\t")
    count = int(count)
        if current_pair == key_pair:
            current_count += count
        else:
            print "{0}\t{1}".format(current_pair,str(current_count))
            current_pair = key_pair
            current_count = count
    print "{0}\t{1}".format(current_pair,str(current_count))

내가 더 큰 데이터 세트에 하둡에 그것을 실행할 때 절반의 결과가 누락 된 것 같습니다. 나는 사용하여 로컬 컴퓨터에서 테스트 할 때     고양이 입력 | mapper.py | 종류 | reducer.py> 외 지역 입력이 reasonalbe 작은 경우, 그것을 잘 작동하지만, 더 큰 데이터 세트 (예를 들어, 1M 항목)에 로컬 출력 파일은 하둡의 맵리 듀스 작업을 실행에서 얻은 것보다 거의 두 배나 많은 항목이 있습니다. 코드에서 오류가 있습니까? 아니면 내가 뭔가를 놓친 거지? 어떤 도움을 매우 높이 평가된다.

해결법

  1. ==============================

    1.귀하의 매퍼는 주어진 값을보고 모든 키 페어의 조합을 생성합니다.

    귀하의 매퍼는 주어진 값을보고 모든 키 페어의 조합을 생성합니다.

    맵 - 감소의 모델은 병렬 방식으로 황당 매퍼 프로세스, 입력의 각 레코드는 키 - 값 쌍을 방출한다는 것이다. 이 키 - 값 쌍에 레코드를 매핑합니다. 사실, 일반적인 네이티브 (자바) 매퍼는 한 번에 하나의 레코드 "를 참조하십시오"할 수 있으므로 스트리밍 매퍼가하는 방법을 작동하지 않을 수 있습니다.

    스트리밍 API를, 당신은 약간의 "속임수"로 분류하고 한 번에 전체 입력 분할을 처리 할 수 ​​있습니다 - 당신에게 주어진 파일의 전체 덩어리, 당신은 그 덩어리의 입력의 모든 레코드를 처리 할 수에 대한, 그리고 그것은 가능 단지 각각의 키 - 값 쌍을지도보다 다른 작업을 할 수 있습니다. 그러나 일반적으로는 전체 입력에 액세스 할 수 없습니다; 입력은 분할로 헤어 졌하고, 매퍼 각 스플릿를 얻을 수있다. 스플릿 전체 입력을 포함하는 경우, 당신은지도 상에있는 병렬이없는, 모든에서 하둡을 사용하는 이유가 없습니다.

    무엇 거의 확실하게 여기에서 일어나고있는 당신의 입력 파일이 두 개의 분할로 헤어 졌 도착한다는 것입니다, 당신의 분할에 당신이 입력의 모든 레코드가 없기 때문에 이제 더 이상 매퍼는 주어진 값에 해당하는 모든 키 쌍을 찾을 수 있습니다. 그래서 예를 들어, 대략 두 가지로 사용자가 제공 한 입력 파일을 깨는 모든 "key1s"하나, 그리고 다른 사람들과 일을 고려하십시오. 지도는-줄일 모든 입력에 로컬로 설정 실행 한 번에 당신이 기대하는 출력을 생성합니다 :

    $ cat input1 input2 | ./map.py | sort | ./reduce.py 
    None    0
    key1_key2   2
    key1_key4   1
    key2_key3   1
    

    그러나 하둡 워크 플로우는 다른 맵퍼가 상을 줄일 수 / 각 입력을 얻는다면, 그들은 단지 셔플에 결합하고 있다는 것입니다 :

    $ cat input1 | ./map.py > output1
    $ cat input2 | ./map.py > output2
    $ cat output1 output2 | sort | ./reduce.py 
    None    0
    key2_key3   1
    

    그래서 지금 당신은 결과를 놓치고있어. 이는 어떤 개인 매퍼가 모든 데이터를 볼려고하지를위한 하둡을 사용하는 것이 나을 어떤 경우에, 불가피하다.

    당신은지도는 단순히 방출 (값, 키) 쌍, 다음 감속기가 주어진 값을 함께 키를 모두 수집 한 후 카운트 모든 키 쌍을 생성하도록, 일을 리팩토링해야합니다. 그런 다음 다른지도-감소 단계는 카운트를해야 할 것입니다.

    그래서 당신은 map1.py과 reduce1.py이있을 것이다 :

    #!/usr/bin/env python 
    # map1.py
    
    import sys
    
    for line in sys.stdin:  
        line = line.strip()
        key, val = line.strip().split("\t")
        print val, "\t", key
    
    #!/usr/bin/env python
    # reduce1.py
    
    import sys
    
    def emit_keypairs(keylist):
        for i in range(len(keylist)-1):
            for j in range(i+1,len(keylist)):
                key_pair = keylist[i]+"_"+keylist[j]
                print "{0}\t{1}".format(key_pair,"1")
    
    current_word = None
    current_keylist = []
    
    for line in sys.stdin:
        line = line.strip()
        word, key = line.split('\t', 1)
    
        if current_word == word:
            current_keylist.append(key)
        else:
            if current_word:
                emit_keypairs(current_keylist)
            current_word = word
            current_keylist = [key]
    
    # do not forget to output the last word if needed!
    if current_word == word:
        emit_keypairs(current_keylist)
    

    사람들을 실행하고 기본적으로 그냥 출력의 단어 수를 실행합니다. 이 입력 파일을 분할에 강력한 될 것입니다 :

    $ cat input1 | ./map1.py > map1
    $ cat input2 | ./map1.py > map2
    $ cat map1 map2 | sort | ./reduce1.py 
    
    key1_key2   1
    key1_key2   1
    key1_key4   1
    key2_key3   1
    

    다음 단어 수와 다른지도-감소 단계는 예상되는 결과를 생성합니다.

  2. from https://stackoverflow.com/questions/23684315/hadoop-mapreduce-streaming-output-different-from-the-output-of-running-mapreduce by cc-by-sa and MIT license