Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解

网友投稿 1108 2022-11-20

Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解

Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解

目录

​​1、MapReduce中数据流动​​​​2、Mapreduce中Partition的概念以及使用。​​

​​1)Partition的原理和作用​​​​2)Partition的使用​​

​​3、MapReduce中Combiner的使用​​​​4、Shuffle阶段排序流程详解​​​​5、MapReduce中辅助排序的原理与实现​​

​​(1)任务​​​​2)工作原理​​​​3)实现代码​​​​1 首先说一下工作原理:​​​​2 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。例如​​​​输出:(注意需要分割线)​​​​3 具体步骤:​​

​​(1)自定义key​​​​(2)由于key是自定义的,所以还需要自定义一下类:​​​​(2.3)分组函数类。​​

1、MapReduce中数据流动

(1)最简单的过程: map - reduce (2)定制了partitioner以将map的结果送往指定reducer的过程: map - partition - reduce (3)增加了在本地先进性一次reduce(优化)过程: map - combin(本地reduce) - partition -reduce

2、Mapreduce中Partition的概念以及使用。

1)Partition的原理和作用

得到map给的记录后,他们该分配给哪些reducer来处理呢?hadoop采用的默认的派发方式是根据散列值来派发的,但是实际中,这并不能很高效或者按照我们要求的去执行任务。例如,经过partition处理后,一个节点的reducer分配到了20条记录,另一个却分配道了10W万条,试想,这种情况效率如何。又或者,我们想要处理后得到的文件按照一定的规律进行输出,假设有两个reducer,我们想要最终结果中part-00000中存储的是"h"开头的记录的结果,part-00001中存储其他开头的结果,这些默认的partitioner是做不到的。所以需要我们自己定制partition来根据自己的要求,选择记录的reducer。自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的setPartitionerClass指定一下即可 Map的结果,会通过partition分发到Reducer上。Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法, getPartition(Text key, Text value, int numPartitions)

输入是Map的结果对和Reducer的数目,输出则是分配的Reducer(整数编号)。就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)。

2)Partition的使用

分区出现的必要性,如何使用Hadoop产生一个全局排序的文件?最简单的方法就是使用一个分区,但是该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。事实上我们可以这样做,首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。主要的思路是使用一个partitioner来描述全局排序的输出。比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要cat所有的输出文件,变成一个大的文件,就都是有序的了

基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取10000个数据采样,然后对采样数据分区间。在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用hadoop的几种采样工具,RandomSampler,InputSampler,IntervalSampler。

这样,我们就可以对利用分布式文件系统进行大数据量的排序了,我们也可以重写Partitioner类中的compare函数,来定义比较的规则,从而可以实现字符串或其他非数字类型的排序,也可以实现二次排序乃至多次排序。

2、MapReduce中分组的概念和使用 分区的目的是根据Key值决定Mapper的输出记录被送到哪一个Reducer上去处理。而分组的就比较好理解了。笔者认为,分组就是与记录的Key相关。在同一个分区里面,具有相同Key值的记录是属于同一个分组的。

3、MapReduce中Combiner的使用

下面我们以《权威指南》的例子来加以说明,假设1950年的天气数据读取是由两个map完成的,其中第一个map的输出如下:    (1950, 0)   (1950, 20)   (1950, 10)

第二个map的输出为: (1950, 25) (1950, 15)

而reduce得到的输入为:(1950, [0, 20, 10, 25, 15]), 输出为:(1950, 25)

由于25是集合中的最大值,我们可以使用一个类似于reduce function的combiner function来找出每个map输出中的最大值,这样的话,reduce的输入就变成了:   (1950, [20, 25])

各个funciton 对温度值的处理过程可以表示如下:max(0, 20, 10, 25, 15) =max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25

注意:并不是所有的函数都拥有这个属性的(有这个属性的函数我们称之为commutative和associative),例如,如果我们要计算平均温度,就不能这样使用combiner function,因为mean(0, 20, 10, 25, 15) =14,而mean(mean(0, 20, 10),mean(25, 15)) = mean(10, 20) = 15

combiner function并不能取代reduce function(因为仍然需要reduce function处理来自不同map的带有相同key的记录)。但是他可以帮助减少需要在map和reduce之间传输的数据,就为这一点combiner function就值得考虑使用。

注意:如果MapOutputKey和MapOuputValue和outputkey和outputvalue不一致的时候,不能使用combiner。

