복붙노트

[HADOOP] 하이브 쿼리가있는 Hadoop의 연속 레코드 간의 차이점 계산

HADOOP

하이브 쿼리가있는 Hadoop의 연속 레코드 간의 차이점 계산

하이브 테이블에 고객 호출 데이터가 있습니다. 단순성을 위해 두 개의 열이 있습니다. 첫 번째 열은 고객 ID를 보유하고 두 번째 열은 호출의 시간 소인 (유닉스 시간 소인)을 보유합니다.

이 표를 쿼리하여 각 고객에 대한 모든 통화를 찾을 수 있습니다.

SELECT * FROM mytable SORT BY customer_id, call_time;

결과는 다음과 같습니다.

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

각 고객에 대해 두 번째 호출부터 두 번의 연속 호출 사이의 시간 간격을 반환하는 하이브 쿼리를 만들 수 있습니까? 위의 예에서 쿼리는 다음을 반환해야합니다.

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

SQL 솔루션에서 솔루션을 적용하려고했지만 Hive 제한 사항이 남아 있습니다. FROM에서만 하위 쿼리를 받아들이고 조인은 등식 만 포함해야합니다.

고맙습니다.

EDIT1

하이브 UDF 함수를 사용하려고했습니다.

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

"델타"라는 이름으로 사용하십시오.

그러나 MAP 시간에 사용 중이라고 (로그 및 결과에서) 보입니다. 2 가지 문제가 발생합니다.

첫째, 테이블 데이터는이 기능을 사용하기 전에 고객 ID 및 타임 스탬프별로 정렬해야합니다. 검색어 :

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

내 기능을 사용하면 오랫동안 정렬 부분이 REDUCE 시간에 수행되기 때문에 작동하지 않습니다.

함수를 사용하기 전에 테이블 데이터를 정렬 할 수 있지만, 오버 헤드가 있으므로 피하기를 희망하기 때문에이 점에 만족하지 않습니다.

둘째 : 분산 된 Hadoop 구성의 경우, 데이터는 사용 가능한 작업 추적자들로 나뉘어집니다. 따라서 각 매퍼마다 하나씩이 함수가 여러 개있을 것으로 예상되므로 동일한 고객 데이터를 두 매퍼간에 분할 할 수 있습니다. 이 경우 고객 전화가 끊깁니다. 이는 허용되지 않습니다.

이 문제를 해결하는 방법을 모르겠습니다. DISTRIBUTE BY는 특정 값을 가진 모든 데이터가 동일한 감속기로 보내지도록 보장합니다 (따라서 SORT가 예상대로 작동 함을 보장합니다). 매퍼에 대해 비슷한 것이 있으면 누구라도 알 수 있습니까?

다음으로 libjack의 제안에 따라 축소 스크립트를 사용할 계획입니다. 이 "계산"은 다른 하이브 쿼리 사이에 필요하므로 Balaswamy vaddeman이 제안한 것처럼 다른 도구로 이동하기 전에 하이브가 제공하는 모든 기능을 시험해보고 싶습니다.

첫 번째 :

필자는 맞춤 스크립트 솔루션을 조사하기 시작했습니다. 하지만 Programming Hive book (이 장에서는 사용자 정의 스크립트를 설명 함)의 14 장 첫 페이지에서 다음 단락을 발견했습니다.

따라서 커스텀 스크립트는 효율성 측면에서 최고의 솔루션이 아닙니다.

하지만 UDF 기능을 어떻게 유지해야합니까?하지만 분산 된 Hadoop 구성에서 예상대로 작동하는지 확인하십시오. 언어 수동 UDF 위키 페이지의 UDF 내부 섹션에서이 질문에 대한 답을 찾았습니다. 내 쿼리를 작성하는 경우 :

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

REDUCE 시간에 실행되며 DISTRIBUTE BY 및 SORT BY 구문을 사용하면 동일한 고객의 모든 레코드가 동일한 축소기에 의해 호출 순서대로 처리됩니다.

그래서 위의 UDF와이 쿼리 구조가 내 문제를 해결합니다.

(링크를 추가하지 않아서 미안하지만, 평판 포인트가 충분하지 않아서 링크를 추가 할 수 없습니다.)

