介绍
wikimedia-node-rdkafka 是一个基于 Node.js 的 Kafka 客户端。它提供了对 Kafka 生产者和消费者的操作,支持流式处理和批量处理,可以在高吞吐量和低延迟的场景下运行。
安装
可以通过 npm 安装:
npm install wikimedia-node-rdkafka
生产者示例
生产者负责将消息写入 Kafka 服务器。使用 wikimedia-node-rdkafka,可以轻松地创建生产者,并发送消息。
-- -------------------- ---- ------- ----- ----- - ---------------------------------- ----- -------- - --- ---------------- ----------------------- ----------------- --- ------------------- -------------------- -- -- - ----------------- ------------- ----- ---------------------------- ---- ------- ---- ----- ----------- -- --- -------------------------- ------- -- - -------------------- ---- ----------- ------- --- --------------------------- ----- -- - --------------------- ------------- -- ----- ---
消费者示例
消费者负责从 Kafka 服务器读取消息。使用 wikimedia-node-rdkafka,可以轻松地创建消费者,并接收消息。
-- -------------------- ---- ------- ----- ----- - ---------------------------------- ----- -------- - --- --------------------- ----------- ----------- ----------------------- ----------------- --- ------------------- -------------------- -- -- - ----------------------------------- ------------------- --- ------------------- ------ -- - ----------------------------------- --- -------------------------- ------- -- - -------------------- ---- ----------- ------- --- --------------------------- ----- -- - --------------------- ------------- -- ----- ---
操作指南
创建生产者
const Kafka = require('wikimedia-node-rdkafka'); const producer = new Kafka.Producer(config);
其中,config
是一个可选的配置对象,可以包含以下属性:
metadata.broker.list
:Kafka 服务器地址,可以是一个逗号分隔的字符串或一个数组。compression.codec
:压缩算法,默认为none
。batch.num.messages
:每批消息的数量,默认为 10000。queue.buffering.max.messages
:缓存未发送到服务器的消息的最大数量,默认为 1000000。queue.buffering.max.ms
:缓存未发送到服务器的消息的最大时间,默认为 1000。linger.ms
:缓存消息的时间,默认为 0。message.send.max.retries
:消息发送失败后的最大重试次数,默认为 2。retry.backoff.ms
:消息发送失败后的重试间隔时间,默认为 100。
创建消费者
const Kafka = require('wikimedia-node-rdkafka'); const consumer = new Kafka.KafkaConsumer(config);
其中,config
是一个可选的配置对象,可以包含以下属性:
group.id
:消费者组 ID。metadata.broker.list
:Kafka 服务器地址,可以是一个逗号分隔的字符串或一个数组。offset.store.method
:偏移量存储方式,默认为 mem。enable.auto.commit
:是否自动提交偏移量,默认为 true。auto.commit.interval.ms
:自动提交偏移量的时间间隔,默认为 5000。fetch.min.bytes
:每次从服务器获取消息的最小字节数,默认为 1。fetch.message.max.bytes
:每次从服务器获取消息的最大字节数,默认为 1048576。fetch.error.backoff.ms
:发生错误时等待的时间,默认为 500。queued.min.messages
:从服务器获取的消息缓存的最小数量,默认为 100。queued.max.messages.kbytes
:从服务器获取的消息缓存的最大大小,默认为 1048576。debug
:是否开启调试模式,默认为 false。
发送消息
producer.produce(topic, partition, message, key, timestamp);
其中,topic
是消息所属的主题,partition
是消息所属的分区(可选),message
是消息内容,key
是消息的键(可选),timestamp
是消息的时间戳(可选)。
接收消息
consumer.subscribe(topics);
其中,topics
是一个数组,包含消费者要订阅的所有主题。
consumer.consume();
这会拉取所有未处理的消息。
consumer.on('data', (data) => { console.log(data.value.toString()); });
这里的 data
是一个对象,包含了消息的所有信息,如下所示:
{ "topic": "topic-name", "value": "{\"key\":\"value\"}", "size": 13, "partition": 0, "offset": 0, "timestamp": 123456789 }
关闭连接
producer.disconnect();
consumer.disconnect();
结论
wikimedia-node-rdkafka 是一个基于 Node.js 的 Kafka 客户端。它提供了对 Kafka 生产者和消费者的操作,支持流式处理和批量处理,可以在高吞吐量和低延迟的场景下运行。本文提供了关于如何使用该客户端的介绍和示例。希望这篇文章能够帮助你更好地使用 wikimedia-node-rdkafka。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/6006710c8dd3466f61ffe140