介绍
wikimedia-node-rdkafka 是一个基于 Node.js 的 Kafka 客户端。它提供了对 Kafka 生产者和消费者的操作,支持流式处理和批量处理,可以在高吞吐量和低延迟的场景下运行。
安装
可以通过 npm 安装:
--- ------- ----------------------
生产者示例
生产者负责将消息写入 Kafka 服务器。使用 wikimedia-node-rdkafka,可以轻松地创建生产者,并发送消息。
----- ----- - ---------------------------------- ----- -------- - --- ---------------- ----------------------- ----------------- --- ------------------- -------------------- -- -- - ----------------- ------------- ----- ---------------------------- ---- ------- ---- ----- ----------- -- --- -------------------------- ------- -- - -------------------- ---- ----------- ------- --- --------------------------- ----- -- - --------------------- ------------- -- ----- ---
消费者示例
消费者负责从 Kafka 服务器读取消息。使用 wikimedia-node-rdkafka,可以轻松地创建消费者,并接收消息。
----- ----- - ---------------------------------- ----- -------- - --- --------------------- ----------- ----------- ----------------------- ----------------- --- ------------------- -------------------- -- -- - ----------------------------------- ------------------- --- ------------------- ------ -- - ----------------------------------- --- -------------------------- ------- -- - -------------------- ---- ----------- ------- --- --------------------------- ----- -- - --------------------- ------------- -- ----- ---
操作指南
创建生产者
----- ----- - ---------------------------------- ----- -------- - --- -----------------------
其中,config
是一个可选的配置对象,可以包含以下属性:
metadata.broker.list
:Kafka 服务器地址,可以是一个逗号分隔的字符串或一个数组。compression.codec
:压缩算法,默认为none
。batch.num.messages
:每批消息的数量,默认为 10000。queue.buffering.max.messages
:缓存未发送到服务器的消息的最大数量,默认为 1000000。queue.buffering.max.ms
:缓存未发送到服务器的消息的最大时间,默认为 1000。linger.ms
:缓存消息的时间,默认为 0。message.send.max.retries
:消息发送失败后的最大重试次数,默认为 2。retry.backoff.ms
:消息发送失败后的重试间隔时间,默认为 100。
创建消费者
----- ----- - ---------------------------------- ----- -------- - --- ----------------------------
其中,config
是一个可选的配置对象,可以包含以下属性:
group.id
:消费者组 ID。metadata.broker.list
:Kafka 服务器地址,可以是一个逗号分隔的字符串或一个数组。offset.store.method
:偏移量存储方式,默认为 mem。enable.auto.commit
:是否自动提交偏移量,默认为 true。auto.commit.interval.ms
:自动提交偏移量的时间间隔,默认为 5000。fetch.min.bytes
:每次从服务器获取消息的最小字节数,默认为 1。fetch.message.max.bytes
:每次从服务器获取消息的最大字节数,默认为 1048576。fetch.error.backoff.ms
:发生错误时等待的时间,默认为 500。queued.min.messages
:从服务器获取的消息缓存的最小数量,默认为 100。queued.max.messages.kbytes
:从服务器获取的消息缓存的最大大小,默认为 1048576。debug
:是否开启调试模式,默认为 false。
发送消息
----------------------- ---------- -------- ---- -----------
其中,topic
是消息所属的主题,partition
是消息所属的分区(可选),message
是消息内容,key
是消息的键(可选),timestamp
是消息的时间戳(可选)。
接收消息
---------------------------
其中,topics
是一个数组,包含消费者要订阅的所有主题。
-------------------
这会拉取所有未处理的消息。
------------------- ------ -- - ----------------------------------- ---
这里的 data
是一个对象,包含了消息的所有信息,如下所示:
- -------- ------------- -------- ---------------------- ------- --- ------------ -- --------- -- ------------ --------- -
关闭连接
----------------------
----------------------
结论
wikimedia-node-rdkafka 是一个基于 Node.js 的 Kafka 客户端。它提供了对 Kafka 生产者和消费者的操作,支持流式处理和批量处理,可以在高吞吐量和低延迟的场景下运行。本文提供了关于如何使用该客户端的介绍和示例。希望这篇文章能够帮助你更好地使用 wikimedia-node-rdkafka。
来源:JavaScript中文网 ,转载请联系管理员! 本文地址:https://www.javascriptcn.com/post/6006710c8dd3466f61ffe140