"Flume"의 두 판 사이의 차이

오픈소스 비즈니스 컨설팅
둘러보기로 가기 검색하러 가기
 
(차이 없음)

2014년 6월 30일 (월) 15:55 기준 최신판

분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스인 Flume을 정리 합니다.

Flume 개요

분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스

Flume Architecture

FlumeArchitecture01.png FlumeArchitecture02.png

Source
  • 데이터를 수집 합니다.
Interceptor
  • 수집한 데이터를 변경 또는 삭제 합니다.
  • 종류 - 삽입 : Timestamp, Host, Static, UUID
  • 종류 - 변형/삭제 : Morphline, Regex Filtering, Regex Extractor
Channel Selector
  • Source에서 Channel로 연동시 Channel을 지정 합니다.
  • 종류 : Replicating (Default), Multiplexing, Custom
Channel
  • 데이터를 Source에서 Sink로 전달하는 통로
Sink
  • 데이터 저장, 전달 합니다.
Sink Processor
  • Sink할 대상을 다중 선택 합니다.
  • 종류 : Default, Failover, Loadbalancing, Custom
  • Sink Group : 여러개의 Sink를 하나의 그룹으로 관리


Serializer / Deserializer
  • Serializer : Body Text, Avro Event
  • Deserializer : LINE, AVRO, BlobDeserializer
Handler
  • JSONHandler, BlobHandler
Encryption
Appender
  • Log4J Appender, Load Balancing Log4J Appender
Reporting
  • Ganglia, JSON, Custom
Plugin
  • $FLUME_HOME/plugins.d/플러그인명/
  • $FLUME_HOME/plugins.d/플러그인명/lib/~.jar
  • $FLUME_HOME/plugins.d/플러그인명/libext/~.jar
  • $FLUME_HOME/plugins.d/플러그인명/native/~.so

Source/Channel/Sink 종류

Source Channel Sink
Avro : Avro 프로토콜로 수집 Memory : Memory 사용 Avro : Avro 프로토콜로 전송
Thrift : Thrift 프로토콜로 수집 JDBC : DB 사용 Thrift : Thrift 프로토콜로 전송

Syslog : Syslog 프로토콜로 수집

  • Syslog TCP, Multiport Syslog TCP
  • Syslog UDP
File : File 사용 IRC : IRC 프로토콜로 전송
HTTP : HTTP 프로토콜로 수집 ElasticSearch : Elastic에 저장

JMS : JMS 프로토콜로 수집

  • Pluggable converter 지원
MorphlineSolr : Solr에 저장
NetCat : TCP/IP 데이터 수집 HDFS : HDFS에 저장
Exec : Linux 명령어로 수집

HBase : HBase에 저장

  • HBaseSink, AsyncHBaseSink
Spooling Directory : 폴더에 신규로 추가된 파일 수집 Logger : 테스트 또는 디버깅을 위해 로깅
Sequence Generator : 0부터 1씩 증가하는 event 생성 File Roll : 파일로 저장

Legacy : 이전 버전의 Flume으로부터 데이터 수집

  • Avro Legacy, Thrift legacy
Null : 아무 일도 하지 않음
Custom Custom Custom

CentOS에서 Flume 설치

사전 준비 사항

설치

  • Apache Flume을 다운로드 합니다.
wget http://apache.tt.co.kr/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz
tar zxvf apache-flume-1.4.0-bin.tar.gz
chown -R root:root apache-flume-1.4.0-bin
mv apache-flume-1.4.0-bin /nas/appl/flume

cd /nas/appl/flume/conf
cp flume-conf.properties.template flume-conf.properties
cp flume-env.sh.template flume-env.sh
chmod 755 flume-env.sh
  • vi /nas/appl/flume/conf/log4j.properties
flume.log.dir=/nas/appl/flume/logs
  • vi ~/.bashrc
export FLUME_HOME=/nas/appl/flume
export PATH=$PATH:$FLUME_HOME/bin
  • 버전 확인
flume-ng version
  • Flume을 설치하면 Avro jar 파일이 아래와 같이 존재 합니다.
lib/avro-1.7.2.jar
lib/avro-ipc-1.7.2.jar

서비스 확인

  • 파일 저장 폴더 생성
mkdir /nas/appl/flume/storage
mkdir /nas/appl/flume/storage/checkpoint
mkdir /nas/appl/flume/storage/data
mkdir /nas/appl/flume/storage/file
  • vi conf/flume-conf.properties
agent_001.sources=source-001
agent_001.sources.source-001.channels=channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /bin/cat /nas/appl/flume/conf/flume-conf.properties

agent_001.channels=channel-001
agent_001.channels.channel-001.type = file
agent_001.channels.channel-001.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.channel-001.dataDirs = /nas/appl/flume/storage/data

#agent_001.channels.channel-001.type = memory
#agent_001.channels.channel-001.capacity = 1000
#agent_001.channels.channel-001.transactionCapacity = 100

agent_001.sinks=sink-001
agent_001.sinks.sink-001.channel=channel-001 
agent_001.sinks.sink-001.type = file_roll
agent_001.sinks.sink-001.sink.directory = /nas/appl/flume/storage/file
  • Agent 실행
flume-ng agent -n agent_001 -c /nas/appl/flume/conf -f /nas/appl/flume/conf/flume-conf.properties
    -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • Agent 동작 확인
  • http://localhost:41414/ 사이트에 접속하여 확인 합니다.
  • /nas/appl/flume/logs/flume.log 로그 파일에 오류가 있는지 확인 합니다.
  • /nas/appl/flume/storage/file/ 폴더에 파일이 생성 되었는지 확인 합니다.

Flume 매뉴얼

Flume conf 개요

  • Flume 도움말
flume-ng help
  • Agent 실행
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • avro-client
flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
  • vi /nas/appl/flume/conf/flume-conf.properties
agent_001.sources = source-001
agent_001.channels = channel-001
agent_001.sinks = sink-001

agent_001.sources.source-001.channels = channel-001
agent_001.sinks.sink-001.channel = channel-001
  • Flume Channel Selector
agent_001.sources.$source.selector.type = replicating, multiplexing, com.jopenbusiness.flume.selector001
  • Flume Interceptor
  • Interceptor : host, timestamp, static, regex_filter
agent_001.sources.$source.interceptors = i1 i2
agent_001.sources.$source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent_001.sources.$source.interceptors.i1.preserveExisting = false
agent_001.sources.$source.interceptors.i1.hostHeader = hostname
agent_001.sources.$source.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  • regex_filter interceptor
agent_001.sources.$source.interceptors.i1.regex = (\\d):(\\d):(\\d)
agent_001.sources.$source.interceptors.i1.serializers = s1 s2 s3
agent_001.sources.$source.interceptors.i1.serializers.s1.name = one
agent_001.sources.$source.interceptors.i1.serializers.s2.name = two
agent_001.sources.$source.interceptors.i1.serializers.s3.name = three
  • Event Serializers
  • serializer : text, avro_event
agent_001.sinks.$sink.sink.serializer = text
agent_001.sinks.$sink.sink.serializer.appendNewline = false
  • Flume Sink Processor
agent_001.sinks.$sink.processor.type = default, failover, load_balance, com.jopenbusiness.flume.processor001
  • failover sik processor
agent_001.sinkgroups = group001
agent_001.sinkgroups.group001.sinks = sink001 sink002

agent_001.sinkgroups.group001.type = failover
agent_001.sinkgroups.group001.priority.sink001 = 5
agent_001.sinkgroups.group001.priority.sink002 = 10
agent_001.sinkgroups.group001.maxpenalty = 10000

Flume Source

  • source 종류
  • exec : cat. 파일 내용, tail. 추가되는 파일 내용
  • hdfs
  • avro, thrift, memory
  • jms, spooldir, netcat, seq, http,
  • syslogtcp, multiport_syslogtcp, syslogudp
  • org.apache.flume.source.avroLegacy.AvroLegacySource
  • org.apache.flume.source.thriftLegacy.ThriftLegacySource
  • org.apache.flume.source.scribe.ScribeSource
  • custom source
agent_001.sources.$source.type = com.jopenbusiness.flume.soure001
  • hdfs source
