随着大数据和实时流处理的需求增加,Kafka成为了一个非常流行的分布式消息队列。Docker则成为了一个流行的容器化解决方案。在本文中,我们将介绍如何在Docker容器中使用Kafka。
准备工作
在开始之前,我们需要安装Docker和Docker Compose。Docker Compose是一个工具,允许我们在Docker容器中部署多个服务,包括Kafka。
如果您还没有安装Docker和Docker Compose,请按照以下步骤进行安装:
创建Kafka容器
我们将使用Docker Compose创建一个Kafka容器。在项目目录下创建一个名为docker-compose.yml
的文件,并将以下内容添加到文件中:
// javascriptcn.com 代码示例 version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka:2.12-2.3.0 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - zookeeper
这个Docker Compose文件定义了两个服务:zookeeper和kafka。zookeeper是一个开源的分布式协调服务,Kafka需要它来存储元数据。Kafka是我们的主要服务,它将存储我们的消息。
在这个文件中,我们定义了Kafka的环境变量。KAFKA_ADVERTISED_LISTENERS
指定了Kafka的地址和端口。在这里,我们使用localhost和9092端口。KAFKA_ZOOKEEPER_CONNECT
指定了zookeeper的地址和端口。在这里,我们使用zookeeper和2181端口。depends_on
指定了Kafka依赖zookeeper服务。
在项目目录下运行以下命令启动Kafka容器:
docker-compose up -d
这个命令将启动zookeeper和kafka容器。-d
参数将容器放在后台运行。
使用Kafka
现在我们已经有了一个运行的Kafka容器,我们可以使用它来发送和接收消息。
发送消息
我们将使用Node.js发送消息到Kafka。在项目目录下创建一个名为producer.js
的文件,并将以下代码添加到文件中:
// javascriptcn.com 代码示例 const kafka = require('kafka-node'); const Producer = kafka.Producer; const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'}); const producer = new Producer(client); producer.on('ready', function () { console.log('Producer is ready'); }); producer.on('error', function (err) { console.log('Producer is in error state'); console.log(err); }); const payloads = [ { topic: 'my-topic', messages: 'Hello World!' }, ]; producer.send(payloads, function (err, data) { if (err) { console.log(err); } console.log(data); });
这个代码使用kafka-node
模块来连接到Kafka并发送消息。我们首先创建一个Kafka客户端,然后创建一个生产者。我们在生产者的ready
事件中打印一条消息,以便我们知道什么时候可以开始发送消息。我们还在生产者的error
事件中打印错误。
在这里,我们发送了一个消息到名为my-topic
的主题。这是一个非常简单的例子,但您可以将其扩展为发送更复杂的消息。
在项目目录下运行以下命令启动producer:
node producer.js
接收消息
我们将使用Node.js从Kafka接收消息。在项目目录下创建一个名为consumer.js
的文件,并将以下代码添加到文件中:
// javascriptcn.com 代码示例 const kafka = require('kafka-node'); const Consumer = kafka.Consumer; const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'}); const consumer = new Consumer( client, [{ topic: 'my-topic', partition: 0 }], { autoCommit: false, fromOffset: true, } ); consumer.on('message', function (message) { console.log(message); }); consumer.on('error', function (err) { console.log('Error:', err); }); consumer.on('offsetOutOfRange', function (err) { console.log('offsetOutOfRange:', err); });
这个代码使用kafka-node
模块来连接到Kafka并从my-topic
主题接收消息。我们首先创建一个Kafka客户端,然后创建一个消费者。我们在消费者的message
事件中打印消息。我们还在消费者的error
事件中打印错误,以及在消费者的offsetOutOfRange
事件中打印偏移量超出范围的错误。
在项目目录下运行以下命令启动consumer:
node consumer.js
总结
在本文中,我们介绍了如何在Docker容器中使用Kafka。我们使用Docker Compose创建了一个Kafka容器,并使用Node.js发送和接收消息。这是一个非常基础的例子,但您可以将其扩展为更复杂的用例。
来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/6583ee2ad2f5e1655deb9b18