MapReduce를 정리 합니다.


MapReduce Architecture


[700px](파일:Hadoop architecture03.png.md)


MapReduce 설치


  • [CentOS에서 Hadoop 설치](Hadoop.md#CentOS에서 Hadoop 설치.md)


MapReduce 가이드


  • MapReduce
    • Job-Tracker : 작업을 할당 (이중화 문제 있음)
    • Task-Trackers : 실제 작업을 처리
  • Map + Reduce
    • Mapper (map) : 입력을 받아 처리, 처리 결과 정렬
    • Reducer (reduce) : Mapper에서 처리된 결과를 받아 통합
  • Mapper Sample
    //--- 입력 : Object. Long 형태의 key, Text. 파일에서 읽은 하나의 라인
    //---        InputReader 등을 사용하여 입력되는 값(Text)의 양식을 변경할 수 있음
    //--- 출력 : Text, IntWritable -> Reduce에 전달, Text는 키
    public class WordCountMapper extends Mapper {
       private final static IntWritable one = new IntWritable(1);
       public void map(Object key, Text word, Context context) throws IOException, InterruptedException {
           context.write(word, one);
       }
    }

  • Reducer Sample
    //--- 입력 : Text, IntWritable <- Map에서 전달 받음, Text를 키로 하여 값을 취합하여 전달받음
    //--- 출력 : Text, IntWritable
    public class WordCountReducer extends Reducer {
       private final static IntWritable count = new IntWritable();
       public void reduce(Text word, Iterable values, Context context) throws IOException, InterruptedException {
           int sum = 0;
           for (IntWritable val : values) {
               sum += val.get();
           }
           count.set(sum);
           context.write(word, count);
       }
    }

    http://songsungkyun.cafe24.com/images/MapReduce.jpg
    http://4.bp.blogspot.com/_j6mB7TMmJJY/SS0CEJLklnI/AAAAAAAAAGQ/ogPGJ3WYpt4/s400/P4.png

MapReduce 개발 환경 설정


  • MapReduce 실행

    hadoop jar ~.jar

  • Mapper

    • 입력키 (key), 입력값 (value) : 입력 라인
    • 출력키 (outKey), 출력값 (123)
      package com.jopenbusiness.mapreduce.mapper;
      

    import java.io.IOException;

    import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

    public class zzmapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

       context.write(new Text(""), new IntWritable(123));

    } } ```


  • Reducer

    • 입력키 (key), 입력값 (values)
    • 출력키 (outKey), 출력값 (567)
      package com.jopenbusiness.mapreduce.reducer;
      

    import java.io.IOException; import java.util.Iterator;

    import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

    public class zzreducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator values, Context context) throws IOException, InterruptedException {

       context.write(key,  new IntWritable(123));

    } } ```


  • MapReducer 실행

       public Boolean process() {
           Job job = null;
    
           try {
               job = new Job();
               job.setJarByClass(zztemp.class);
    
               FileInputFormat.addInputPath(job, new Path("~"));
               job.setMapperClass(zzmapper.class);    
    
               job.setCombinerClass(zzreducer.class);
    
               FileOutputFormat.setOutputPath(job, new Path("~"));
               job.setReducerClass(zzreducer.class);
               job.setOutputKeyClass(Text.class);
               job.setOutputValueClass(IntWritable.class);
               return job.waitForCompletion(true);
           } catch (ClassNotFoundException e) {
               e.printStackTrace();
           } catch (IOException e) {
               e.printStackTrace();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           return true;
       }


  • Generic type

    • Text, ByteWritable, ArrayWritable
    • IntWritable, LongWritable, FloatWritable, DoubleWritable
    • BooleanWritable
    • MapWritable, SortedMapWritable, TwoDArrayWritable
    • VIntWritable, VLongWritable
    • GenericWritable, CompressedWritable, ObjectWritable, VersionedWritable, NullWritable
  • Hadoop 스트리밍

    hadoop jar /appl/hadoop/contrib/streaming/hadoop-*-streaming.jar
          -input input/~.txt -output output
          -mapper ~1.bash
          -combiner ~2.bash
          -reducer ~3.bash
          -file ~1.bash -file ~2.bash -file ~3.bash

  • 참고 문헌


표준 MapReduce Job


  • StandardMapReduce.java

    package com.jopenbusiness.hadoop.sample2; 
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class StandardMapReduce extends Configured implements Tool {
       public static void main(String[](.md) args) throws Exception {
           System.exit(ToolRunner.run(new StandardMapReduce(), args));
       }
    
       public int run(String[](.md) args) throws Exception {
           Configuration conf = null;
           Job job = null;
           FileSystem fs = null;
    
           //---    입력 파라메터 확인
           if (args.length != 2) {
               System.err.printf("Usage: %s [options](generic)  \n", getClass().getSimpleName());
               ToolRunner.printGenericCommandUsage(System.err);
           }
    
           //---    MapReduce 설정
           conf = getConf();
           job = Job.getInstance(conf, "Max temperature");
           job.setJarByClass(StandardMapReduce.class);
    
           job.setInputFormatClass(TextInputFormat.class);
           job.setMapperClass(StandardMapReduceMapper.class);
    //        job.setCombinerClass(StandardMapReduceReducer.class);
    //        job.setPartitionerClass(StandardMapReducePartitioner.class);
    //        job.setGroupingComparatorClass(StandardMapReduceGroupComparator.class);
    //        job.setSortComparatorClass(StandardMapReduceSortComparator.class);
           job.setReducerClass(StandardMapReduceReducer.class);
           job.setOutputFormatClass(TextOutputFormat.class);
    
           //---    입출력 데이터의 전달 경로 지정
           fs = FileSystem.get(conf);
           fs.delete(new Path(args[1](1.md)), true);
    
             FileInputFormat.addInputPath(job, new Path(args[0](0.md)));
           FileOutputFormat.setOutputPath(job, new Path(args[1](1.md)));
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(IntWritable.class);
    
           //---    Job 실행 /w 모니터링
           return (job.waitForCompletion(true)) ? 0 : 1;
       }    
    
       public class StandardMapReduceMapper extends Mapper {
           public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           }
       }
    
       public class StandardMapReduceReducer extends Reducer {
           public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
           }
       }
    }


참고 문헌


공유하기