분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스인 Flume을 정리 합니다.


Flume 개요


분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스


Flume Architecture

700px 700px

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Source |width="70%" align="left" valign="middle"|

  • 데이터를 수집 합니다.
    |- |align="center" valign="middle" style="background-color:#eee;"|Interceptor |align="left" valign="middle"|
  • 수집한 데이터를 변경 또는 삭제 합니다.
  • 종류 - 삽입 : Timestamp, Host, Static, UUID
  • 종류 - 변형/삭제 : Morphline, Regex Filtering, Regex Extractor
    |- |align="center" valign="middle" style="background-color:#eee;"|Channel Selector |align="left" valign="middle"|
  • Source에서 Channel로 연동시 Channel을 지정 합니다.
  • 종류 : Replicating (Default), Multiplexing, Custom
    |- |align="center" valign="middle" style="background-color:#eee;"|Channel |align="left" valign="middle"|
  • 데이터를 Source에서 Sink로 전달하는 통로
    |- |align="center" valign="middle" style="background-color:#eee;"|Sink |align="left" valign="middle"|
  • 데이터 저장, 전달 합니다.
    |- |align="center" valign="middle" style="background-color:#eee;"|Sink Processor |align="left" valign="middle"|
  • Sink할 대상을 다중 선택 합니다.
  • 종류 : Default, Failover, Loadbalancing, Custom
  • Sink Group : 여러개의 Sink를 하나의 그룹으로 관리
    |}

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Serializer / Deserializer |width="70%" align="left" valign="middle"|

  • Serializer : Body Text, Avro Event
  • Deserializer : LINE, AVRO, BlobDeserializer
    |- |align="center" valign="middle" style="background-color:#eee;"|Handler |align="left" valign="middle"|
  • JSONHandler, BlobHandler
    |- |align="center" valign="middle" style="background-color:#eee;"|Encryption
    align="left" valign="middle"
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • Log4J Appender, Load Balancing Log4J Appender
    |- |align="center" valign="middle" style="background-color:#eee;"|Reporting |align="left" valign="middle"|
  • Ganglia, JSON, Custom
    |- |align="center" valign="middle" style="background-color:#eee;"|Plugin |align="left" valign="middle"|
  • $FLUME_HOME/plugins.d/플러그인명/
  • $FLUME_HOME/plugins.d/플러그인명/lib/~.jar
  • $FLUME_HOME/plugins.d/플러그인명/libext/~.jar
  • $FLUME_HOME/plugins.d/플러그인명/native/~.so
    |}

Source/Channel/Sink 종류

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="35%" align="center" valign="middle" style="background-color:#eee;"|Source |width="30%" align="center" valign="middle" style="background-color:#eee;"|Channel

|width="35%" align="center" valign="middle" style="background-color:#eee;"|Sink |- |align="left" valign="middle"|Avro : Avro 프로토콜로 수집 |align="left" valign="middle"|Memory : Memory 사용

|align="left" valign="middle"|Avro : Avro 프로토콜로 전송 |- |align="left" valign="middle"|Thrift : Thrift 프로토콜로 수집 |align="left" valign="middle"|JDBC : DB 사용

|align="left" valign="middle"|Thrift : Thrift 프로토콜로 전송 |- |align="left" valign="middle"| Syslog : Syslog 프로토콜로 수집

  • Syslog TCP, Multiport Syslog TCP
  • Syslog UDP
    |align="left" valign="middle"|File : File 사용 |align="left" valign="middle"|IRC : IRC 프로토콜로 전송 |- |align="left" valign="middle"|HTTP : HTTP 프로토콜로 수집 |align="left" valign="middle"| |align="left" valign="middle"|ElasticSearch : Elastic에 저장 |- |align="left" valign="middle"| JMS : JMS 프로토콜로 수집
  • Pluggable converter 지원
    |align="left" valign="middle"| |align="left" valign="middle"|MorphlineSolr : Solr에 저장 |- |align="left" valign="middle"|NetCat : TCP/IP 데이터 수집 |align="left" valign="middle"| |align="left" valign="middle"|HDFS : HDFS에 저장 |- |align="left" valign="middle"|Exec : Linux 명령어로 수집 |align="left" valign="middle"| |align="left" valign="middle"| HBase : HBase에 저장
  • HBaseSink, AsyncHBaseSink
    |- |align="left" valign="middle"|Spooling Directory : 폴더에 신규로 추가된 파일 수집 |align="left" valign="middle"| |align="left" valign="middle"|Logger : 테스트 또는 디버깅을 위해 로깅 |- |align="left" valign="middle"|Sequence Generator : 0부터 1씩 증가하는 event 생성 |align="left" valign="middle"| |align="left" valign="middle"|File Roll : 파일로 저장 |- |align="left" valign="middle"| Legacy : 이전 버전의 Flume으로부터 데이터 수집
  • Avro Legacy, Thrift legacy
    |align="left" valign="middle"| |align="left" valign="middle"|Null : 아무 일도 하지 않음 |- |align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |}

