MapReduce

오픈소스 비즈니스 컨설팅
둘러보기로 가기 검색하러 가기

MapReduce를 정리 합니다.

MapReduce Architecture

Hadoop architecture03.png

MapReduce 설치

MapReduce 가이드

  • MapReduce
  • Job-Tracker : 작업을 할당 (이중화 문제 있음)
  • Task-Trackers : 실제 작업을 처리
  • Map + Reduce
  • Mapper (map) : 입력을 받아 처리, 처리 결과 정렬
  • Reducer (reduce) : Mapper에서 처리된 결과를 받아 통합
  • Mapper Sample
//--- 입력 : Object. Long 형태의 key, Text. 파일에서 읽은 하나의 라인
//---        InputReader 등을 사용하여 입력되는 값(Text)의 양식을 변경할 수 있음
//--- 출력 : Text, IntWritable -> Reduce에 전달, Text는 키
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    public void map(Object key, Text word, Context context) throws IOException, InterruptedException {
        context.write(word, one);
    }
}
  • Reducer Sample
//--- 입력 : Text, IntWritable <- Map에서 전달 받음, Text를 키로 하여 값을 취합하여 전달받음
//--- 출력 : Text, IntWritable
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private final static IntWritable count = new IntWritable();
    public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        count.set(sum);
        context.write(word, count);
    }
}

MapReduce.jpg
P4.png

MapReduce 개발 환경 설정

  • MapReduce 실행
hadoop jar ~.jar
  • Mapper
  • 입력키 (key), 입력값 (value) : 입력 라인
  • 출력키 (outKey), 출력값 (123)
package com.jopenbusiness.mapreduce.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class zzmapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		context.write(new Text(""), new IntWritable(123));
	}
}
  • Reducer
  • 입력키 (key), 입력값 (values)
  • 출력키 (outKey), 출력값 (567)
package com.jopenbusiness.mapreduce.reducer;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class zzreducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {
		context.write(key,  new IntWritable(123));
	}
} 
  • MapReducer 실행
	public Boolean process() {
		Job job = null;
		
		try {
			job = new Job();
			job.setJarByClass(zztemp.class);
			
			FileInputFormat.addInputPath(job, new Path("~"));
			job.setMapperClass(zzmapper.class);	
			
			job.setCombinerClass(zzreducer.class);

			FileOutputFormat.setOutputPath(job, new Path("~"));
			job.setReducerClass(zzreducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			return job.waitForCompletion(true);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return true;
	}


  • Generic type
  • Text, ByteWritable, ArrayWritable
  • IntWritable, LongWritable, FloatWritable, DoubleWritable
  • BooleanWritable
  • MapWritable, SortedMapWritable, TwoDArrayWritable
  • VIntWritable, VLongWritable
  • GenericWritable, CompressedWritable, ObjectWritable, VersionedWritable, NullWritable
  • Hadoop 스트리밍
hadoop jar /appl/hadoop/contrib/streaming/hadoop-*-streaming.jar
       -input input/~.txt -output output
       -mapper ~1.bash
       -combiner ~2.bash
       -reducer ~3.bash
       -file ~1.bash -file ~2.bash -file ~3.bash
  • 참고 문헌

표준 MapReduce Job

  • StandardMapReduce.java
package com.jopenbusiness.hadoop.sample2; 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StandardMapReduce extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new StandardMapReduce(), args));
	}

	public int run(String[] args) throws Exception {
		Configuration conf = null;
		Job job = null;
		FileSystem fs = null;
		
		//---	입력 파라메터 확인
		if (args.length != 2) {
			System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
			ToolRunner.printGenericCommandUsage(System.err);
		}
		
		//---	MapReduce 설정
		conf = getConf();
		job = Job.getInstance(conf, "Max temperature");
		job.setJarByClass(StandardMapReduce.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setMapperClass(StandardMapReduceMapper.class);
//		job.setCombinerClass(StandardMapReduceReducer.class);
//		job.setPartitionerClass(StandardMapReducePartitioner.class);
//		job.setGroupingComparatorClass(StandardMapReduceGroupComparator.class);
//		job.setSortComparatorClass(StandardMapReduceSortComparator.class);
		job.setReducerClass(StandardMapReduceReducer.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//---	입출력 데이터의 전달 경로 지정
		fs = FileSystem.get(conf);
		fs.delete(new Path(args[1]), true);
		
  		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//---	Job 실행 /w 모니터링
		return (job.waitForCompletion(true)) ? 0 : 1;
	}	
	
	public class StandardMapReduceMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		}
	}

	public class StandardMapReduceReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		}
	}
}

참고 문헌