MapReduce是面向大数据并行处理的计算模型、框架和平台,提供为数据划分和计算任务调度的功能,系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并负责Map节点执行的同步控制。
一个完整的MapReduce程序在分布式运行时有两步进程:
1)MapTask:负责map阶段整个数据处理流程。
2)ReduceTask:负责reduce阶段整个数据处理流程。
接下来我们重点讲解一下二者的具体流程。
一、MR流程
1. MapTask
(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个Key-Value。
(2)Map阶段:该节点主要是将解析出的Key-Value交给用户编写map()函数处理,并产生一系列新的Key-Value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的Key-Value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
2. ReduceTask
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按Key进行聚集的一组数据。为了将Key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。
二、源码解析
1. Job提交流程
public class WordCOuntDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置jar路径
job.setJarByClass(WordCOuntDriver.class);
//关联mapper和reducer
job.setMapperClass(WordCountMappper.class);
job.setReducerClass(WordCountReduce.class);
//设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output\\o1"));
//提交任务
boolean bool = job.waitForCompletion(true);
System.exit(bool ? 0 : 1);
}
}
if (state == JobState.DEFINE) {
submit();
}
connect(); //建立连接
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration()); //创建提交Job的代理
}
});
}
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf); //判断是本地运行环境还是yarn集群运行环境
}
submitter.submitJobInternal(Job.this, cluster); //提交job
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
Path jobStagingArea =
JobSubmissionFiles.getStagingDir(cluster, conf); //创建给集群提交数据的Stag路径
JobID jobId = submitClient.getNewJobID(); //获取JobID,并创建Job路径
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
copyAndConfigureFiles(job, submitJobDir); //拷贝jar包到集群
int maps = writeSplits(job, submitJobDir); //计算切片,生成切片规划文件
writeConf(conf, submitJobFile); //向Stag路径写xml配置文件
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); //提交job,返回提交状态
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
2. MapTask源码
public class WordCountMappper extends Mapper
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
k.set(word);
context.write(k, v); //自定义Map方法的写出
}
}
}
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions)); //收集方法,执行两次,以及默认分区器HashPartitioner
}
public synchronized void collect(K key, V value, final int partition
) throws IOException {
} //Map端所有的kv全部写出后会走下面的close方法
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush(); //溢出刷写方法
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
public void flush() throws IOException, ClassNotFoundException,
InterruptedException {
spillLock.lock();
sortAndSpill(); //溢写排序
mergeParts(); //合并文件
}
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢写排序方法
}
3. ReduceTask源码
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
} //判断任务
initialize(job, getJobID(), reporter, useNewApi); //任务初始化
shuffleConsumerPlugin.init(shuffleContext); //shuffle初始化
rIter = shuffleConsumerPlugin.run();
sortPhase.complete(); //排序完成,即将进入reduce阶段,自定义的reduce会执行多次
}
totalMaps = job.getNumMapTasks(); //获取MapTask的个数
merger = createMergeManager(context); //合并方法
this.inMemoryMerger = createInMemoryMerger(); //内存合并
this.inMemoryMerger.start(); //磁盘合并
public RawKeyValueIterator run() throws IOException, InterruptedException {
eventFetcher.start(); //开始抓取数据
eventFetcher.shutDown(); //抓取结束
copyPhase.complete(); //copy阶段完成
taskStatus.setPhase(TaskStatus.Phase.SORT); //开始排序阶段
return kvIter;
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
} //reduce完成之前,会最后调用一次Reducer里的cleanup方法
三、总结
本片文章讲述了MapReduce的详细流程和源码执行过程,有几处重点如下:
- 任务提交过程中的数据切片,决定MapTask任务的个数
- 溢写过程的排序与合并
- Reduce的归并排序
对于这些内容,我们需要重点关注,在面试中遇到这些问题我们就可以对答如流了。