[译] RabbitMQ tutorials (3) ---- 'Pub/Sub' (Javascript)

news/2024/7/1 5:32:20

发布与订阅 (Publish/Subscribe)

在之前的章节中,我们创建了工作队列,之前的工作队列的假设是每个任务只被分发到一个worker。在这一节中,我们会做一些完全不一样的事--把一条消息发送给多个消费者,这个模式叫做“发布/订阅”(publish/subscribe)。

举个例子,我们要构建一个简易的日志系统。由两个程序组成---一个来发出日志消息,另一个接收并把消息显示出来。

在我们的日志系统当中,每一个正在运行的接收程序都会收到消息。这样,我们可以运行一个receiver并把log定向到磁盘,然后再跑一个receiver,看看它是否会在屏幕上显示日志。

事实上,被发布的消息会被广播到所有的receiver那里。

交换器(Exchanges)

在之前的引导中,我们从一个队列中做了收发的操作。是时候介绍在Rabbit中的全部的消息模型了。

让我们先快速地回顾一下之前学习的,

  • producer 是一个发送消息的应用

  • queue 是一个存储消息的buffer

  • consumer 是一个接收消息的应用

RabbitMQ中,消息模型的核心思想是生产者绝不会把消息直接发到队列。实际上,生产者通常不知道一条消息是否已经被发送到任意一个队列中。

生产者只能把消息发到交换器。交换器是个简单的东西。一方面接收从生产者那边来的消息,另一方面把他们push到队列中。交换器一定要知道当它们接收到消息之后要如何处理。是否要追加到一个特殊的队列?是否要追加到许多的队列?或者丢掉这条消息?这些规则被定义为交换类型。

exchanges

以下是可以使用的交换类型:direct, topic, header, fanout。我们介绍一下最后一个--fanout。让我们先创建一个fanout类型的交换器“logs”:

ch.assertExchange('logs', 'fanout', {durable: false})

fanout类型的交换器非常简单,我们可以从单单从名字上猜测,它就是把它接收到的消息广播给所有已知的队列。这也就是我们的logger所需要的。


列出所有的交换器(Listing exchanges)
你可以使用rabbitmqctl

$sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在列表中,一些amq.*的交换器和一些默认的(未命名的),都是被默认创建的,但是可能是你用不到的

未命名的交换器(Nameless exchange)
在之前的章节中我们未提过交换器,但是我们仍然能够把消息传到队列中,这就是我们使用了默认的交换器,因为我们使用了空的字符串("")。
之前我们是这样发布一条消息的
ch.sendToQueue('hello', new Buffer('Hello World!'));
这里我们使用默认的或者未命名的交换器,如果第一个参数存在的话,消息会被路由到这个参数名的队列。


现在,我们可以使用我们定义好的交换器

ch.publish('logs', '', new Buffer('Hello World!'));

第二个参数为空的话代表我们不想把消息推到指定的队列,只是想发布到logs的交换器中。

临时队列 (Temporary queues)

你还记得我们之前用的声明过的队列(hello 和 task_queue)吗?。能够指明一个队列的名字对我们来说是重要的--我们需要把workers指到相同的队列。
当你想要分享给消费者和生产者队列的时候,给队列起一个名字很重要。

但着不是我们logger这个程序需要的,我们想监听所有的log消息,不是一部分log消息。同样的,我们对正在流动的消息也感兴趣(not in the old ones).我们需要完成两件事情:
第一,不管我们什么时候连接Rabbit,都需要一个新的,空的队列。我们可以创建一个随机的队列名字,或者让服务器为我们随机选择一个队列名字。
第二,不管我们什么时候断开与消费者的连接,队列需要自动销毁。

amqp.node的客户端中,当我们传入字符串的时候,可以创建一个带有名字的未持久化的队列

ch.assertQueue('', {exclusive: true});

这个方法返回一个带有随机名字的队列实例,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg
当连接被断开的时候,这个队列会被销毁,因为我们在声明的时候{exclusive:true}

绑定 (Bindings)

binding

我们已经创建了一个fanout类型的交换器和一个队列,现在我们需要告诉交换器把消息发送给队列,队列与交换器之间的关系我们称之为绑定。
ch.bindQueue(queue_name, 'logs', '');
现在开始,logs的交换器为追加消息到我们的队列

Listing bindings:

你可以列出已经存在的绑定关系,你应该猜到。rabbitmqctl list_bindings

整合(Putting it all together)

all

