복붙노트

[HADOOP] 수 아파치 flume hdfs 싱크 동적 경로를 쓸 수 있습니까?

HADOOP

수 아파치 flume hdfs 싱크 동적 경로를 쓸 수 있습니까?

나는 아파시 용의자가 처음이다. 나는 json (http 소스)을 어떻게 얻을 수 있는지보고, 내용을 분석하여 hdfs의 동적 경로에 저장한다. 예 : json이 다음과 같은 경우 :

[{   
  "field1" : "value1",
  "field2" : "value2"
}]

hdfs 경로는 다음과 같습니다.  / some-default-root-path / value1 / value2 / some-value-name-file 저에게 그것을 가능하게하는 flume의 그런 윤곽 있는가?

다음은 현재 구성입니다 (http를 통해 json을 수락하고 타임 스탬프에 따라 경로에 저장합니다).

#flume.conf: http source, hdfs sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =  org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 9000
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/uri/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

감사!

해결법

  1. ==============================

    1.해결책은 hdfs 싱크에 대한 flume 문서에 있습니다.

    해결책은 hdfs 싱크에 대한 flume 문서에 있습니다.

    다음은 수정 된 구성입니다.

    #flume.conf: http source, hdfs sink
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type =  org.apache.flume.source.http.HTTPSource
    a1.sources.r1.port = 9000
    #a1.sources.r1.handler = org.apache.flume.http.JSONHandler
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /user/uri/events/%{field1}
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1  
    

    그리고 컬 :

    curl -X POST -d '[{  "headers" : {           "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"            },  "body" : "random_body"  }]' localhost:9000
    
  2. from https://stackoverflow.com/questions/14830147/can-apache-flume-hdfs-sink-accept-dynamic-path-to-write by cc-by-sa and MIT license