CentOS에서 Flume 설치



사전 준비 사항


설치

  • Apache Flume을 다운로드 합니다.
 wget http://apache.tt.co.kr/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz
 tar zxvf apache-flume-1.4.0-bin.tar.gz
 chown -R root:root apache-flume-1.4.0-bin
 mv apache-flume-1.4.0-bin /nas/appl/flume
 
 cd /nas/appl/flume/conf
 cp flume-conf.properties.template flume-conf.properties
 cp flume-env.sh.template flume-env.sh
 chmod 755 flume-env.sh
  • vi /nas/appl/flume/conf/log4j.properties
 flume.log.dir=/nas/appl/flume/logs
  • vi ~/.bashrc
 export FLUME_HOME=/nas/appl/flume
 export PATH=$PATH:$FLUME_HOME/bin
  • 버전 확인
 flume-ng version
  • Flume을 설치하면 Avro jar 파일이 아래와 같이 존재 합니다.
 lib/avro-1.7.2.jar
 lib/avro-ipc-1.7.2.jar

서비스 확인

  • 파일 저장 폴더 생성
 mkdir /nas/appl/flume/storage
 mkdir /nas/appl/flume/storage/checkpoint
 mkdir /nas/appl/flume/storage/data
 mkdir /nas/appl/flume/storage/file
  • vi conf/flume-conf.properties
 agent_001.sources=source-001
 agent_001.sources.source-001.channels=channel-001
 agent_001.sources.source-001.type = exec
 agent_001.sources.source-001.command = /bin/cat /nas/appl/flume/conf/flume-conf.properties
 
 agent_001.channels=channel-001
 agent_001.channels.channel-001.type = file
 agent_001.channels.channel-001.checkpointDir = /nas/appl/flume/storage/checkpoint
 agent_001.channels.channel-001.dataDirs = /nas/appl/flume/storage/data
 
 #agent_001.channels.channel-001.type = memory
 #agent_001.channels.channel-001.capacity = 1000
 #agent_001.channels.channel-001.transactionCapacity = 100
 
 agent_001.sinks=sink-001
 agent_001.sinks.sink-001.channel=channel-001 
 agent_001.sinks.sink-001.type = file_roll
 agent_001.sinks.sink-001.sink.directory = /nas/appl/flume/storage/file
  • Agent 실행
 flume-ng agent -n agent_001 -c /nas/appl/flume/conf -f /nas/appl/flume/conf/flume-conf.properties
     -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • Agent 동작 확인
    • http://localhost:41414/ 사이트에 접속하여 확인 합니다.
    • /nas/appl/flume/logs/flume.log 로그 파일에 오류가 있는지 확인 합니다.
    • /nas/appl/flume/storage/file/ 폴더에 파일이 생성 되었는지 확인 합니다.

Flume 매뉴얼



