【MapReduce】MR 框架原理 之 数据清洗、计数器的应用

网友投稿 1037 2022-11-08

【MapReduce】MR 框架原理 之 数据清洗、计数器的应用

【MapReduce】MR 框架原理 之 数据清洗、计数器的应用

文章目录

​​数据清洗(ETL)​​​​计数器​​

​​-.计数器API​​

​​(1)采用枚举的方式统计计数​​​​(2)采用计数器组、计数器名称的方式统计​​​​(3)计数结果在程序运行后的控制台上查看。​​

​​数据清洗案例实操 ---- 简单​​

​​需求​​​​需求分析​​​​代码实现​​

​​Mapper阶段​​​​Driver阶段​​

​​数据清洗案例实操 ---- 复杂​​

​​需求​​​​需求分析​​​​代码实现​​

​​Bean类​​​​Mapper阶段​​​​Driver阶段​​

数据清洗(ETL)

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

计数器

Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录​​已处理的字节数​​和​​记录数​​,使用户可监控已处理的输入数据量和已产生的输出数据量。

-.计数器API

(1)采用枚举的方式统计计数

enum MyCounter {MALFORORMED, NORMAL} //对枚举定义的自定义计数器加1 context. getCounter(MyCounter. MALFOROR).increment(1)

(2)采用计数器组、计数器名称的方式统计

