如何利用小游戏开发框架提升企业小程序的用户体验与运营效率
628
2022-11-02
MapReduce原理分析之MapTask读取数据
通过前面的内容介绍相信大家对于MapReduce的操作有了一定的了解,通过客户端源码的分析也清楚了split是逻辑分区,记录了每个分区对应的是哪个文件,从什么位置开始到什么位置介绍,而且一个split对应一个Map Task任务,而MapTask具体是怎么读取文件的呢?本文来具体分析下。
MapTask读取数据的过程
我们要分析的就是如下的过程:
1.自定义Mapper
在自定义的Mapper中我们只需要重写map方法,那么每读取一行记录就会调用一次该方法,如下
2.查看Mapper源码
public class Mapper
通过源码我们能够看到里面的方法如下
3.context.nextKeyValue()
context的类型是Context实现了MapContext
public abstract class Context implements MapContext
MapContext的实现类是MapContextImpl
而nextKeyValue()方法的调用
说明调用的是RecordReader中的方法,而具体是RecordReader中的哪个实现类呢?继续往下。
4.FileInputFormat
我们在启动类中设置了输入输出路径。进入FileInputFormat的子类TextFileInputFormat中查看
说明nextKeyValue()其实执行的是RecordReader中的nextKeyValue方法。
读取split文件中每行数据的方法。将每行的偏移量保存在key中,每行的具体数据保存在value中,分别通过getCurrentKey方法和getCurrentValue方法来获取。
public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }
自此我们简单的分析了下map具体是怎么一行一行的读取数据的了。MapReduce的其他阶段我们后续再慢慢分析
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~