복붙노트

[HADOOP] pyspark로 HBase로 스트리밍하기

HADOOP

pyspark로 HBase로 스트리밍하기

스칼라를 사용하여 Spark 스트리밍 (이 두 가지가 특히 유용했습니다)과 Java에 대한 정보가 있지만, PySpark로 정보를 전달할 수있는 정보가 부족한 것으로 보입니다. 그래서 내 질문은 :

현재 가지고있는 코드는 다음과 같습니다.

if __name__ == "__main__":

    context = SparkContext(appName="PythonHBaseBulkLoader")
    streamingContext = StreamingContext(context, 5)

    stream = streamingContext.textFileStream("file:///test/input");

    stream.foreachRDD(bulk_load)

    streamingContext.start()
    streamingContext.awaitTermination()

내가 도움이 필요한 것은 대량로드 기능입니다.

def bulk_load(rdd):
    #???

이전에 많은 진도를 보였습니다 (여기 및 여기에 문서화 된 것처럼)

해결법

  1. ==============================

    1.그래서 많은 시행 착오 끝에 나는 내가 제시 한 최선의 것을 제시한다. 그것은 잘 작동하고 성공적으로 일괄로드 데이터 (Puts 또는 HFiles 사용) 나는 그것이 최선의 방법이 아니므로 믿고 기꺼이 모든 의견 / 다른 답변을 환영합니다. 여기서는 데이터에 CSV를 사용한다고 가정합니다.

    그래서 많은 시행 착오 끝에 나는 내가 제시 한 최선의 것을 제시한다. 그것은 잘 작동하고 성공적으로 일괄로드 데이터 (Puts 또는 HFiles 사용) 나는 그것이 최선의 방법이 아니므로 믿고 기꺼이 모든 의견 / 다른 답변을 환영합니다. 여기서는 데이터에 CSV를 사용한다고 가정합니다.

    대량로드를위한 가장 쉬운 방법은 CSV에있는 각 셀에 대해 Put 요청을 생성하고 HBase에 큐에 넣기 만하면됩니다.

    def bulk_load(rdd):
        #Your configuration will likely be different. Insert your own quorum and parent node and table name
        conf = {"hbase.zookeeper.qourum": "localhost:2181",\
                "zookeeper.znode.parent": "/hbase-unsecure",\
                "hbase.mapred.outputtable": "Test",\
                "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
                "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
                "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
    
        keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
        valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    
        load_rdd = rdd.flatMap(lambda line: line.split("\n"))\#Split the input into individual lines
                      .flatMap(csv_to_key_value)#Convert the CSV line to key value pairs
        load_rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
    

    csv_to_key_value 함수는 마술이 일어나는 곳입니다 :

    def csv_to_key_value(row):
        cols = row.split(",")#Split on commas.
        #Each cell is a tuple of (key, [key, column-family, column-descriptor, value])
        #Works well for n>=1 columns
        result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
                  (cols[0], [cols[0], "f2", "c2", cols[2]]),
                  (cols[0], [cols[0], "f3", "c3", cols[3]]))
        return result
    

    이전에 정의한 값 변환기는이 튜플을 HBase Puts로 변환합니다.

    HFiles를 대량로드하는 것이 더 효율적입니다. 각 셀에 대한 Put 요청 대신 HFile이 직접 쓰여지고 RegionServer는 새로운 HFile을 가리 키도록 간단하게 전달됩니다. 이것은 Py4J를 사용할 것이기 때문에 파이썬 코드 전에 작은 자바 프로그램을 작성해야합니다 :

    import py4j.GatewayServer;
    import org.apache.hadoop.hbase.*;
    
    public class GatewayApplication {
    
        public static void main(String[] args)
        {
            GatewayApplication app = new GatewayApplication();
            GatewayServer server = new GatewayServer(app);
            server.start();
        }
    }
    

    이것을 컴파일하고 실행하십시오. 스트리밍이 진행되는 동안 실행 상태로 둡니다. 이제 다음과 같이 bulk_load를 업데이트하십시오.

    def bulk_load(rdd):
        #The output class changes, everything else stays
        conf = {"hbase.zookeeper.qourum": "localhost:2181",\
                "zookeeper.znode.parent": "/hbase-unsecure",\
                "hbase.mapred.outputtable": "Test",\
                "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\
                "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
                "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}#"org.apache.hadoop.hbase.client.Put"}
    
        keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
        valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    
        load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                      .flatMap(csv_to_key_value)\
                      .sortByKey(True)
        #Don't process empty RDDs
        if not load_rdd.isEmpty():
            #saveAsNewAPIHadoopDataset changes to saveAsNewAPIHadoopFile
            load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
                                            "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
                                            conf=conf,
                                            keyConverter=keyConv,
                                            valueConverter=valueConv)
            #The file has now been written, but HBase doesn't know about it
    
            #Get a link to Py4J
            gateway = JavaGateway()
            #Convert conf to a fully fledged Configuration type
            config = dict_to_conf(conf)
            #Set up our HTable
            htable = gateway.jvm.org.apache.hadoop.hbase.client.HTable(config, "Test")
            #Set up our path
            path = gateway.jvm.org.apache.hadoop.fs.Path("/tmp/hfiles" + startTime)
            #Get a bulk loader
            loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(config)
            #Load the HFile
            loader.doBulkLoad(path, htable)
        else:
            print("Nothing to process")
    

    마지막으로, 상당히 간단 dict_to_conf :

    def dict_to_conf(conf):
        gateway = JavaGateway()
        config = gateway.jvm.org.apache.hadoop.conf.Configuration()
        keys = conf.keys()
        vals = conf.values()
        for i in range(len(keys)):
            config.set(keys[i], vals[i])
        return config
    

    보시다시피, HFiles를 사용한 대량로드는 Puts를 사용하는 것보다 복잡하지만 데이터로드에 따라 일단 작업을 시작하면 그만큼 가치가 있습니다.

    마지막으로 HFile은받은 데이터가 어휘 순서로 쓰여질 것으로 기대합니다. 이것은 항상 "10"< "9"이후로 항상 보장되는 것은 아닙니다. 고유 한 키를 설계 한 경우 다음과 같이 쉽게 수정할 수 있습니다.

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                  .flatMap(csv_to_key_value)\
                  .sortByKey(True)#Sort in ascending order
    
  2. from https://stackoverflow.com/questions/35077986/streaming-to-hbase-with-pyspark by cc-by-sa and MIT license