Flink实战:消费Wikipedia实时消息

网友投稿 686 2022-10-29

Flink实战:消费Wikipedia实时消息

Flink实战:消费Wikipedia实时消息

欢迎访问我的GitHub

关于Wikipedia Edit Stream

Wikipedia Edit Stream是Flink官网提供的一个经典demo,该应用消费的消息来自维基百科,消息中包含了用户名对wiki的编辑情况,demo的官方资料地址:Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个while循环不停的从队列取出数据,再调用SourceContext的collect方法,就在Flink中将这条数据生产出来了; IRC是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat 关于WikipediaEditsSource类的深入分析,请参考《Flink数据源拆解分析(WikipediaEditsSource)》

实战简介

本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;

和官网demo的不同之处

和官网的demo略有不同,官网用的是Tuple2来处理数据,但我这里用了Tuple3,多保存了一个StringBuilder对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:

环境信息

Flink:1.7; 运行模式:单机(官网称之为Local Flink Cluster); Flink所在机器的操作系统:CentOS Linux release 7.5.1804; 开发环境JDK:1.8.0_181; 开发环境Maven:3.5.0;

操作步骤简介

今天的实战分为以下步骤: 创建应用; 编码; 构建; 部署运行;

创建应用

应用基本代码是通过mvn命令创建的,在命令行输入以下命令:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0

按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹(我这里是wikipediaeditstreamdemo),里面是个maven工程:

Define value for property 'groupId': com.bolingcavalry Define value for property 'artifactId': wikipediaeditstreamdemo Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' com.bolingcavalry: : Confirm properties configuration: groupId: com.bolingcavalry artifactId: wikipediaeditstreamdemo version: 1.0-SNAPSHOT package: com.bolingcavalry Y: :

用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:

应用创建成功,接下来可以开始编码了;

编码

您可以选择直接从GitHub-这个工程的源码,地址和链接信息如下表所示:

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章源码在wikipediaeditstreamdemo这个文件夹下,如下图红框所示:

接下来开始编码: 在pom.mxl文件中增加wikipedia相关的库依赖:

org.apache.flink flink-connector-wikiedits_2.11 ${flink.version}

在类中增加代码,如下所示,源码中已加详细注释:

package com.bolingcavalry; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; public class StreamingJob { public static void main(String[] args) throws Exception { // 环境信息 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new WikipediaEditsSource()) //以用户名为key分组 .keyBy((KeySelector) wikipediaEditEvent -> wikipediaEditEvent.getUser()) //时间窗口为5秒 .timeWindow(Time.seconds(15)) //在时间窗口内按照key将所有数据做聚合 .aggregate(new AggregateFunction, Tuple3>() { @Override public Tuple3 createAccumulator() { //创建ACC return new Tuple3<>("", 0, new StringBuilder()); } @Override public Tuple3 add(WikipediaEditEvent wikipediaEditEvent, Tuple3 tuple3) { StringBuilder sbud = tuple3.f2; //如果是第一条记录,就加个"Details :"作为前缀, //如果不是第一条记录,就用空格作为分隔符 if(StringUtils.isBlank(sbud.toString())){ sbud.append("Details : "); }else { sbud.append(" "); } //聚合逻辑是将改动的字节数累加 return new Tuple3<>(wikipediaEditEvent.getUser(), wikipediaEditEvent.getByteDiff() + tuple3.f1, sbud.append(wikipediaEditEvent.getByteDiff())); } @Override public Tuple3 getResult(Tuple3 tuple3) { return tuple3; } @Override public Tuple3 merge(Tuple3 tuple3, Tuple3 acc1) { //合并窗口的场景才会用到 return new Tuple3<>(tuple3.f0, tuple3.f1 + acc1.f1, tuple3.f2.append(acc1.f2)); } }) //聚合操作后,将每个key的聚合结果单独转为字符串 .map((MapFunction, String>) tuple3 -> tuple3.toString()) //输出方式是STDOUT .print(); // 执行 env.execute("Flink Streaming Java API Skeleton"); } }

至此编码结束;

构建

在pom.xml文件所在目录下执行命令:

mvn clean package -U

命令执行完毕后,在target目录下的wikipediaeditstreamdemo-1.0-SNAPSHOT.jar文件就是构建成功的jar包;

在Flink验证

Flink的安装和启动请参考《Flink1.7从安装到体验》; 我这边Flink所在机器的IP地址是192.168.1.103,因此用浏览器访问的Flink的web地址为:; 选择刚刚生成的jar文件作为一个新的任务,如下图:

目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:

提交后的页面效果如下图所示,可见一个job已经在运行中了:

接下来看看我们的job的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且Details后面的内容还展示了具体的聚合情况:

至此,一个实施处理的Flink应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨Flink;

欢迎关注51CTO博客:程序员欣宸

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:WeAdmin基于LayUI的轻量级前端后台管理框架
下一篇:vue 全流程开发脚手架,开发从未如此简单
相关文章

 发表评论

暂时没有评论,来抢沙发吧~