洞察纵观鸿蒙next版本,如何凭借FinClip加强小程序的跨平台管理,确保企业在数字化转型中的高效运营和数据安全?
1037
2022-11-08
【MapReduce】MR 框架原理 之 数据清洗、计数器的应用
文章目录
数据清洗(ETL)计数器
-.计数器API
(1)采用枚举的方式统计计数(2)采用计数器组、计数器名称的方式统计(3)计数结果在程序运行后的控制台上查看。
数据清洗案例实操 ---- 简单
需求需求分析代码实现
Mapper阶段Driver阶段
数据清洗案例实操 ---- 复杂
需求需求分析代码实现
Bean类Mapper阶段Driver阶段
数据清洗(ETL)
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
计数器
Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。
-.计数器API
(1)采用枚举的方式统计计数
enum MyCounter {MALFORORMED, NORMAL} //对枚举定义的自定义计数器加1 context. getCounter(MyCounter. MALFOROR).increment(1)
(2)采用计数器组、计数器名称的方式统计
. get Counter("counterGroup", " counter). increment(1);
组名和计数器名称随便起,但最好有意义。
(3)计数结果在程序运行后的控制台上查看。
返回顶部
数据清洗案例实操 ---- 简单
需求
每行字段长度都大于11
返回顶部
需求分析
需要在Map阶段对输入的数据根据规则进行过滤清洗。
返回顶部
代码实现
Mapper阶段
进行数据解析解析的同时记性计数
package 第三章_MR框架原理.数据清洗_计数器;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMap extends Mapper
Driver阶段
正常的编写配置项注意设置ReduceTask数为0
package 第三章_MR框架原理.数据清洗_计数器;import jdk.internal.org.objectweb.asm.tree.MultiANewArrayInsnNode;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import 第三章_MR框架原理.多种join应用.DistributedCacheDriver;import 第三章_MR框架原理.多种join应用.DistributedCacheMapper;import java-.URI;public class LogDriver { public static void main(String[] args) { Configuration configuration = new Configuration(); Job job = null; try { // 1 获取job信息 job = Job.getInstance(configuration); // 2 设置加载jar包路径 job.setJarByClass(LogDriver.class); // 3 关联map job.setMapperClass(LogMap.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置reducetask个数为0 job.setNumReduceTasks(0); // 6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\web.log")); FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\filteroutput")); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } }}
返回顶部
数据清洗案例实操 ---- 复杂
需求
对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据。
保留都是合法的数据
返回顶部
需求分析
定义一个bean类,用来记录日志数据中心的各数据字段编写LogMap类进行数据的过滤编写驱动类,运行
返回顶部
代码实现
Bean类
定义Bean类,对数据集中的数据进行封装;并且添加一列是否合法数据字段。在重写toString方法的时候利用StringBuilder,高效拼接字符串返回时添加唯一标识‘\001‘
在这里插入代码片package 第三章_MR框架原理.数据清洗_计数器;public class LogBean { // 封装 private String remote_addr; // 记录客户端的ip地址 private String remote_user; // 记录客户端用户名称,忽略属性“-” private String time_local; // 记录访问时间与时区 private String request; // 记录请求的url和 private String status; // 记录请求状态:200是成功 private String body_bytes_sent; // 记录发送给客户端文件主体内容的大小 private String // 用来记录从哪个页面链接访问过来的 private String // 记录客户浏览器的相关信息 private boolean valid = true; // 判断数据是否合法 /** * 重写toString方法 * @return */ @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.remote_addr); sb.append("\001").append(this.remote_user); sb.append("\001").append(this.time_local); sb.append("\001").append(this.request); sb.append("\001").append(this.status); sb.append("\001").append(this.body_bytes_sent); sb.append("\001").append(this. sb.append("\001").append(this. return sb.toString(); } /** * set、get * @return */ public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return } public void setHttp_referer(String { this.= } public String getHttp_user_agent() { return } public void setHttp_user_agent(String { this.= } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; }}
返回顶部
Mapper阶段
读取数据进行解析拆分处理字符串的时候,以“ ”拆分,处理时忽略了field[2] 的 ‘-’,对数据的处理进行细化添加计数器,这里是本人自行添加的,再添加的过程中也出现了错误。最终添加true的计数器应当与判断请求状态同级,应为在大前提分割数大于11的时候,所有的记录中都会包含有状态字段,如果不是200,那其余的肯定就不符合条件。对于拆分那部分,提取了一行数据进行验证:
package 第三章_MR框架原理.数据清洗_计数器;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogBestMap extends Mapper
返回顶部
Driver阶段
正常的编写配置项注意设置ReduceTask数为0
package 第三章_MR框架原理.数据清洗_计数器;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogBestDriver { public static void main(String[] args) throws Exception { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(LogBestDriver.class); // 3 关联map job.setMapperClass(LogBestMap.class); // 4 设置最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置reducetask个数为0 job.setNumReduceTasks(0); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\web.log")); FileOutputFormat.setOutputPath(job, new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\数据清洗_计数器\\filterBestoutput")); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); }}
返回顶部
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~