agent_001.sources.$source.type = hdfs
agent_001.sources.$source.bind = hdfs://namenode/user/flume/weblog
  • exec source
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = /bin/bash -c
agent_001.sources.$source.shell = /bin/bash -c
agent_001.sources.$source.restartThrottle = 10000
agent_001.sources.$source.restart = false
agent_001.sources.$source.logStdErr = false
agent_001.sources.$source.batchSize = 1000
agent_001.sources.$source.selector.type = replicating 또는 multiplexing
agent_001.sources.$source.selector.* = ~
agent_001.sources.$source.interceptors = 1000
agent_001.sources.$source.interceptors.* = ~

agent_001.sources.$source.type = exec
agent_001.sources.$source.command = tail -f ~
agent_001.sources.$source.logStdErr = true
agent_001.sources.$source.restart = true

agent_001.sources.$source.type = exec
agent_001.sources.$source.command = for i in /path/*.txt; do cat $i; done
agent_001.sources.$source.shell = /bin/bash -c
  • avro source
agent_001.sources.$source.type = avro
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 10000
  • thrift source
agent_001.sources.$source.type = thrift
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 4141
  • memory source
agent_001.sources.$source.type = memory
agent_001.sources.$source.capacity = 1000
agent_001.sources.$source.transactionCapacity = 100
  • multiplexing selector
agent_001.sources.$source.selector.type = multiplexing
agent_001.sources.$source.selector.header = <someHeader>
agent_001.sources.$source.selector.mapping.<Value1> = <Channel1>
agent_001.sources.$source.selector.mapping.<Value2> = <Channel1> <Channel2>
agent_001.sources.$source.selector.mapping.<Value3> = <Channel2>

Flume Channel

  • Channel 종류
  • memory, file
  • jdbc
  • org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
  • org.apache.flume.channel.PseudoTxnMemoryChannel
  • Custom channel
agent_001.channels.$channel.type = com.jopenbusiness.flume.channel001
  • memory channel
agent_001.channels.$channel.type = memory
agent_001.channels.$channel.capacity = 1000
  • file channel
  • mkdir /nas/appl/flume/storage
  • mkdir /nas/appl/flume/storage/checkpoint
  • mkdir /nas/appl/flume/storage/data
agent_001.channels.$channel.type = file
agent_001.channels.$channel.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.$channel.dataDirs = /nas/appl/flume/storage/data

Flume Sink

  • sink 종류
  • hdfs
  • file_roll
  • logger, avro, thrift, irc, null
  • hbase, asynchbase
  • org.apache.flume.sink.elasticsearch.ElasticSearchSink
  • Custom sink
agent_001.sinks.$sink.type = com.jopenbusiness.flume.sink001
  • hdfs sink
  • hadoop.jar 필요
agent_001.sinks.$sink.type = hdfs
agent_001.sinks.$sink.hdfs.path= hdfs://namenode:9000/user/flume/weblog/%y%m%d
agent_001.sinks.$sink.hdfs.filePrefix = http-
agent_001.sinks.$sink.hdfs.fileSuffix = .log
agent_001.sinks.$sink.hdfs.round = true
agent_001.sinks.$sink.hdfs.roundValue = 10
agent_001.sinks.$sink.hdfs.roundUnit = minute
  • file_roll sink
  • mkdir /nas/appl/flume/storage
  • mkdir /nas/appl/flume/storage/file
agent_001.sinks.$sink.type = file_roll
agent_001.sinks.$sink.sink.directory = /nas/appl/flume/storage/file
  • avro sink
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = localhost
agent_001.sinks.$sink.port = 4545
  • logger sink
agent_001.sinks.$sink.type = logger

Avro로 agent 연결

  • agent001 -> agent002 연결
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = hostForAgent002
agent_001.sinks.$sink.port = 41414
agent_001.sinks.$sink.batch-size = 100
agent_001.sinks.$sink.runner.type = polling
agent_001.sinks.$sink.runner.polling.interval = 10

//--- Collector에서 port를 bind 합니다.
agent_002.sources.$source.type = avro
agent_002.sources.$source.bind = hostForAgent002
agent_002.sources.$source.port = 41414

Monitor

  • ganglia reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
  • JSON Reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • Custom reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
    -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332

Flume Agent 사례

exec -> memory -> avro

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
Source
(exec)
agent_001.sources = source-001
agent_001.sources.source-001.channels = channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /cloudnas/develop/tail_00001.bash
agent_001.sources.source-001.restartThrottle = 1000
agent_001.sources.source-001.restart = true
agent_001.sources.source-001.logStdErr = false
agent_001.sources.source-001.batchSize = 100
  • restartThrottle : 비정상 종료시 재시작 시간 (ms)
  • batchSize : Transaction당 event 수
Channel
(memory)
agent_001.channels = channel-001
agent_001.channels.channel-001.type = memory
agent_001.channels.channel-001.capacity = 240000
agent_001.channels.channel-001.transactionCapactiy = 100
agent_001.channels.channel-001.keep-alive = 3
agent_001.channels.channel-001.byteCapacityBufferPercentage = 20
agent_001.channels.channel-001.byteCapacity = 24960000
  • capacity : 채널에 저장 가능한 event의 최대 개수 (Count * 100)
  • transactionCapacity : 한 transaction당 수신/송신할 수 있는 최대 event 개수
  • byteCapacity : 저장 가능한 bytes 수 (capacity * Size * 2)
Sink
(Sink)
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = avro
agent_001.sinks.sink-001.hostname = 192.168.56.201
agent_001.sinks.sink-001.port = 4141
agent_001.sinks.sink-001.batch-size = 100
agent_001.sinks.sink-001.connect-timeout = 20000
agent_001.sinks.sink-001.request-timeout = 20000
  • batch-size : Transaction당 event 수

avro -> memory -> file_roll

  • flume-ng agent -n agent_002 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
Source
(avro)
agent_002.sources = source-002
agent_002.sources.source-002.channels = channel-002
agent_002.sources.source-002.type = avro
agent_002.sources.source-002.bind = 0.0.0.0
agent_002.sources.source-002.port = 4141
Channel
(memory)
생략
Sink
(file_roll)
agent_002.sinks = sink-002
agent_002.sinks.sink-002.channel = channel-002
agent_002.sinks.sink-002.type = file_roll
agent_002.sinks.sink-002.sink.directory = /cloudnas/develop/output/fileRoll
agent_002.sinks.sink-002.sink.rollInterval = 30
agent_002.sinks.sink-002.batchSize = 100
  • batchSize : Transaction당 event 수

exec -> memory -> null

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
Event 정보
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
Source
(exec)
생략
Channel
(memory)
생략
Sink
(null)
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = null
agent_001.sinks.sink-001.batchSize = 100
  • batchSize : Transaction당 event 수

Flume 개발

  • Flume Library
  • lib/flume-ng-configuration-1.4.0.jar
  • lib/flume-ng-sdk-1.4.0.jar
  • lib/flume-ng-core-1.4.0.jar
Source
  • org.apache.flume.Source
  • 구현 : org.apache.flumen.source.ExecSource
Interceptor
  • org.apache.flume.interceptor.Interceptor
  • 구현 : org.apache.flume.interceptor.TimestampInterceptor$Builder
agent_001.sources.source-001.interceptors = ConversionInterceptor
agent_001.sources.source-001.interceptors.ConversionInterceptor.type =
    com.jopenbusiness.hadoop.flume.interceptor.ConversionInterceptor$Builder
Channel Selector
  • org.apache.flume.ChannelSelector
  • 구현 : org.apache.flume.channel.MultiplexingChannelSelector
Channel
  • org.apache.flume.Channel
  • 구현 : org.apache.flume.channel.MemoryChannel
Sink
  • org.apache.flume.Sink
  • 구현 : org.apache.flume.sink.LoggerSink
Sink Processor
  • org.apache.flume.SinkProcessor
  • 구현 : org.apache.flume.sink.LoadBalancingSinkProcessor
  • Component
  • Event
  • Source
  • Sink
  • Channel
  • Source and Sink Runners
  • Agent
  • Configuration Provider
  • Client
  • Physical node
  • Logical node

FlumeDevelop01.png FlumeDevelop02.png FlumeDevelop03.png

  • 참고 문헌

로그 데이터 구조

  • 로그 데이터
  • ymd string : 년월일
  • hms string : 시분초
  • severity string : 심각도 (ERROR, WARNING, INFO, ...)
  • server string
  • process_id int
  • message string

기술지원

  • 오류 : org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel
  • Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count

Flume 구성

  • 참고 문헌

참고 문헌