Strom数据流分组解析

网友投稿 704 2022-11-26

Strom数据流分组解析

Strom数据流分组解析

随机分组

最经常用的也就是Shuffle grouping,Fields grouping,Direct grouping等等

现在我们看一个例子:

就是最经常见的数单词的例子

public class WordCountBolt extends BaseRichBolt{ private OutputCollector collector; private HashMap counts = null; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap(); } public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); this.collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }}

在添加这个bolt的时候,使用的是按字段分组,如下

builder.setBolt(COUNT_BOLT_ID, countBolt,4) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

如果我们分组模式改成

builder.setBolt(COUNT_BOLT_ID, countBolt,4) .shuffleGrouping(SPLIT_BOLT_ID);

那么对单词的统计就会偏少。

为什么?

大家想想恩,有4个countbolt实例(咱们暂时称之为countbolta,b,c,d),如果我是随机分组,the这个单词出现了3回,前两回被分配到了countbolta,第三回被分配到了countboltb,那么后面的reportbolt先收到了这个tuple(来自countbolta),然后又收到了这个tuple(来自countboltb),最后的输出肯定是the:1喽

那么如果使用

builder.setBolt(COUNT_BOLT_ID, countBolt,4) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

自然就不会出现刚才的问题了,为什么,大家自己想。

直接分组

这里我引用一个用storm给句子加感叹号的例子,代码在最后

以下为16-7-4修改

其实我下面这例子不好

直接分组,主要是保证把消息给bolt中某一个特定的task

而下面的例子的实际效果是想吧 messagea给bolta,messageb给boltb

那么其实还有更方便的做法,就是

在发送是:

public void execute(Tuple tuple, BasicOutputCollector collector) { tpsCounter.count(); Long tupleId = tuple.getLong(0); Object obj = tuple.getValue(1); if (obj instanceof TradeCustomer) { TradeCustomer tradeCustomer = (TradeCustomer)obj; Pair trade = tradeCustomer.getTrade(); Pair customer = tradeCustomer.getCustomer(); collector.emit(SequenceTopologyDef.TRADE_STREAM_ID, new Values(tupleId, trade)); collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Values(tupleId, customer)); }else if (obj != null){ LOG.info("Unknow type " + obj.getClass().getName()); }else { LOG.info("Nullpointer " ); } }

在提交时:

builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping( SequenceTopologyDef.SEQUENCE_SPOUT_NAME); builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping( SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字 SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1) .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字 SequenceTopologyDef.CUSTOMER_STREAM_ID); // --- 接收发送方该stream 的tuple

定义输出格式

public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE")); declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER")); }

最后接收的时候还得判断一下

if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) { customer = pair; customerTuple = input; tradeTuple = tradeMap.get(tupleId); if (tradeTuple == null) { customerMap.put(tupleId, input); return; } trade = (Pair) tradeTuple.getValue(1); }

参考资料

​​数据的分流​​

以上为16-7-4修改

最开始的时候

运行的结果如下:

mystorm.PrintBolt@67178f5d String recieved: edi:I'm happy!mystorm.PrintBolt@67178f5d String recieved: marry:I'm angry!mystorm.PrintBolt@393ddf54 String recieved: ted:I'm excited!mystorm.PrintBolt@393ddf54 String recieved: john:I'm sad!mystorm.PrintBolt@5f97cfcb String recieved: marry:I'm angry!

不同的task都平均收到了tuple

然后我想让指定某些句子只让某个task接受,怎么办?

首先看ExclaimBasicBolt

public class ExclaimBasicBolt extends BaseBasicBolt { /** * */ private static final long serialVersionUID = -6239845315934660303L; private List list; private List list2; @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //String sentence = tuple.getString(0); String sentence = (String) tuple.getValue(0); String out = sentence + "!"; if (out.startsWith("e")) { collector.emitDirect(list.get(0),new Values(out)); }else { collector.emitDirect(list2.get(0),new Values(out)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(true,new Fields("excl_sentence")); } @Override public void prepare(Map stormConf, TopologyContext context) { list =context.getComponentTasks("print"); list2=context.getComponentTasks("print2"); }}

在构建topology的时候

使用directGrouping

builder.setSpout("spout", new RandomSpout()); builder.setBolt("exclaim", new ExclaimBasicBolt(),3).shuffleGrouping("spout"); builder.setBolt("print", new PrintBolt(),3).directGrouping("exclaim"); builder.setBolt("print2", new PrintBolt2(),3).directGrouping("exclaim");

PrintBolt2与PrintBolt类似

只是打印的时候打印出 System.err.println(this+"  i am two   String recieved: " + rec);

OK这下运行的时候我们就能看到

mystorm.PrintBolt2@238ac8bf String recieved: ted:I'm excited!mystorm.PrintBolt2@238ac8bf String recieved: john:I'm sad!mystorm.PrintBolt2@238ac8bf String recieved: marry:I'm angry!mystorm.PrintBolt2@238ac8bf String recieved: ted:I'm excited!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!mystorm.PrintBolt2@238ac8bf String recieved: marry:I'm angry!mystorm.PrintBolt2@238ac8bf String recieved: ted:I'm excited!mystorm.PrintBolt2@238ac8bf String recieved: marry:I'm angry!

所有e开头的句子 都跑到Print2这个Bolt的某个task里面了。

本节的整体代码见

package mystorm;public class ExclaimBasicTopo { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSpout()); builder.setBolt("exclaim", new ExclaimBasicBolt(),3).shuffleGrouping("spout"); builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim"); Config conf = new Config(); conf.setDebug(false); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); } }}package mystorm; public class RandomSpout extends BaseRichSpout { private SpoutOutputCollector collector; private Random rand; private int index; private static String[] sentences = new String[] { "edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"}; @Override public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) { this.collector = collector; this.rand = new Random(); } @Override public void nextTuple() { if (index<10*sentences.length) { String toSay = sentences[rand.nextInt(sentences.length)]; this.collector.emit(new Values(toSay)); index++; }else { try { Thread.sleep(1000); System.out.println("我停了一秒"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); }}package mystorm;public class ExclaimBasicBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //String sentence = tuple.getString(0); String sentence = (String) tuple.getValue(0); String out = sentence + "!"; collector.emit(new Values(out)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("excl_sentence")); }}package mystorm;public class PrintBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String rec = tuple.getString(0); System.err.println(this+" String recieved: " + rec); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // do nothing }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:ILMerge在MSBuild与ILMerge在批处理文件中运行
下一篇:Storm并发机制详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~