Flume

오픈소스 비즈니스 컨설팅
둘러보기로 가기 검색하러 가기

분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스인 Flume을 정리 합니다.

Flume 개요

분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스

Flume Architecture

FlumeArchitecture01.png FlumeArchitecture02.png

Source
  • 데이터를 수집 합니다.
Interceptor
  • 수집한 데이터를 변경 또는 삭제 합니다.
  • 종류 - 삽입 : Timestamp, Host, Static, UUID
  • 종류 - 변형/삭제 : Morphline, Regex Filtering, Regex Extractor
Channel Selector
  • Source에서 Channel로 연동시 Channel을 지정 합니다.
  • 종류 : Replicating (Default), Multiplexing, Custom
Channel
  • 데이터를 Source에서 Sink로 전달하는 통로
Sink
  • 데이터 저장, 전달 합니다.
Sink Processor
  • Sink할 대상을 다중 선택 합니다.
  • 종류 : Default, Failover, Loadbalancing, Custom
  • Sink Group : 여러개의 Sink를 하나의 그룹으로 관리


Serializer / Deserializer
  • Serializer : Body Text, Avro Event
  • Deserializer : LINE, AVRO, BlobDeserializer
Handler
  • JSONHandler, BlobHandler
Encryption
Appender
  • Log4J Appender, Load Balancing Log4J Appender
Reporting
  • Ganglia, JSON, Custom
Plugin
  • $FLUME_HOME/plugins.d/플러그인명/
  • $FLUME_HOME/plugins.d/플러그인명/lib/~.jar
  • $FLUME_HOME/plugins.d/플러그인명/libext/~.jar
  • $FLUME_HOME/plugins.d/플러그인명/native/~.so

Source/Channel/Sink 종류

Source Channel Sink
Avro : Avro 프로토콜로 수집 Memory : Memory 사용 Avro : Avro 프로토콜로 전송
Thrift : Thrift 프로토콜로 수집 JDBC : DB 사용 Thrift : Thrift 프로토콜로 전송

Syslog : Syslog 프로토콜로 수집

  • Syslog TCP, Multiport Syslog TCP
  • Syslog UDP
File : File 사용 IRC : IRC 프로토콜로 전송
HTTP : HTTP 프로토콜로 수집 ElasticSearch : Elastic에 저장

JMS : JMS 프로토콜로 수집

  • Pluggable converter 지원
MorphlineSolr : Solr에 저장
NetCat : TCP/IP 데이터 수집 HDFS : HDFS에 저장
Exec : Linux 명령어로 수집

HBase : HBase에 저장

  • HBaseSink, AsyncHBaseSink
Spooling Directory : 폴더에 신규로 추가된 파일 수집 Logger : 테스트 또는 디버깅을 위해 로깅
Sequence Generator : 0부터 1씩 증가하는 event 생성 File Roll : 파일로 저장

Legacy : 이전 버전의 Flume으로부터 데이터 수집

  • Avro Legacy, Thrift legacy
Null : 아무 일도 하지 않음
Custom Custom Custom

CentOS에서 Flume 설치

사전 준비 사항

설치

  • Apache Flume을 다운로드 합니다.
wget http://apache.tt.co.kr/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz
tar zxvf apache-flume-1.4.0-bin.tar.gz
chown -R root:root apache-flume-1.4.0-bin
mv apache-flume-1.4.0-bin /nas/appl/flume

cd /nas/appl/flume/conf
cp flume-conf.properties.template flume-conf.properties
cp flume-env.sh.template flume-env.sh
chmod 755 flume-env.sh
  • vi /nas/appl/flume/conf/log4j.properties
flume.log.dir=/nas/appl/flume/logs
  • vi ~/.bashrc
export FLUME_HOME=/nas/appl/flume
export PATH=$PATH:$FLUME_HOME/bin
  • 버전 확인
flume-ng version
  • Flume을 설치하면 Avro jar 파일이 아래와 같이 존재 합니다.
lib/avro-1.7.2.jar
lib/avro-ipc-1.7.2.jar

서비스 확인

  • 파일 저장 폴더 생성