4、Shuffle阶段排序流程详解

我们首先看一下MapReduce中的排序的总体流程。

MapReduce框架会确保每一个Reducer的输入都是按Key进行排序的。一般,将排序以及Map的输出传输到Reduce的过程称为混洗(shuffle)。每一个Map都包含一个环形的缓存,默认100M,Map首先将输出写到缓存当中。当缓存的内容达到“阈值”时(阈值默认的大小是缓存的80%),一个后台线程负责将结果写到硬盘,这个过程称为“spill”。Spill过程中,Map仍可以向缓存写入结果,如果缓存已经写满,那么Map进行等待。

Spill的具体过程如下:首先,后台线程根据Reducer的个数将输出结果进行分组,每一个分组对应一个Reducer。其次,对于每一个分组后台线程对输出结果的Key进行排序。在排序过程中,如果有Combiner函数,则对排序结果进行Combiner函数进行调用。每一次spill都会在硬盘产生一个spill文件。因此,一个Map task有可能会产生多个spill文件,当Map写出最后一个输出时,会将所有的spill文件进行合并与排序,输出最终的结果文件。在这个过程中Combiner函数仍然会被调用。从整个过程来看,Combiner函数的调用次数是不确定的。下面我们重点分析下Shuffle阶段的排序过程:

Shuffle阶段的排序可以理解成两部分,一个是对spill进行分区时,由于一个分区包含多个key值,所以要对分区内的按照key进行排序,即key值相同的一串存放在一起,这样一个partition内按照key值整体有序了。第二部分并不是排序,而是进行merge,merge有两次,一次是map端将多个spill 按照分区和分区内的key进行merge,形成一个大的文件。第二次merge是在reduce端,进入同一个reduce的多个map的输出 merge在一起,该merge理解起来有点复杂,最终不是形成一个大文件,而且期间数据在内存和磁盘上都有。所以shuffle阶段的merge并不是严格的排序意义,只是将多个整体有序的文件merge成一个大的文件,由于不同的task执行map的输出会有所不同,所以merge后的结果不是每次都相同,不过还是严格要求按照分区划分,同时每个分区内的具有相同key的对挨在一起。Shuffle排序综述:如果只定义了map函数,没有定义reduce函数,那么输入数据经过shuffle的排序后,结果为key值相同的输出挨在一起,且key值小的一定在前面,这样整体来看key值有序(宏观意义的,不一定是按从大到小,因为如果采用默认的HashPartitioner,则key 的hash值相等的在一个分区,如果key为IntWritable的话,每个分区内的key会排序好的),而每个key对应的value不是有序的。

5、MapReduce中辅助排序的原理与实现

(1)任务

我们需要把内容如下的sample.txt文件处理为下面文件:

源文件:Sample.txt

bbb 654ccc 534ddd 423aaa 754bbb 842ccc 120ddd 219aaa 344bbb 214ccc 547ddd 654aaa 122bbb 102ccc 479ddd 742aaa 146

目标:part-r-00000

aaa 122

bbb 102

ccc 120

ddd 219

2)工作原理

过程导引: 1、定义包含记录值和自然值的组合键,本例中为MyPariWritable.

2、自定义键的比较器(comparator)来根据组合键对记录进行排序,即同时利用自然键和自然值进行排序。(aaa 122组合为一个键)。

3、针对组合键的Partitioner(本示例使用默认的hashPartitioner)和分组comparator在进行分区和分组时均只考虑自然键。

3)实现代码