해결법

  1. ==============================

    1.그것은 오래된 질문이지만, 미래의 참고 문헌으로 나는 또 다른 명제를 적었습니다.

    그것은 오래된 질문이지만, 미래의 참고 문헌으로 나는 또 다른 명제를 적었습니다.

    하이브 창 함수를 사용하면 쿼리의 이전 / 다음 값을 사용할 수 있습니다.

    비슷한 코드 쿼리는 다음과 같습니다.

    SELECT customer_id, LAG (call_time, 1, 0) 이상 (PARTITION BY customer_id ORDER BY call_time) - call_time FROM mytable;

  2. ==============================

    2.Java 또는 Python과 같은 다른 프로그래밍 언어와 함께 명시 적 MAP-REDUCE를 사용할 수 있습니다. 지도 {cutomer_id, call_time}에서 내보내고 감속기에서 {customer_id, list {time_stamp}}를 얻을 수 있으며 감속기에서이 시간 스탬프를 정렬하고 데이터를 처리 할 수 ​​있습니다.

    Java 또는 Python과 같은 다른 프로그래밍 언어와 함께 명시 적 MAP-REDUCE를 사용할 수 있습니다. 지도 {cutomer_id, call_time}에서 내보내고 감속기에서 {customer_id, list {time_stamp}}를 얻을 수 있으며 감속기에서이 시간 스탬프를 정렬하고 데이터를 처리 할 수 ​​있습니다.

  3. ==============================

    3.어쩌면 누군가 비슷한 요구 사항을 만났을 것입니다. 해결책은 다음과 같습니다.

    어쩌면 누군가 비슷한 요구 사항을 만났을 것입니다. 해결책은 다음과 같습니다.

    1) 사용자 지정 함수 만들기 :

    package com.example;
    // imports (they depend on the hive version)
    @Description(name = "delta", value = "_FUNC_(customer id column, call time column) "
        + "- computes the time passed between two succesive records from the same customer. "
        + "It generates 3 columns: first contains the customer id, second contains call time "
        + "and third contains the time passed from the previous call. This function returns only "
        + "the records that have a previous call from the same customer (requirements are not applicable "
        + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS"
        + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable "
        + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;")
    public class DeltaComputerUDTF extends GenericUDTF {
    private static final int NUM_COLS = 3;
    
    private Text[] retCols; // array of returned column values
    private ObjectInspector[] inputOIs; // input ObjectInspectors
    private String prevCustomerId;
    private Long prevCallTime;
    
    @Override
    public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
        if (ois.length != 2) {
            throw new UDFArgumentException(
                    "There must be 2 arguments: customer Id column name and call time column name");
        }
    
        inputOIs = ois;
    
        // construct the output column data holders
        retCols = new Text[NUM_COLS];
        for (int i = 0; i < NUM_COLS; ++i) {
            retCols[i] = new Text();
        }
    
        // construct output object inspector
        List<String> fieldNames = new ArrayList<String>(NUM_COLS);
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS);
        for (int i = 0; i < NUM_COLS; ++i) {
            // column name can be anything since it will be named by UDTF as clause
            fieldNames.add("c" + i);
            // all returned type will be Text
            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        }
    
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    
    @Override
    public void process(Object[] args) throws HiveException {
        String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]);
        Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]);
    
        if (customerId.equals(prevCustomerId)) {
            retCols[0].set(customerId);
            retCols[1].set(callTime.toString());
            retCols[2].set(new Long(callTime - prevCallTime).toString());
            forward(retCols);
        }
    
        // Store the current customer data, for the next line
        prevCustomerId = customerId;
        prevCallTime = callTime;
    }
    
    @Override
    public void close() throws HiveException {
        // TODO Auto-generated method stub
    
    }
    
    }
    

    2)이 함수를 포함하는 jar 파일을 만듭니다. jarname이 myjar.jar라고 가정합니다.

    3) 항아리를 하이브로 기계에 복사하십시오. / tmp에 있다고 가정하십시오.

    4) Hive 내부에 사용자 정의 함수를 정의하십시오.

    ADD JAR /tmp/myjar.jar;
    CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF';
    

    5) 쿼리를 실행합니다 :

    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
      (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
    

    비고 :

  4. from https://stackoverflow.com/questions/14648201/compute-differences-between-succesive-records-in-hadoop-with-hive-queries by cc-by-sa and MIT license