Flink消费kafka消息实战

网友投稿 898 2022-10-26

Flink消费kafka消息实战

Flink消费kafka消息实战

欢迎访问我的GitHub

本次实战的内容是开发Flink应用,消费来自kafka的消息,进行实时计算;

环境情况

本次实战用到了三台机器,它们的IP地址和身份如下表所示:

IP地址 身份 备注
192.168.1.104 http请求发起者 此机器上安装了Apache Bench,可以发起大量http请求到192.168.1.101
192.168.1.101 Docker server 此机器上安装了Docker,并且运行了三个容器:zookeeper、kafka、消息生产者(接收http请求时生产一条消息)
192.168.1.102 Flink应用 此机器部署了Flink,运行着我们开发的Flink应用,接收kafka消息做实时处理

操作步骤

在机器192.168.1.101上部署三个容器(消息生产者、zookeeper、kafka); 在机器192.168.1.104上安装Apache Bench; 在机器192.168.1.102上配置kafak相关的host; 开发Flink应用,部署到机器192.168.1.102; 在机器192.168.1.104上发起压力测试,请求地址是消息生产者的http接口地址,产生大量消息; 观察Flink应用的处理情况;

版本信息

操作系统:Centos7 docker:17.03.2-ce docker-compose:1.23.2 kafka:0.11.0.3 zookeeper:3.4.9 JDK:1.8.0_191 spring boot:1.5.9.RELEASE spring-kafka:1.3.8.RELEASE Flink:1.7

在机器192.168.1.101上部署三个容器(消息生产者、zookeeper、kafka)

构建kafka相关的环境不是本文重点,因此这里利用docker快速实现,步骤如下: 在机器192.168.1.101上安装docker和docker-compose; 创建docker-compose.yml文件,内容如下: version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: image: wurstmeister/kafka:2.11-0.11.0.3 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "topic001:2:1" volumes: - /var/run/docker.sock:/var/run/docker.sock producer: image: bolingcavalry/kafka01103producer:0.0.1-SNAPSHOT ports: - "8080:8080" 在docker-compose.yml所在目录执行命令**docker-compose up -d,即可启动容器; 如果您想了解更多docker环境下kafka消息生产者的细节,请参考《如何使用Docker内的kafka服务》;

在机器192.168.1.104上安装Apache Bench

不同的操作系统安装Apache Bench的命令也不一样: ubuntu上的安装命令**apt-get install apache2-utils; centos上的安装命令**yum install httpd-tools;

源码-

接下来的实战是编写Flink应用的源码,您可以选择直接从GitHub-这个工程的源码,地址和链接信息如下表所示:

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

开发Flink应用,部署到机器192.168.1.102

Flink环境搭建请参考《Flink1.7从安装到体验》; 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0

根据提示,输入groupId为com.bolingcavalry,artifactId为flinkkafkademo,其他的直接按下回车键即可使用默认值,这样就得到了一个maven工程:flinkkafkademo; 打开工程的pom.xml文件,增加以下两个依赖:

org.apache.flink flink-connector-kafka-0.11_2.12 ${flink.version} com.alibaba fastjson 1.2.28

新增一个辅助类,用于将kafka消息中的内容转换成java对象:

/** * @Description: 解析原始消息的辅助类 * @author: willzhao E-mail: zq2599@gmail.com * @date: 2019/1/1 20:13 */ public class JSONHelper { /** * 解析消息,得到时间字段 * @param raw * @return */ public static long getTimeLongFromRawMessage(String raw){ SingleMessage singleMessage = parse(raw); return null==singleMessage ? 0L : singleMessage.getTimeLong(); } /** * 将消息解析成对象 * @param raw * @return */ public static SingleMessage parse(String raw){ SingleMessage singleMessage = null; if (raw != null) { singleMessage = JSONObject.parseObject(raw, SingleMessage.class); } return singleMessage; } }

SingleMessage对象的定义:

public class SingleMessage { private long timeLong; private String name; private String bizID; private String time; private String message; public long getTimeLong() { return timeLong; } public void setTimeLong(long timeLong) { this.timeLong = timeLong; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getBizID() { return bizID; } public void setBizID(String bizID) { this.bizID = bizID; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }

实时处理的操作都集中在StreamingJob类,源码的关键位置已经加了注释,就不再赘述了:

package com.bolingcavalry; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import javax.annotation.Nullable; import java.util.Properties; /** * Skeleton for a Flink Streaming Job. * *

For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the 数据源配置,是一个kafka消息的消费者 FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>("topic001", new SimpleStringSchema(), props); //增加时间水位设置类 consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks (){ @Override public long extractTimestamp(String element, long previousElementTimestamp) { return JSONHelper.getTimeLongFromRawMessage(element); } @Nullable @Override public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) { if (lastElement != null) { return new Watermark(JSONHelper.getTimeLongFromRawMessage(lastElement)); } return null; } }); env.addSource(consumer) //将原始消息转成Tuple2对象,保留用户名称和访问次数(每个消息访问次数为1) .flatMap((FlatMapFunction>) (s, collector) -> { SingleMessage singleMessage = JSONHelper.parse(s); if (null != singleMessage) { collector.collect(new Tuple2<>(singleMessage.getName(), 1L)); } }) //以用户名为key .keyBy(0) //时间窗口为2秒 .timeWindow(Time.seconds(2)) //将每个用户访问次数累加起来 .apply((WindowFunction, Tuple2, Tuple, TimeWindow>) (tuple, window, input, out) -> { long sum = 0L; for (Tuple2 record: input) { sum += record.f1; } Tuple2 result = input.iterator().next(); result.f1 = sum; out.collect(result); }) //输出方式是STDOUT .print(); env.execute("Flink-Kafka demo"); } }

在pom.xml所在文件夹执行以下命令打包:

mvn clean package -Dmaven.test.skip=true -U

在机器192.168.1.104上发起压力测试,产生大量消息

欢迎关注51CTO博客:程序员欣宸

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:#yyds干货盘点# leetcode算法题:同构字符串
下一篇:关于Controller层和Service层的类报错问题及解决方案
相关文章

 发表评论

暂时没有评论,来抢沙发吧~