简介
@microfleet/amqp-coffee 是一个基于 AMQP 0.9.1 协议的 Node.js 消息队列 (MQ) 客户端库,适用于 RabbitMQ 等支持 AMQP 0.9.1 协议的 MQ。
它提供了易用的 API 接口,支持消息发送和接收,以及 AMQP 的高级特性,如交换机和队列的声明、绑定和解绑等。
本文主要介绍 @microfleet/amqp-coffee 的使用方法,涵盖了基本的配置、连接、发送和接收消息等方面。
安装
使用 @microfleet/amqp-coffee 安装非常简单,只需要通过 npm 安装即可:
npm install @microfleet/amqp-coffee --save
连接
使用 @microfleet/amqp-coffee 连接 MQ,需要设置几个必要的参数:
url
: MQ 的连接字符串,格式为 amqp://${username}:${password}@${host}:${port}/${vhost};queue
: MQ 中的队列名称。
其他可选参数包括:
exchange
: 交换机名称;routingKey
: 路由键;key
: 队列名称;channel
: 信道;reconnect
: 是否自动重连。
以下是一个完整的 @microfleet/amqp-coffee 连接示例:
-- -------------------- ---- ------- ----- ---- - ----------------------------------- ----- --- - ------------------------------------ ----- ----- - ------- ----- ---------- - --- -------------------- - ------ --- ---------------------- ----- -- - ------------------------- -------- ----- --- ------------------------ -- -- - ----------------------- -------------- --- --------------------------- -- -- - ------------------------ ------- ---
在上面的示例中,我们创建了一个连接对象 connection
,并监听了三个事件:
connect
事件表示连接成功;disconnect
事件表示连接断开;error
事件表示连接出错。
需要注意的是,@microfleet/amqp-coffee 会自动尝试重新连接,所以在 connect
事件触发后,应该保持连接对象,以备后续使用。
发送消息
使用 @microfleet/amqp-coffee 发送消息非常简单,只需要调用 connection.publish
方法,传入消息的内容即可:
const message = { name: 'Alice', age: 18, }; connection.publish(JSON.stringify(message));
在上面的示例中,我们发送了一条 JSON 格式的消息。
接收消息
使用 @microfleet/amqp-coffee 接收消息也很简单,只需要监听 message
事件即可:
connection.on('message', (message, done) => { console.log('Received message:', message.content.toString()); done(); });
在上面的示例中,我们监听了 message
事件,并在事件处理函数中打印出接收到的消息内容。
需要注意的是,@microfleet/amqp-coffee 提供了自动确认和手动确认两种消息确认方式。默认是自动确认,即在处理完消息后自动发送确认信号;如果需要手动确认,可以将 done
函数作为第二个参数传入 message
事件的处理函数,在处理完成后手动调用 done
函数即可。
高级特性
@microfleet/amqp-coffee 支持 AMQP 的许多高级特性,例如声明、绑定和解绑交换机、队列等。
声明队列
使用 connection.declareQueue
方法,可以声明一个队列,指定队列名称和其他属性:
connection.declareQueue('my-queue', { autoDelete: true, durable: true, exclusive: false, arguments: { name: 'Alice', }, });
在上面的示例中,我们声明了一个名称为 my-queue
的队列,设置了自动删除、持久化、不是独占队列以及其他自定义参数。
绑定和解绑队列
使用 connection.bind
和 connection.unbind
方法,可以完成队列和交换机的绑定和解绑:
connection.bind('my-queue', 'my-exchange', '', { name: 'Alice' }); connection.unbind('my-queue', 'my-exchange', '', { name: 'Alice' });
在上面的示例中,我们绑定了队列 my-queue
到交换机 my-exchange
,并指定了路由键为空字符串和自定义参数;然后解绑了队列和交换机的绑定关系。
声明交换机
使用 connection.declareExchange
方法,可以声明一个交换机,指定交换机名称和其他属性:
connection.declareExchange('my-exchange', 'topic', { autoDelete: false, durable: true, internal: false, arguments: { name: 'Alice', }, });
在上面的示例中,我们声明了一个名称为 my-exchange
类型为 topic
的交换机,设置了不自动删除、持久化、不是内部交换机以及其他自定义参数。
绑定和解绑交换机
使用 connection.exchanges
方法,可以获取当前已声明的交换机列表:
console.log(connection.exchanges());
在上面的示例中,我们打印出了当前已声明的交换机列表。
总结
通过本篇文章的介绍,我们了解了 @microfleet/amqp-coffee 的基本使用方法,包括连接、发送消息、接收消息,以及 AMQP 的高级特性等。
在实际使用中,我们可以通过 @microfleet/amqp-coffee 来搭建一个可靠的消息队列系统,应用于日志处理、告警通知、任务调度等场景,提高系统的可伸缩性和稳定性。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/5eedacaab5cbfe1ea0610ac2