[HADOOP] pyspark로 HBase로 스트리밍하기
HADOOPpyspark로 HBase로 스트리밍하기
스칼라를 사용하여 Spark 스트리밍 (이 두 가지가 특히 유용했습니다)과 Java에 대한 정보가 있지만, PySpark로 정보를 전달할 수있는 정보가 부족한 것으로 보입니다. 그래서 내 질문은 :
현재 가지고있는 코드는 다음과 같습니다.
if __name__ == "__main__":
context = SparkContext(appName="PythonHBaseBulkLoader")
streamingContext = StreamingContext(context, 5)
stream = streamingContext.textFileStream("file:///test/input");
stream.foreachRDD(bulk_load)
streamingContext.start()
streamingContext.awaitTermination()
내가 도움이 필요한 것은 대량로드 기능입니다.
def bulk_load(rdd):
#???
이전에 많은 진도를 보였습니다 (여기 및 여기에 문서화 된 것처럼)
해결법
-
==============================
1.그래서 많은 시행 착오 끝에 나는 내가 제시 한 최선의 것을 제시한다. 그것은 잘 작동하고 성공적으로 일괄로드 데이터 (Puts 또는 HFiles 사용) 나는 그것이 최선의 방법이 아니므로 믿고 기꺼이 모든 의견 / 다른 답변을 환영합니다. 여기서는 데이터에 CSV를 사용한다고 가정합니다.
그래서 많은 시행 착오 끝에 나는 내가 제시 한 최선의 것을 제시한다. 그것은 잘 작동하고 성공적으로 일괄로드 데이터 (Puts 또는 HFiles 사용) 나는 그것이 최선의 방법이 아니므로 믿고 기꺼이 모든 의견 / 다른 답변을 환영합니다. 여기서는 데이터에 CSV를 사용한다고 가정합니다.
대량로드를위한 가장 쉬운 방법은 CSV에있는 각 셀에 대해 Put 요청을 생성하고 HBase에 큐에 넣기 만하면됩니다.
def bulk_load(rdd): #Your configuration will likely be different. Insert your own quorum and parent node and table name conf = {"hbase.zookeeper.qourum": "localhost:2181",\ "zookeeper.znode.parent": "/hbase-unsecure",\ "hbase.mapred.outputtable": "Test",\ "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\ "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\ "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" load_rdd = rdd.flatMap(lambda line: line.split("\n"))\#Split the input into individual lines .flatMap(csv_to_key_value)#Convert the CSV line to key value pairs load_rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
csv_to_key_value 함수는 마술이 일어나는 곳입니다 :
def csv_to_key_value(row): cols = row.split(",")#Split on commas. #Each cell is a tuple of (key, [key, column-family, column-descriptor, value]) #Works well for n>=1 columns result = ((cols[0], [cols[0], "f1", "c1", cols[1]]), (cols[0], [cols[0], "f2", "c2", cols[2]]), (cols[0], [cols[0], "f3", "c3", cols[3]])) return result
이전에 정의한 값 변환기는이 튜플을 HBase Puts로 변환합니다.
HFiles를 대량로드하는 것이 더 효율적입니다. 각 셀에 대한 Put 요청 대신 HFile이 직접 쓰여지고 RegionServer는 새로운 HFile을 가리 키도록 간단하게 전달됩니다. 이것은 Py4J를 사용할 것이기 때문에 파이썬 코드 전에 작은 자바 프로그램을 작성해야합니다 :
import py4j.GatewayServer; import org.apache.hadoop.hbase.*; public class GatewayApplication { public static void main(String[] args) { GatewayApplication app = new GatewayApplication(); GatewayServer server = new GatewayServer(app); server.start(); } }
이것을 컴파일하고 실행하십시오. 스트리밍이 진행되는 동안 실행 상태로 둡니다. 이제 다음과 같이 bulk_load를 업데이트하십시오.
def bulk_load(rdd): #The output class changes, everything else stays conf = {"hbase.zookeeper.qourum": "localhost:2181",\ "zookeeper.znode.parent": "/hbase-unsecure",\ "hbase.mapred.outputtable": "Test",\ "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\ "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\ "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}#"org.apache.hadoop.hbase.client.Put"} keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" load_rdd = rdd.flatMap(lambda line: line.split("\n"))\ .flatMap(csv_to_key_value)\ .sortByKey(True) #Don't process empty RDDs if not load_rdd.isEmpty(): #saveAsNewAPIHadoopDataset changes to saveAsNewAPIHadoopFile load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime, "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2", conf=conf, keyConverter=keyConv, valueConverter=valueConv) #The file has now been written, but HBase doesn't know about it #Get a link to Py4J gateway = JavaGateway() #Convert conf to a fully fledged Configuration type config = dict_to_conf(conf) #Set up our HTable htable = gateway.jvm.org.apache.hadoop.hbase.client.HTable(config, "Test") #Set up our path path = gateway.jvm.org.apache.hadoop.fs.Path("/tmp/hfiles" + startTime) #Get a bulk loader loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(config) #Load the HFile loader.doBulkLoad(path, htable) else: print("Nothing to process")
마지막으로, 상당히 간단 dict_to_conf :
def dict_to_conf(conf): gateway = JavaGateway() config = gateway.jvm.org.apache.hadoop.conf.Configuration() keys = conf.keys() vals = conf.values() for i in range(len(keys)): config.set(keys[i], vals[i]) return config
보시다시피, HFiles를 사용한 대량로드는 Puts를 사용하는 것보다 복잡하지만 데이터로드에 따라 일단 작업을 시작하면 그만큼 가치가 있습니다.
마지막으로 HFile은받은 데이터가 어휘 순서로 쓰여질 것으로 기대합니다. 이것은 항상 "10"< "9"이후로 항상 보장되는 것은 아닙니다. 고유 한 키를 설계 한 경우 다음과 같이 쉽게 수정할 수 있습니다.
load_rdd = rdd.flatMap(lambda line: line.split("\n"))\ .flatMap(csv_to_key_value)\ .sortByKey(True)#Sort in ascending order
from https://stackoverflow.com/questions/35077986/streaming-to-hbase-with-pyspark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 오류 : 생성자 htable (구성 문자열)이 사용되지 않습니다. (0) | 2019.07.30 |
---|---|
[HADOOP] 복잡한 중첩 된 Json에 대한 하이브 (0) | 2019.07.30 |
[HADOOP] Hadoop에서 헤더가있는 파일 처리 (0) | 2019.07.30 |
[HADOOP] 람다 아키텍처 - 왜 배치 레이어 (0) | 2019.07.30 |
[HADOOP] getCacheFiles ()와 getLocalCacheFiles ()는 같은 것입니까? (0) | 2019.07.30 |