MapReduce定义
MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:
1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
2)MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
3)MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理 [1] 。
MapReduce 既是一个编程模型,又是一个计算框架。其编程模型只包含 Map 和 Reduce 两个过程,map 的主要输入是一对 <Key, Value> 值,经过 map 计算后输出一对 <Key, Value> 值;然后将相同 Key 合并,形成 <Key, Value 集合 >;再将这个 <Key, Value 集合 > 输入 reduce,经过计算输出零个或多个 <Key, Value> 对。
同时,MapReduce 又是非常强大的,不管是关系代数运算(SQL 计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过 MapReduce 编程来实现。
MapReduce实例
以WordCount 为示例,来解读MapReduce 编程模型
简单说来,就是建一个 Hash 表,然后将字符串里的每个词放到这个 Hash 表里。如果这个词第一次放到 Hash 表,就新建一个 Key、Value 对,Key 是这个词,Value 是 1。如果 Hash 表里已经有这个词了,那么就给这个词的 Value + 1。
小数据量用单机统计词频很简单,但是如果想统计全世界互联网所有网页(数万亿计)的词频数(而这正是 Google 这样的搜索引擎的典型需求),不可能写一个程序把全世界的网页都读入内存,这时候就需要用 MapReduce 编程来解决。
WordCount 的 MapReduce 程序如下。
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}
MapReduce的核心是Map函数和一个Reduce函数。
map 函数的输入主要是一个 <Key, Value> 对,在这个例子里,Value 是要统计的所有文本中的一行数据,Key 在一般计算中都不会用到。
public void reduce(Text key, Iterable<IntWritable> values,Context context)
这里 reduce 的输入参数 Values 就是由很多个 1 组成的集合,而 Key 就是具体的单词 word。
reduce 函数的计算过程是,将这个集合里的 1 求和,再将单词(word)和这个和(sum)组成一个 <Key, Value>,也就是 <word, sum> 输出。每一个输出就是一个单词和它的词频统计总和。
一个 map 函数可以针对一部分数据进行运算,这样就可以将一个大数据切分成很多块(这也正是 HDFS 所做的),MapReduce 计算框架为每个数据块分配一个 map 函数去计算,从而实现大数据的分布式计算。
假设有两个数据块的文本数据需要进行词频统计,MapReduce 计算过程如下图所示。
以上就是 MapReduce 编程模型的主要计算过程和原理,但是这样一个 MapReduce 程序要想在分布式环境中执行,并处理海量的大规模数据,还需要一个计算框架,能够调度执行这个 MapReduce 程序,使它在分布式的集群中并行运行,而这个计算框架也叫 MapReduce。
MapReduce实践
pageId | age |
---|---|
1 | 25 |
2 | 25 |
1 | 32 |
2 | 25 |
如果存储在 HDFS 中,每一行记录在 HDFS 对应一行文本,文本格式是
1,25
2,25
1,32
2,25
根据上面 WordCount 的示例,请你写一个 MapReduce 程序,得到下面这条 SQL 的计算结果。
SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;
Map
public class PageMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String data = value.toString();
String[] words = data.split("\n");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
Reduce
public class PageReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int total=0;
for (IntWritable value : values) {
total=total+value.get();
}
context.write(key, new IntWritable(total));
}
}
Main
public class PageMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJarByClass(PageMain.class);
job.setMapperClass(PageMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(PageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}