Flume
둘러보기로 가기
검색하러 가기
분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스인 Flume을 정리 합니다.
- 다운로드 : http://flume.apache.org/download.html
- 라이선스 : Apache 2.0
- 플랫폼 : Java
목차
Flume 개요
분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스
Flume Architecture
Source |
|
Interceptor |
|
Channel Selector |
|
Channel |
|
Sink |
|
Sink Processor |
|
Serializer / Deserializer |
|
Handler |
|
Encryption | |
Appender |
|
Reporting |
|
Plugin |
|
Source/Channel/Sink 종류
Source | Channel | Sink |
Avro : Avro 프로토콜로 수집 | Memory : Memory 사용 | Avro : Avro 프로토콜로 전송 |
Thrift : Thrift 프로토콜로 수집 | JDBC : DB 사용 | Thrift : Thrift 프로토콜로 전송 |
Syslog : Syslog 프로토콜로 수집
|
File : File 사용 | IRC : IRC 프로토콜로 전송 |
HTTP : HTTP 프로토콜로 수집 | ElasticSearch : Elastic에 저장 | |
JMS : JMS 프로토콜로 수집
|
MorphlineSolr : Solr에 저장 | |
NetCat : TCP/IP 데이터 수집 | HDFS : HDFS에 저장 | |
Exec : Linux 명령어로 수집 |
HBase : HBase에 저장
| |
Spooling Directory : 폴더에 신규로 추가된 파일 수집 | Logger : 테스트 또는 디버깅을 위해 로깅 | |
Sequence Generator : 0부터 1씩 증가하는 event 생성 | File Roll : 파일로 저장 | |
Legacy : 이전 버전의 Flume으로부터 데이터 수집
|
Null : 아무 일도 하지 않음 | |
Custom | Custom | Custom |
CentOS에서 Flume 설치
사전 준비 사항
설치
- Apache Flume을 다운로드 합니다.
wget http://apache.tt.co.kr/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz tar zxvf apache-flume-1.4.0-bin.tar.gz chown -R root:root apache-flume-1.4.0-bin mv apache-flume-1.4.0-bin /nas/appl/flume cd /nas/appl/flume/conf cp flume-conf.properties.template flume-conf.properties cp flume-env.sh.template flume-env.sh chmod 755 flume-env.sh
- vi /nas/appl/flume/conf/log4j.properties
flume.log.dir=/nas/appl/flume/logs
- vi ~/.bashrc
export FLUME_HOME=/nas/appl/flume export PATH=$PATH:$FLUME_HOME/bin
- 버전 확인
flume-ng version
- Flume을 설치하면 Avro jar 파일이 아래와 같이 존재 합니다.
lib/avro-1.7.2.jar lib/avro-ipc-1.7.2.jar
서비스 확인
- 파일 저장 폴더 생성
mkdir /nas/appl/flume/storage mkdir /nas/appl/flume/storage/checkpoint mkdir /nas/appl/flume/storage/data mkdir /nas/appl/flume/storage/file
- vi conf/flume-conf.properties
agent_001.sources=source-001 agent_001.sources.source-001.channels=channel-001 agent_001.sources.source-001.type = exec agent_001.sources.source-001.command = /bin/cat /nas/appl/flume/conf/flume-conf.properties agent_001.channels=channel-001 agent_001.channels.channel-001.type = file agent_001.channels.channel-001.checkpointDir = /nas/appl/flume/storage/checkpoint agent_001.channels.channel-001.dataDirs = /nas/appl/flume/storage/data #agent_001.channels.channel-001.type = memory #agent_001.channels.channel-001.capacity = 1000 #agent_001.channels.channel-001.transactionCapacity = 100 agent_001.sinks=sink-001 agent_001.sinks.sink-001.channel=channel-001 agent_001.sinks.sink-001.type = file_roll agent_001.sinks.sink-001.sink.directory = /nas/appl/flume/storage/file
- Agent 실행
flume-ng agent -n agent_001 -c /nas/appl/flume/conf -f /nas/appl/flume/conf/flume-conf.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
- Agent 동작 확인
- http://localhost:41414/ 사이트에 접속하여 확인 합니다.
- /nas/appl/flume/logs/flume.log 로그 파일에 오류가 있는지 확인 합니다.
- /nas/appl/flume/storage/file/ 폴더에 파일이 생성 되었는지 확인 합니다.
Flume 매뉴얼
Flume conf 개요
- Flume 도움말
flume-ng help
- Agent 실행
flume-ng agent -n $agent_name -c conf -f flume-conf.properties flume-ng agent -n $agent_name -c conf -f flume-conf.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
- avro-client
flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
- vi /nas/appl/flume/conf/flume-conf.properties
agent_001.sources = source-001 agent_001.channels = channel-001 agent_001.sinks = sink-001 agent_001.sources.source-001.channels = channel-001 agent_001.sinks.sink-001.channel = channel-001
- Flume Channel Selector
agent_001.sources.$source.selector.type = replicating, multiplexing, com.jopenbusiness.flume.selector001
- Flume Interceptor
- Interceptor : host, timestamp, static, regex_filter
agent_001.sources.$source.interceptors = i1 i2 agent_001.sources.$source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder agent_001.sources.$source.interceptors.i1.preserveExisting = false agent_001.sources.$source.interceptors.i1.hostHeader = hostname agent_001.sources.$source.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
- regex_filter interceptor
agent_001.sources.$source.interceptors.i1.regex = (\\d):(\\d):(\\d) agent_001.sources.$source.interceptors.i1.serializers = s1 s2 s3 agent_001.sources.$source.interceptors.i1.serializers.s1.name = one agent_001.sources.$source.interceptors.i1.serializers.s2.name = two agent_001.sources.$source.interceptors.i1.serializers.s3.name = three
- Event Serializers
- serializer : text, avro_event
agent_001.sinks.$sink.sink.serializer = text agent_001.sinks.$sink.sink.serializer.appendNewline = false
- Flume Sink Processor
agent_001.sinks.$sink.processor.type = default, failover, load_balance, com.jopenbusiness.flume.processor001
- failover sik processor
agent_001.sinkgroups = group001 agent_001.sinkgroups.group001.sinks = sink001 sink002 agent_001.sinkgroups.group001.type = failover agent_001.sinkgroups.group001.priority.sink001 = 5 agent_001.sinkgroups.group001.priority.sink002 = 10 agent_001.sinkgroups.group001.maxpenalty = 10000
Flume Source
- source 종류
- exec : cat. 파일 내용, tail. 추가되는 파일 내용
- hdfs
- avro, thrift, memory
- jms, spooldir, netcat, seq, http,
- syslogtcp, multiport_syslogtcp, syslogudp
- org.apache.flume.source.avroLegacy.AvroLegacySource
- org.apache.flume.source.thriftLegacy.ThriftLegacySource
- org.apache.flume.source.scribe.ScribeSource
- custom source
agent_001.sources.$source.type = com.jopenbusiness.flume.soure001
- hdfs source
agent_001.sources.$source.type = hdfs agent_001.sources.$source.bind = hdfs://namenode/user/flume/weblog
- exec source
agent_001.sources.$source.type = exec agent_001.sources.$source.command = /bin/bash -c agent_001.sources.$source.shell = /bin/bash -c agent_001.sources.$source.restartThrottle = 10000 agent_001.sources.$source.restart = false agent_001.sources.$source.logStdErr = false agent_001.sources.$source.batchSize = 1000 agent_001.sources.$source.selector.type = replicating 또는 multiplexing agent_001.sources.$source.selector.* = ~ agent_001.sources.$source.interceptors = 1000 agent_001.sources.$source.interceptors.* = ~ agent_001.sources.$source.type = exec agent_001.sources.$source.command = tail -f ~ agent_001.sources.$source.logStdErr = true agent_001.sources.$source.restart = true agent_001.sources.$source.type = exec agent_001.sources.$source.command = for i in /path/*.txt; do cat $i; done agent_001.sources.$source.shell = /bin/bash -c
- avro source
agent_001.sources.$source.type = avro agent_001.sources.$source.bind = 0.0.0.0 agent_001.sources.$source.port = 10000
- thrift source
agent_001.sources.$source.type = thrift agent_001.sources.$source.bind = 0.0.0.0 agent_001.sources.$source.port = 4141
- memory source
agent_001.sources.$source.type = memory agent_001.sources.$source.capacity = 1000 agent_001.sources.$source.transactionCapacity = 100
- multiplexing selector
agent_001.sources.$source.selector.type = multiplexing agent_001.sources.$source.selector.header = <someHeader> agent_001.sources.$source.selector.mapping.<Value1> = <Channel1> agent_001.sources.$source.selector.mapping.<Value2> = <Channel1> <Channel2> agent_001.sources.$source.selector.mapping.<Value3> = <Channel2>
Flume Channel
- Channel 종류
- memory, file
- jdbc
- org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
- org.apache.flume.channel.PseudoTxnMemoryChannel
- Custom channel
agent_001.channels.$channel.type = com.jopenbusiness.flume.channel001
- memory channel
agent_001.channels.$channel.type = memory agent_001.channels.$channel.capacity = 1000
- file channel
- mkdir /nas/appl/flume/storage
- mkdir /nas/appl/flume/storage/checkpoint
- mkdir /nas/appl/flume/storage/data
agent_001.channels.$channel.type = file agent_001.channels.$channel.checkpointDir = /nas/appl/flume/storage/checkpoint agent_001.channels.$channel.dataDirs = /nas/appl/flume/storage/data
Flume Sink
- sink 종류
- hdfs
- file_roll
- logger, avro, thrift, irc, null
- hbase, asynchbase
- org.apache.flume.sink.elasticsearch.ElasticSearchSink
- Custom sink
agent_001.sinks.$sink.type = com.jopenbusiness.flume.sink001
- hdfs sink
- hadoop.jar 필요
agent_001.sinks.$sink.type = hdfs agent_001.sinks.$sink.hdfs.path= hdfs://namenode:9000/user/flume/weblog/%y%m%d agent_001.sinks.$sink.hdfs.filePrefix = http- agent_001.sinks.$sink.hdfs.fileSuffix = .log agent_001.sinks.$sink.hdfs.round = true agent_001.sinks.$sink.hdfs.roundValue = 10 agent_001.sinks.$sink.hdfs.roundUnit = minute
- file_roll sink
- mkdir /nas/appl/flume/storage
- mkdir /nas/appl/flume/storage/file
agent_001.sinks.$sink.type = file_roll agent_001.sinks.$sink.sink.directory = /nas/appl/flume/storage/file
- avro sink
agent_001.sinks.$sink.type = avro agent_001.sinks.$sink.hostname = localhost agent_001.sinks.$sink.port = 4545
- logger sink
agent_001.sinks.$sink.type = logger
Avro로 agent 연결
- agent001 -> agent002 연결
agent_001.sinks.$sink.type = avro agent_001.sinks.$sink.hostname = hostForAgent002 agent_001.sinks.$sink.port = 41414 agent_001.sinks.$sink.batch-size = 100 agent_001.sinks.$sink.runner.type = polling agent_001.sinks.$sink.runner.polling.interval = 10 //--- Collector에서 port를 bind 합니다. agent_002.sources.$source.type = avro agent_002.sources.$source.bind = hostForAgent002 agent_002.sources.$source.port = 41414
Monitor
- ganglia reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
- JSON Reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
- Custom reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Flume Agent 사례
exec -> memory -> avro
- flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보 |
|
Source (exec) |
agent_001.sources = source-001 agent_001.sources.source-001.channels = channel-001 agent_001.sources.source-001.type = exec agent_001.sources.source-001.command = /cloudnas/develop/tail_00001.bash agent_001.sources.source-001.restartThrottle = 1000 agent_001.sources.source-001.restart = true agent_001.sources.source-001.logStdErr = false agent_001.sources.source-001.batchSize = 100
|
Channel (memory) |
agent_001.channels = channel-001 agent_001.channels.channel-001.type = memory agent_001.channels.channel-001.capacity = 240000 agent_001.channels.channel-001.transactionCapactiy = 100 agent_001.channels.channel-001.keep-alive = 3 agent_001.channels.channel-001.byteCapacityBufferPercentage = 20 agent_001.channels.channel-001.byteCapacity = 24960000
|
Sink (Sink) |
agent_001.sinks = sink-001 agent_001.sinks.sink-001.channel = channel-001 agent_001.sinks.sink-001.type = avro agent_001.sinks.sink-001.hostname = 192.168.56.201 agent_001.sinks.sink-001.port = 4141 agent_001.sinks.sink-001.batch-size = 100 agent_001.sinks.sink-001.connect-timeout = 20000 agent_001.sinks.sink-001.request-timeout = 20000
|
avro -> memory -> file_roll
- flume-ng agent -n agent_002 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보 |
|
Source (avro) |
agent_002.sources = source-002 agent_002.sources.source-002.channels = channel-002 agent_002.sources.source-002.type = avro agent_002.sources.source-002.bind = 0.0.0.0 agent_002.sources.source-002.port = 4141 |
Channel (memory) |
생략 |
Sink (file_roll) |
agent_002.sinks = sink-002 agent_002.sinks.sink-002.channel = channel-002 agent_002.sinks.sink-002.type = file_roll agent_002.sinks.sink-002.sink.directory = /cloudnas/develop/output/fileRoll agent_002.sinks.sink-002.sink.rollInterval = 30 agent_002.sinks.sink-002.batchSize = 100
|
exec -> memory -> null
- flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보 |
|
Source (exec) |
생략 |
Channel (memory) |
생략 |
Sink (null) |
agent_001.sinks = sink-001 agent_001.sinks.sink-001.channel = channel-001 agent_001.sinks.sink-001.type = null agent_001.sinks.sink-001.batchSize = 100
|
Flume 개발
- Flume Library
- lib/flume-ng-configuration-1.4.0.jar
- lib/flume-ng-sdk-1.4.0.jar
- lib/flume-ng-core-1.4.0.jar
Source |
|
Interceptor |
agent_001.sources.source-001.interceptors = ConversionInterceptor agent_001.sources.source-001.interceptors.ConversionInterceptor.type = com.jopenbusiness.hadoop.flume.interceptor.ConversionInterceptor$Builder |
Channel Selector |
|
Channel |
|
Sink |
|
Sink Processor |
|
- Component
- Event
- Source
- Sink
- Channel
- Source and Sink Runners
- Agent
- Configuration Provider
- Client
- Physical node
- Logical node
- 참고 문헌
로그 데이터 구조
- 로그 데이터
- ymd string : 년월일
- hms string : 시분초
- severity string : 심각도 (ERROR, WARNING, INFO, ...)
- server string
- process_id int
- message string
기술지원
- 오류 : org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel
- Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
Flume 구성
- 참고 문헌