[HADOOP] Apache Spark을 사용하여 pdf / audio / video 파일 (구조화되지 않은 데이터)을 읽을 수 있습니까?
HADOOPApache Spark을 사용하여 pdf / audio / video 파일 (구조화되지 않은 데이터)을 읽을 수 있습니까?
Apache Spark을 사용하여 pdf / audio / video 파일 (구조화되지 않은 데이터)을 읽을 수 있습니까? 예를 들어 수천 개의 PDF 인보이스가 있는데 그 중 일부 데이터를 읽고 그에 대한 분석을 수행하려고합니다. 비정형 데이터를 처리하기 위해 수행해야 할 단계는 무엇입니까?
해결법
-
==============================
1.예, 그렇습니다. sparkContext.binaryFiles를 사용하여 파일을 이진 형식으로로드 한 다음 map을 사용하여 값을 다른 형식으로 매핑합니다 (예 : Apache Tika 또는 Apache POI로 이진 구문 분석).
예, 그렇습니다. sparkContext.binaryFiles를 사용하여 파일을 이진 형식으로로드 한 다음 map을 사용하여 값을 다른 형식으로 매핑합니다 (예 : Apache Tika 또는 Apache POI로 이진 구문 분석).
의사 코드 :
val rawFile = sparkContext.binaryFiles(... val ready = rawFile.map ( here parsing with other framework
무엇이 중요한지, 파싱은 이전에 내 답변에서 언급 한 다른 프레임 워크로 수행되어야합니다. Map는 InputStream를 인수로 가져옵니다.
-
==============================
2.입력 파일에 사용자 지정 암호 해독 알고리즘을 사용해야하는 경우가있었습니다. 스칼라 또는 파이썬에서이 코드를 다시 작성하기를 원하지 않았습니다. Python-Spark 코드는 다음과 같습니다.
입력 파일에 사용자 지정 암호 해독 알고리즘을 사용해야하는 경우가있었습니다. 스칼라 또는 파이썬에서이 코드를 다시 작성하기를 원하지 않았습니다. Python-Spark 코드는 다음과 같습니다.
from pyspark import SparkContext, SparkConf, HiveContext, AccumulatorParam def decryptUncompressAndParseFile(filePathAndContents): '''each line of the file becomes an RDD record''' global acc_errCount, acc_errLog proc = subprocess.Popen(['custom_decrypt_program','--decrypt'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (unzippedData, err) = proc.communicate(input=filePathAndContents[1]) if len(err) > 0: # problem reading the file acc_errCount.add(1) acc_errLog.add('Error: '+str(err)+' in file: '+filePathAndContents[0]+ ', on host: '+ socket.gethostname()+' return code:'+str(returnCode)) return [] # this is okay with flatMap records = list() iterLines = iter(unzippedData.splitlines()) for line in iterLines: #sys.stderr.write('Line: '+str(line)+'\n') values = [x.strip() for x in line.split('|')] ... records.append( (... extract data as appropriate from values into this tuple ...) ) return records class StringAccumulator(AccumulatorParam): ''' custom accumulator to holds strings ''' def zero(self,initValue=""): return initValue def addInPlace(self,str1,str2): return str1.strip()+'\n'+str2.strip() def main(): ... global acc_errCount, acc_errLog acc_errCount = sc.accumulator(0) acc_errLog = sc.accumulator('',StringAccumulator()) binaryFileTup = sc.binaryFiles(args.inputDir) # use flatMap instead of map, to handle corrupt files linesRdd = binaryFileTup.flatMap(decryptUncompressAndParseFile, True) df = sqlContext.createDataFrame(linesRdd, ourSchema()) df.registerTempTable("dataTable") ...
사용자 정의 문자열 누적 기는 손상된 입력 파일을 식별하는 데 매우 유용했습니다.
from https://stackoverflow.com/questions/44890381/is-it-possible-to-read-pdf-audio-video-filesunstructured-data-using-apache-spa by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 데이터를 HDFS로 복사 할 때 createBlockOutputStream의 예외 (0) | 2019.06.11 |
---|---|
[HADOOP] Spark / Python에서 누락 된 누락 값 전달 (0) | 2019.06.11 |
[HADOOP] 와일드 카드가있는 Hadoop HDFS 사본? (0) | 2019.06.11 |
[HADOOP] 돼지 :리스트에 변수 위에 루프를 쓸 수 있습니까? (0) | 2019.06.11 |
[HADOOP] Windows에서 Mapreduce (yarn)를 실행하는 동안 오류가 발생했습니다. (0) | 2019.06.11 |