优秀的编程知识分享平台

网站首页 > 技术文章 正文

网易二面:MapReduce的运行完整流程(内含源码分析)

nanyue 2025-03-23 21:55:09 技术文章 15 ℃

MapReduce是面向大数据并行处理的计算模型、框架和平台,提供为数据划分和计算任务调度的功能,系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并负责Map节点执行的同步控制。

一个完整的MapReduce程序在分布式运行时有两步进程:

1)MapTask:负责map阶段整个数据处理流程。

2)ReduceTask:负责reduce阶段整个数据处理流程。

接下来我们重点讲解一下二者的具体流程。

一、MR流程

1. MapTask

(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个Key-Value。

(2)Map阶段:该节点主要是将解析出的Key-Value交给用户编写map()函数处理,并产生一系列新的Key-Value。

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的Key-Value分区(调用Partitioner),并写入一个环形内存缓冲区中。

(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

2. ReduceTask

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按Key进行聚集的一组数据。为了将Key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。

二、源码解析

1. Job提交流程

public class WordCOuntDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//获取任务

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//设置jar路径

job.setJarByClass(WordCOuntDriver.class);

//关联mapper和reducer

job.setMapperClass(WordCountMappper.class);

job.setReducerClass(WordCountReduce.class);

//设置map输出

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//设置最终输出

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//设置输入输出

FileInputFormat.setInputPaths(job, new Path("D:\\input"));

FileOutputFormat.setOutputPath(job, new Path("D:\\output\\o1"));

//提交任务

boolean bool = job.waitForCompletion(true);

System.exit(bool ? 0 : 1);

}

}

if (state == JobState.DEFINE) {

submit();

}

connect(); //建立连接

private synchronized void connect()

throws IOException, InterruptedException, ClassNotFoundException {

if (cluster == null) {

cluster =

ugi.doAs(new PrivilegedExceptionAction() {

public Cluster run()

throws IOException, InterruptedException,

ClassNotFoundException {

return new Cluster(getConfiguration()); //创建提交Job的代理

}

});

}

}

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)

throws IOException {

this.conf = conf;

this.ugi = UserGroupInformation.getCurrentUser();

initialize(jobTrackAddr, conf); //判断是本地运行环境还是yarn集群运行环境

}


submitter.submitJobInternal(Job.this, cluster); //提交job

JobStatus submitJobInternal(Job job, Cluster cluster)

throws ClassNotFoundException, InterruptedException, IOException {

Path jobStagingArea =
JobSubmissionFiles.getStagingDir(cluster, conf); //创建给集群提交数据的Stag路径

JobID jobId = submitClient.getNewJobID(); //获取JobID,并创建Job路径

job.setJobID(jobId);

Path submitJobDir = new Path(jobStagingArea, jobId.toString());

JobStatus status = null;

try {

copyAndConfigureFiles(job, submitJobDir); //拷贝jar包到集群

int maps = writeSplits(job, submitJobDir); //计算切片,生成切片规划文件

writeConf(conf, submitJobFile); //向Stag路径写xml配置文件

status = submitClient.submitJob(

jobId, submitJobDir.toString(), job.getCredentials()); //提交job,返回提交状态

if (status != null) {

return status;

} else {

throw new IOException("Could not launch job");

}

} finally {

if (status == null) {

LOG.info("Cleaning up the staging area " + submitJobDir);

if (jtFs != null && submitJobDir != null)

jtFs.delete(submitJobDir, true);

}

}

}

2. MapTask源码

public class WordCountMappper extends Mapper {

Text k = new Text();

IntWritable v = new IntWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(" ");

for (String word : words) {

k.set(word);

context.write(k, v); //自定义Map方法的写出

}

}

}

public void write(K key, V value) throws IOException, InterruptedException {

collector.collect(key, value,

partitioner.getPartition(key, value, partitions)); //收集方法,执行两次,以及默认分区器HashPartitioner

}

public synchronized void collect(K key, V value, final int partition

) throws IOException {

} //Map端所有的kv全部写出后会走下面的close方法

public void close(TaskAttemptContext context

) throws IOException,InterruptedException {

try {

collector.flush(); //溢出刷写方法

} catch (ClassNotFoundException cnf) {

throw new IOException("can't find class ", cnf);

}

collector.close();

}

public void flush() throws IOException, ClassNotFoundException,

InterruptedException {

spillLock.lock();

sortAndSpill(); //溢写排序

mergeParts(); //合并文件

}

private void sortAndSpill() throws IOException, ClassNotFoundException,

InterruptedException {

sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢写排序方法

}

3. ReduceTask源码

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, InterruptedException, ClassNotFoundException {

if (isMapOrReduce()) {

copyPhase = getProgress().addPhase("copy");

sortPhase = getProgress().addPhase("sort");

reducePhase = getProgress().addPhase("reduce");

} //判断任务

initialize(job, getJobID(), reporter, useNewApi); //任务初始化


shuffleConsumerPlugin.init(shuffleContext); //shuffle初始化

rIter = shuffleConsumerPlugin.run();

sortPhase.complete(); //排序完成,即将进入reduce阶段,自定义的reduce会执行多次

}

totalMaps = job.getNumMapTasks(); //获取MapTask的个数

merger = createMergeManager(context); //合并方法

this.inMemoryMerger = createInMemoryMerger(); //内存合并

this.inMemoryMerger.start(); //磁盘合并

public RawKeyValueIterator run() throws IOException, InterruptedException {

eventFetcher.start(); //开始抓取数据

eventFetcher.shutDown(); //抓取结束

copyPhase.complete(); //copy阶段完成

taskStatus.setPhase(TaskStatus.Phase.SORT); //开始排序阶段

return kvIter;

}

protected void cleanup(Context context

) throws IOException, InterruptedException {

} //reduce完成之前,会最后调用一次Reducer里的cleanup方法

三、总结

本片文章讲述了MapReduce的详细流程和源码执行过程,有几处重点如下:

  • 任务提交过程中的数据切片,决定MapTask任务的个数
  • 溢写过程的排序与合并
  • Reduce的归并排序

对于这些内容,我们需要重点关注,在面试中遇到这些问题我们就可以对答如流了。

最近发表
标签列表