복붙노트

[HADOOP] 인터셉터를 사용하여 Flume에서 로그 파일 필터링하기

HADOOP

인터셉터를 사용하여 Flume에서 로그 파일 필터링하기

나는 HTTP 서버에 로그 파일을 작성한 다음 Flume을 사용하여 HDFS에로드합니다. 먼저 헤더 또는 본문에있는 데이터에 따라 데이터를 필터링하고 싶습니다. 나는 정규 표현식을 가진 인터셉터를 사용하여 이것을 할 수 있다고 읽었습니다. 누군가 내가 할 일을 정확히 설명 할 수 있습니까? Flume 코드를 대체하는 Java 코드를 작성해야합니까?

또한 데이터를 가져 와서 헤더에 따라 다른 싱크대로 보냅니다. 즉 소스 = 1은 싱크대 1에, 소스 2는 싱크 2로 이동합니다. 어떻게 수행됩니까?

고맙습니다,

시몬

해결법

  1. ==============================

    1.이벤트를 필터링하기 위해 Java 코드를 작성할 필요가 없습니다. Regex Filtering Interceptor를 사용하여 본문이 정규 표현식과 일치하는 이벤트를 필터링하십시오.

    이벤트를 필터링하기 위해 Java 코드를 작성할 필요가 없습니다. Regex Filtering Interceptor를 사용하여 본문이 정규 표현식과 일치하는 이벤트를 필터링하십시오.

    agent.sources.logs_source.interceptors = regex_filter_interceptor
    agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
    agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>
    agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true
    

    헤더를 기반으로 이벤트를 라우팅하려면 Multiplexing Channel Selector를 사용하십시오.

    a1.sources = r1
    a1.channels = c1 c2 c3 c4
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = state
    a1.sources.r1.selector.mapping.CZ = c1
    a1.sources.r1.selector.mapping.US = c2 c3
    a1.sources.r1.selector.default = c4
    

    여기에 "state"= "CZ"헤더가있는 이벤트는 "state"= "US"- "c2"및 "c3"채널 "c1"로 이동하고 다른 모든 이벤트는 "c4"로 이동합니다.

    이 방법으로 헤더별로 이벤트를 필터링 할 수도 있습니다. 특정 헤더 값을 널 싱크 (Null Sink)를 가리키는 채널로 라우트하면됩니다.

  2. ==============================

    2.flume 채널 선택기를 사용하여 이벤트를 다른 대상으로 간단히 라우팅 할 수 있습니다. 또는 여러 개의 flume 에이전트를 함께 연결하여 복잡한 라우팅 기능을 구현할 수 있습니다. 그러나 연쇄 된 flume agent는 유지하기가 약간 어려워 질 것입니다 (자원 사용 및 flume 토폴로지). flume-ng router sink를 살펴볼 수 있습니다. 원하는 기능을 제공 할 수 있습니다.

    flume 채널 선택기를 사용하여 이벤트를 다른 대상으로 간단히 라우팅 할 수 있습니다. 또는 여러 개의 flume 에이전트를 함께 연결하여 복잡한 라우팅 기능을 구현할 수 있습니다. 그러나 연쇄 된 flume agent는 유지하기가 약간 어려워 질 것입니다 (자원 사용 및 flume 토폴로지). flume-ng router sink를 살펴볼 수 있습니다. 원하는 기능을 제공 할 수 있습니다.

    먼저, flume 인터셉터에 의해 이벤트 헤더에 특정 필드를 추가하십시오.

    a1.sources = r1 r2
    a1.channels = c1 c2
    a1.sources.r1.channels =  c1
    a1.sources.r1.type = seq
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = datacenter
    a1.sources.r1.interceptors.i1.value = NEW_YORK
    a1.sources.r2.channels =  c2
    a1.sources.r2.type = seq
    a1.sources.r2.interceptors = i2
    a1.sources.r2.interceptors.i2.type = static
    a1.sources.r2.interceptors.i2.key = datacenter
    a1.sources.r2.interceptors.i2.value = BERKELEY
    

    그런 다음 아래와 같이 flume 채널 선택기를 설정할 수 있습니다.

    a2.sources = r2
    a2.sources.channels = c1 c2 c3 c4
    a2.sources.r2.selector.type = multiplexing
    a2.sources.r2.selector.header = datacenter
    a2.sources.r2.selector.mapping.NEW_YORK = c1
    a2.sources.r2.selector.mapping.BERKELEY= c2 c3
    a2.sources.r2.selector.default = c4
    

    또는 다음과 같이 avro-router sink를 설정할 수 있습니다.

    agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink
    agent.sinks.routerSink.hostname = test_host
    agent.sinks.routerSink.port = 34541
    agent.sinks.routerSink.channel = memoryChannel
    
    # Set sink name
    agent.sinks.routerSink.component.name = AvroRouterSink
    
    # Set header name for routing
    agent.sinks.routerSink.condition = datacenter
    
    # Set routing conditions
    agent.sinks.routerSink.conditions = east,west
    agent.sinks.routerSink.conditions.east.if = ^NEW_YORK
    agent.sinks.routerSink.conditions.east.then.hostname = east_host
    agent.sinks.routerSink.conditions.east.then.port = 34542
    agent.sinks.routerSink.conditions.west.if = ^BERKELEY
    agent.sinks.routerSink.conditions.west.then.hostname = west_host
    agent.sinks.routerSink.conditions.west.then.port = 34543
    
  3. from https://stackoverflow.com/questions/17781508/filtering-log-files-in-flume-using-interceptors by cc-by-sa and MIT license