序
本文主要研究一下storm的stream的分流与合并
实例
@Testpublic void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentence-spout", new SentenceSpout());// SentenceSpout --> SplitStreamBoltbuilder.setBolt("split-bolt", new SplitStreamBolt()).shuffleGrouping("sentence-spout");// SplitStreamBolt split two stream --> WordCountBolt//NOTE 这里要指定上游的bolt以及要处理的streamIdbuilder.setBolt("long-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","longWordStream");builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","shortWordStream");// WordCountBolt join --> ReportBoltbuilder.setBolt("report-bolt", new ReportBolt()).shuffleGrouping("long-word-count-bolt").shuffleGrouping("short-word-count-bolt");submitRemote(builder);}
- 这里在SplitStreamBolt里头将stream分为两个,之后有两个CountStreamBolt分别处理两个stream的数据,最后归到同一个stream由ReportBolt消费tuple
SplitStreamBolt
public class SplitStreamBolt extends BaseRichBolt {private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);private OutputCollector collector;public void prepare(Map config, TopologyContext context, OutputCollector collector) {this.collector = collector;}//NOTE 这里要自己ackpublic void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){// NOTE 这里指定发送给指定streamIdif(word.length() > 4){this.collector.emit("longWordStream",new Values(word));}else{this.collector.emit("shortWordStream",new Values(word));}}this.collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));//NOTE 这里通过declareStream声明direct stream,并指定streamIddeclarer.declareStream("longWordStream",true,new Fields("word"));declarer.declareStream("shortWordStream",true,new Fields("word"));}
}
- 这里额外声明了两个stream,一个是longWordStream,一个是shortWordStream
- 对于word长度大于4的发送到longWordStream,小于等于4的发送到longWordStream
CountStreamBolt
public class CountStreamBolt extends BaseBasicBolt{private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);Map<String, Integer> longWordCounts = new HashMap<String, Integer>();Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String sourceStreamId = input.getSourceStreamId();String word = input.getString(0);if(sourceStreamId.equals("longWordStream")){Integer count = longWordCounts.get(word);if (count == null) count = 0;count++;longWordCounts.put(word, count);LOGGER.info("long word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}if(sourceStreamId.equals("shortWordStream")){Integer count = shortWordCounts.get(word);if (count == null) count = 0;count++;shortWordCounts.put(word, count);LOGGER.info("short word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}
- 这里为了展示sourceStreamId的区别,所以两个stream共用了同一个bolt,但是topology那里是两个实例
- 实际也可以是两个不同的bolt类来处理两个stream的数据
小结
- OutputFieldsDeclarer可以通过declareStream方法声明多个streamId
- OutputCollector可以通过emit(String streamId, List<Object> tuple)方法来选择性将tuple发送到指定的streamId
- OutputCollector也有emit方法参数没有streamId,其内部默认是使用Utils.DEFAULT_STREAM_ID(
default
)作为实际的streamId
doc
- How to split one stream or join multiple stream