MapReduce之输出结果排序

网友投稿 640 2022-11-02

MapReduce之输出结果排序

MapReduce之输出结果排序

前面的案例中我们介绍了统计出每个用户的上行流量,下行流量及总流量,现在我们想要将输出的结果按照总流量倒序排序。

实现思路

MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前会排序),排序的依据是map输出的key。所以我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法来指定比较规则

实现步骤

1.自定义Bean

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;/** * 存储流量相关数据 * @author 波波烤鸭 * */public class Flow implements WritableComparable { // 上下流量 private long upFlow; // 下行流量 private long downFlow; // 总流量 private long sumFlow; /** * 比较Flow对象的总流量 */ @Override public int compareTo(Flow o) { // TODO Auto-generated method stub return -(int)(this.sumFlow - o.getSumFlow()); } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public Flow(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } /** * 无参构造方法必须要有 反射的时候需要用到 */ public Flow() { super(); } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } /** * 反序列化 反序列化的顺序和序列化的顺序一致 */ @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }}

2.Map阶段

public class FlowCountMap extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将一行数据转换为String String line = value.toString(); // 切分字段 String[] fields = line.split("\t"); // 取出手机号 String phoneNum = fields[0]; // 取出上行流量下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); Flow flow = new Flow(upFlow, downFlow); context.write(flow, new Text(phoneNum)); }}

3.Reduce阶段

public class FlowCountReducer extends Reducer{ @Override protected void reduce(Flow flow, Iterable values, Context context) throws IOException, InterruptedException { String phone = values.iterator().next().toString(); // 输出结果 context.write(new Text(phone), flow); }}

4.启动类

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(true); conf.set("mapreduce.framework.name", "local"); // 输出到HDFS文件系统中 // conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000"); // 输出到本地文件系统 conf.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(conf); job.setJarByClass(FlowTest.class); // 指定本job要使用的map/reduce的工具类 job.setMapperClass(FlowCountMap.class); job.setReducerClass(FlowCountReducer.class); // 指定mapper输出kv的类型 job.setMapOutputKeyClass(Flow.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); // 指定job的原始文件输入目录 // 6.设置输出输出类 FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/sort/input/")); FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/sort/output/")); //将job中配置的相关参数,以及job所用的jar包提交给yarn运行 //job.submit(); waitForCompletion等待执行完成 boolean flag = job.waitForCompletion(true); System.exit(flag?0:1); }}

5.输出结果

成功倒序输出 本案例的目的有两个:

实现对输出结果排序我们可以在自定义对象的compareTo方法中指定如果一次MapReduce任务获取不到我们需要的结果我们可以对输出的结果做多次MapReduce任务。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:OAuth Connector 基于YMP框架实现的第三方OAuth授权登录模块
下一篇:解决RabbitMq消息队列Qos Prefetch消息堵塞问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~