복붙노트

[HADOOP] PySpark : newAPIHadoopFile을 사용하여 여러 줄 레코드 텍스트 파일에서 읽고 매핑하고 줄입니다.

HADOOP

PySpark : newAPIHadoopFile을 사용하여 여러 줄 레코드 텍스트 파일에서 읽고 매핑하고 줄입니다.

나는이 게시물과 비슷한 종류의 문제를 해결하려고 노력 중이다. 내 원본 데이터는 여러 센서의 값 (관측치)을 포함하는 텍스트 파일입니다. 각 관찰은 타임 스탬프와 함께 주어 지지만 센서 이름은 한 번만 표시되며 각 줄에는 표시되지 않습니다. 그러나 하나의 파일에 여러 개의 센서가 있습니다.

Time    MHist::852-YF-007   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time    MHist::852-YF-008   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0

따라서 Hadoop을 구성하여 센서 정보가 제공된 행에서 파일을 분할하려고합니다. 그런 다음 해당 라인에서 센서 이름 (예 : 852-YF-007 및 852-YF-008)을 읽고 MapReduce를 사용하여 각 센서의 값을 적절히 읽습니다.

파이썬 (Jupyter Notebook)에서이 작업을 수행했습니다.

sheet = sc.newAPIHadoopFile(
    '/user/me/sample.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)

sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())

sf.take(50)

출력은 다음과 같습니다.

[[u'::852-YF-007\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0'],
 [u'::852-YF-008\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0']]

내 질문은 센서 이름을 추출하고 그 센서에 대한 가치관이있는 방법을 추가로 처리하는 방법입니다. 다소 좋아한다.

852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines

그러면 라인 자체가 타임 스탬프와 값으로 나뉘어집니다. 그러나 저는 센서 이름을 줄에서 분리하는 것에 더 관심이 있습니다.

해결법

  1. ==============================

    1.개인적으로 나는하고자했다 :

    개인적으로 나는하고자했다 :

    이 모든 것은 물론 하나의 기능으로 수행 할 수 있습니다.

    import dateutil.parser
    
    def process(pair):
        _, content = pair
        clean = [x.strip() for x in content.strip().splitlines()]
        if not clean:
            return []
        k, vs = clean[0], clean[1:]
        for v in vs:
            try:
                ds, x = v.split("\t")
                yield k, (dateutil.parser.parse(ds), float(x))  # or int(x)
            except ValueError:
                pass
    
    sheet.flatMap(process)
    
  2. from https://stackoverflow.com/questions/38117391/pyspark-read-map-and-reduce-from-multiline-record-textfile-with-newapihadoopfi by cc-by-sa and MIT license