【MapReduce】MR 框架原理 之 Combiner局部汇总

网友投稿 798 2022-10-20

【MapReduce】MR 框架原理 之 Combiner局部汇总

【MapReduce】MR 框架原理 之 Combiner局部汇总

文章目录

​​Combiner​​

​​▶ 定义​​​​☠ Combiner案例​​

​​一、需求分析​​​​二、代码实现​​

​​2.1 wordCountCombiner合并​​​​2.2 Mapper阶段​​​​2.3 Reducer阶段​​​​2.4 Driver阶段​​

Combiner

▶ 定义

Ⅰ. Combiner 是MR程序中的Mapper、Reducer之外的一种组件

Ⅱ. ​​Combiner组件的父类就是Reducer​​

Ⅲ. Combiner 和 Reducer 的区别在于运行的位置:

​​Combiner 是在每一个MapTask所在的节点运行​​​​Reducer 是接收全局所有Mapper的输出结果​​

Ⅳ. ​​Combiner 的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量​​

Ⅴ. Combiner 能够应你用的前提是不能影响最终的业务逻辑(​​一般Combiner适用于汇总类型的业务​​),而且,​​Combiner的输出kv应该跟Reducer的输入kv类型要对应起来​​

​​返回顶部​​

☠ Combiner案例

一、需求分析

统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。

(1)数据输入

(2)期望输出数据

​​返回顶部​​

二、代码实现

首先我们运行最开始的wordCount案例,查看控制台打印的信息,发现Combine部分为空,并没有执行这一部分。 接下来我们编写一个wordCountCombiner类,相当于在Reducer之前,进行一个提前合并。

2.1 wordCountCombiner合并

package 第三章_MR框架原理.Combiner;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class wordCountCombiner extends Reducer { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1.遍历求和 int sum = 0; for (IntWritable value:values){ sum += value.get(); } v.set(sum); // 2.写出 context.write(key,v); }}

​​返回顶部​​

2.2 Mapper阶段

package 第三章_MR框架原理.Combiner;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * Mapper 阶段 * KEYIN 输入数据的key类型 * VALUEIN 输入数据的value类型 * KEYOUT 输出数据的key类型 * VALUEOUT 输出数据的value类型 */public class wordCountMapper extends Mapper { // 创建对象 Text k = new Text(); IntWritable v = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取一行数据 // atguigu atguigu String line = value.toString(); // 2.切分 String[] words = line.split(" "); // 3.循环写出 for (String word:words){ // 设置键 atguigu k.set(word); // 设置词频为 1 , 也可以在上面创建对象时默认为1 v.set(1); // 生成键值对 (atguigu,1) context.write(k,v); } }}

​​返回顶部​​

2.3 Reducer阶段

package 第三章_MR框架原理.Combiner;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * Reducer 阶段 * KEYIN ,VALUEIN Reducer阶段输入(Mapper阶段输出)数据的类型 * KEYOUT 最终输出数据的key类型 * VALUEOUT 最终输出数据的value类型 */public class wordCountReducer extends Reducer { IntWritable v = new IntWritable(); @Override // Iterable values 对key的value值进行迭代实现词频统计 protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // atguigu,1 // atguigu,1 // 1.累加求和 int sum = 0; for (IntWritable value:values){ // value是IntWritable类型数据,通过get转为int型,才好计算 sum += value.get(); } // 2.写出结果 v.set(sum); context.write(key,v); }}

​​返回顶部​​

2.4 Driver阶段

设置Combiner:​​job.setCombinerClass(wordCountCombiner.class);​​

package 第三章_MR框架原理.Combiner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class wordCountDriver { public static void main(String[] args) { Configuration conf = new Configuration(); Job job = null; try { // 1.获取job对象 job = Job.getInstance(conf); // 2.设置jar存储位置 job.setJarByClass(wordCountDriver.class); // 3.关联map、reduce类 job.setMapperClass(wordCountMapper.class); job.setReducerClass(wordCountReducer.class); // 4.设置Mapper阶段输出数据的key、value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.设置Reducer阶段输出数据的key、value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6.设置Combiner job.setCombinerClass(wordCountCombiner.class); // 7.设置输入、出路径 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Combiner\\hello.txt")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Combiner\\output1")); // 打jar包// FileInputFormat.setInputPaths(job,new Path(args[0]));// FileOutputFormat.setOutputPath(job,new Path(args[1])); // 8 .提交job job.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } }}

​​返回顶部​​

通过设置Combine类,在Reducer阶段之前提前进行一次统计,再将结果传输到Reducer,以减少网络传输量。上面在案例分析的时候,还有一个方案二提到,在本案例中由于Combiner和Reducer的作用相同,所以可以在设置Combiner类的时候直接设置成Reducer类,最终达到的效果是一样的。

​​返回顶部​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:美团一面:如何在 100 亿数据中找到中位数?
下一篇:HelloData- 数据框架,基础类集
相关文章

 发表评论

暂时没有评论,来抢沙发吧~