前言
Kafka 是一种高性能、高可靠、分布式的消息系统,在大型分布式系统中广泛使用。而 WebSocket 是一种实现了全双工通信的协议,能够在客户端和服务器之间建立持久性的连接,从而实现实时通信。结合两者,可以实现高并发、低延迟的实时消息推送。
本文将介绍如何在 Express.js 中使用 WebSocket 对接 Kafka,实现实时消息推送功能。
环境准备
本文使用的技术栈为 Express.js、WebSocket、Kafka 和 Node.js,所以需要事先安装它们:
另外,推荐安装 Kafka 可视化管理工具 Kafka Manager,它可以方便地对 Kafka 进行管理和监控。
创建 Express.js 应用
首先,我们需要创建一个 Express.js 应用:
$ express myapp $ cd myapp
然后安装依赖:
$ npm install
在 app.js
中添加 WebSocket 和 Kafka 的依赖:
const WebSocket = require("ws"); const kafka = require("kafka-node");
同时,创建一个 WebSocket 服务器:
const wss = new WebSocket.Server({ port: 8080 });
在 wss.on("connection", (ws) => {})
回调函数中,我们可以获取到连接上的 WebSocket 实例 ws
,从而向客户端发送实时消息。
连接 Kafka
在连接 Kafka 之前,我们需要先创建一个 Kafka 生产者:
const Producer = kafka.Producer; const client = new kafka.KafkaClient({ kafkaHost: "localhost:9092" }); const producer = new Producer(client);
其中,kafkaHost
是 Kafka 服务器的地址,这里使用本地地址 localhost:9092
。
然后,我们可以通过 producer.send(payloads, cb)
方法将消息发送给 Kafka。payloads
是消息数组,cb
是回调函数。
producer.send( [{ topic: "test-topic", messages: ["hello"] }], (err, data) => { console.log(data); } );
其中,topic
是消息主题,messages
是消息内容。
实现 WebSocket 对接 Kafka
在连接 Kafka 和 WebSocket 之间,我们需要在 wss.on("connection", (ws) => {})
回调函数中,将 ws
和 Kafka 进行对接。
首先,我们需要创建一个 Kafka 消费者:
const Consumer = kafka.Consumer; const consumer = new Consumer(client, [{ topic: "test-topic" }], { autoCommit: true, });
其中,autoCommit: true
表示自动提交消息,这里为了方便演示,直接提交消息。
然后,在 consumer.on("message", (message) => {})
回调函数中,我们可以获取到 Kafka 中的实时消息 message
,从而将消息推送给客户端。
consumer.on("message", (message) => { wss.clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(message.value); } }); });
其中,message.value
是 Kafka 中的消息内容。
完整代码

总结
本文介绍了如何在 Express.js 中使用 WebSocket 对接 Kafka,实现实时消息推送功能。通过连接 Kafka 和 WebSocket 的方式,可以实现高并发、低延迟的实时消息推送。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/64af4fab48841e9894b56d19