"MapReduce"의 두 판 사이의 차이
둘러보기로 가기
검색하러 가기
잔글 (→MapReduce 개요) |
(차이 없음)
|
2013년 12월 1일 (일) 22:07 기준 최신판
MapReduce를 정리 합니다.
목차
MapReduce Architecture
MapReduce 설치
MapReduce 가이드
- MapReduce
- Job-Tracker : 작업을 할당 (이중화 문제 있음)
- Task-Trackers : 실제 작업을 처리
- Map + Reduce
- Mapper (map) : 입력을 받아 처리, 처리 결과 정렬
- Reducer (reduce) : Mapper에서 처리된 결과를 받아 통합
- Mapper Sample
//--- 입력 : Object. Long 형태의 key, Text. 파일에서 읽은 하나의 라인 //--- InputReader 등을 사용하여 입력되는 값(Text)의 양식을 변경할 수 있음 //--- 출력 : Text, IntWritable -> Reduce에 전달, Text는 키 public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(Object key, Text word, Context context) throws IOException, InterruptedException { context.write(word, one); } }
- Reducer Sample
//--- 입력 : Text, IntWritable <- Map에서 전달 받음, Text를 키로 하여 값을 취합하여 전달받음 //--- 출력 : Text, IntWritable public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private final static IntWritable count = new IntWritable(); public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } count.set(sum); context.write(word, count); } }
MapReduce 개발 환경 설정
- MapReduce 실행
hadoop jar ~.jar
- Mapper
- 입력키 (key), 입력값 (value) : 입력 라인
- 출력키 (outKey), 출력값 (123)
package com.jopenbusiness.mapreduce.mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class zzmapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text(""), new IntWritable(123)); } }
- Reducer
- 입력키 (key), 입력값 (values)
- 출력키 (outKey), 출력값 (567)
package com.jopenbusiness.mapreduce.reducer; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class zzreducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key, new IntWritable(123)); } }
- MapReducer 실행
public Boolean process() { Job job = null; try { job = new Job(); job.setJarByClass(zztemp.class); FileInputFormat.addInputPath(job, new Path("~")); job.setMapperClass(zzmapper.class); job.setCombinerClass(zzreducer.class); FileOutputFormat.setOutputPath(job, new Path("~")); job.setReducerClass(zzreducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return true; }
- Generic type
- Text, ByteWritable, ArrayWritable
- IntWritable, LongWritable, FloatWritable, DoubleWritable
- BooleanWritable
- MapWritable, SortedMapWritable, TwoDArrayWritable
- VIntWritable, VLongWritable
- GenericWritable, CompressedWritable, ObjectWritable, VersionedWritable, NullWritable
- Hadoop 스트리밍
hadoop jar /appl/hadoop/contrib/streaming/hadoop-*-streaming.jar -input input/~.txt -output output -mapper ~1.bash -combiner ~2.bash -reducer ~3.bash -file ~1.bash -file ~2.bash -file ~3.bash
- 참고 문헌
표준 MapReduce Job
- StandardMapReduce.java
package com.jopenbusiness.hadoop.sample2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class StandardMapReduce extends Configured implements Tool { public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new StandardMapReduce(), args)); } public int run(String[] args) throws Exception { Configuration conf = null; Job job = null; FileSystem fs = null; //--- 입력 파라메터 확인 if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); } //--- MapReduce 설정 conf = getConf(); job = Job.getInstance(conf, "Max temperature"); job.setJarByClass(StandardMapReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(StandardMapReduceMapper.class); // job.setCombinerClass(StandardMapReduceReducer.class); // job.setPartitionerClass(StandardMapReducePartitioner.class); // job.setGroupingComparatorClass(StandardMapReduceGroupComparator.class); // job.setSortComparatorClass(StandardMapReduceSortComparator.class); job.setReducerClass(StandardMapReduceReducer.class); job.setOutputFormatClass(TextOutputFormat.class); //--- 입출력 데이터의 전달 경로 지정 fs = FileSystem.get(conf); fs.delete(new Path(args[1]), true); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //--- Job 실행 /w 모니터링 return (job.waitForCompletion(true)) ? 0 : 1; } public class StandardMapReduceMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { } } public class StandardMapReduceReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { } } }