[HADOOP] 인터셉터를 사용하여 Flume에서 로그 파일 필터링하기
HADOOP인터셉터를 사용하여 Flume에서 로그 파일 필터링하기
나는 HTTP 서버에 로그 파일을 작성한 다음 Flume을 사용하여 HDFS에로드합니다. 먼저 헤더 또는 본문에있는 데이터에 따라 데이터를 필터링하고 싶습니다. 나는 정규 표현식을 가진 인터셉터를 사용하여 이것을 할 수 있다고 읽었습니다. 누군가 내가 할 일을 정확히 설명 할 수 있습니까? Flume 코드를 대체하는 Java 코드를 작성해야합니까?
또한 데이터를 가져 와서 헤더에 따라 다른 싱크대로 보냅니다. 즉 소스 = 1은 싱크대 1에, 소스 2는 싱크 2로 이동합니다. 어떻게 수행됩니까?
고맙습니다,
시몬
해결법
-
==============================
1.이벤트를 필터링하기 위해 Java 코드를 작성할 필요가 없습니다. Regex Filtering Interceptor를 사용하여 본문이 정규 표현식과 일치하는 이벤트를 필터링하십시오.
이벤트를 필터링하기 위해 Java 코드를 작성할 필요가 없습니다. Regex Filtering Interceptor를 사용하여 본문이 정규 표현식과 일치하는 이벤트를 필터링하십시오.
agent.sources.logs_source.interceptors = regex_filter_interceptor agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex> agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true
헤더를 기반으로 이벤트를 라우팅하려면 Multiplexing Channel Selector를 사용하십시오.
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
여기에 "state"= "CZ"헤더가있는 이벤트는 "state"= "US"- "c2"및 "c3"채널 "c1"로 이동하고 다른 모든 이벤트는 "c4"로 이동합니다.
이 방법으로 헤더별로 이벤트를 필터링 할 수도 있습니다. 특정 헤더 값을 널 싱크 (Null Sink)를 가리키는 채널로 라우트하면됩니다.
-
==============================
2.flume 채널 선택기를 사용하여 이벤트를 다른 대상으로 간단히 라우팅 할 수 있습니다. 또는 여러 개의 flume 에이전트를 함께 연결하여 복잡한 라우팅 기능을 구현할 수 있습니다. 그러나 연쇄 된 flume agent는 유지하기가 약간 어려워 질 것입니다 (자원 사용 및 flume 토폴로지). flume-ng router sink를 살펴볼 수 있습니다. 원하는 기능을 제공 할 수 있습니다.
flume 채널 선택기를 사용하여 이벤트를 다른 대상으로 간단히 라우팅 할 수 있습니다. 또는 여러 개의 flume 에이전트를 함께 연결하여 복잡한 라우팅 기능을 구현할 수 있습니다. 그러나 연쇄 된 flume agent는 유지하기가 약간 어려워 질 것입니다 (자원 사용 및 flume 토폴로지). flume-ng router sink를 살펴볼 수 있습니다. 원하는 기능을 제공 할 수 있습니다.
먼저, flume 인터셉터에 의해 이벤트 헤더에 특정 필드를 추가하십시오.
a1.sources = r1 r2 a1.channels = c1 c2 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK a1.sources.r2.channels = c2 a1.sources.r2.type = seq a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = datacenter a1.sources.r2.interceptors.i2.value = BERKELEY
그런 다음 아래와 같이 flume 채널 선택기를 설정할 수 있습니다.
a2.sources = r2 a2.sources.channels = c1 c2 c3 c4 a2.sources.r2.selector.type = multiplexing a2.sources.r2.selector.header = datacenter a2.sources.r2.selector.mapping.NEW_YORK = c1 a2.sources.r2.selector.mapping.BERKELEY= c2 c3 a2.sources.r2.selector.default = c4
또는 다음과 같이 avro-router sink를 설정할 수 있습니다.
agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink agent.sinks.routerSink.hostname = test_host agent.sinks.routerSink.port = 34541 agent.sinks.routerSink.channel = memoryChannel # Set sink name agent.sinks.routerSink.component.name = AvroRouterSink # Set header name for routing agent.sinks.routerSink.condition = datacenter # Set routing conditions agent.sinks.routerSink.conditions = east,west agent.sinks.routerSink.conditions.east.if = ^NEW_YORK agent.sinks.routerSink.conditions.east.then.hostname = east_host agent.sinks.routerSink.conditions.east.then.port = 34542 agent.sinks.routerSink.conditions.west.if = ^BERKELEY agent.sinks.routerSink.conditions.west.then.hostname = west_host agent.sinks.routerSink.conditions.west.then.port = 34543
from https://stackoverflow.com/questions/17781508/filtering-log-files-in-flume-using-interceptors by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 아마존 s3n URL의 일부로 AWS 액세스 키 ID 및 비밀 키를 지정하는 방법 (0) | 2019.06.16 |
---|---|
[HADOOP] 하이브 테이블에서 열 이름 가져 오기 (0) | 2019.06.16 |
[HADOOP] hadoop (또는 dfs 명령)을 실행할 때 더 이상 사용되지 않는 오류가 계속 나타나는 이유는 무엇입니까? (0) | 2019.06.16 |
[HADOOP] 각 Hadoop 매퍼가 읽을 기본 크기는 얼마입니까? (0) | 2019.06.16 |
[HADOOP] Hadoop Mapreduce 콘솔 출력에 대한 설명 (0) | 2019.06.16 |