[HADOOP] 하이브 쿼리가있는 Hadoop의 연속 레코드 간의 차이점 계산
HADOOP하이브 쿼리가있는 Hadoop의 연속 레코드 간의 차이점 계산
하이브 테이블에 고객 호출 데이터가 있습니다. 단순성을 위해 두 개의 열이 있습니다. 첫 번째 열은 고객 ID를 보유하고 두 번째 열은 호출의 시간 소인 (유닉스 시간 소인)을 보유합니다.
이 표를 쿼리하여 각 고객에 대한 모든 통화를 찾을 수 있습니다.
SELECT * FROM mytable SORT BY customer_id, call_time;
결과는 다음과 같습니다.
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
각 고객에 대해 두 번째 호출부터 두 번의 연속 호출 사이의 시간 간격을 반환하는 하이브 쿼리를 만들 수 있습니까? 위의 예에서 쿼리는 다음을 반환해야합니다.
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
SQL 솔루션에서 솔루션을 적용하려고했지만 Hive 제한 사항이 남아 있습니다. FROM에서만 하위 쿼리를 받아들이고 조인은 등식 만 포함해야합니다.
고맙습니다.
EDIT1
하이브 UDF 함수를 사용하려고했습니다.
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
"델타"라는 이름으로 사용하십시오.
그러나 MAP 시간에 사용 중이라고 (로그 및 결과에서) 보입니다. 2 가지 문제가 발생합니다.
첫째, 테이블 데이터는이 기능을 사용하기 전에 고객 ID 및 타임 스탬프별로 정렬해야합니다. 검색어 :
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
내 기능을 사용하면 오랫동안 정렬 부분이 REDUCE 시간에 수행되기 때문에 작동하지 않습니다.
함수를 사용하기 전에 테이블 데이터를 정렬 할 수 있지만, 오버 헤드가 있으므로 피하기를 희망하기 때문에이 점에 만족하지 않습니다.
둘째 : 분산 된 Hadoop 구성의 경우, 데이터는 사용 가능한 작업 추적자들로 나뉘어집니다. 따라서 각 매퍼마다 하나씩이 함수가 여러 개있을 것으로 예상되므로 동일한 고객 데이터를 두 매퍼간에 분할 할 수 있습니다. 이 경우 고객 전화가 끊깁니다. 이는 허용되지 않습니다.
이 문제를 해결하는 방법을 모르겠습니다. DISTRIBUTE BY는 특정 값을 가진 모든 데이터가 동일한 감속기로 보내지도록 보장합니다 (따라서 SORT가 예상대로 작동 함을 보장합니다). 매퍼에 대해 비슷한 것이 있으면 누구라도 알 수 있습니까?
다음으로 libjack의 제안에 따라 축소 스크립트를 사용할 계획입니다. 이 "계산"은 다른 하이브 쿼리 사이에 필요하므로 Balaswamy vaddeman이 제안한 것처럼 다른 도구로 이동하기 전에 하이브가 제공하는 모든 기능을 시험해보고 싶습니다.
첫 번째 :
필자는 맞춤 스크립트 솔루션을 조사하기 시작했습니다. 하지만 Programming Hive book (이 장에서는 사용자 정의 스크립트를 설명 함)의 14 장 첫 페이지에서 다음 단락을 발견했습니다.
따라서 커스텀 스크립트는 효율성 측면에서 최고의 솔루션이 아닙니다.
하지만 UDF 기능을 어떻게 유지해야합니까?하지만 분산 된 Hadoop 구성에서 예상대로 작동하는지 확인하십시오. 언어 수동 UDF 위키 페이지의 UDF 내부 섹션에서이 질문에 대한 답을 찾았습니다. 내 쿼리를 작성하는 경우 :
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
REDUCE 시간에 실행되며 DISTRIBUTE BY 및 SORT BY 구문을 사용하면 동일한 고객의 모든 레코드가 동일한 축소기에 의해 호출 순서대로 처리됩니다.
그래서 위의 UDF와이 쿼리 구조가 내 문제를 해결합니다.
(링크를 추가하지 않아서 미안하지만, 평판 포인트가 충분하지 않아서 링크를 추가 할 수 없습니다.)
해결법
-
==============================
1.그것은 오래된 질문이지만, 미래의 참고 문헌으로 나는 또 다른 명제를 적었습니다.
그것은 오래된 질문이지만, 미래의 참고 문헌으로 나는 또 다른 명제를 적었습니다.
하이브 창 함수를 사용하면 쿼리의 이전 / 다음 값을 사용할 수 있습니다.
비슷한 코드 쿼리는 다음과 같습니다.
SELECT customer_id, LAG (call_time, 1, 0) 이상 (PARTITION BY customer_id ORDER BY call_time) - call_time FROM mytable;
-
==============================
2.Java 또는 Python과 같은 다른 프로그래밍 언어와 함께 명시 적 MAP-REDUCE를 사용할 수 있습니다. 지도 {cutomer_id, call_time}에서 내보내고 감속기에서 {customer_id, list {time_stamp}}를 얻을 수 있으며 감속기에서이 시간 스탬프를 정렬하고 데이터를 처리 할 수 있습니다.
Java 또는 Python과 같은 다른 프로그래밍 언어와 함께 명시 적 MAP-REDUCE를 사용할 수 있습니다. 지도 {cutomer_id, call_time}에서 내보내고 감속기에서 {customer_id, list {time_stamp}}를 얻을 수 있으며 감속기에서이 시간 스탬프를 정렬하고 데이터를 처리 할 수 있습니다.
-
==============================
3.어쩌면 누군가 비슷한 요구 사항을 만났을 것입니다. 해결책은 다음과 같습니다.
어쩌면 누군가 비슷한 요구 사항을 만났을 것입니다. 해결책은 다음과 같습니다.
1) 사용자 지정 함수 만들기 :
package com.example; // imports (they depend on the hive version) @Description(name = "delta", value = "_FUNC_(customer id column, call time column) " + "- computes the time passed between two succesive records from the same customer. " + "It generates 3 columns: first contains the customer id, second contains call time " + "and third contains the time passed from the previous call. This function returns only " + "the records that have a previous call from the same customer (requirements are not applicable " + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS" + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable " + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;") public class DeltaComputerUDTF extends GenericUDTF { private static final int NUM_COLS = 3; private Text[] retCols; // array of returned column values private ObjectInspector[] inputOIs; // input ObjectInspectors private String prevCustomerId; private Long prevCallTime; @Override public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException { if (ois.length != 2) { throw new UDFArgumentException( "There must be 2 arguments: customer Id column name and call time column name"); } inputOIs = ois; // construct the output column data holders retCols = new Text[NUM_COLS]; for (int i = 0; i < NUM_COLS; ++i) { retCols[i] = new Text(); } // construct output object inspector List<String> fieldNames = new ArrayList<String>(NUM_COLS); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS); for (int i = 0; i < NUM_COLS; ++i) { // column name can be anything since it will be named by UDTF as clause fieldNames.add("c" + i); // all returned type will be Text fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); } return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]); Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]); if (customerId.equals(prevCustomerId)) { retCols[0].set(customerId); retCols[1].set(callTime.toString()); retCols[2].set(new Long(callTime - prevCallTime).toString()); forward(retCols); } // Store the current customer data, for the next line prevCustomerId = customerId; prevCallTime = callTime; } @Override public void close() throws HiveException { // TODO Auto-generated method stub } }
2)이 함수를 포함하는 jar 파일을 만듭니다. jarname이 myjar.jar라고 가정합니다.
3) 항아리를 하이브로 기계에 복사하십시오. / tmp에 있다고 가정하십시오.
4) Hive 내부에 사용자 정의 함수를 정의하십시오.
ADD JAR /tmp/myjar.jar; CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF';
5) 쿼리를 실행합니다 :
SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
비고 :
from https://stackoverflow.com/questions/14648201/compute-differences-between-succesive-records-in-hadoop-with-hive-queries by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 어떻게 hadoop 순차 파일을 읽는가? (0) | 2019.06.22 |
---|---|
[HADOOP] 지도 뒤섞기를위한 셔플 및 정렬 (0) | 2019.06.22 |
[HADOOP] Hive를 사용하여 날짜 차이를 몇 분 만에 얻는 방법 (0) | 2019.06.22 |
[HADOOP] 자바에서 hdfs 폴더 삭제 (0) | 2019.06.22 |
[HADOOP] Hadoop에 JAVA_Home이 설정되지 않았습니다. (0) | 2019.06.22 |