Flume conf 개요

  • Flume 도움말
 flume-ng help
  • Agent 실행
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • avro-client
 flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
  • vi /nas/appl/flume/conf/flume-conf.properties
 agent_001.sources = source-001
 agent_001.channels = channel-001
 agent_001.sinks = sink-001
 
 agent_001.sources.source-001.channels = channel-001
 agent_001.sinks.sink-001.channel = channel-001
  • Flume Channel Selector
 agent_001.sources.$source.selector.type = replicating, multiplexing, com.jopenbusiness.flume.selector001
  • Flume Interceptor
    • Interceptor : host, timestamp, static, regex_filter
 agent_001.sources.$source.interceptors = i1 i2
 agent_001.sources.$source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
 agent_001.sources.$source.interceptors.i1.preserveExisting = false
 agent_001.sources.$source.interceptors.i1.hostHeader = hostname
 agent_001.sources.$source.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  • regex_filter interceptor
 agent_001.sources.$source.interceptors.i1.regex = (\\d):(\\d):(\\d)
 agent_001.sources.$source.interceptors.i1.serializers = s1 s2 s3
 agent_001.sources.$source.interceptors.i1.serializers.s1.name = one
 agent_001.sources.$source.interceptors.i1.serializers.s2.name = two
 agent_001.sources.$source.interceptors.i1.serializers.s3.name = three
  • Event Serializers
    • serializer : text, avro_event
 agent_001.sinks.$sink.sink.serializer = text
 agent_001.sinks.$sink.sink.serializer.appendNewline = false
  • Flume Sink Processor
 agent_001.sinks.$sink.processor.type = default, failover, load_balance, com.jopenbusiness.flume.processor001
  • failover sik processor
 agent_001.sinkgroups = group001
 agent_001.sinkgroups.group001.sinks = sink001 sink002
 
 agent_001.sinkgroups.group001.type = failover
 agent_001.sinkgroups.group001.priority.sink001 = 5
 agent_001.sinkgroups.group001.priority.sink002 = 10
 agent_001.sinkgroups.group001.maxpenalty = 10000

