2121SC@SDUSC
storm-stream(2) SingleStream默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是default。
可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。
代码说明:发射时指定一个stream-id,声明流时指定一个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id,代码如下:
class RandomSentenceSpout { public void nextTuple() { Utils.sleep(1000); String sentence = sentences[rand.nextInt(sentences.length)]; System.out.println("n" + sentence); this.collector.emit("split-stream", new Values(sentence)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("split-stream", new Fields("sentence")); } } class SplitSentenceBolt { public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { this.collector.emit("count-stream", new Values(word)); } this.collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("count-stream", new Fields("word")); } } class WordCountBolt { public void execute(Tuple tuple) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit("print-stream", new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("print-stream", new Fields("word", "count")); } } class Topology { main(){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 1); builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream"); builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word")); builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream"); } }MultiStream
Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。
虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,
另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。
但是可以看到在declareStream中,要同时指定所有的stream-id。
public void execute(Tuple input) { String word = input.getString(0); //小于j的word发送给stream1; 大于j的word发送给stream2; if(word.compareTo("j") < 0){ collector.emit("stream1", new Values(word)); }else if(word.compareTo("j") > 0){ collector.emit("stream2", new Values(word)); } //不管什么都发送给stream3 collector.emit("stream3", new Values(word)); } public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declareStream("stream1", new Fields("word")); outputFieldsDeclarer.declareStream("stream2", new Fields("word")); outputFieldsDeclarer.declareStream("stream3", new Fields("word")); }
stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks。
storm里面有6种类型的stream grouping:
-
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配,下游每个bolt均衡接收到上游的tuple。
-
Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
例如,如果流是按“user-id”字段分组的,具有相同“user-id”的元组将总是进入相同的任务,但是具有不同“user-id”的元组可能会进入不同的任务。 -
All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。 流在bolt的所有任务中被复制,小心使用这个分组。
-
Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。 整个流指向bolt的一个任务。具体来说,它将转到具有最低id的任务。所有tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
-
Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。此分组指定不关心流如何分组。目前还没有一个分组等同于shuffle分组。最终,Storm会把没有分组的螺栓按在同一个线程中执行,就像他们订阅的螺栓或喷口一样(如果可能的话)。
-
Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。
元组的生产者决定消费者的哪个任务将接收这个元组。直接分组只能在已声明为直接流的流上声明。发出到直接流的元组必须使用emitDirect方法之一发出。
用这种分组意味着消息的发送者指定优消息接收者的某个task处理这个消息,只有被声明为DirectStream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)