MapReduce原理分析之MapTask读取数据

网友投稿 628 2022-11-02

MapReduce原理分析之MapTask读取数据

MapReduce原理分析之MapTask读取数据

通过前面的内容介绍相信大家对于MapReduce的操作有了一定的了解,通过客户端源码的分析也清楚了split是逻辑分区,记录了每个分区对应的是哪个文件,从什么位置开始到什么位置介绍,而且一个split对应一个Map Task任务,而MapTask具体是怎么读取文件的呢?本文来具体分析下。

MapTask读取数据的过程

我们要分析的就是如下的过程:

1.自定义Mapper

在自定义的Mapper中我们只需要重写map方法,那么每读取一行记录就会调用一次该方法,如下

2.查看Mapper源码

public class Mapper { public abstract class Context implements MapContext { } protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }}

通过源码我们能够看到里面的方法如下

3.context.nextKeyValue()

context的类型是Context实现了MapContext

public abstract class Context implements MapContext { }

MapContext的实现类是MapContextImpl

而nextKeyValue()方法的调用

说明调用的是RecordReader中的方法,而具体是RecordReader中的哪个实现类呢?继续往下。

4.FileInputFormat

我们在启动类中设置了输入输出路径。进入FileInputFormat的子类TextFileInputFormat中查看

说明nextKeyValue()其实执行的是RecordReader中的nextKeyValue方法。

读取split文件中每行数据的方法。将每行的偏移量保存在key中,每行的具体数据保存在value中,分别通过getCurrentKey方法和getCurrentValue方法来获取。

public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }

自此我们简单的分析了下map具体是怎么一行一行的读取数据的了。MapReduce的其他阶段我们后续再慢慢分析

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:解决RabbitMq消息队列Qos Prefetch消息堵塞问题
下一篇:Sockeye项目包含基于MXNet的神经机器翻译Sequence-to-sequence框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~