微前端架构如何改变企业的开发模式与效率提升
686
2022-10-29
Flink实战:消费Wikipedia实时消息
欢迎访问我的GitHub
关于Wikipedia Edit Stream
Wikipedia Edit Stream是Flink官网提供的一个经典demo,该应用消费的消息来自维基百科,消息中包含了用户名对wiki的编辑情况,demo的官方资料地址:Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个while循环不停的从队列取出数据,再调用SourceContext的collect方法,就在Flink中将这条数据生产出来了; IRC是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat 关于WikipediaEditsSource类的深入分析,请参考《Flink数据源拆解分析(WikipediaEditsSource)》
实战简介
本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;
和官网demo的不同之处
和官网的demo略有不同,官网用的是Tuple2来处理数据,但我这里用了Tuple3,多保存了一个StringBuilder对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:
环境信息
Flink:1.7; 运行模式:单机(官网称之为Local Flink Cluster); Flink所在机器的操作系统:CentOS Linux release 7.5.1804; 开发环境JDK:1.8.0_181; 开发环境Maven:3.5.0;
操作步骤简介
今天的实战分为以下步骤: 创建应用; 编码; 构建; 部署运行;
创建应用
应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹(我这里是wikipediaeditstreamdemo),里面是个maven工程:
Define value for property 'groupId': com.bolingcavalry Define value for property 'artifactId': wikipediaeditstreamdemo Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' com.bolingcavalry: : Confirm properties configuration: groupId: com.bolingcavalry artifactId: wikipediaeditstreamdemo version: 1.0-SNAPSHOT package: com.bolingcavalry Y: :
用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:
应用创建成功,接下来可以开始编码了;
编码
您可以选择直接从GitHub-这个工程的源码,地址和链接信息如下表所示:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章源码在wikipediaeditstreamdemo这个文件夹下,如下图红框所示:
接下来开始编码: 在pom.mxl文件中增加wikipedia相关的库依赖:
在类中增加代码,如下所示,源码中已加详细注释:
package com.bolingcavalry;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 环境信息
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new WikipediaEditsSource())
//以用户名为key分组
.keyBy((KeySelector
至此编码结束;
构建
在pom.xml文件所在目录下执行命令:
mvn clean package -U
命令执行完毕后,在target目录下的wikipediaeditstreamdemo-1.0-SNAPSHOT.jar文件就是构建成功的jar包;
在Flink验证
Flink的安装和启动请参考《Flink1.7从安装到体验》; 我这边Flink所在机器的IP地址是192.168.1.103,因此用浏览器访问的Flink的web地址为:; 选择刚刚生成的jar文件作为一个新的任务,如下图:
目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:
提交后的页面效果如下图所示,可见一个job已经在运行中了:
接下来看看我们的job的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且Details后面的内容还展示了具体的聚合情况:
至此,一个实施处理的Flink应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨Flink;
欢迎关注51CTO博客:程序员欣宸
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~