生产者的程序,用来发出log消息,和之前章节没有太多的不同,最重要的改变就是现在我们是把消息发布到我们的logs的交换器中,而不是之前的在未声明的情况下使用。发送的时候我们需要提供一个路由键,但是在fanout类型当中,这个可以忽略。下面是emit_log.js的代码

#!/usr/bin/env nodevar amqp = require('amqplib/callback_api');amqp.connect('amqp://localhost', function(err, conn) {conn.createChannel(function(err, ch) {var ex = 'logs';var msg = process.argv.slice(2).join(' ') || 'Hello World!';ch.assertExchange(ex, 'fanout', {durable: false});ch.publish(ex, '', new Buffer(msg));console.log(" [x] Sent %s", msg);});setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

(emit_log.js 源码)

正如你所见,在与交换器建立连接之后。有一点很关键,向不存在的交换器发布消息是被禁止的。
如果仍然没有队列绑定交换器,消息会丢失。但是对我们来说还好,如果仍然没有消费者监听,我们可以安全地丢弃这些消息。

receive_logs.js的代码

#!/usr/bin/env nodevar amqp = require('amqplib/callback_api');amqp.connect('amqp://localhost', function(err, conn) {conn.createChannel(function(err, ch) {var ex = 'logs';ch.assertExchange(ex, 'fanout', {durable: false});ch.assertQueue('', {exclusive: true}, function(err, q) {console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);ch.bindQueue(q.queue, ex, '');ch.consume(q.queue, function(msg) {console.log(" [x] %s", msg.content.toString());}, {noAck: true});});});
});

(receive_logs,js源码)

如果你想要保存log,你可以打开控制台输入

$ ./receive_logs.js > logs_from_rabbit.log

如果你想在屏幕上看到log,再打开一个控制台

$ ./receive_logs.js

当然,需要发出logs

$ ./emit_log.js

使用rabbitmqctl list_bindings,你可以确定刚才的代码确实创建了交换器和队列,有两个receive_logs.js的程序在运行。

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这个结果的简要解释:数据从logs交换器到两个服务器分配的队列。这也是我们想要的结果。

要如何监听一部分的消息?让我们移到下一章。


http://lihuaxi.xjx100.cn/news/241581.html

相关文章

web前端培训分享:面向对象中类和对象的定义是什么?

在学习web前端技术的时候,我们接触的最多的便是面向对象这一块,其实很多编程技术都有用到这个现象,下面我们就为大家详细的介绍一下面向对象中类和对象的定义是什么? web前端培训分享:面向对象中类和对象的定义是什么?面向对象让…

对分组交换(packet switching)高效迅速灵活可靠四个优点的理解

1.什么是分组? 通信过程中要发送的整块数据被称为一个报文(message),报文被划分为一个个更小的等长数据段,每个数据段前加入一些由必要的控制信息组成的首部后,就构成了一个分组。分组是在互联网中传送的数据单元(长报文&#xff…

N皇后问题详解

文章目录一、题目描述二、题目解析(1)思考一(集合回溯)(2)思考二(数组深度递归)(3)思考三(位运算)一、题目描述 N 皇后问题是指在 n * n 的棋盘上要摆 n 个皇后, 要求:任何两个皇后…

display会影响canvas吗_多动症会影响智商吗?

小智(化名)小的时候非常皮,上学了也不老实,学习成绩还很差,一直是倒数,还有人说他智商低。父母带他到医院检查,一切都正常,智商也没问题。直到最近他被检查出多动症,小智…

性能测试学习过程中遇到的问题与解答1

1.一个脚本里的多个action是怎么关系到一起的?Run logic中Block是如何使用的?time:20140226解答:1)在Run-time Setting里Run Logic中,先insert block,然后选中这个block,再insert一个…

批删,全选

<th>全选<input type"checkbox" οnclick"all_()" class"che_"></th> <td><input type"checkbox" name"check_" value"<?php echo $v[w_id] ?>"></td>function dels…

大数据环境下该如何优雅地设计数据分层

最近出现了好几次同样的对话场景&#xff1a; 问&#xff1a;你是做什么的? 答&#xff1a;最近在搞数据仓库。 问&#xff1a;哦&#xff0c;你是传统行业的吧&#xff0c;我是搞大数据的。 答&#xff1a;…… 发个牢骚&#xff0c;搞大数据的也得建设数据仓库吧。而且不管是…

训练不出结果_智能训练仪:专业化智能防控近视训练设备

视觉训练精准化&#xff0c;近视防控效果佳智能训练仪小百科 智能训练仪是一款近视防控全功能智能康复设备&#xff0c;一机集成十大视功能康复模块&#xff0c;针对各种视功能异常引发的儿童及青少年假…