简介
Kafka 是一种高吞吐量的分布式发布/订阅消息系统,它可以处理大规模数据并允许多个生产者和多个消费者同时使用。 Docker 是一种容器化平台,它提供了一种轻量级的容器环境,用于运行和管理应用程序。 在本文中,我们将讨论如何在 Docker 容器中使用 Kafka 消息队列。
安装 Kafka
首先,我们需要安装 Kafka。在 Docker 中,可以使用 wurstmeister/kafka 镜像进行安装。该镜像已在 Docker Hub 中预先构建,并包括 Zookeeper 和 Kafka 服务器。
运行以下命令以安装 Kafka:
docker run --name kafka -p 2181:2181 -p 9092:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 \ -e KAFKA_CREATE_TOPICS=test:1:1 \ -d wurstmeister/kafka
命令参数解释:
--name kafka
:指定容器名称为 kafka。-p 2181:2181 -p 9092:9092
:映射容器端口到主机端口,这里的 2181 是 Zookeeper 端口,9092 是 Kafka 端口。-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
:指定 Kafka 的地址为 localhost:9092,这是用于生产和消费消息的地址。-e KAFKA_ZOOKEEPER_CONNECT=localhost:2181
:指定 Zookeeper 的地址为 localhost:2181,这是 Kafka 使用的协调器地址。-e KAFKA_CREATE_TOPICS=test:1:1
:创建一个名为 test 的主题,具有一个分区和一个副本。
生产消息
一旦 Kafka 安装完毕,我们可以使用它来发布和订阅消息。 在 Docker 容器中,可以使用 Kafka 的命令行工具来生产和消费消息。 首先,连接到 Kafka 容器:
docker exec -it kafka /bin/bash
在容器中,可以使用 kafka-topics.sh
命令创建新的主题:
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
解释:
--create
:创建主题。--bootstrap-server localhost:9092
:指定 Kafka 服务器的地址。--replication-factor 1
:主题复制因子。--partitions 1
:主题分区数。--topic test
:主题名称。
生产消息:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
此时,您可以输入一些消息并按 Enter 键。 输入的消息将被写入 Kafka 中。
消费消息
要获取 Kafka 中的消息,请使用 kafka-console-consumer.sh
命令。 使用以下命令从主题 test 中获取消息:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
解释:
--bootstrap-server localhost:9092
:指定 Kafka 服务器的地址。--topic test
:主题名称。--from-beginning
:从主题开头开始消费。
现在,您可以在控制台上查看我们生产的消息,以及有关其发送时间和分区的其他详细信息。
容器间通信
除了在容器内生成和消费消息之外,Kafka 还可以在多个容器之间进行通信。 为此,我们需要确保多个容器可以相互通信。
示例代码
以下是 Docker Compose 文件的示例代码,其中包含 Kafka 和消费者容器:
// javascriptcn.com code example version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - zookeeper consumer: build: . environment: KAFKA_SERVER: kafka:9092 depends_on: - kafka
以下是 Node.js 消费者的示例代码,它使用 KafkaJS 进行消息消费。
// javascriptcn.com code example const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-app', brokers: [`${process.env.KAFKA_SERVER}`], }); const consumer = kafka.consumer({ groupId: 'test-group' }); const run = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'test', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ value: message.value.toString(), }); }, }); }; run().catch(console.error);
该示例代码运行消费者容器,该容器连接到 Kafka 容器,使用 KafkaJS 消费来自主题 test 的消息。
结论
在本文中,我们讨论了在 Docker 容器中使用 Kafka 消息队列的方法。我们看到了如何安装 Kafka 并使用 KafkaJS 消费消息。 使用 Docker,我们可以轻松创建和管理 Kafka 容器以及使用它发送和接收消息。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/672d96deeedcc8a97c8561b3