MapReduce를 정리 합니다.
[700px](파일:Hadoop architecture03.png.md)
//--- 입력 : Object. Long 형태의 key, Text. 파일에서 읽은 하나의 라인
//--- InputReader 등을 사용하여 입력되는 값(Text)의 양식을 변경할 수 있음
//--- 출력 : Text, IntWritable -> Reduce에 전달, Text는 키
public class WordCountMapper extends Mapper
//--- 입력 : Text, IntWritable <- Map에서 전달 받음, Text를 키로 하여 값을 취합하여 전달받음
//--- 출력 : Text, IntWritable
public class WordCountReducer extends Reducer {
private final static IntWritable count = new IntWritable();
public void reduce(Text word, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
count.set(sum);
context.write(word, count);
}
}
MapReduce 실행
hadoop jar ~.jar
Mapper
package com.jopenbusiness.mapreduce.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class zzmapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(new Text(""), new IntWritable(123));
} } ```
Reducer
package com.jopenbusiness.mapreduce.reducer;
import java.io.IOException; import java.util.Iterator;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class zzreducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator
context.write(key, new IntWritable(123));
} } ```
MapReducer 실행
public Boolean process() {
Job job = null;
try {
job = new Job();
job.setJarByClass(zztemp.class);
FileInputFormat.addInputPath(job, new Path("~"));
job.setMapperClass(zzmapper.class);
job.setCombinerClass(zzreducer.class);
FileOutputFormat.setOutputPath(job, new Path("~"));
job.setReducerClass(zzreducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
Generic type
Hadoop 스트리밍
hadoop jar /appl/hadoop/contrib/streaming/hadoop-*-streaming.jar
-input input/~.txt -output output
-mapper ~1.bash
-combiner ~2.bash
-reducer ~3.bash
-file ~1.bash -file ~2.bash -file ~3.bash
참고 문헌
StandardMapReduce.java
package com.jopenbusiness.hadoop.sample2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class StandardMapReduce extends Configured implements Tool {
public static void main(String[](.md) args) throws Exception {
System.exit(ToolRunner.run(new StandardMapReduce(), args));
}
public int run(String[](.md) args) throws Exception {
Configuration conf = null;
Job job = null;
FileSystem fs = null;
//--- 입력 파라메터 확인
if (args.length != 2) {
System.err.printf("Usage: %s [options](generic)