开发第一个Flink应用

网友投稿 578 2022-10-30

开发第一个Flink应用

开发第一个Flink应用

欢迎访问我的GitHub

在《Flink1.7从安装到体验》一文中,我们安装和体验了Flink,今天就用java来一起开发一个简单的Flink应用; 步骤列表 本次实战经历以下步骤: 创建应用; 编码; 构建; 提交任务到Flink,验证功能;

环境信息

Flink:1.7; Flink所在机器的操作系统:CentOS Linux release 7.5.1804; 开发环境JDK:1.8.0_181; 开发环境Maven:3.5.0;

应用功能简介

在《Flink1.7从安装到体验》一文中,我们在Flink运行SocketWindowWordCount.jar,实现的功能是从socket读取字符串,将其中的每个单词的数量统计出来,今天我们就来编码开发这个应用,实现此功能;

创建应用

应用基本代码是通过mvn命令创建的,在命令行输入以下命令:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0

按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹,里面是个maven工程:

Define value for property 'groupId': com.bolingcavalry Define value for property 'artifactId': socketwordcountdemo Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' com.bolingcavalry: : Confirm properties configuration: groupId: com.bolingcavalry artifactId: socketwordcountdemo version: 1.0-SNAPSHOT package: com.bolingcavalry

编码

您可以选择直接从GitHub-这个工程的源码,地址和链接信息如下表所示:

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

/** * 记录单词及其出现频率的Pojo */ public static class WordWithCount { /** * 单词内容 */ public String word; /** * 出现频率 */ public long count; public WordWithCount() { super(); } public WordWithCount(String word, long count) { this.word = word; this.count = count; } /** * 将单词内容和频率展示出来 * @return */ @Override public String toString() { return word + " : " + count; } }

把所有业务逻辑写在StreamJob类的main方法中,如下所示,关键位置都加了中文注释:

public static void main(String[] args) throws Exception { //环境信息 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //数据来源是本机9999端口,换行符分隔,您也可以考虑将hostname和port参数通过main方法的入参传入 DataStream text = env.socketTextStream("localhost", 9999, "\n"); //通过text对象转换得到新的DataStream对象, //转换逻辑是分隔每个字符串,取得的所有单词都创建一个WordWithCount对象 DataStream windowCounts = text.flatMap(new FlatMapFunction() { @Override public void flatMap(String s, Collector collector) throws Exception { for(String word : s.split("\\s")){ collector.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word")//key为word字段 .timeWindow(Time.seconds(5)) //五秒一次的翻滚时间窗口 .reduce(new ReduceFunction() { //reduce策略 @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count+b.count); } }); //单线程输出结果 windowCounts.print().setParallelism(1); // 执行 env.execute("Flink Streaming Java API Skeleton"); }

构建

在pom.xml文件所在目录下执行命令:

mvn clean package -U

命令执行完毕后,在target目录下的socketwordcountdemo-1.0-SNAPSHOT.jar文件就是构建成功的jar包;

在Flink验证

Flink的安装和启动请参考《Flink1.7从安装到体验》; 登录到Flink所在机器,执行以下命令:

nc -l 9999

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于司徒大神的轻量级MVVM框架avalon2.0搭建的一个简易的脚手架。
下一篇:基于fis3和vue全家桶,搭建前端工程化脚手架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~