React 前端框架助力企业快速适应数字化转型的挑战与机遇
684
2022-10-27
jstorm kafka插件使用案例
本文用的是jstorm 2.2.1 一、pom引用
三、自定义拓扑图入口类 package jiankunking.kafkajstorm-ologies;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm-ology.TopologyBuilder;import com.alibaba.jstorm.client.ConfigExtension;import jiankunking.kafkajstorm.bolts.CustomBolt;import jiankunking.kafkajstorm.kafka.KafkaSpout;import jiankunking.kafkajstorm.kafka.KafkaSpoutConfig;import jiankunking.kafkajstorm.util.PropertiesUtil;import java.util.Map;/** * Created by jiankunking on 2017/4/19 16:27. * 拓扑图 入口类 */public class CustomCounterTopology /** * 入口类,即提交任务的类 * * @throws InterruptedException * @throws AlreadyAliveException * @throws public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { System.out.println("11111"); PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false); Map propsMap = propertiesUtil.getAllProperty(); KafkaSpoutConfig spoutConfig = new KafkaSpoutConfig(propertiesUtil.getProps()); spoutConfig.configure(propsMap); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig)); builder.setBolt("customBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout"); //Configuration Config conf = new Config(); conf.setDebug(false); //指定使用logback.xml //需要把logback.xml文件放到jstorm conf目录下 ConfigExtension.setUserDefinedLogbackConf(conf, "%JSTORM_HOME%/conf/logback.xml"); if (args != null && args.length > 0) { //提交到集群运行 StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); //本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology()); } }} 四、配置文件application.properties # kafka# kafka 消费组kafka.client.id=kafkaspoutidkafka.broker.partitions=4kafka.fetch.from.beginning=falsekafka-ic=test_onekafka.broker.hosts=10.10.10.10:9092kafka.zookeeper.hosts=10.10.10.10:2181storm.zookeeper.root=/kafka 小注: 1、jstorm kafka插件源码集成 需要到jstorm的github官网: jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为: a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体: b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的: 这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。 理论上,这样就能够把log4j桥接到slf4j。 https://github.com/JianKunKing/jstorm-kafka-plugin-demo 作者:jiankunking
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~