微前端架构如何改变企业的开发模式与效率提升
898
2022-10-26
Flink消费kafka消息实战
欢迎访问我的GitHub
本次实战的内容是开发Flink应用,消费来自kafka的消息,进行实时计算;
环境情况
本次实战用到了三台机器,它们的IP地址和身份如下表所示:
IP地址 | 身份 | 备注 |
---|---|---|
192.168.1.104 | http请求发起者 | 此机器上安装了Apache Bench,可以发起大量http请求到192.168.1.101 |
192.168.1.101 | Docker server | 此机器上安装了Docker,并且运行了三个容器:zookeeper、kafka、消息生产者(接收http请求时生产一条消息) |
192.168.1.102 | Flink应用 | 此机器部署了Flink,运行着我们开发的Flink应用,接收kafka消息做实时处理 |
操作步骤
在机器192.168.1.101上部署三个容器(消息生产者、zookeeper、kafka); 在机器192.168.1.104上安装Apache Bench; 在机器192.168.1.102上配置kafak相关的host; 开发Flink应用,部署到机器192.168.1.102; 在机器192.168.1.104上发起压力测试,请求地址是消息生产者的http接口地址,产生大量消息; 观察Flink应用的处理情况;
版本信息
操作系统:Centos7 docker:17.03.2-ce docker-compose:1.23.2 kafka:0.11.0.3 zookeeper:3.4.9 JDK:1.8.0_191 spring boot:1.5.9.RELEASE spring-kafka:1.3.8.RELEASE Flink:1.7
在机器192.168.1.101上部署三个容器(消息生产者、zookeeper、kafka)
构建kafka相关的环境不是本文重点,因此这里利用docker快速实现,步骤如下: 在机器192.168.1.101上安装docker和docker-compose; 创建docker-compose.yml文件,内容如下: version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: image: wurstmeister/kafka:2.11-0.11.0.3 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "topic001:2:1" volumes: - /var/run/docker.sock:/var/run/docker.sock producer: image: bolingcavalry/kafka01103producer:0.0.1-SNAPSHOT ports: - "8080:8080" 在docker-compose.yml所在目录执行命令**docker-compose up -d,即可启动容器; 如果您想了解更多docker环境下kafka消息生产者的细节,请参考《如何使用Docker内的kafka服务》;
在机器192.168.1.104上安装Apache Bench
不同的操作系统安装Apache Bench的命令也不一样: ubuntu上的安装命令**apt-get install apache2-utils; centos上的安装命令**yum install httpd-tools;
源码-
接下来的实战是编写Flink应用的源码,您可以选择直接从GitHub-这个工程的源码,地址和链接信息如下表所示:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
开发Flink应用,部署到机器192.168.1.102
Flink环境搭建请参考《Flink1.7从安装到体验》; 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
根据提示,输入groupId为com.bolingcavalry,artifactId为flinkkafkademo,其他的直接按下回车键即可使用默认值,这样就得到了一个maven工程:flinkkafkademo; 打开工程的pom.xml文件,增加以下两个依赖:
新增一个辅助类,用于将kafka消息中的内容转换成java对象:
/** * @Description: 解析原始消息的辅助类 * @author: willzhao E-mail: zq2599@gmail.com * @date: 2019/1/1 20:13 */ public class JSONHelper { /** * 解析消息,得到时间字段 * @param raw * @return */ public static long getTimeLongFromRawMessage(String raw){ SingleMessage singleMessage = parse(raw); return null==singleMessage ? 0L : singleMessage.getTimeLong(); } /** * 将消息解析成对象 * @param raw * @return */ public static SingleMessage parse(String raw){ SingleMessage singleMessage = null; if (raw != null) { singleMessage = JSONObject.parseObject(raw, SingleMessage.class); } return singleMessage; } }
SingleMessage对象的定义:
public class SingleMessage { private long timeLong; private String name; private String bizID; private String time; private String message; public long getTimeLong() { return timeLong; } public void setTimeLong(long timeLong) { this.timeLong = timeLong; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getBizID() { return bizID; } public void setBizID(String bizID) { this.bizID = bizID; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
实时处理的操作都集中在StreamingJob类,源码的关键位置已经加了注释,就不再赘述了:
package com.bolingcavalry; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import javax.annotation.Nullable; import java.util.Properties; /** * Skeleton for a Flink Streaming Job. * *
For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the 数据源配置,是一个kafka消息的消费者
FlinkKafkaConsumer011
在pom.xml所在文件夹执行以下命令打包:
mvn clean package -Dmaven.test.skip=true -U
在机器192.168.1.104上发起压力测试,产生大量消息
欢迎关注51CTO博客:程序员欣宸
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~