package com.hadoop;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class WritableSample extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); Job job=new Job(conf); job.setJarByClass(WritableSample.class); FileSystem fs=FileSystem.get(conf); fs.delete(new Path("out"),true); FileInputFormat.addInputPath(job, new Path("sample.txt")); FileOutputFormat.setOutputPath(job, new Path("out")); job.setMapperClass(MyWritableMap.class); job.setOutputKeyClass(MyPariWritable.class); job.setOutputValueClass(NullWritable.class); job.setReducerClass(MyWriableReduce.class); job.setSortComparatorClass(PairKeyComparator.class); job.setGroupingComparatorClass(GroupComparatored.class); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { Tool tool=new WritableSample(); ToolRunner.run(tool, args); }}class MyPariWritable implements WritableComparable{ Text first; IntWritable second; public void set(Text first,IntWritable second){ this.first=first; this.second=second; } public Text getFirst(){ return this.first; } public IntWritable getSecond(){ return this.second; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub first=new Text(in.readUTF()); second=new IntWritable(in.readInt()); } public void write(DataOutput out){ try { out.writeUTF(first.toString()); out.writeInt(second.get()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public int compareTo(MyPariWritable o) { // TODO Auto-generated method stub if(this.first!=o.getFirst()){ return this.first.toString().compareTo(o.getFirst().toString()); }else if(this.second!=o.getSecond()){ return this.second.get()-o.getSecond().get(); }else return 0; } @Override public String toString() { // TODO Auto-generated method stub return first.toString()+" "+second.get(); } @Override public boolean equals(Object obj) { MyPariWritable temp=(MyPariWritable) obj; return first.equals(temp.first)&&second.equals(temp.second); } @Override public int hashCode() { return first.hashCode()*163+second.hashCode(); }}class MyWritableMap extends Mapper{ MyPariWritable pair=new MyPariWritable(); protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException { String strs[]=value.toString().split(" "); Text keyy=new Text(strs[0]); IntWritable valuee=new IntWritable(Integer.parseInt(strs[1])); pair.set(keyy, valuee); context.write(pair, NullWritable.get()); };}class PairKeyComparator extends WritableComparator{ public PairKeyComparator() { super(MyPariWritable.class,true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { MyPariWritable p1=(MyPariWritable) a; MyPariWritable p2=(MyPariWritable) b; if(!p1.getFirst().toString().equals(p2.getFirst().toString())){ return p1.first.toString().compareTo(p2.first.toString()); }else{ return p1.getSecond().get()-p2.getSecond().get(); } }}class MyWriableReduce extends Reducer{ protected void reduce(MyPariWritable key, java.lang.Iterable values, Context context) throws IOException ,InterruptedException { context.write(key, NullWritable.get()); };}class GroupComparatored extends WritableComparator{ public GroupComparatored(){ super(MyPariWritable.class,true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { MyPariWritable p1=(MyPariWritable) a; MyPariWritable p2=(MyPariWritable) b; return p1.first.toString().compareTo(p2.first.toString());//这里只比较第一个元素,只要自然键类型为MyPariWritable的Key就认为是相同,目的是找出自然键对应的最小自然值。 }}

mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变。

这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)

public static class Map extends Mapper

public static class Reduce extends Reducer

1 首先说一下工作原理:

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是的原因。然后调用自定义Map的map方法,将一个个对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出。最终是生成一个List。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。

2 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。例如

输入文件

20 2150 5150 5250 5350 5460 5160 5360 5260 5660 5770 5860 6170 5470 5570 5670 5770 581 23 45 67 82203 2150 51250 52250 53530 5440 51120 5320 52260 5660 57740 5863 61730 5471 5571 5673 5774 5812 21131 4250 627 8

输出:(注意需要分割线)

1 23 45 67 87 8212 21120 2120 5320 52231 4240 51150 5150 5250 5350 5350 5450 6250 51250 52260 5160 5260 5360 5660 5660 5760 5760 6163 6170 5470 5570 5670 5770 5870 5871 5571 5673 5774 58203 21530 54730 54740 58

3 具体步骤:

(1)自定义key

在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。

所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法: [cpp] view plaincopy //反序列化,从流中的二进制转换成IntPair public void readFields(DataInput in) throws IOException //序列化,将IntPair转化成使用流传送的二进制 public void write(DataOutput out) //key的比较 public int compareTo(IntPair o) //另外新定义的类应该重写的两个方法 //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) public int hashCode() public boolean equals(Object right)

(2)由于key是自定义的,所以还需要自定义一下类:

(2.1)分区函数类。这是key的第一次比较。 [cpp] view plaincopy public static class FirstPartitioner extends Partitioner 在job中使用setPartitionerClasss设置Partitioner。

(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。 [cpp] view plaincopy public static class KeyComparator extends WritableComparator 必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)

另一种方法是 实现接口RawComparator。

在job中使用setSortComparatorClass设置key比较函数类。

(2.3)分组函数类。

在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。

[cpp] view plaincopypublic static class GroupingComparator extends WritableComparator

分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)

分组函数类的另一种方法是实现接口RawComparator。 在job中使用setGroupingComparatorClass设置分组函数类。

作者:XifengHZ

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微信小程序学习笔记(1) -- 基础知识入门
下一篇:springmvc整合ssm配置的详细代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~