Esper

오픈소스 비즈니스 컨설팅
이동: 둘러보기, 검색

CEP (Complex Event Processing) 솔루션인 Esper를 정리 합니다.

Esper 개요

  • EPL(Event Processing Language) 제공
  • Dual CPU * 2 GHz : 초당 50만건 이상의 처리 성능과 평균 3 microseconds 이하로 처리
  • CEP 모니터링 : JMX(Java Management Extensions)를 사용하여 JBoss RHQ에서 모니터링

cep_architecture.png

  • select * from pattern [expression]
  • expression :: A -> (B and C) : B와 C가 발생한 이후에 A가 발생

CentOS에서 Esper 설치

빅데이터에서 Hadoop은 일반적으로 Batch 처리의 성격을 갖습니다. BigData에 대한 실시간 분석을 위해서 CEP (Complex Event Processing) 솔루션인 Esper를 설치하고 개발 환경을 구성해 봅시다.

  • 사전 준비 사항
  • Esper 설치 및 환경 설정
wget http://dist.codehaus.org/esper/esper-4.10.0.tar.gz
tar zxvf esper-4.10.0.tar.gz
chown -R root:root esper-4.10.0
mv esper-4.10.0 /appl/esper

vi  ~/.bashrc
    ### ----------------------------------------------------------------------------
    ###     Esper 환경 설정
    ### ----------------------------------------------------------------------------
    export ESPER_HOME=/appl/esper

Esper Input Adapter

  • File and CSV Input Adapter
  • JMS Input Adapter
  • AMQP Input Adapter
  • HTTP Input Adapter
  • esperio-http-4.10.0.jar
  • esperio-http/lib/httpcore-4.0.1.jar, esperio-http/lib/httpcore-nio-4.0.1.jar
  • Socket Adapter
  • Relational Database Adapter

Esper Output Adapter

  • File and CSV Output Adapter
  • JMS Output Adapter
  • AMQP Output Adapter
  • HTTP Output Adapter
  • esperio-http-4.10.0.jar
  • esperio-http/lib/httpclient-4.0.1.jar

Eclipse 개발 환경

  • Esper 개발 환경 구축
  • Java Build Path에서 라이브러리로 다음을 추가 합니다.
  • lib/esper/esper/lib/antlr-runtime-3.2.jar
  • lib/esper/esper/lib/cglib-nodep-2.2.jar
  • lib/esper/esper/esper-4.10.0.jar
  • lib/esper/lib/commons-logging-1.0.4.jar
  • lib/esper/lib/log4j-1.2.16.jar

Esper Sample 프로그램

Esper 샘플 프로그램을 사용하여 Esper의 동작 원리를 간단하게 나마 이해해 봅시다.

Esper sample.png

  • SampleEvent.java
  • Event를 저장하는 POJO class를 생성 합니다.
package com.jopenbusiness.hadoop.esper.event;

public class SampleEvent {
	private String item = null;
	private Double price = null;
	
	public SampleEvent(String item, Double price) {
		super();
		this.item = item; 
		this.price = price;
	}

	public String getItem() {
		return item;
	}

	public void setItem(String item) {
		this.item = item;
	}

	public Double getPrice() {
		return price;
	}

	public void setPrice(Double price) {
		this.price = price;
	}
}
  • SampleListener.java
  • EPL (Event Processing Engine)에서 정의된 조건을 만족할 때, 처리하는 프로그램
package com.jopenbusiness.hadoop.esper.listener;

import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

public class SampleListener implements UpdateListener {
	public void update(EventBean[] eventNew, EventBean[] eventOld) {
		if (eventNew != null) {
			UtilLogger.info.println(logCaller, "--- Event New : " + eventNew.length);
			for (Integer idx = 0; idx < eventNew.length; idx++) {
				displayEvent(idx, eventNew[idx]);
			}
		}

		if (eventOld != null) {
			UtilLogger.info.println(logCaller, "--- Event Old : " + eventOld.length); 
			for (Integer idx = 0; idx < eventOld.length; idx++)  {
				displayEvent(idx, eventOld[idx]);
			}
		}
		UtilLogger.info.println(logCaller, "");
	}
}
  • SampleEngine.java
