jstorm kafka插件使用案例

网友投稿 684 2022-10-27

jstorm kafka插件使用案例

jstorm kafka插件使用案例

本文用的是jstorm 2.2.1 一、pom引用

三、自定义拓扑图入口类

package jiankunking.kafkajstorm-ologies;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm-ology.TopologyBuilder;import com.alibaba.jstorm.client.ConfigExtension;import jiankunking.kafkajstorm.bolts.CustomBolt;import jiankunking.kafkajstorm.kafka.KafkaSpout;import jiankunking.kafkajstorm.kafka.KafkaSpoutConfig;import jiankunking.kafkajstorm.util.PropertiesUtil;import java.util.Map;/** * Created by jiankunking on 2017/4/19 16:27. * 拓扑图 入口类 */public class CustomCounterTopology /** * 入口类,即提交任务的类 * * @throws InterruptedException * @throws AlreadyAliveException * @throws public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { System.out.println("11111"); PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false); Map propsMap = propertiesUtil.getAllProperty(); KafkaSpoutConfig spoutConfig = new KafkaSpoutConfig(propertiesUtil.getProps()); spoutConfig.configure(propsMap); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig)); builder.setBolt("customBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout"); //Configuration Config conf = new Config(); conf.setDebug(false); //指定使用logback.xml //需要把logback.xml文件放到jstorm conf目录下 ConfigExtension.setUserDefinedLogbackConf(conf, "%JSTORM_HOME%/conf/logback.xml"); if (args != null && args.length > 0) { //提交到集群运行 StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); //本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology()); } }}

四、配置文件application.properties

# kafka# kafka 消费组kafka.client.id=kafkaspoutidkafka.broker.partitions=4kafka.fetch.from.beginning=falsekafka-ic=test_onekafka.broker.hosts=10.10.10.10:9092kafka.zookeeper.hosts=10.10.10.10:2181storm.zookeeper.root=/kafka

小注:

1、jstorm kafka插件源码集成

需要到jstorm的github官网:​​ jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为:

a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体:

org.slf4j slf4j-api 1.7.5 org.slf4j log4j-over-slf4j 1.7.10 ch.qos.logback logback-classic 1.0.13

b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的:

org.slf4j slf4j-log4j12 1.7.5 provided

这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。

理论上,这样就能够把log4j桥接到slf4j。

​​https://github.com/JianKunKing/jstorm-kafka-plugin-demo​​

作者:jiankunking

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:从PAXOS到ZOOKEEPER分布式一致性原理与实践--Paxos算法
下一篇:logstash @timestamp 内容替换
相关文章

 发表评论

暂时没有评论,来抢沙发吧~