聊聊storm的stream的分流与合并

news/2024/5/19 12:59:57

本文主要研究一下storm的stream的分流与合并

实例

    @Testpublic void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentence-spout", new SentenceSpout());// SentenceSpout --> SplitStreamBoltbuilder.setBolt("split-bolt", new SplitStreamBolt()).shuffleGrouping("sentence-spout");// SplitStreamBolt split two stream --> WordCountBolt//NOTE 这里要指定上游的bolt以及要处理的streamIdbuilder.setBolt("long-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","longWordStream");builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","shortWordStream");// WordCountBolt join --> ReportBoltbuilder.setBolt("report-bolt", new ReportBolt()).shuffleGrouping("long-word-count-bolt").shuffleGrouping("short-word-count-bolt");submitRemote(builder);}
  • 这里在SplitStreamBolt里头将stream分为两个,之后有两个CountStreamBolt分别处理两个stream的数据,最后归到同一个stream由ReportBolt消费tuple

SplitStreamBolt

public class SplitStreamBolt extends BaseRichBolt {private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);private OutputCollector collector;public void prepare(Map config, TopologyContext context, OutputCollector collector) {this.collector = collector;}//NOTE 这里要自己ackpublic void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){// NOTE 这里指定发送给指定streamIdif(word.length() > 4){this.collector.emit("longWordStream",new Values(word));}else{this.collector.emit("shortWordStream",new Values(word));}}this.collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));//NOTE 这里通过declareStream声明direct stream,并指定streamIddeclarer.declareStream("longWordStream",true,new Fields("word"));declarer.declareStream("shortWordStream",true,new Fields("word"));}
}
  • 这里额外声明了两个stream,一个是longWordStream,一个是shortWordStream
  • 对于word长度大于4的发送到longWordStream,小于等于4的发送到longWordStream

CountStreamBolt

public class CountStreamBolt extends BaseBasicBolt{private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);Map<String, Integer> longWordCounts = new HashMap<String, Integer>();Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String sourceStreamId = input.getSourceStreamId();String word = input.getString(0);if(sourceStreamId.equals("longWordStream")){Integer count = longWordCounts.get(word);if (count == null) count = 0;count++;longWordCounts.put(word, count);LOGGER.info("long word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}if(sourceStreamId.equals("shortWordStream")){Integer count = shortWordCounts.get(word);if (count == null) count = 0;count++;shortWordCounts.put(word, count);LOGGER.info("short word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}
  • 这里为了展示sourceStreamId的区别,所以两个stream共用了同一个bolt,但是topology那里是两个实例
  • 实际也可以是两个不同的bolt类来处理两个stream的数据

小结

  • OutputFieldsDeclarer可以通过declareStream方法声明多个streamId
  • OutputCollector可以通过emit(String streamId, List<Object> tuple)方法来选择性将tuple发送到指定的streamId
  • OutputCollector也有emit方法参数没有streamId,其内部默认是使用Utils.DEFAULT_STREAM_ID(default)作为实际的streamId

doc

  • How to split one stream or join multiple stream

http://lihuaxi.xjx100.cn/news/235988.html

相关文章

值得注意的知识点

ImageView的属性adjustViewBounds www.jianshu.com/p/13de17744… 转载于:https://juejin.im/post/5c8b7742e51d454e02716e44

积极拥抱.NET Core开源社区

潘正磊在上海的Tech Summit 2018 大会上给我们的.NET Core以及开源情况带来了最新信息。 .Net Core 开源后取得了更加快速的发展&#xff0c;目前越活跃用户高达400万人&#xff0c;每月新增开发者45万&#xff0c;在 GitHub 上的月度增长达到15%。目前有来自超过3,700家企业的…

Linux06-服务、守护进程和systemd

目录 一、简介systemd 二、使用systemd 2.1、systemctl命令与systemd单元 2.2、控制系统服务 一、简介systemd RHEL6及以前&#xff0c;系统启动和服务器进程是由第一个进程 init 管理&#xff0c;init按顺序启动、启动慢。 RHEL7以后系统启动和服务器进程由 systemd系统和…

Linux环境HBase安装配置及使用

Linux环境HBase安装配置及使用 1. 认识HBase (1) HBase介绍 HBase Hadoop database&#xff0c;Hadoop数据库开源数据库官网&#xff1a;hbase.apache.org/HBase源于Google的BigTableApache HBase™是Hadoop数据库&#xff0c;是一个分布式&#xff0c;可扩展的大数据存储。当…

第五次作业:四则运算之升级

本次作业要求来源&#xff1a;https://edu.cnblogs.com/campus/gzcc/GZCC-16SE2/homework/2232 我的github地址&#xff1a;https://github.com/yellowjy/study 结对同伴的学号姓名&#xff1a;201606120069 缪国锋 一、基本要求&#xff1a; 生成题目&#xff0c;单个题目最多…

四个超好用的优质资源搜索网站,海量优质资源等你发现!

在网上找资源的时候总找不到满意的优质资源&#xff1f;今天小编把办公室大佬珍藏多年的四个超好用优质资源搜索网站分享给你&#xff0c;只要你想找&#xff0c;没有找不到的资源&#xff01;一、学习资料库学习资料库中有大量的免费学习资料&#xff0c;学习资料涵盖多种学科…

移动磁盘由于IO设备错误,要怎样寻回文件

J盘打不开由于IO设备错误&#xff0c;是因为这个I盘的文件系统内部结构损坏导致的。要恢复里面的数据就必须要注意&#xff0c;这个盘不能格式化&#xff0c;否则数据会进一步损坏。具体的恢复方法看正文 工具/软件&#xff1a;流星数据恢复软件 步骤1&#xff1a;先百度搜索并…

Linux07-OpenSSH

目录 一、使用SSH访问远程主机 1.1、什么是OpenSSH Secure Shell&#xff08;SSH&#xff09; 1.2、SSH主机密钥 二、配置基于SSH密钥的身份验证 2.1、基于SSH密钥的身份验证 2.2、自定义SSH服务配置 2.3、sftp传输文件 一、使用SSH访问远程主机 1.1、什么是OpenSSH Se…