从hbase读取数据,经过 MR 计算,将结果存储于hdfs的实现

网友投稿 652 2022-11-21

从hbase读取数据,经过 MR 计算,将结果存储于hdfs的实现

从hbase读取数据,经过 MR 计算,将结果存储于hdfs的实现

1.主类

package yqq.study.app4;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FilterFileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * @Author yqq * @Date 2021/11/18 09:04 * @Version 1.0 */public class HBase2HdfsMain { public static void main(String[] args) throws Exception { String outputPath = "/usr/local/mrout"; Configuration conf = new Configuration(true); conf.set("hbase.zookeeper.quorum","node2,node3,node4"); conf.set("mapreduce.framework.name","local"); Job job = Job.getInstance(conf,"hbase2hdfs"); job.setJarByClass(HBase2HdfsMain.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob("sentence", scan, HBase2HdfsMapper.class, Text.class, LongWritable.class, job, false); job.setReducerClass(HBase2HdfsReducer.class); Path path = new Path(outputPath); if(path.getFileSystem(conf).exists(path)) path.getFileSystem(conf).delete(path,true); FileOutputFormat.setOutputPath(job,path); job.waitForCompletion(true); }}

2.Mapper类

package yqq.study.app4;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import java.io.IOException;/** * @Author yqq * @Date 2021/11/18 09:17 * @Version 1.0 */public class HBase2HdfsMapper extends TableMapper { private Text keyOut = new Text(); private LongWritable valOut = new LongWritable(1); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Cell cell = value.getColumnLatestCell("cf".getBytes(),"line".getBytes()); String line = Bytes.toString(CellUtil.cloneValue(cell)).trim(); String[] words = line.split(" "); for (String word:words){ keyOut.set(word); context.write(keyOut,valOut); } }}

3.Reducer类

package yqq.study.app4;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * @Author yqq * @Date 2021/11/18 09:17 * @Version 1.0 */public class HBase2HdfsReducer extends Reducer { private LongWritable valOut = new LongWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value:values) sum+=value.get(); valOut.set(sum); context.write(key,valOut); }}

hdfs部分数据

99979 19998 199980 199981 199982 199983 199984 199985 199986 199987 199988 199989 19999 199990 199991 199992 199993 199994 199995 199996 199997 199998 199999 1hello 100000neusoft 100000

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:泛型_泛型方法_可变参数
下一篇:异常try....except结构
相关文章

 发表评论

暂时没有评论,来抢沙发吧~