mkdir /nas/appl/flume/storage
mkdir /nas/appl/flume/storage/checkpoint
mkdir /nas/appl/flume/storage/data
mkdir /nas/appl/flume/storage/file
  • vi conf/flume-conf.properties
agent_001.sources=source-001
agent_001.sources.source-001.channels=channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /bin/cat /nas/appl/flume/conf/flume-conf.properties

agent_001.channels=channel-001
agent_001.channels.channel-001.type = file
agent_001.channels.channel-001.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.channel-001.dataDirs = /nas/appl/flume/storage/data

#agent_001.channels.channel-001.type = memory
#agent_001.channels.channel-001.capacity = 1000
#agent_001.channels.channel-001.transactionCapacity = 100

agent_001.sinks=sink-001
agent_001.sinks.sink-001.channel=channel-001 
agent_001.sinks.sink-001.type = file_roll
agent_001.sinks.sink-001.sink.directory = /nas/appl/flume/storage/file
  • Agent 실행
flume-ng agent -n agent_001 -c /nas/appl/flume/conf -f /nas/appl/flume/conf/flume-conf.properties
    -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • Agent 동작 확인
  • http://localhost:41414/ 사이트에 접속하여 확인 합니다.
  • /nas/appl/flume/logs/flume.log 로그 파일에 오류가 있는지 확인 합니다.
  • /nas/appl/flume/storage/file/ 폴더에 파일이 생성 되었는지 확인 합니다.

Flume 매뉴얼

Flume conf 개요

  • Flume 도움말
flume-ng help
  • Agent 실행
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • avro-client
flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
  • vi /nas/appl/flume/conf/flume-conf.properties
agent_001.sources = source-001
agent_001.channels = channel-001
agent_001.sinks = sink-001

agent_001.sources.source-001.channels = channel-001
agent_001.sinks.sink-001.channel = channel-001
  • Flume Channel Selector
agent_001.sources.$source.selector.type = replicating, multiplexing, com.jopenbusiness.flume.selector001
  • Flume Interceptor
  • Interceptor : host, timestamp, static, regex_filter
agent_001.sources.$source.interceptors = i1 i2
agent_001.sources.$source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent_001.sources.$source.interceptors.i1.preserveExisting = false
agent_001.sources.$source.interceptors.i1.hostHeader = hostname
agent_001.sources.$source.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  • regex_filter interceptor
agent_001.sources.$source.interceptors.i1.regex = (\\d):(\\d):(\\d)
agent_001.sources.$source.interceptors.i1.serializers = s1 s2 s3
agent_001.sources.$source.interceptors.i1.serializers.s1.name = one
agent_001.sources.$source.interceptors.i1.serializers.s2.name = two
agent_001.sources.$source.interceptors.i1.serializers.s3.name = three
  • Event Serializers
  • serializer : text, avro_event
agent_001.sinks.$sink.sink.serializer = text
agent_001.sinks.$sink.sink.serializer.appendNewline = false
  • Flume Sink Processor
agent_001.sinks.$sink.processor.type = default, failover, load_balance, com.jopenbusiness.flume.processor001
  • failover sik processor
agent_001.sinkgroups = group001
agent_001.sinkgroups.group001.sinks = sink001 sink002

agent_001.sinkgroups.group001.type = failover
agent_001.sinkgroups.group001.priority.sink001 = 5
agent_001.sinkgroups.group001.priority.sink002 = 10
agent_001.sinkgroups.group001.maxpenalty = 10000

Flume Source

  • source 종류
  • exec : cat. 파일 내용, tail. 추가되는 파일 내용
  • hdfs
  • avro, thrift, memory
  • jms, spooldir, netcat, seq, http,
  • syslogtcp, multiport_syslogtcp, syslogudp
  • org.apache.flume.source.avroLegacy.AvroLegacySource
  • org.apache.flume.source.thriftLegacy.ThriftLegacySource
  • org.apache.flume.source.scribe.ScribeSource
  • custom source
agent_001.sources.$source.type = com.jopenbusiness.flume.soure001
  • hdfs source
