- Flume 개요
- Flume Architecture
- Source/Channel/Sink 종류
- CentOS에서 Flume 설치
- 사전 준비 사항
- 설치
- 서비스 확인
- Flume 매뉴얼
- Flume conf 개요
- Flume Source
- Flume Channel
- Flume Sink
- Avro로 agent 연결
- Monitor
- Flume Agent 사례
- exec -> memory -> avro
- avro -> memory -> file_roll
- exec -> memory -> null
- Flume 개발
- 로그 데이터 구조
- 기술지원
- Flume 구성
- 참고 문헌
분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스인 Flume을 정리 합니다.
- 홈페이지 : http://flume.apache.org/index.html
- 다운로드 : http://flume.apache.org/download.html
- 라이선스 : Apache 2.0
- 플랫폼 : Java
Flume 개요
분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스
Flume Architecture
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Source |width="70%" align="left" valign="middle"|
- 데이터를 수집 합니다.
|- |align="center" valign="middle" style="background-color:#eee;"|Interceptor |align="left" valign="middle"| - 수집한 데이터를 변경 또는 삭제 합니다.
- 종류 - 삽입 : Timestamp, Host, Static, UUID
- 종류 - 변형/삭제 : Morphline, Regex Filtering, Regex Extractor
|- |align="center" valign="middle" style="background-color:#eee;"|Channel Selector |align="left" valign="middle"| - Source에서 Channel로 연동시 Channel을 지정 합니다.
- 종류 : Replicating (Default), Multiplexing, Custom
|- |align="center" valign="middle" style="background-color:#eee;"|Channel |align="left" valign="middle"| - 데이터를 Source에서 Sink로 전달하는 통로
|- |align="center" valign="middle" style="background-color:#eee;"|Sink |align="left" valign="middle"| - 데이터 저장, 전달 합니다.
|- |align="center" valign="middle" style="background-color:#eee;"|Sink Processor |align="left" valign="middle"| - Sink할 대상을 다중 선택 합니다.
- 종류 : Default, Failover, Loadbalancing, Custom
- Sink Group : 여러개의 Sink를 하나의 그룹으로 관리
|}
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Serializer / Deserializer |width="70%" align="left" valign="middle"|
- Serializer : Body Text, Avro Event
- Deserializer : LINE, AVRO, BlobDeserializer
|- |align="center" valign="middle" style="background-color:#eee;"|Handler |align="left" valign="middle"| - JSONHandler, BlobHandler
|- |align="center" valign="middle" style="background-color:#eee;"|Encryptionalign="left" valign="middle" align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" - Log4J Appender, Load Balancing Log4J Appender
|- |align="center" valign="middle" style="background-color:#eee;"|Reporting |align="left" valign="middle"| - Ganglia, JSON, Custom
|- |align="center" valign="middle" style="background-color:#eee;"|Plugin |align="left" valign="middle"| - $FLUME_HOME/plugins.d/플러그인명/
- $FLUME_HOME/plugins.d/플러그인명/lib/~.jar
- $FLUME_HOME/plugins.d/플러그인명/libext/~.jar
- $FLUME_HOME/plugins.d/플러그인명/native/~.so
|}
Source/Channel/Sink 종류
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="35%" align="center" valign="middle" style="background-color:#eee;"|Source |width="30%" align="center" valign="middle" style="background-color:#eee;"|Channel
|width="35%" align="center" valign="middle" style="background-color:#eee;"|Sink |- |align="left" valign="middle"|Avro : Avro 프로토콜로 수집 |align="left" valign="middle"|Memory : Memory 사용
|align="left" valign="middle"|Avro : Avro 프로토콜로 전송 |- |align="left" valign="middle"|Thrift : Thrift 프로토콜로 수집 |align="left" valign="middle"|JDBC : DB 사용
|align="left" valign="middle"|Thrift : Thrift 프로토콜로 전송 |- |align="left" valign="middle"| Syslog : Syslog 프로토콜로 수집
- Syslog TCP, Multiport Syslog TCP
- Syslog UDP
|align="left" valign="middle"|File : File 사용 |align="left" valign="middle"|IRC : IRC 프로토콜로 전송 |- |align="left" valign="middle"|HTTP : HTTP 프로토콜로 수집 |align="left" valign="middle"| |align="left" valign="middle"|ElasticSearch : Elastic에 저장 |- |align="left" valign="middle"| JMS : JMS 프로토콜로 수집 - Pluggable converter 지원
|align="left" valign="middle"| |align="left" valign="middle"|MorphlineSolr : Solr에 저장 |- |align="left" valign="middle"|NetCat : TCP/IP 데이터 수집 |align="left" valign="middle"| |align="left" valign="middle"|HDFS : HDFS에 저장 |- |align="left" valign="middle"|Exec : Linux 명령어로 수집 |align="left" valign="middle"| |align="left" valign="middle"| HBase : HBase에 저장 - HBaseSink, AsyncHBaseSink
|- |align="left" valign="middle"|Spooling Directory : 폴더에 신규로 추가된 파일 수집 |align="left" valign="middle"| |align="left" valign="middle"|Logger : 테스트 또는 디버깅을 위해 로깅 |- |align="left" valign="middle"|Sequence Generator : 0부터 1씩 증가하는 event 생성 |align="left" valign="middle"| |align="left" valign="middle"|File Roll : 파일로 저장 |- |align="left" valign="middle"| Legacy : 이전 버전의 Flume으로부터 데이터 수집 - Avro Legacy, Thrift legacy
|align="left" valign="middle"| |align="left" valign="middle"|Null : 아무 일도 하지 않음 |- |align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |}
CentOS에서 Flume 설치
사전 준비 사항
설치
- Apache Flume을 다운로드 합니다.
wget http://apache.tt.co.kr/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz
tar zxvf apache-flume-1.4.0-bin.tar.gz
chown -R root:root apache-flume-1.4.0-bin
mv apache-flume-1.4.0-bin /nas/appl/flume
cd /nas/appl/flume/conf
cp flume-conf.properties.template flume-conf.properties
cp flume-env.sh.template flume-env.sh
chmod 755 flume-env.sh
- vi /nas/appl/flume/conf/log4j.properties
flume.log.dir=/nas/appl/flume/logs
- vi ~/.bashrc
export FLUME_HOME=/nas/appl/flume
export PATH=$PATH:$FLUME_HOME/bin
- 버전 확인
flume-ng version
- Flume을 설치하면 Avro jar 파일이 아래와 같이 존재 합니다.
lib/avro-1.7.2.jar
lib/avro-ipc-1.7.2.jar
서비스 확인
- 파일 저장 폴더 생성
mkdir /nas/appl/flume/storage
mkdir /nas/appl/flume/storage/checkpoint
mkdir /nas/appl/flume/storage/data
mkdir /nas/appl/flume/storage/file
- vi conf/flume-conf.properties
agent_001.sources=source-001
agent_001.sources.source-001.channels=channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /bin/cat /nas/appl/flume/conf/flume-conf.properties
agent_001.channels=channel-001
agent_001.channels.channel-001.type = file
agent_001.channels.channel-001.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.channel-001.dataDirs = /nas/appl/flume/storage/data
#agent_001.channels.channel-001.type = memory
#agent_001.channels.channel-001.capacity = 1000
#agent_001.channels.channel-001.transactionCapacity = 100
agent_001.sinks=sink-001
agent_001.sinks.sink-001.channel=channel-001
agent_001.sinks.sink-001.type = file_roll
agent_001.sinks.sink-001.sink.directory = /nas/appl/flume/storage/file
- Agent 실행
flume-ng agent -n agent_001 -c /nas/appl/flume/conf -f /nas/appl/flume/conf/flume-conf.properties
-Dflume.monitoring.type=http -Dflume.monitoring.port=41414
- Agent 동작 확인
- http://localhost:41414/ 사이트에 접속하여 확인 합니다.
- /nas/appl/flume/logs/flume.log 로그 파일에 오류가 있는지 확인 합니다.
- /nas/appl/flume/storage/file/ 폴더에 파일이 생성 되었는지 확인 합니다.
Flume 매뉴얼
Flume conf 개요
- Flume 도움말
flume-ng help
- Agent 실행
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=http -Dflume.monitoring.port=41414
- avro-client
flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
- vi /nas/appl/flume/conf/flume-conf.properties
agent_001.sources = source-001
agent_001.channels = channel-001
agent_001.sinks = sink-001
agent_001.sources.source-001.channels = channel-001
agent_001.sinks.sink-001.channel = channel-001
- Flume Channel Selector
agent_001.sources.$source.selector.type = replicating, multiplexing, com.jopenbusiness.flume.selector001
- Flume Interceptor
- Interceptor : host, timestamp, static, regex_filter
agent_001.sources.$source.interceptors = i1 i2
agent_001.sources.$source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent_001.sources.$source.interceptors.i1.preserveExisting = false
agent_001.sources.$source.interceptors.i1.hostHeader = hostname
agent_001.sources.$source.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
- regex_filter interceptor
agent_001.sources.$source.interceptors.i1.regex = (\\d):(\\d):(\\d)
agent_001.sources.$source.interceptors.i1.serializers = s1 s2 s3
agent_001.sources.$source.interceptors.i1.serializers.s1.name = one
agent_001.sources.$source.interceptors.i1.serializers.s2.name = two
agent_001.sources.$source.interceptors.i1.serializers.s3.name = three
- Event Serializers
- serializer : text, avro_event
agent_001.sinks.$sink.sink.serializer = text
agent_001.sinks.$sink.sink.serializer.appendNewline = false
- Flume Sink Processor
agent_001.sinks.$sink.processor.type = default, failover, load_balance, com.jopenbusiness.flume.processor001
- failover sik processor
agent_001.sinkgroups = group001
agent_001.sinkgroups.group001.sinks = sink001 sink002
agent_001.sinkgroups.group001.type = failover
agent_001.sinkgroups.group001.priority.sink001 = 5
agent_001.sinkgroups.group001.priority.sink002 = 10
agent_001.sinkgroups.group001.maxpenalty = 10000
Flume Source
- source 종류
- exec : cat. 파일 내용, tail. 추가되는 파일 내용
- hdfs
- avro, thrift, memory
- jms, spooldir, netcat, seq, http,
- syslogtcp, multiport_syslogtcp, syslogudp
- org.apache.flume.source.avroLegacy.AvroLegacySource
- org.apache.flume.source.thriftLegacy.ThriftLegacySource
- org.apache.flume.source.scribe.ScribeSource
- custom source
agent_001.sources.$source.type = com.jopenbusiness.flume.soure001
- hdfs source
agent_001.sources.$source.type = hdfs
agent_001.sources.$source.bind = hdfs://namenode/user/flume/weblog
- exec source
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = /bin/bash -c
agent_001.sources.$source.shell = /bin/bash -c
agent_001.sources.$source.restartThrottle = 10000
agent_001.sources.$source.restart = false
agent_001.sources.$source.logStdErr = false
agent_001.sources.$source.batchSize = 1000
agent_001.sources.$source.selector.type = replicating 또는 multiplexing
agent_001.sources.$source.selector.* = ~
agent_001.sources.$source.interceptors = 1000
agent_001.sources.$source.interceptors.* = ~
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = tail -f ~
agent_001.sources.$source.logStdErr = true
agent_001.sources.$source.restart = true
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = for i in /path/*.txt; do cat $i; done
agent_001.sources.$source.shell = /bin/bash -c
- avro source
agent_001.sources.$source.type = avro
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 10000
- thrift source
agent_001.sources.$source.type = thrift
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 4141
- memory source
agent_001.sources.$source.type = memory
agent_001.sources.$source.capacity = 1000
agent_001.sources.$source.transactionCapacity = 100
- multiplexing selector
agent_001.sources.$source.selector.type = multiplexing
agent_001.sources.$source.selector.header =
agent_001.sources.$source.selector.mapping. =
agent_001.sources.$source.selector.mapping. =
agent_001.sources.$source.selector.mapping. =
Flume Channel
- Channel 종류
- memory, file
- jdbc
- org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
- org.apache.flume.channel.PseudoTxnMemoryChannel
- Custom channel
agent_001.channels.$channel.type = com.jopenbusiness.flume.channel001
- memory channel
agent_001.channels.$channel.type = memory
agent_001.channels.$channel.capacity = 1000
- file channel
- mkdir /nas/appl/flume/storage
- mkdir /nas/appl/flume/storage/checkpoint
- mkdir /nas/appl/flume/storage/data
agent_001.channels.$channel.type = file
agent_001.channels.$channel.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.$channel.dataDirs = /nas/appl/flume/storage/data
Flume Sink
- sink 종류
- hdfs
- file_roll
- logger, avro, thrift, irc, null
- hbase, asynchbase
- org.apache.flume.sink.elasticsearch.ElasticSearchSink
- Custom sink
agent_001.sinks.$sink.type = com.jopenbusiness.flume.sink001
- hdfs sink
- hadoop.jar 필요
agent_001.sinks.$sink.type = hdfs
agent_001.sinks.$sink.hdfs.path= hdfs://namenode:9000/user/flume/weblog/%y%m%d
agent_001.sinks.$sink.hdfs.filePrefix = http-
agent_001.sinks.$sink.hdfs.fileSuffix = .log
agent_001.sinks.$sink.hdfs.round = true
agent_001.sinks.$sink.hdfs.roundValue = 10
agent_001.sinks.$sink.hdfs.roundUnit = minute
- file_roll sink
- mkdir /nas/appl/flume/storage
- mkdir /nas/appl/flume/storage/file
agent_001.sinks.$sink.type = file_roll
agent_001.sinks.$sink.sink.directory = /nas/appl/flume/storage/file
- avro sink
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = localhost
agent_001.sinks.$sink.port = 4545
- logger sink
agent_001.sinks.$sink.type = logger
Avro로 agent 연결
- agent001 -> agent002 연결
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = hostForAgent002
agent_001.sinks.$sink.port = 41414
agent_001.sinks.$sink.batch-size = 100
agent_001.sinks.$sink.runner.type = polling
agent_001.sinks.$sink.runner.polling.interval = 10
//--- Collector에서 port를 bind 합니다.
agent_002.sources.$source.type = avro
agent_002.sources.$source.bind = hostForAgent002
agent_002.sources.$source.port = 41414
Monitor
- ganglia reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
- JSON Reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=http -Dflume.monitoring.port=41414
- Custom reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Flume Agent 사례
exec -> memory -> avro
- flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 |width="80%" align="left" valign="middle"| - Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
|- |align="center" valign="middle" style="background-color:#eee;"|Source
(exec) |align="left" valign="middle"|
agent_001.sources = source-001
agent_001.sources.source-001.channels = channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /cloudnas/develop/tail_00001.bash
agent_001.sources.source-001.restartThrottle = 1000
agent_001.sources.source-001.restart = true
agent_001.sources.source-001.logStdErr = false
agent_001.sources.source-001.batchSize = 100
- restartThrottle : 비정상 종료시 재시작 시간 (ms)
- batchSize : Transaction당 event 수
|- |align="center" valign="middle" style="background-color:#eee;"|Channel
(memory) |align="left" valign="middle"|
agent_001.channels = channel-001
agent_001.channels.channel-001.type = memory
agent_001.channels.channel-001.capacity = 240000
agent_001.channels.channel-001.transactionCapactiy = 100
agent_001.channels.channel-001.keep-alive = 3
agent_001.channels.channel-001.byteCapacityBufferPercentage = 20
agent_001.channels.channel-001.byteCapacity = 24960000
- capacity : 채널에 저장 가능한 event의 최대 개수 (Count * 100)
- transactionCapacity : 한 transaction당 수신/송신할 수 있는 최대 event 개수
- byteCapacity : 저장 가능한 bytes 수 (capacity * Size * 2)
|- |align="center" valign="middle" style="background-color:#eee;"|Sink
(Sink) |align="left" valign="middle"|
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = avro
agent_001.sinks.sink-001.hostname = 192.168.56.201
agent_001.sinks.sink-001.port = 4141
agent_001.sinks.sink-001.batch-size = 100
agent_001.sinks.sink-001.connect-timeout = 20000
agent_001.sinks.sink-001.request-timeout = 20000
- batch-size : Transaction당 event 수
|}
avro -> memory -> file_roll
- flume-ng agent -n agent_002 -f /nas/appl/flume/conf/flume-conf.properties
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 |width="80%" align="left" valign="middle"| - Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
|- |align="center" valign="middle" style="background-color:#eee;"|Source
(avro) |align="left" valign="middle"|
agent_002.sources = source-002
agent_002.sources.source-002.channels = channel-002
agent_002.sources.source-002.type = avro
agent_002.sources.source-002.bind = 0.0.0.0
agent_002.sources.source-002.port = 4141
|-
|align="center" valign="middle" style="background-color:#eee;"|Channel
(memory)
|align="left" valign="middle"|생략
|-
|align="center" valign="middle" style="background-color:#eee;"|Sink
(file_roll)
|align="left" valign="middle"|
agent_002.sinks = sink-002
agent_002.sinks.sink-002.channel = channel-002
agent_002.sinks.sink-002.type = file_roll
agent_002.sinks.sink-002.sink.directory = /cloudnas/develop/output/fileRoll
agent_002.sinks.sink-002.sink.rollInterval = 30
agent_002.sinks.sink-002.batchSize = 100
- batchSize : Transaction당 event 수
|}
exec -> memory -> null
- flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 |width="80%" align="left" valign="middle"| - Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
|- |align="center" valign="middle" style="background-color:#eee;"|Source
(exec) |align="left" valign="middle"|생략 |- |align="center" valign="middle" style="background-color:#eee;"|Channel
(memory) |align="left" valign="middle"|생략 |- |align="center" valign="middle" style="background-color:#eee;"|Sink
(null) |align="left" valign="middle"|
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = null
agent_001.sinks.sink-001.batchSize = 100
- batchSize : Transaction당 event 수
|}
Flume 개발
- Flume Library
- lib/flume-ng-configuration-1.4.0.jar
- lib/flume-ng-sdk-1.4.0.jar
- lib/flume-ng-core-1.4.0.jar
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Source |width="70%" align="left" valign="middle"|
- org.apache.flume.Source
- 구현 : org.apache.flumen.source.ExecSource
|- |align="center" valign="middle" style="background-color:#eee;"|Interceptor |align="left" valign="middle"| - org.apache.flume.interceptor.Interceptor
- 구현 : org.apache.flume.interceptor.TimestampInterceptor$Builder
agent_001.sources.source-001.interceptors = ConversionInterceptor
agent_001.sources.source-001.interceptors.ConversionInterceptor.type =
com.jopenbusiness.hadoop.flume.interceptor.ConversionInterceptor$Builder
|- |align="center" valign="middle" style="background-color:#eee;"|Channel Selector |align="left" valign="middle"|
org.apache.flume.ChannelSelector
구현 : org.apache.flume.channel.MultiplexingChannelSelector
|- |align="center" valign="middle" style="background-color:#eee;"|Channel |align="left" valign="middle"|org.apache.flume.Channel
구현 : org.apache.flume.channel.MemoryChannel
|- |align="center" valign="middle" style="background-color:#eee;"|Sink |align="left" valign="middle"|org.apache.flume.Sink
구현 : org.apache.flume.sink.LoggerSink
|- |align="center" valign="middle" style="background-color:#eee;"|Sink Processor |align="left" valign="middle"|org.apache.flume.SinkProcessor
구현 : org.apache.flume.sink.LoadBalancingSinkProcessor
|}Component
- Event
- Source
- Sink
- Channel
- Source and Sink Runners
- Agent
- Configuration Provider
- Client
Physical node
Logical node
로그 데이터 구조
- 로그 데이터
- ymd string : 년월일
- hms string : 시분초
- severity string : 심각도 (ERROR, WARNING, INFO, ...)
- server string
- process_id int
- message string
기술지원
- 오류 : org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel
- Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
Flume 구성
참고 문헌
Streaming data into Apache HBase using Apache Flume, 2012.12
http://blog.sematext.com/2011/07/28/flume-and-hbase-integration/
분류: BigData