package com.jopenbusiness.hadoop.esper.cli;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.jopenbusiness.hadoop.esper.event.SampleEvent;
import com.jopenbusiness.hadoop.esper.listener.SampleListener;

public class SampleEngine {
	public static void main(String[] args) {
		Configuration config = null;
		EPServiceProvider service = null;
		EPStatement stat = null;
		String epl = null;
		SampleListener listener = null;
		EPRuntime runtime = null;
		
		//---	처리할 Event를 등록하여 Esper용 서비스를 생성 합니다.
		config = new Configuration();
		config.addEventType("SampleEvent", SampleEvent.class.getName());
		service = EPServiceProviderManager.getDefaultProvider(config);
		
		//---	EPL(Event Processing Language)을 사용하여 Statement를 생성 합니다.
		//---	지난 3초 동안 발생한 이벤트로 전체 갯수와 가격 평균을 구합니다. 
		epl = "select item, count(*), avg(price) from SampleEvent.win:time(3 sec)";
		stat = service.getEPAdministrator().createEPL(epl);
				
		listener = new SampleListener();
		stat.addListener(listener);
		
		//---	Event를 발생시켜 봅니다.
		runtime = service.getEPRuntime();
		for (int i = 0;i < 20; i++) {
			runtime.sendEvent(new SampleEvent("aaa_" + i, 10.0 * i));
			try {
				Thread.sleep(300);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
	}
}
  • SampleEngine.java 프로그램 실행 결과
  • 3초간 발생한 이벤트의 갯수와 평균을 출력 합니다.
    [java] aaa_0 : 갯수 = 1, 평균 = 0.0
    [java] aaa_1 : 갯수 = 2, 평균 = 5.0
    [java] aaa_2 : 갯수 = 3, 평균 = 10.0
    [java] aaa_3 : 갯수 = 4, 평균 = 15.0
    [java] aaa_4 : 갯수 = 5, 평균 = 20.0
    [java] aaa_5 : 갯수 = 6, 평균 = 25.0
    [java] aaa_6 : 갯수 = 7, 평균 = 30.0
    [java] aaa_7 : 갯수 = 8, 평균 = 35.0
    [java] aaa_8 : 갯수 = 9, 평균 = 40.0
    [java] aaa_9 : 갯수 = 10, 평균 = 45.0
    [java] aaa_10 : 갯수 = 11, 평균 = 50.0
    [java] aaa_11 : 갯수 = 10, 평균 = 65.0
    [java] aaa_12 : 갯수 = 10, 평균 = 75.0
    [java] aaa_13 : 갯수 = 10, 평균 = 85.0
    [java] aaa_14 : 갯수 = 10, 평균 = 95.0
    [java] aaa_15 : 갯수 = 10, 평균 = 105.0
    [java] aaa_16 : 갯수 = 10, 평균 = 115.0
    [java] aaa_17 : 갯수 = 10, 평균 = 125.0
    [java] aaa_18 : 갯수 = 10, 평균 = 135.0
    [java] aaa_19 : 갯수 = 10, 평균 = 145.0
Total time: 8 seconds

프로그램 개발 Tip

설정 파일

  • 설정 파일
  • vi config/esper.default.cfg.xml
<?xml version="1.0" encoding="UTF-8"?>
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.espertech.com/schema/esper"
    xsi:noNamespaceSchemaLocation="esper-configuration-4-0.xsd"> 
	<event-type-auto-name package-name="com.jopenbusiness.hadoop.esper.event"/>
	<event-type name="ServiceEvent" class="com.jopenbusiness.hadoop.esper.event.ServiceEvent"/>
</esper-configuration>
  • 프로그램에서 설정 적용
Configuration config = null;
EPServiceProvider service = null;

config = new Configuration();
config.configure(new File("~/config/esper.default.cfg.xml"));
service = EPServiceProviderManager.getProvider(ENGINE_URI, config);

참고 문헌