CEP (Complex Event Processing) 솔루션인 Esper를 정리 합니다.


Esper 개요


  • EPL(Event Processing Language) 제공
  • Dual CPU * 2 GHz : 초당 50만건 이상의 처리 성능과 평균 3 microseconds 이하로 처리
  • CEP 모니터링 : JMX(Java Management Extensions)를 사용하여 JBoss RHQ에서 모니터링
    http://hochul.net/blog/wp-content/uploads/2012/12/cep_architecture.png
  • select * from pattern expression
    • expression :: A -> (B and C) : B와 C가 발생한 이후에 A가 발생


CentOS에서 Esper 설치


빅데이터에서 Hadoop은 일반적으로 Batch 처리의 성격을 갖습니다. BigData에 대한 실시간 분석을 위해서 CEP (Complex Event Processing) 솔루션인 Esper를 설치하고 개발 환경을 구성해 봅시다.

  • 사전 준비 사항

    • [CentOS용 설치 가이드](JDK.md#CentOS용 설치 가이드.md)
  • Esper 설치 및 환경 설정

    wget http://dist.codehaus.org/esper/esper-4.10.0.tar.gz
    tar zxvf esper-4.10.0.tar.gz
    chown -R root:root esper-4.10.0
    mv esper-4.10.0 /appl/esper
    
    vi  ~/.bashrc
       ### ----------------------------------------------------------------------------
       ###     Esper 환경 설정
       ### ----------------------------------------------------------------------------
       export ESPER_HOME=/appl/esper


Esper Input Adapter


  • File and CSV Input Adapter
  • JMS Input Adapter
  • AMQP Input Adapter
  • HTTP Input Adapter
    • esperio-http-4.10.0.jar
    • esperio-http/lib/httpcore-4.0.1.jar, esperio-http/lib/httpcore-nio-4.0.1.jar
  • Socket Adapter
  • Relational Database Adapter


Esper Output Adapter


  • File and CSV Output Adapter
  • JMS Output Adapter
  • AMQP Output Adapter
  • HTTP Output Adapter
    • esperio-http-4.10.0.jar
    • esperio-http/lib/httpclient-4.0.1.jar


Eclipse 개발 환경


  • Esper 개발 환경 구축
    • [CentOS에서 Apache Ant 설치](Apache Ant.md#CentOS에서 Apache Ant 설치.md)
  • Java Build Path에서 라이브러리로 다음을 추가 합니다.
    • lib/esper/esper/lib/antlr-runtime-3.2.jar
    • lib/esper/esper/lib/cglib-nodep-2.2.jar
    • lib/esper/esper/esper-4.10.0.jar
    • lib/esper/lib/commons-logging-1.0.4.jar
    • lib/esper/lib/log4j-1.2.16.jar


Esper Sample 프로그램


Esper 샘플 프로그램을 사용하여 Esper의 동작 원리를 간단하게 나마 이해해 봅시다.
[500px](파일:Esper sample.png.md)

  • SampleEvent.java

    • Event를 저장하는 POJO class를 생성 합니다.
      package com.jopenbusiness.hadoop.esper.event;
      

    public class SampleEvent { private String item = null; private Double price = null;

    public SampleEvent(String item, Double price) {

       super();
       this.item = item; 
       this.price = price;

    }

    public String getItem() {

       return item;

    }

    public void setItem(String item) {

       this.item = item;

    }

    public Double getPrice() {

       return price;

    }

    public void setPrice(Double price) {

       this.price = price;

    } } ```


  • SampleListener.java

    • EPL (Event Processing Engine)에서 정의된 조건을 만족할 때, 처리하는 프로그램
      package com.jopenbusiness.hadoop.esper.listener;
      

    import com.espertech.esper.client.EventBean; import com.espertech.esper.client.UpdateListener;

    public class SampleListener implements UpdateListener { public void update(EventBean[] eventNew, EventBean eventOld) {

       if (eventNew != null) {
           UtilLogger.info.println(logCaller, "--- Event New : " + eventNew.length);
           for (Integer idx = 0; idx < eventNew.length; idx++) {
               displayEvent(idx, eventNew[idx](idx.md));
           }
       }
    
       if (eventOld != null) {
           UtilLogger.info.println(logCaller, "--- Event Old : " + eventOld.length); 
           for (Integer idx = 0; idx < eventOld.length; idx++)  {
               displayEvent(idx, eventOld[idx](idx.md));
           }
       }
       UtilLogger.info.println(logCaller, "");

    } } ```


  • SampleEngine.java

    package com.jopenbusiness.hadoop.esper.cli;
    
    import com.espertech.esper.client.Configuration;
    import com.espertech.esper.client.EPRuntime;
    import com.espertech.esper.client.EPServiceProvider;
    import com.espertech.esper.client.EPServiceProviderManager;
    import com.espertech.esper.client.EPStatement;
    import com.jopenbusiness.hadoop.esper.event.SampleEvent;
    import com.jopenbusiness.hadoop.esper.listener.SampleListener;
    
    public class SampleEngine {
       public static void main(String[](.md) args) {
           Configuration config = null;
           EPServiceProvider service = null;
           EPStatement stat = null;
           String epl = null;
           SampleListener listener = null;
           EPRuntime runtime = null;
    
           //---    처리할 Event를 등록하여 Esper용 서비스를 생성 합니다.
           config = new Configuration();
           config.addEventType("SampleEvent", SampleEvent.class.getName());
           service = EPServiceProviderManager.getDefaultProvider(config);
    
           //---    EPL(Event Processing Language)을 사용하여 Statement를 생성 합니다.
           //---    지난 3초 동안 발생한 이벤트로 전체 갯수와 가격 평균을 구합니다. 
           epl = "select item, count(*), avg(price) from SampleEvent.win:time(3 sec)";
           stat = service.getEPAdministrator().createEPL(epl);
    
           listener = new SampleListener();
           stat.addListener(listener);
    
           //---    Event를 발생시켜 봅니다.
           runtime = service.getEPRuntime();
           for (int i = 0;i < 20; i++) {
               runtime.sendEvent(new SampleEvent("aaa_" + i, 10.0 * i));
               try {
                   Thread.sleep(300);
               } catch (InterruptedException ex) {
                   ex.printStackTrace();
               }
           }
       }
    }

  • SampleEngine.java 프로그램 실행 결과

    • 3초간 발생한 이벤트의 갯수와 평균을 출력 합니다.
       [java](java.md) aaa_0 : 갯수 = 1, 평균 = 0.0
       [java](java.md) aaa_1 : 갯수 = 2, 평균 = 5.0
       [java](java.md) aaa_2 : 갯수 = 3, 평균 = 10.0
       [java](java.md) aaa_3 : 갯수 = 4, 평균 = 15.0
       [java](java.md) aaa_4 : 갯수 = 5, 평균 = 20.0
       [java](java.md) aaa_5 : 갯수 = 6, 평균 = 25.0
       [java](java.md) aaa_6 : 갯수 = 7, 평균 = 30.0
       [java](java.md) aaa_7 : 갯수 = 8, 평균 = 35.0
       [java](java.md) aaa_8 : 갯수 = 9, 평균 = 40.0
       [java](java.md) aaa_9 : 갯수 = 10, 평균 = 45.0
       [java](java.md) aaa_10 : 갯수 = 11, 평균 = 50.0
       [java](java.md) aaa_11 : 갯수 = 10, 평균 = 65.0
       [java](java.md) aaa_12 : 갯수 = 10, 평균 = 75.0
       [java](java.md) aaa_13 : 갯수 = 10, 평균 = 85.0
       [java](java.md) aaa_14 : 갯수 = 10, 평균 = 95.0
       [java](java.md) aaa_15 : 갯수 = 10, 평균 = 105.0
       [java](java.md) aaa_16 : 갯수 = 10, 평균 = 115.0
       [java](java.md) aaa_17 : 갯수 = 10, 평균 = 125.0
       [java](java.md) aaa_18 : 갯수 = 10, 평균 = 135.0
       [java](java.md) aaa_19 : 갯수 = 10, 평균 = 145.0
      Total time: 8 seconds


프로그램 개발 Tip



설정 파일

  • 설정 파일

    • vi config/esper.default.cfg.xml
      
       
       
       
      

  • 프로그램에서 설정 적용

    Configuration config = null;
    EPServiceProvider service = null;
    
    config = new Configuration();
    config.configure(new File("~/config/esper.default.cfg.xml"));
    service = EPServiceProviderManager.getProvider(ENGINE_URI, config);


참고 문헌


공유하기