Flume Source

  • source 종류
    • exec : cat. 파일 내용, tail. 추가되는 파일 내용
    • hdfs
    • avro, thrift, memory
    • jms, spooldir, netcat, seq, http,
    • syslogtcp, multiport_syslogtcp, syslogudp
    • org.apache.flume.source.avroLegacy.AvroLegacySource
    • org.apache.flume.source.thriftLegacy.ThriftLegacySource
    • org.apache.flume.source.scribe.ScribeSource
    • custom source
 agent_001.sources.$source.type = com.jopenbusiness.flume.soure001
  • hdfs source
 agent_001.sources.$source.type = hdfs
 agent_001.sources.$source.bind = hdfs://namenode/user/flume/weblog
  • exec source
 agent_001.sources.$source.type = exec
 agent_001.sources.$source.command = /bin/bash -c
 agent_001.sources.$source.shell = /bin/bash -c
 agent_001.sources.$source.restartThrottle = 10000
 agent_001.sources.$source.restart = false
 agent_001.sources.$source.logStdErr = false
 agent_001.sources.$source.batchSize = 1000
 agent_001.sources.$source.selector.type = replicating 또는 multiplexing
 agent_001.sources.$source.selector.* = ~
 agent_001.sources.$source.interceptors = 1000
 agent_001.sources.$source.interceptors.* = ~
 
 agent_001.sources.$source.type = exec
 agent_001.sources.$source.command = tail -f ~
 agent_001.sources.$source.logStdErr = true
 agent_001.sources.$source.restart = true
 
 agent_001.sources.$source.type = exec
 agent_001.sources.$source.command = for i in /path/*.txt; do cat $i; done
 agent_001.sources.$source.shell = /bin/bash -c
  • avro source
 agent_001.sources.$source.type = avro
 agent_001.sources.$source.bind = 0.0.0.0
 agent_001.sources.$source.port = 10000
  • thrift source
 agent_001.sources.$source.type = thrift
 agent_001.sources.$source.bind = 0.0.0.0
 agent_001.sources.$source.port = 4141
  • memory source
 agent_001.sources.$source.type = memory
 agent_001.sources.$source.capacity = 1000
 agent_001.sources.$source.transactionCapacity = 100
  • multiplexing selector
 agent_001.sources.$source.selector.type = multiplexing
 agent_001.sources.$source.selector.header = 
 agent_001.sources.$source.selector.mapping. = 
 agent_001.sources.$source.selector.mapping. =  
 agent_001.sources.$source.selector.mapping. = 

Flume Channel

  • Channel 종류
    • memory, file
    • jdbc
    • org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
    • org.apache.flume.channel.PseudoTxnMemoryChannel
    • Custom channel
 agent_001.channels.$channel.type = com.jopenbusiness.flume.channel001
  • memory channel
 agent_001.channels.$channel.type = memory
 agent_001.channels.$channel.capacity = 1000
  • file channel
    • mkdir /nas/appl/flume/storage
    • mkdir /nas/appl/flume/storage/checkpoint
    • mkdir /nas/appl/flume/storage/data
 agent_001.channels.$channel.type = file
 agent_001.channels.$channel.checkpointDir = /nas/appl/flume/storage/checkpoint
 agent_001.channels.$channel.dataDirs = /nas/appl/flume/storage/data

Flume Sink

  • sink 종류
    • hdfs
    • file_roll
    • logger, avro, thrift, irc, null
    • hbase, asynchbase
    • org.apache.flume.sink.elasticsearch.ElasticSearchSink
    • Custom sink
 agent_001.sinks.$sink.type = com.jopenbusiness.flume.sink001
  • hdfs sink
    • hadoop.jar 필요
 agent_001.sinks.$sink.type = hdfs
 agent_001.sinks.$sink.hdfs.path= hdfs://namenode:9000/user/flume/weblog/%y%m%d
 agent_001.sinks.$sink.hdfs.filePrefix = http-
 agent_001.sinks.$sink.hdfs.fileSuffix = .log
 agent_001.sinks.$sink.hdfs.round = true
 agent_001.sinks.$sink.hdfs.roundValue = 10
 agent_001.sinks.$sink.hdfs.roundUnit = minute
  • file_roll sink
    • mkdir /nas/appl/flume/storage
    • mkdir /nas/appl/flume/storage/file
 agent_001.sinks.$sink.type = file_roll
 agent_001.sinks.$sink.sink.directory = /nas/appl/flume/storage/file
  • avro sink
 agent_001.sinks.$sink.type = avro
 agent_001.sinks.$sink.hostname = localhost
 agent_001.sinks.$sink.port = 4545
  • logger sink
 agent_001.sinks.$sink.type = logger

Avro로 agent 연결

 agent_001.sinks.$sink.type = avro
 agent_001.sinks.$sink.hostname = hostForAgent002
 agent_001.sinks.$sink.port = 41414
 agent_001.sinks.$sink.batch-size = 100
 agent_001.sinks.$sink.runner.type = polling
 agent_001.sinks.$sink.runner.polling.interval = 10
 
 //--- Collector에서 port를 bind 합니다.
 agent_002.sources.$source.type = avro
 agent_002.sources.$source.bind = hostForAgent002
 agent_002.sources.$source.port = 41414

Monitor

  • ganglia reporting
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
  • JSON Reporting
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
  • Custom reporting
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332

Flume Agent 사례



exec -> memory -> avro

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
    {|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 |width="80%" align="left" valign="middle"|
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
    |- |align="center" valign="middle" style="background-color:#eee;"|Source
    (exec) |align="left" valign="middle"|
 agent_001.sources = source-001
 agent_001.sources.source-001.channels = channel-001
 agent_001.sources.source-001.type = exec
 agent_001.sources.source-001.command = /cloudnas/develop/tail_00001.bash
 agent_001.sources.source-001.restartThrottle = 1000
 agent_001.sources.source-001.restart = true
 agent_001.sources.source-001.logStdErr = false
 agent_001.sources.source-001.batchSize = 100
  • restartThrottle : 비정상 종료시 재시작 시간 (ms)
  • batchSize : Transaction당 event 수
    |- |align="center" valign="middle" style="background-color:#eee;"|Channel
    (memory) |align="left" valign="middle"|
 agent_001.channels = channel-001
 agent_001.channels.channel-001.type = memory
 agent_001.channels.channel-001.capacity = 240000
 agent_001.channels.channel-001.transactionCapactiy = 100
 agent_001.channels.channel-001.keep-alive = 3
 agent_001.channels.channel-001.byteCapacityBufferPercentage = 20
 agent_001.channels.channel-001.byteCapacity = 24960000
  • capacity : 채널에 저장 가능한 event의 최대 개수 (Count * 100)
  • transactionCapacity : 한 transaction당 수신/송신할 수 있는 최대 event 개수
  • byteCapacity : 저장 가능한 bytes 수 (capacity * Size * 2)
    |- |align="center" valign="middle" style="background-color:#eee;"|Sink
    (Sink) |align="left" valign="middle"|
 agent_001.sinks = sink-001
 agent_001.sinks.sink-001.channel = channel-001
 agent_001.sinks.sink-001.type = avro
 agent_001.sinks.sink-001.hostname = 192.168.56.201
 agent_001.sinks.sink-001.port = 4141
 agent_001.sinks.sink-001.batch-size = 100
 agent_001.sinks.sink-001.connect-timeout = 20000
 agent_001.sinks.sink-001.request-timeout = 20000
  • batch-size : Transaction당 event 수
    |}

avro -> memory -> file_roll

  • flume-ng agent -n agent_002 -f /nas/appl/flume/conf/flume-conf.properties
    {|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 |width="80%" align="left" valign="middle"|
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
    |- |align="center" valign="middle" style="background-color:#eee;"|Source
    (avro) |align="left" valign="middle"|
 agent_002.sources = source-002
 agent_002.sources.source-002.channels = channel-002
 agent_002.sources.source-002.type = avro
 agent_002.sources.source-002.bind = 0.0.0.0
 agent_002.sources.source-002.port = 4141

|- |align="center" valign="middle" style="background-color:#eee;"|Channel
(memory)

|align="left" valign="middle"|생략 |- |align="center" valign="middle" style="background-color:#eee;"|Sink
(file_roll) |align="left" valign="middle"|

 agent_002.sinks = sink-002
 agent_002.sinks.sink-002.channel = channel-002
 agent_002.sinks.sink-002.type = file_roll
 agent_002.sinks.sink-002.sink.directory = /cloudnas/develop/output/fileRoll
 agent_002.sinks.sink-002.sink.rollInterval = 30
 agent_002.sinks.sink-002.batchSize = 100
  • batchSize : Transaction당 event 수
    |}

exec -> memory -> null

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
    {|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 |width="80%" align="left" valign="middle"|
  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
    |- |align="center" valign="middle" style="background-color:#eee;"|Source
    (exec) |align="left" valign="middle"|생략 |- |align="center" valign="middle" style="background-color:#eee;"|Channel
    (memory) |align="left" valign="middle"|생략 |- |align="center" valign="middle" style="background-color:#eee;"|Sink
    (null) |align="left" valign="middle"|
 agent_001.sinks = sink-001
 agent_001.sinks.sink-001.channel = channel-001
 agent_001.sinks.sink-001.type = null
 agent_001.sinks.sink-001.batchSize = 100
  • batchSize : Transaction당 event 수
    |}

Flume 개발


  • Flume Library
    • lib/flume-ng-configuration-1.4.0.jar
    • lib/flume-ng-sdk-1.4.0.jar
    • lib/flume-ng-core-1.4.0.jar

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" |- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Source |width="70%" align="left" valign="middle"|

  • org.apache.flume.Source
  • 구현 : org.apache.flumen.source.ExecSource
    |- |align="center" valign="middle" style="background-color:#eee;"|Interceptor |align="left" valign="middle"|
  • org.apache.flume.interceptor.Interceptor
  • 구현 : org.apache.flume.interceptor.TimestampInterceptor$Builder
 agent_001.sources.source-001.interceptors = ConversionInterceptor
 agent_001.sources.source-001.interceptors.ConversionInterceptor.type =
     com.jopenbusiness.hadoop.flume.interceptor.ConversionInterceptor$Builder

|- |align="center" valign="middle" style="background-color:#eee;"|Channel Selector |align="left" valign="middle"|

  • org.apache.flume.ChannelSelector

  • 구현 : org.apache.flume.channel.MultiplexingChannelSelector
    |- |align="center" valign="middle" style="background-color:#eee;"|Channel |align="left" valign="middle"|

  • org.apache.flume.Channel

  • 구현 : org.apache.flume.channel.MemoryChannel
    |- |align="center" valign="middle" style="background-color:#eee;"|Sink |align="left" valign="middle"|

  • org.apache.flume.Sink

  • 구현 : org.apache.flume.sink.LoggerSink
    |- |align="center" valign="middle" style="background-color:#eee;"|Sink Processor |align="left" valign="middle"|

  • org.apache.flume.SinkProcessor

  • 구현 : org.apache.flume.sink.LoadBalancingSinkProcessor
    |}

  • Component

    • Event
    • Source
    • Sink
    • Channel
    • Source and Sink Runners
    • Agent
    • Configuration Provider
    • Client
  • Physical node

  • Logical node

700px 350px 700px


로그 데이터 구조


  • 로그 데이터
    • ymd string : 년월일
    • hms string : 시분초
    • severity string : 심각도 (ERROR, WARNING, INFO, ...)
    • server string
    • process_id int
    • message string

기술지원


  • 오류 : org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel
    • Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count

Flume 구성



참고 문헌


분류: BigData

최종 수정일: 2022-10-24 19:17:28

이전글 :
다음글 :
상단 menu
arrow_back_ios
arrow_forward_ios