前言
随着互联网技术的发展,应用系统的复杂度也越来越高,通过消息队列实现异步任务调度、解耦业务系统、流量控制等方案的应用场景越来越广泛。RabbitMQ 作为当前应用最广泛的消息队列框架,为开发人员提供了全面的功能和灵活的配置方式。而 PM2 作为 Node.js 应用管理器,针对 Node.js 应用的特点,提供了完善的进程管理和容错机制,与 RabbitMQ 的集成可以帮助我们轻松构建具备高可用性和高性能的消息队列应用。
本文将介绍如何使用 PM2 集成 RabbitMQ 实现消息队列应用,并附带完整的示例代码和详细解释,帮助读者快速实现应用场景。
主要内容
RabbitMQ 简介
RabbitMQ 是一个开源的 AMQP(Advanced Message Queuing Protocol)实现,它是一个分布式的消息队列系统,可以作为分布式业务系统之间的消息传递中转站,实现异步通信和解耦业务系统。
RabbitMQ 支持多种消息协议,例如 STOMP、MQTT、HTTP 和 AMQP。通俗地讲 AMQP 就是一种跨语言、跨平台的企业消息队列传输协议,它是一个定义了一系列规范,包括消息推送、路由、传输和订阅等机制,保证了消息的可靠性和可扩展性。
此外,RabbitMQ 还支持多种插件,例如,Shovel 插件可以将消息从一个 RabbitMQ 实例传输到另一个例子,而 RabbitMQ Streams 插件则可以使 RabbitMQ 与 Apache Kafka 等流数据管道进行集成。
PM2 简介
PM2 是一个基于 Node.js 的进程管理器,它提供了完整的进程管理解决方案,允许应用程序在后台运行,同时提供完善的容错机制、负载均衡方案和监控报警等特性。
PM2 可以管理多个进程,允许应用程序启动、停止、重启和监控,同时支持多进程运行,即通过 Node.js 的 cluster 模块实现多个进程对服务的并行处理,充分利用系统资源,提高 Node.js 应用的性能。
RabbitMQ + PM2 实现消息队列应用
PM2 集成 RabbitMQ 实现的消息队列应用流程如下:
- 应用通过 AMQP 的方式与消息队列服务进行通信;
- 应用启动时使用 PM2 进程管理器启动多个子进程,每个子进程都是一条独立的业务逻辑处理流程;
- 消息队列发送消息到指定的交换机;
- 队列接收消息,回调处理逻辑;
- 处理完毕后再次放回队列,等待下一次调度。
以下为示例代码详解。
安装依赖
首先需要安装 amqplib 和 pm2 两个 npm 包:
npm install amqplib pm2 --save
初始化 RabbitMQ
在应用中连接到 RabbitMQ 需要先对其进行初始化。以下为连接示例:
-- -------------------- ---- ------- ----- ---- - ------------------- ----- ------------- - ------------------------- -- ------------ ----- ------------- - ------------------------- -- ----- ----- ----------------- - ----------------------------- -- -------- ----- ----------------- - ----------------------------- -- -------- ----- ------------ - ----- -- -- - ----- ---------- - ----- --------------------------------------------------------------------------------------------------- ----- ------- - ----- --------------------------- ----- ------------------------------------- --------- - -------- ----- --- ------ -------- -
其中 EXCHANGE_NAME
为交换机名称,durable: false
表示 RabbitMQ 服务停止时队列会自动删除。
应用代码
下面是 PM2 和 RabbitMQ 集成实现的示例代码:
-- -------------------- ---- ------- ----- ---- - ------------------- ----- --- - --------------- ----- ------------- - ------------------------- -- ------------ ----- ------------- - ------------------------- -- ----- ----- ----------------- - ----------------------------- -- -------- ----- ----------------- - ----------------------------- -- -------- ----- ------------- - ---------------- ----- ---------- - ------------- ----- ------------ - ----- -- -- - ----- ---------- - ----- --------------------------------------------------------------------------------------------------- ----- ------- - ----- --------------------------- ----- ------------------------------------- --------- - -------- ----- --- ----- ------------------------------- - -------- ----- --- ----- ----------------------------- -------------- ------------ -------------------- ---------- -------------- -- --------- --------- -- ----- - ---------------- ------ -------- - --------- ----- ------------ - ----- -- -- - ----- ---------- - ----- --------------------------------------------------------------------------------------------------- ----- ------- - ----- --------------------------- --- ---- - - -- - - --- ---- - ----- ------- - -------- ------ - ------ ------------------------------ ----------- ---------------------- ----------------- -------- ------------- - ----- ------------- -- - ---------------- -------- ------- ---------------- -- ------ - ----- ----- ----------- - ----- -- -- - --- ------- - ----- ----------------- -- - -- ----- - ------------------ ------- ---------- -------------- ------- ---- --------- ---------------- - ---------------- ------- ------- ------------ ------------------------- ---- -- - ------- - --- ----------- ------- -------------- ----- --------- ---------- -- ------------------- ------- ----------- ------------------------ --------- ------------------------ ---------------- ----------- ---------- -- ----- -- - -- ----- - --------------------- ------- -------------- ------- ---- --------- ---------------- - ------------------- ------- ---------- --- --------------------------- ----- -- - --------------------- -------- ---------------------------- ----------------- -- - ------ ----- --- -------------- -- - ----------------------- -------------- ------- ---- --------- ---------------- --- ------------- -- - ------------------ -- ---- -------------- --------------- -- ------ ------------------- ---- -- - ----------------- ------ -- - ---------------- --- - --- - --------------------- -------------------------- --- ------------------- ------ -- - ---------------- --- - ----- - --------------------- -------------------------- --- --- -------------------- -- -- - ---------------- -------------------- -------- ------------- -------------------- --- ---------------- ----------------- ---------------- ---------- --------- ---------------- --- --- - --------------
解析
示例代码实现的主要流程如下:
- 使用
amqp
包连接 RabbitMQ; - 初始化 RabbitMQ 的交换机和队列;
- 通过 PM2 启动多个子进程;
- 监听 RabbitMQ 队列中的消息,异步回调处理函数处理消息;
- 子进程通过 pm2 输出日志;
- 发送 50 条消息到 RabbitMQ 队列中。
总体说明: - 该应用启动时通过 PM2 进程管理器启动三个子进程,在三个子进程中进行业务处理,充分利用系统资源,提高应用程序性能; - 通过 RabbitMQ 在多进程间传递消息,多进程间业务逻辑处理模块可以无缝协作,实现了异步任务调度和消息解耦。
启动应用
在终端中执行以下命令启动应用程序:
node main.js
终端将输出以下信息:
PM2 process manager connected Message processing initialization is finished, listening on queue - test.queue Worker process started Begin to send messages... Sent message: Message number - 0 Sent message: Message number - 1 Sent message: Message number - 2 ......
以上输出信息表明我们已经成功启动了应用,发送了 50 条消息到 RabbitMQ 队列中,在三个进程测完全并行地处理了消息。
结论
通过以上示例,可以看出 PM2 集成 RabbitMQ 可以满足异步任务调度、解耦业务系统等应用场景。通过启动多个进程,充分利用系统资源,提高应用性能,同时 RabbitMQ 提供的可靠性、流控等机制,使 RabbitMQ 搭配 PM2 非常适用于高并发、高可用的应用场景。
最后,建议读者在日常工作中,结合自身场景及业务逻辑,进一步探索 RabbitMQ 跟分布式应用逻辑的结合,以及 PM2 跟应用程序的协作方式,获得更好的代码编写体验和系统运维效果。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/67144ecfad1e889fe213265d