在 Node.js 中使用 MQ(Message Queue)是很常见的场景,而 nsq-rocket 是一个 Node.js 库,提供了一些便捷的 API 来访问 NSQ(一种实时分布式消息发布订阅系统)。
在本文中,我们将介绍如何使用 nsq-rocket 库来连接 NSQ,并发送和接收消息。
安装
使用 npm 进行安装:
npm install nsq-rocket
连接 NSQ
在使用 nsq-rocket 前,要先安装 NSQ,可以通过以下命令进行安装。
curl -s https://packagecloud.io/install/repositories/nsqio/nsq/script.deb.sh | sudo bash sudo apt-get install nsq
接下来的示例中,我们使用黑洞(blackhole)作为 NSQ 的主题(topic)。
const NsqRocket = require('nsq-rocket'); const producer = new NsqRocket.Producer('127.0.0.1:4150'); const subscriber = new NsqRocket.Subscriber('127.0.0.1:4161', { topic: 'blackhole' });
NSQ 有两个端口,一个是 Producer 的端口,用来发送消息,也就是 4150 端口;另一个是 Subscriber 的端口,用来订阅消息,也就是 4161 端口。
发送消息
nsq-rocket 提供两个类:
- Producer:发送消息。
- Subscriber:订阅消息。
我们先来看一下如何发送消息。
producer .publish('Hello World') .then(() => { console.log('Message has been sent.'); }) .catch((err) => { console.error(err); });
在上面的代码中,我们使用 publish 方法向 blackhole 主题发送消息(字符串 'Hello World')。
订阅消息
订阅消息的过程相对来说要稍微复杂一点,下面是一个订阅示例:
subscriber .on('message', (msg) => { console.log('Received message: ' + msg.getBody().toString()); }) .on('error', (err) => { console.error(err); });
在上面的代码中,我们订阅 blackhole 主题,并监听 'message' 事件,当有消息到达时,系统会自动触发该事件,并将消息体(body)传递进回调函数中。
除了订阅 'message' 事件,还有其他一些事件可供订阅,比如 'error',当出现错误时,将会触发该事件。
总结
通过本文的介绍,我们学习了如何使用 nsq-rocket 连接 NSQ,并发送和接收消息。在实际应用中,使用 MQ 能够极大地提高分布式系统的可扩展性和可靠性。
完整的示例代码可以在 Github 上找到。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/60066f933d1de16d83a66bbe