Esper
CEP (Complex Event Processing) 솔루션인 Esper를 정리 합니다.
- 홈페이지 : http://esper.codehaus.org/
- 다운로드 : http://esper.codehaus.org/esper/download/download.html
- 라이선스 : GNU GPLv2
- 플랫폼 : Java
목차
Esper 개요
- EPL(Event Processing Language) 제공
- Dual CPU * 2 GHz : 초당 50만건 이상의 처리 성능과 평균 3 microseconds 이하로 처리
- CEP 모니터링 : JMX(Java Management Extensions)를 사용하여 JBoss RHQ에서 모니터링
- select * from pattern [expression]
- expression :: A -> (B and C) : B와 C가 발생한 이후에 A가 발생
CentOS에서 Esper 설치
빅데이터에서 Hadoop은 일반적으로 Batch 처리의 성격을 갖습니다. BigData에 대한 실시간 분석을 위해서 CEP (Complex Event Processing) 솔루션인 Esper를 설치하고 개발 환경을 구성해 봅시다.
- 사전 준비 사항
- Esper 설치 및 환경 설정
wget http://dist.codehaus.org/esper/esper-4.10.0.tar.gz tar zxvf esper-4.10.0.tar.gz chown -R root:root esper-4.10.0 mv esper-4.10.0 /appl/esper vi ~/.bashrc ### ---------------------------------------------------------------------------- ### Esper 환경 설정 ### ---------------------------------------------------------------------------- export ESPER_HOME=/appl/esper
Esper Input Adapter
- File and CSV Input Adapter
- JMS Input Adapter
- AMQP Input Adapter
- HTTP Input Adapter
- esperio-http-4.10.0.jar
- esperio-http/lib/httpcore-4.0.1.jar, esperio-http/lib/httpcore-nio-4.0.1.jar
- Socket Adapter
- Relational Database Adapter
Esper Output Adapter
- File and CSV Output Adapter
- JMS Output Adapter
- AMQP Output Adapter
- HTTP Output Adapter
- esperio-http-4.10.0.jar
- esperio-http/lib/httpclient-4.0.1.jar
Eclipse 개발 환경
- Esper 개발 환경 구축
- Java Build Path에서 라이브러리로 다음을 추가 합니다.
- lib/esper/esper/lib/antlr-runtime-3.2.jar
- lib/esper/esper/lib/cglib-nodep-2.2.jar
- lib/esper/esper/esper-4.10.0.jar
- lib/esper/lib/commons-logging-1.0.4.jar
- lib/esper/lib/log4j-1.2.16.jar
Esper Sample 프로그램
Esper 샘플 프로그램을 사용하여 Esper의 동작 원리를 간단하게 나마 이해해 봅시다.
- SampleEvent.java
- Event를 저장하는 POJO class를 생성 합니다.
package com.jopenbusiness.hadoop.esper.event; public class SampleEvent { private String item = null; private Double price = null; public SampleEvent(String item, Double price) { super(); this.item = item; this.price = price; } public String getItem() { return item; } public void setItem(String item) { this.item = item; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } }
- SampleListener.java
- EPL (Event Processing Engine)에서 정의된 조건을 만족할 때, 처리하는 프로그램
package com.jopenbusiness.hadoop.esper.listener; import com.espertech.esper.client.EventBean; import com.espertech.esper.client.UpdateListener; public class SampleListener implements UpdateListener { public void update(EventBean[] eventNew, EventBean[] eventOld) { if (eventNew != null) { UtilLogger.info.println(logCaller, "--- Event New : " + eventNew.length); for (Integer idx = 0; idx < eventNew.length; idx++) { displayEvent(idx, eventNew[idx]); } } if (eventOld != null) { UtilLogger.info.println(logCaller, "--- Event Old : " + eventOld.length); for (Integer idx = 0; idx < eventOld.length; idx++) { displayEvent(idx, eventOld[idx]); } } UtilLogger.info.println(logCaller, ""); } }
- SampleEngine.java
package com.jopenbusiness.hadoop.esper.cli; import com.espertech.esper.client.Configuration; import com.espertech.esper.client.EPRuntime; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.jopenbusiness.hadoop.esper.event.SampleEvent; import com.jopenbusiness.hadoop.esper.listener.SampleListener; public class SampleEngine { public static void main(String[] args) { Configuration config = null; EPServiceProvider service = null; EPStatement stat = null; String epl = null; SampleListener listener = null; EPRuntime runtime = null; //--- 처리할 Event를 등록하여 Esper용 서비스를 생성 합니다. config = new Configuration(); config.addEventType("SampleEvent", SampleEvent.class.getName()); service = EPServiceProviderManager.getDefaultProvider(config); //--- EPL(Event Processing Language)을 사용하여 Statement를 생성 합니다. //--- 지난 3초 동안 발생한 이벤트로 전체 갯수와 가격 평균을 구합니다. epl = "select item, count(*), avg(price) from SampleEvent.win:time(3 sec)"; stat = service.getEPAdministrator().createEPL(epl); listener = new SampleListener(); stat.addListener(listener); //--- Event를 발생시켜 봅니다. runtime = service.getEPRuntime(); for (int i = 0;i < 20; i++) { runtime.sendEvent(new SampleEvent("aaa_" + i, 10.0 * i)); try { Thread.sleep(300); } catch (InterruptedException ex) { ex.printStackTrace(); } } } }
- SampleEngine.java 프로그램 실행 결과
- 3초간 발생한 이벤트의 갯수와 평균을 출력 합니다.
[java] aaa_0 : 갯수 = 1, 평균 = 0.0 [java] aaa_1 : 갯수 = 2, 평균 = 5.0 [java] aaa_2 : 갯수 = 3, 평균 = 10.0 [java] aaa_3 : 갯수 = 4, 평균 = 15.0 [java] aaa_4 : 갯수 = 5, 평균 = 20.0 [java] aaa_5 : 갯수 = 6, 평균 = 25.0 [java] aaa_6 : 갯수 = 7, 평균 = 30.0 [java] aaa_7 : 갯수 = 8, 평균 = 35.0 [java] aaa_8 : 갯수 = 9, 평균 = 40.0 [java] aaa_9 : 갯수 = 10, 평균 = 45.0 [java] aaa_10 : 갯수 = 11, 평균 = 50.0 [java] aaa_11 : 갯수 = 10, 평균 = 65.0 [java] aaa_12 : 갯수 = 10, 평균 = 75.0 [java] aaa_13 : 갯수 = 10, 평균 = 85.0 [java] aaa_14 : 갯수 = 10, 평균 = 95.0 [java] aaa_15 : 갯수 = 10, 평균 = 105.0 [java] aaa_16 : 갯수 = 10, 평균 = 115.0 [java] aaa_17 : 갯수 = 10, 평균 = 125.0 [java] aaa_18 : 갯수 = 10, 평균 = 135.0 [java] aaa_19 : 갯수 = 10, 평균 = 145.0 Total time: 8 seconds
프로그램 개발 Tip
설정 파일
- 설정 파일
- vi config/esper.default.cfg.xml
<?xml version="1.0" encoding="UTF-8"?> <esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.espertech.com/schema/esper" xsi:noNamespaceSchemaLocation="esper-configuration-4-0.xsd"> <event-type-auto-name package-name="com.jopenbusiness.hadoop.esper.event"/> <event-type name="ServiceEvent" class="com.jopenbusiness.hadoop.esper.event.ServiceEvent"/> </esper-configuration>
- 프로그램에서 설정 적용
Configuration config = null; EPServiceProvider service = null; config = new Configuration(); config.configure(new File("~/config/esper.default.cfg.xml")); service = EPServiceProviderManager.getProvider(ENGINE_URI, config);
참고 문헌
- 오픈 소스로 구현하는 실시간 데이터 처리를 위한 CEP, 2012.H3
- About Esper – 실시간 이벤트 처리를 위한 CEP 대표 오픈소스, 2012.12
- https://github.com/tedwon/cep-esper-quick-start
- Esper CEP Engine, 2009.04