. get Counter("counterGroup", " counter). increment(1);

组名和计数器名称随便起,但最好有意义。

(3)计数结果在程序运行后的控制台上查看。

​​返回顶部​​

数据清洗案例实操 ---- 简单

需求

每行字段长度都大于11

​​返回顶部​​

需求分析

需要在Map阶段对输入的数据根据规则进行过滤清洗。

​​返回顶部​​

代码实现

Mapper阶段

进行数据解析解析的同时记性计数

package 第三章_MR框架原理.数据清洗_计数器;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMap extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.读取一行数据 String line = value.toString(); // 2.解析数据 boolean result = parseLog(line, context); // 3.判断返回,不是就直接返回 if (!result) { return; } // 4.解析通过,写出 context.write(value, NullWritable.get()); } /** * 信息处理 * @param line * @param context * @return */ private boolean parseLog(String line, Context context) { // 1.切分 String[] feilds = line.split(" "); // 2.判断 if (feilds.length > 11) { // 获取计数器,记录解析为true的数据条数 context.getCounter("map", "true").increment(1); return true; } else { // 获取计数器,记录解析为false的数据条数 context.getCounter("map", "false").increment(1); return false; } }}

Driver阶段

正常的编写配置项注意​​设置ReduceTask数为0​​

package 第三章_MR框架原理.数据清洗_计数器;import jdk.internal.org.objectweb.asm.tree.MultiANewArrayInsnNode;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import 第三章_MR框架原理.多种join应用.DistributedCacheDriver;import 第三章_MR框架原理.多种join应用.DistributedCacheMapper;import java-.URI;public class LogDriver { public static void main(String[] args) { Configuration configuration = new Configuration(); Job job = null; try { // 1 获取job信息 job = Job.getInstance(configuration); // 2 设置加载jar包路径 job.setJarByClass(LogDriver.class); // 3 关联map job.setMapperClass(LogMap.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置reducetask个数为0 job.setNumReduceTasks(0); // 6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\web.log")); FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\filteroutput")); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } }}

​​返回顶部​​

数据清洗案例实操 ---- 复杂

需求

对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据。

保留都是合法的数据

​​返回顶部​​

需求分析

定义一个bean类,用来记录日志数据中心的各数据字段编写LogMap类进行数据的过滤编写驱动类,运行

​​返回顶部​​

代码实现

Bean类

定义Bean类,对数据集中的数据进行封装;并且添加一列是否合法数据字段。在重写toString方法的时候​​利用StringBuilder,高效拼接字符串​​返回时​​添加唯一标识‘\001‘​​

在这里插入代码片package 第三章_MR框架原理.数据清洗_计数器;public class LogBean { // 封装 private String remote_addr; // 记录客户端的ip地址 private String remote_user; // 记录客户端用户名称,忽略属性“-” private String time_local; // 记录访问时间与时区 private String request; // 记录请求的url和 private String status; // 记录请求状态:200是成功 private String body_bytes_sent; // 记录发送给客户端文件主体内容的大小 private String // 用来记录从哪个页面链接访问过来的 private String // 记录客户浏览器的相关信息 private boolean valid = true; // 判断数据是否合法 /** * 重写toString方法 * @return */ @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.remote_addr); sb.append("\001").append(this.remote_user); sb.append("\001").append(this.time_local); sb.append("\001").append(this.request); sb.append("\001").append(this.status); sb.append("\001").append(this.body_bytes_sent); sb.append("\001").append(this. sb.append("\001").append(this. return sb.toString(); } /** * set、get * @return */ public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return } public void setHttp_referer(String { this.= } public String getHttp_user_agent() { return } public void setHttp_user_agent(String { this.= } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; }}

​​返回顶部​​

Mapper阶段

读取数据进行解析拆分处理字符串的时候,​​以“ ”拆分​​,处理时​​忽略了field[2] 的 ‘-’​​,对数据的处理进行细化添加计数器,这里是本人自行添加的,再添加的过程中也出现了错误。最终添加true的计数器应当与判断请求状态同级,应为在大前提分割数大于11的时候,所有的记录中都会包含有状态字段,如果不是200,那其余的肯定就不符合条件。对于拆分那部分,提取了一行数据进行验证:

package 第三章_MR框架原理.数据清洗_计数器;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogBestMap extends Mapper { Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 58.248.178.212 - - [18/Sep/2013:06:51:40 +0000] "GET /wp-includes/js/jquery/jquery-migrate.min.js?ver=1.2.1 HTTP/1.1" 200 7200 ""Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; MDDR; InfoPath.2; .NET4.0C)" // 1.获取一行数据 String line = value.toString(); // 2.解析日志是够合法 LogBean bean = parseLog(line,context); // 3.判断 if (!bean.isValid()) { return; } // 4.合法输出 k.set(bean.toString()); context.write(k, NullWritable.get()); } /** * 数据处理 * * @param line * @return */ private LogBean parseLog(String line, Context context) { LogBean logBean = new LogBean(); // 1.截取 String[] fields = line.split(" "); if (fields.length > 11) { // 封装数据 logBean.setRemote_addr(fields[0]); logBean.setRemote_user(fields[1]); logBean.setTime_local(fields[3].substring(1)); logBean.setRequest(fields[6]); logBean.setStatus(fields[8]); logBean.setBody_bytes_sent(fields[9]); logBean.setHttp_referer(fields[10]); if (fields.length > 12) { logBean.setHttp_user_agent(fields[11] + " " + fields[12]); } else { logBean.setHttp_user_agent(fields[11]); } // 大于400,HTTP错误 if (Integer.parseInt(logBean.getStatus()) >= 400) { logBean.setValid(false); context.getCounter("map", "false").increment(1); }else{ context.getCounter("map","true").increment(1); } } else { logBean.setValid(false); context.getCounter("map", "false").increment(1); } return logBean; }}

​​返回顶部​​

Driver阶段

正常的编写配置项注意​​设置ReduceTask数为0​​

package 第三章_MR框架原理.数据清洗_计数器;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogBestDriver { public static void main(String[] args) throws Exception { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(LogBestDriver.class); // 3 关联map job.setMapperClass(LogBestMap.class); // 4 设置最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置reducetask个数为0 job.setNumReduceTasks(0); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\web.log")); FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\filterBestoutput")); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); }}

​​返回顶部​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring复杂对象创建的方式小结
下一篇:【XShell 、Xftp】版本升级问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~