agent_001.sources.$source.type = hdfs
agent_001.sources.$source.bind = hdfs://namenode/user/flume/weblog
  • exec source
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = /bin/bash -c
agent_001.sources.$source.shell = /bin/bash -c
agent_001.sources.$source.restartThrottle = 10000
agent_001.sources.$source.restart = false
agent_001.sources.$source.logStdErr = false
agent_001.sources.$source.batchSize = 1000
agent_001.sources.$source.selector.type = replicating 또는 multiplexing
agent_001.sources.$source.selector.* = ~
agent_001.sources.$source.interceptors = 1000
agent_001.sources.$source.interceptors.* = ~

agent_001.sources.$source.type = exec
agent_001.sources.$source.command = tail -f ~
agent_001.sources.$source.logStdErr = true
agent_001.sources.$source.restart = true

agent_001.sources.$source.type = exec
agent_001.sources.$source.command = for i in /path/*.txt; do cat $i; done
agent_001.sources.$source.shell = /bin/bash -c
  • avro source
agent_001.sources.$source.type = avro
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 10000
  • thrift source
agent_001.sources.$source.type = thrift
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 4141
  • memory source
agent_001.sources.$source.type = memory
agent_001.sources.$source.capacity = 1000
agent_001.sources.$source.transactionCapacity = 100
  • multiplexing selector
agent_001.sources.$source.selector.type = multiplexing
agent_001.sources.$source.selector.header = <someHeader>
agent_001.sources.$source.selector.mapping.<Value1> = <Channel1>
agent_001.sources.$source.selector.mapping.<Value2> = <Channel1> <Channel2>
agent_001.sources.$source.selector.mapping.<Value3> = <Channel2>

Flume Channel

  • Channel 종류
  • memory, file
  • jdbc
  • org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
  • org.apache.flume.channel.PseudoTxnMemoryChannel
  • Custom channel
agent_001.channels.$channel.type = com.jopenbusiness.flume.channel001
  • memory channel
agent_001.channels.$channel.type = memory
agent_001.channels.$channel.capacity = 1000
  • file channel
  • mkdir /nas/appl/flume/storage
  • mkdir /nas/appl/flume/storage/checkpoint
  • mkdir /nas/appl/flume/storage/data
agent_001.channels.$channel.type = file
agent_001.channels.$channel.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.$channel.dataDirs = /nas/appl/flume/storage/data

Flume Sink

  • sink 종류
  • hdfs
  • file_roll
  • logger, avro, thrift, irc, null
  • hbase, asynchbase
  • org.apache.flume.sink.elasticsearch.ElasticSearchSink
  • Custom sink
agent_001.sinks.$sink.type = com.jopenbusiness.flume.sink001
  • hdfs sink
  • hadoop.jar 필요
agent_001.sinks.$sink.type = hdfs
agent_001.sinks.$sink.hdfs.path= hdfs://namenode:9000/user/flume/weblog/%y%m%d
agent_001.sinks.$sink.hdfs.filePrefix = http-
agent_001.sinks.$sink.hdfs.fileSuffix = .log
agent_001.sinks.$sink.hdfs.round = true
agent_001.sinks.$sink.hdfs.roundValue = 10
agent_001.sinks.$sink.hdfs.roundUnit = minute
  • file_roll sink
  • mkdir /nas/appl/flume/storage
  • mkdir /nas/appl/flume/storage/file
agent_001.sinks.$sink.type = file_roll
agent_001.sinks.$sink.sink.directory = /nas/appl/flume/storage/file
  • avro sink
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = localhost
agent_001.sinks.$sink.port = 4545
  • logger sink
agent_001.sinks.$sink.type = logger

Avro로 agent 연결

  • agent001 -> agent002 연결
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = hostForAgent002
agent_001.sinks.$sink.port = 41414
agent_001.sinks.$sink.batch-size = 100
agent_001.sinks.$sink.runner.type = polling
agent_001.sinks.$sink.runner.polling.interval = 10

//--- Collector에서 port를 bind 합니다.
agent_002.sources.$source.type = avro
agent_002.sources.$source.bind = hostForAgent002
agent_002.sources.$source.port = 41414

Monitor

  • ganglia reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
  • JSON Reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • Custom reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332

Flume Agent 사례

exec -> memory -> avro

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
Source
(exec)
agent_001.sources = source-001
agent_001.sources.source-001.channels = channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /cloudnas/develop/tail_00001.bash
agent_001.sources.source-001.restartThrottle = 1000
agent_001.sources.source-001.restart = true
agent_001.sources.source-001.logStdErr = false
agent_001.sources.source-001.batchSize = 100
  • restartThrottle : 비정상 종료시 재시작 시간 (ms)
  • batchSize : Transaction당 event 수
Channel
(memory)
agent_001.channels = channel-001
agent_001.channels.channel-001.type = memory
agent_001.channels.channel-001.capacity = 240000
agent_001.channels.channel-001.transactionCapactiy = 100
agent_001.channels.channel-001.keep-alive = 3
agent_001.channels.channel-001.byteCapacityBufferPercentage = 20
agent_001.channels.channel-001.byteCapacity = 24960000
  • capacity : 채널에 저장 가능한 event의 최대 개수 (Count * 100)
  • transactionCapacity : 한 transaction당 수신/송신할 수 있는 최대 event 개수
  • byteCapacity : 저장 가능한 bytes 수 (capacity * Size * 2)
Sink
(Sink)
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = avro
agent_001.sinks.sink-001.hostname = 192.168.56.201
agent_001.sinks.sink-001.port = 4141
agent_001.sinks.sink-001.batch-size = 100
agent_001.sinks.sink-001.connect-timeout = 20000
agent_001.sinks.sink-001.request-timeout = 20000
  • batch-size : Transaction당 event 수

avro -> memory -> file_roll

  • flume-ng agent -n agent_002 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
Source
(avro)
agent_002.sources = source-002
agent_002.sources.source-002.channels = channel-002
agent_002.sources.source-002.type = avro
agent_002.sources.source-002.bind = 0.0.0.0
agent_002.sources.source-002.port = 4141
Channel
(memory)
생략
Sink
(file_roll)
agent_002.sinks = sink-002
agent_002.sinks.sink-002.channel = channel-002
agent_002.sinks.sink-002.type = file_roll
agent_002.sinks.sink-002.sink.directory = /cloudnas/develop/output/fileRoll
agent_002.sinks.sink-002.sink.rollInterval = 30
agent_002.sinks.sink-002.batchSize = 100
  • batchSize : Transaction당 event 수

exec -> memory -> null

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
Source
(exec)
생략
Channel
(memory)
생략
Sink
(null)
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = null
agent_001.sinks.sink-001.batchSize = 100
  • batchSize : Transaction당 event 수

Flume 개발

  • Flume Library
  • lib/flume-ng-configuration-1.4.0.jar
  • lib/flume-ng-sdk-1.4.0.jar
  • lib/flume-ng-core-1.4.0.jar
Source
  • org.apache.flume.Source
  • 구현 : org.apache.flumen.source.ExecSource
Interceptor
  • org.apache.flume.interceptor.Interceptor
  • 구현 : org.apache.flume.interceptor.TimestampInterceptor$Builder
agent_001.sources.source-001.interceptors = ConversionInterceptor
agent_001.sources.source-001.interceptors.ConversionInterceptor.type =
    com.jopenbusiness.hadoop.flume.interceptor.ConversionInterceptor$Builder
Channel Selector
  • org.apache.flume.ChannelSelector
  • 구현 : org.apache.flume.channel.MultiplexingChannelSelector
Channel
  • org.apache.flume.Channel
  • 구현 : org.apache.flume.channel.MemoryChannel
Sink
  • org.apache.flume.Sink
  • 구현 : org.apache.flume.sink.LoggerSink
Sink Processor
  • org.apache.flume.SinkProcessor
  • 구현 : org.apache.flume.sink.LoadBalancingSinkProcessor
  • Component
  • Event
  • Source
  • Sink
  • Channel
  • Source and Sink Runners
  • Agent
  • Configuration Provider
  • Client
  • Physical node
  • Logical node

FlumeDevelop01.png FlumeDevelop02.png FlumeDevelop03.png

  • 참고 문헌

로그 데이터 구조

  • 로그 데이터
  • ymd string : 년월일
  • hms string : 시분초
  • severity string : 심각도 (ERROR, WARNING, INFO, ...)
  • server string
  • process_id int
  • message string

기술지원

  • 오류 : org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel
  • Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count

Flume 구성

  • 참고 문헌

참고 문헌