简介
node-nats-streaming 是一个 Node.js 的 NATS Streaming 客户端库,支持 NATS Streaming 消息队列协议。它能够让你使用 NATS Streaming 实现可靠的消息传递,并且支持消息的持久化和回放。
安装
在 Node.js 项目中使用 npm 包管理器来安装 node-nats-streaming。
npm install node-nats-streaming --save
使用方法
创建客户端
使用以下代码创建一个 node-nats-streaming 客户端实例。
const stan = require('node-nats-streaming').connect(clusterId, clientId, options);
其中,clusterId
是 NATS Streaming 的集群 ID,clientId
是客户端 ID,options
是一个可选的配置对象,用于设置客户端的连接选项。例如:
const options = { url: 'nats://localhost:4222', maxReconnectAttempts: -1 }; const stan = require('node-nats-streaming').connect('test-cluster', 'client-1', options);
这段代码创建了一个连接到本地 NATS Streaming 服务器的名为 client-1
的客户端实例。maxReconnectAttempts
选项设置为 -1
表示重连不限制次数。
发布消息
使用以下代码向 NATS Streaming 服务器发布消息。
stan.publish(subject, data, (err, guid) => { if (err) { console.error(err); return; } console.log(`消息已发送,GUID:${guid}`); });
其中,subject
是主题名称,data
是消息数据,guid
返回一个全局唯一的消息 ID,用于在需要时检索消息。例如:
stan.publish('orders.created', JSON.stringify({ orderId: '123' }), (err, guid) => { if (err) { console.error(err); return; } console.log(`订单已创建,GUID:${guid}`); });
这段代码发布了一个名为 orders.created
的订单创建消息,并将订单 ID 作为数据进行了序列化和传递。
订阅消息
使用以下代码订阅 NATS Streaming 服务器的消息。
const subscription = stan.subscribe(subject, opts); subscription.on('message', (msg) => { console.log(`已接收到来自 ${msg.getSubject()} 的消息,内容:${msg.getData()}`); });
其中,subject
是主题名称,opts
是一个可选的订阅选项对象,用于设置订阅的几种特性。例如:
const opts = stan.subscriptionOptions() .setStartWithLastReceived() .setDeliverAllAvailable() .setDurableName('orders-service'); const subscription = stan.subscribe('orders.*', opts); subscription.on('message', (msg) => { console.log(`已接收到来自 ${msg.getSubject()} 的消息,内容:${msg.getData()}`); });
这段代码订阅了所有以 orders.
开头的消息,并使用 opts
指定了几种订阅选项,包括从最后接收的消息开始,接收所有可用的消息,以及设置订阅的持久化名称为 orders-service
。
消息队列
node-nats-streaming 还支持消息队列模式,可以多个客户端共同消费同一个主题下的消息。
-- -------------------- ---- ------- ----- ---------- - ------------- ----- ---- - ------------------------------------------------------------------------ ----- ------------ - -------------------------------- ----------- ------ -------------------------- ----- -- - ---------------------------------------------- -- ------- ---------- ---
这段代码创建了一个名为 order-saga
的消息队列,多个订阅此队列的客户端可以竞争消费 orders.created
主题下的消息。使用 setManualAckMode
方法可以开启消息的手动确认模式,即在处理完毕后手动发送确认消息。
示例代码
下面的完整代码演示了如何在 node-nats-streaming 中创建一个基本的消息发布和订阅程序。
-- -------------------- ---- ------- ----- ---- - ------------------------------------------------------ ------------ -- ---- ------------------------------ ---------------- -------- ----- --- ----- ----- -- - -- ----- - ------------------- ------- - ---------------------------------- --- -- ---- ----- ------------ - --------------------------- -------------------------- ----- -- - ------------------- ------------------- -------------------------- ---
这段代码将创建一个名为 client-1
的客户端实例,发布一个 orders.created
消息(携带订单 ID),然后订阅所有以 orders.
开头的消息,打印出接收到的消息内容。
指导意义
node-nats-streaming 让 Node.js 项目可以方便地使用 NATS Streaming 消息队列协议,实现可靠的消息传递。它可以帮助你构建高效、健壮、可伸缩的分布式系统,使用起来非常方便。
在使用 node-nats-streaming 的过程中,需要注意以下几点:
- 连接 NATS Streaming 服务器的 URL 和端口号必须正确。
- 在订阅消息时,需要注意订阅的主题名称和订阅选项。
- 在发布消息时,需要确定消息的主题名称和消息的数据格式。
- 在使用消息队列时,需要设置正确的队列组名称和确认消息的方式。
学习和掌握 node-nats-streaming 的使用方法,对于提高 Node.js 开发人员的分布式系统开发能力非常有帮助。同时,node-nats-streaming 还有很多高级特性可以使用,建议进一步学习和探索。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/5eedaaa6b5cbfe1ea061054c