복붙노트

[HADOOP] Apache Spark을 사용하여 pdf / audio / video 파일 (구조화되지 않은 데이터)을 읽을 수 있습니까?

HADOOP

Apache Spark을 사용하여 pdf / audio / video 파일 (구조화되지 않은 데이터)을 읽을 수 있습니까?

Apache Spark을 사용하여 pdf / audio / video 파일 (구조화되지 않은 데이터)을 읽을 수 있습니까? 예를 들어 수천 개의 PDF 인보이스가 있는데 그 중 일부 데이터를 읽고 그에 대한 분석을 수행하려고합니다. 비정형 데이터를 처리하기 위해 수행해야 할 단계는 무엇입니까?

해결법

  1. ==============================

    1.예, 그렇습니다. sparkContext.binaryFiles를 사용하여 파일을 이진 형식으로로드 한 다음 map을 사용하여 값을 다른 형식으로 매핑합니다 (예 : Apache Tika 또는 Apache POI로 이진 구문 분석).

    예, 그렇습니다. sparkContext.binaryFiles를 사용하여 파일을 이진 형식으로로드 한 다음 map을 사용하여 값을 다른 형식으로 매핑합니다 (예 : Apache Tika 또는 Apache POI로 이진 구문 분석).

    의사 코드 :

    val rawFile = sparkContext.binaryFiles(...
    val ready = rawFile.map ( here parsing with other framework
    

    무엇이 중요한지, 파싱은 이전에 내 답변에서 언급 한 다른 프레임 워크로 수행되어야합니다. Map는 InputStream를 인수로 가져옵니다.

  2. ==============================

    2.입력 파일에 사용자 지정 암호 해독 알고리즘을 사용해야하는 경우가있었습니다. 스칼라 또는 파이썬에서이 코드를 다시 작성하기를 원하지 않았습니다. Python-Spark 코드는 다음과 같습니다.

    입력 파일에 사용자 지정 암호 해독 알고리즘을 사용해야하는 경우가있었습니다. 스칼라 또는 파이썬에서이 코드를 다시 작성하기를 원하지 않았습니다. Python-Spark 코드는 다음과 같습니다.

    from pyspark import SparkContext, SparkConf, HiveContext, AccumulatorParam
    
    def decryptUncompressAndParseFile(filePathAndContents):
        '''each line of the file becomes an RDD record'''
        global acc_errCount, acc_errLog
        proc = subprocess.Popen(['custom_decrypt_program','--decrypt'], 
                 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (unzippedData, err) = proc.communicate(input=filePathAndContents[1])
        if len(err) > 0:  # problem reading the file
            acc_errCount.add(1)
            acc_errLog.add('Error: '+str(err)+' in file: '+filePathAndContents[0]+
                ', on host: '+ socket.gethostname()+' return code:'+str(returnCode))
            return []  # this is okay with flatMap
        records   = list()
        iterLines = iter(unzippedData.splitlines())
        for line in iterLines:
            #sys.stderr.write('Line: '+str(line)+'\n')
            values = [x.strip() for x in line.split('|')]
            ...
            records.append( (... extract data as appropriate from values into this tuple ...) )
        return records
    
    class StringAccumulator(AccumulatorParam):
        ''' custom accumulator to holds strings '''
        def zero(self,initValue=""):
            return initValue
        def addInPlace(self,str1,str2):
            return str1.strip()+'\n'+str2.strip()
    
    def main():
        ...
        global acc_errCount, acc_errLog
        acc_errCount  = sc.accumulator(0)
        acc_errLog    = sc.accumulator('',StringAccumulator())
        binaryFileTup = sc.binaryFiles(args.inputDir)
        # use flatMap instead of map, to handle corrupt files
        linesRdd = binaryFileTup.flatMap(decryptUncompressAndParseFile, True)
        df = sqlContext.createDataFrame(linesRdd, ourSchema())
        df.registerTempTable("dataTable")
        ...
    

    사용자 정의 문자열 누적 기는 손상된 입력 파일을 식별하는 데 매우 유용했습니다.

  3. from https://stackoverflow.com/questions/44890381/is-it-possible-to-read-pdf-audio-video-filesunstructured-data-using-apache-spa by cc-by-sa and MIT license