Deno 中如何使用 RabbitMQ

前言

RabbitMQ 是一个流行的消息代理程序,用于支持应用程序之间的异步消息传递。Deno 是一个新兴的 JavaScript 和 TypeScript 运行时环境,旨在提供更好的安全性和性能等优势。本文将介绍如何在 Deno 中使用 RabbitMQ。

安装 RabbitMQ

在使用 RabbitMQ 之前,需要先安装它。RabbitMQ 可以在 官网 上下载和安装。安装完成后,可以使用命令行启动 RabbitMQ。

# 启动 RabbitMQ
rabbitmq-server

安装依赖

使用 RabbitMQ 需要先安装相关依赖,包括 amqplibdeno-amqplibamqplib 是 Node.js 中的 RabbitMQ 客户端库,而 deno-amqplib 是用于 Deno 的版本。可以使用以下命令来安装它们:

# 安装 amqplib
npm install amqplib

# 安装 deno-amqplib
deno install --allow-net --allow-read https://deno.land/x/amqp/mod.ts

连接 RabbitMQ

在 Deno 中连接 RabbitMQ 首先需要引入 amqpdeno-amqp

// 引入 amqp
import amqp from 'amqplib';

// 引入 deno-amqp
import { connect } from 'https://deno.land/x/amqp/mod.ts';

然后我们可以使用 amqp.connect 方法来连接 RabbitMQ:

const connection = await amqp.connect('amqp://localhost');

这里连接的是本机的 RabbitMQ 服务。如果连接成功,会返回一个 connection 实例,后续使用它来创建或者销毁频道以及发布或消费消息。

使用频道

在 RabbitMQ 中,频道用于定义用户如何传输消息。一个连接可以包含多个频道。在 Deno 中,可以使用 connection.createChannel() 方法来创建一个频道:

const channel = await connection.createChannel();

定义消费者

在 RabbitMQ 中,消费者用于接收消息并处理它们。可以使用 channel.consume(queueName, callback) 方法来定义一个消费者。在 Deno 中,我们可以使用 Promise.racechannel.cancel 方法来实现停止消费者:

const consume = async (queueName: string) => {
  await channel.assertQueue(queueName, { durable: false });
  await channel.prefetch(1);

  console.log(`Waiting for messages in ${queueName}.`);

  const consumerTag = await channel.consume(queueName, async (msg) => {
    console.log(`Received message: ${msg.content.toString()}`);

    await channel.ack(msg);
  });

  await Promise.race([
    Deno.signal(Deno.Signal.SIGTERM).catch((err) => {
      console.log(err);
    }),
    Deno.signal(Deno.Signal.SIGINT).catch((err) => {
      console.log(err);
    }),
  ]);

  console.log(`Stopping consumer ${consumerTag}`);
  await channel.cancel(consumerTag);
};

上面的代码定义了一个消费者,首先使用 channel.assertQueue 方法来检查是否存在指定名称的队列。并使用 channel.prefetch 方法来进行限流,指定最多处理多少个未确认的消息。接下来使用 channel.consume 方法来注册一个回调函数,传入的消息类型为 msg。接收到消息后,打印消息内容,并使用 channel.ack 方法来确认接收到了消息。最后使用 Promise.race 方法来等待程序退出信号,如 SIGINT 或 SIGTERM。

定义生产者

在 RabbitMQ 中,生产者用于发布消息到队列中。可以使用 channel.sendToQueue(queueName, Buffer.from(message)) 方法来向指定的队列发送消息。在 Deno 中,我们可以使用以下方法来定义一个生产者:

const publish = async (queueName: string, msg: string) => {
  await channel.assertQueue(queueName, { durable: false });

  await channel.sendToQueue(queueName, Buffer.from(msg));
  console.log(`Sent message: ${msg}`);
};

这里使用 channel.assertQueue 方法来检查队列是否存在,然后使用 channel.sendToQueue 方法来发送消息。

示例代码

下面是一个完整的示例代码,它包含了消费者和生产者这两个函数,并在接收到消息后打印消息内容:

import amqp from 'amqplib';
import { connect } from 'https://deno.land/x/amqp/mod.ts';

// 连接 RabbitMQ
const connection = await amqp.connect('amqp://localhost');

// 创建频道
const channel = await connection.createChannel();

// 定义消费者
const consume = async (queueName: string) => {
  await channel.assertQueue(queueName, { durable: false });
  await channel.prefetch(1);

  console.log(`Waiting for messages in ${queueName}.`);

  const consumerTag = await channel.consume(queueName, async (msg) => {
    console.log(`Received message: ${msg.content.toString()}`);

    await channel.ack(msg);
  });

  await Promise.race([
    Deno.signal(Deno.Signal.SIGTERM).catch((err) => {
      console.log(err);
    }),
    Deno.signal(Deno.Signal.SIGINT).catch((err) => {
      console.log(err);
    }),
  ]);

  console.log(`Stopping consumer ${consumerTag}`);
  await channel.cancel(consumerTag);
};

// 定义生产者
const publish = async (queueName: string, msg: string) => {
  await channel.assertQueue(queueName, { durable: false });

  await channel.sendToQueue(queueName, Buffer.from(msg));
  console.log(`Sent message: ${msg}`);
};

// 消费队列 test
await consume('test');

// 向队列 test 发送消息
await publish('test', 'Hello, RabbitMQ!');

总结

本文介绍了如何在 Deno 中使用 RabbitMQ,包括安装 RabbitMQ、安装依赖、连接 RabbitMQ、使用频道、定义消费者和生产者。通过本文的学习,读者可以了解如何在 Deno 中进行异步的消息传递处理。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65ae41cdadd4f0e0ff7cff98