Pig

오픈소스 비즈니스 컨설팅
둘러보기로 가기 검색하러 가기

HadoopMapReduce를 사용하는 절차적 언어인 pig를 정리 합니다.

pig 개요

  • Pig Latin : Pig에서 정의하여 사용하는 언어
  • UDF : 로딩, 저장, 필터링, 그룹핑, 순서 정렬, 조인

CentOS에서 Pig 설치

사전 준비 사항

  • Hadoop 1.1.2
  • Java 1.7.0_19
  • CentOS 6.4, 64 bits

설치

  • 설치 파일을 다운로드하여 /appl/pig 폴더에 압축을 풉니다.
wget http://apache.tt.co.kr/pig/pig-0.11.1/pig-0.11.1.tar.gz
tar zxvf pig-0.11.0.tar.gz
  • vi .bashrc
export PIG_HOME=/appl/pig
export PATH=$PATH:$PIG_HOME/bin
#export PIG_HADOOP_VERSION=18        //--- Pig에서 사용할 Hadoop 버전 지정
#export PIG_CLASSPATH=/appl/hadoop/conf/ 
  • Pig 관련 파일
/appl/pig/conf/pig.properties
/appl/pig/bin/pig_1369150716069.log
  • vi /app/pig/bin/pig
export HADOOP_CONF_DIR=/appl/hadoop/conf
export PIG_HOME=/appl/pig
export PIG_CONF_DIR=$PIG_HOME/conf
  • vi pig.properties
#fs.default.name=hdfs://localhost/
#mapred.job.tracker=localhost:8021
  • 서비스 확인
startall.sh                 //--- Hadoop이 먼저 실행이 되어 있어야 합니다.
pig -help                   //--- 도움말
pig
    ctrl_d

Pig Latin

Data type

  • int, long, float, double
  • chararray
  • bytearray
  • turple : (~, ~)
  • bap : {(~), (~)}
  • map : [~#~]

Pig Latin 문법

  • Sample
data01 = LOAD 'hdfs://~/txt'
           AS (~:chararray, ~:int);                    //--- 데이터는 탭으로 구분
DESCRIBE data01;
DUMP data01;

data02 = FILTER data01 BY ~ != 9999 AND (~ == 0 OR ~ == 1);
store data02 into '~/b.txt';

data03 = GROUP data02 BY year;
item01 = FOREACH data03 GENERATE group, MAX(data02.~);     //--- group은 year를 지칭 합니다.
 
DUMP item01;
illustrate item01;
  • MapReduce 갯수 지정 : parallel
  • Parameter 전달 : $~
pig  -param ~=~  ~.pig
  • 로드 및 저장
load 'hdfs://~' as ($var:$type, $var:$type);
store
dump
  • 필터링
filter ~ by ~ parallel 30;           //--- 30개의 MapReduce로 작업 실행
distinct
foreach ~ generate ~, ~;
    MAX(~), COUNT(~)
foreach ~ {
    ~;
    generate ~;
}
stream ~ through 'linux 명령어';
  • 분류 및 결합
join ~ by $0, ~ by $11;
join ~ by ~ left outer, ~ by ~;
cogroup ~ by $0, ~ by $11;        //--- key의 값에 따른 중첩된 tuple 집합을 생성
group ~ by ~;
cross ~, ~;
  • 정렬
order ~ by $0, $1 desc;
limit
  • 병합 및 분할
union ~, ~;                       //--- 두개의 테이블을 하나의 테이블로 결합
split
  • 설명
describe ~;
explain ~;
illustrate ~;
  • UDF 정의
register
define
  • 하둡 파일 시스템
cat, cd, copyFromLocal, copyToLocal, cp, fs, ls, mkdir, mv, pwd, rm, rmf
  • 하둡 맵리듀스
kill, exec, help, run, quit, set

UDF

  • 필터 UDF
package com.jopenbusiness.hadoop.pig;
public class ~ extends FilterFunc {
    public Boolean exec(Tuple tuple) throws IOException {
        try {
            //--- tuple.size()
            //--- Object obj = tuple.get(0);
        } catch(ExecException e) {
            throw new IOExeption(e);
        }
    }
}
  • register ~.jar;
  • com.jopenbusiness.hadoop.pig.함수명(~) 을 호출하여 사용
  • 새로운 ~ 함수명으로 호출하여 사용하기 위해서는 pig에서 다음을 실행 합니다.
define ~ com.jopenbusiness.hadoop.pig.함수명();
~ = filter ~ by 함수명(~);
  • 평가 UDF
public class ~ extends EvalFunc<String> {
    public String exec(Tuple input) throws IOException {
        try {
        } catch (ExecException e) {
            throw new IOException(e);
        }
    }

    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
        List<FuncSpec> funcList = new ArrayList<FuncSpec>();

        funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schea.FieldSchema(null, DataType.CHARARRAY))));
        return funcList;
    }
}
  • pig에서 사용
~ = foreach ~ generate 함수명(~);
  • 로드 UDF
public class ~ extends LoadFunc {
    public ~(String initValue) {
    }

    public InputFormat getInputFormat() {
        return new TextInputFormat();
    }

    public void prepareToRead(RecordReader reader, PigSplit split) {
         this.reader = reader;
    }

    public Tuple getNext() throws IOException {
        try {
        } catch (InterruptedException e) {
            throw new ExecException(e);
        }
    }
}
  • pig에서 사용
~ = load '~'
    using 함수명(~)
    as (~);

참고 문헌

  • Pig 함수