npm 包 wikimedia-node-rdkafka 使用教程

阅读时长 6 分钟读完

介绍

wikimedia-node-rdkafka 是一个基于 Node.js 的 Kafka 客户端。它提供了对 Kafka 生产者和消费者的操作,支持流式处理和批量处理,可以在高吞吐量和低延迟的场景下运行。

安装

可以通过 npm 安装:

生产者示例

生产者负责将消息写入 Kafka 服务器。使用 wikimedia-node-rdkafka,可以轻松地创建生产者,并发送消息。

-- -------------------- ---- -------
----- ----- - ----------------------------------

----- -------- - --- ----------------
  ----------------------- -----------------
---

-------------------

-------------------- -- -- -
  -----------------
    -------------
    -----
    ---------------------------- ---- ------- ----
    -----
    -----------
  --
---

-------------------------- ------- -- -
  -------------------- ---- ----------- -------
---

--------------------------- ----- -- -
  --------------------- ------------- -- -----
---

消费者示例

消费者负责从 Kafka 服务器读取消息。使用 wikimedia-node-rdkafka,可以轻松地创建消费者,并接收消息。

-- -------------------- ---- -------
----- ----- - ----------------------------------

----- -------- - --- ---------------------
  ----------- -----------
  ----------------------- -----------------
---

-------------------

-------------------- -- -- -
  -----------------------------------
  -------------------
---

------------------- ------ -- -
  -----------------------------------
---

-------------------------- ------- -- -
  -------------------- ---- ----------- -------
---

--------------------------- ----- -- -
  --------------------- ------------- -- -----
---

操作指南

创建生产者

其中,config 是一个可选的配置对象,可以包含以下属性:

  • metadata.broker.list:Kafka 服务器地址,可以是一个逗号分隔的字符串或一个数组。
  • compression.codec:压缩算法,默认为 none
  • batch.num.messages:每批消息的数量,默认为 10000。
  • queue.buffering.max.messages:缓存未发送到服务器的消息的最大数量,默认为 1000000。
  • queue.buffering.max.ms:缓存未发送到服务器的消息的最大时间,默认为 1000。
  • linger.ms:缓存消息的时间,默认为 0。
  • message.send.max.retries:消息发送失败后的最大重试次数,默认为 2。
  • retry.backoff.ms:消息发送失败后的重试间隔时间,默认为 100。

创建消费者

其中,config 是一个可选的配置对象,可以包含以下属性:

  • group.id:消费者组 ID。
  • metadata.broker.list:Kafka 服务器地址,可以是一个逗号分隔的字符串或一个数组。
  • offset.store.method:偏移量存储方式,默认为 mem。
  • enable.auto.commit:是否自动提交偏移量,默认为 true。
  • auto.commit.interval.ms:自动提交偏移量的时间间隔,默认为 5000。
  • fetch.min.bytes:每次从服务器获取消息的最小字节数,默认为 1。
  • fetch.message.max.bytes:每次从服务器获取消息的最大字节数,默认为 1048576。
  • fetch.error.backoff.ms:发生错误时等待的时间,默认为 500。
  • queued.min.messages:从服务器获取的消息缓存的最小数量,默认为 100。
  • queued.max.messages.kbytes:从服务器获取的消息缓存的最大大小,默认为 1048576。
  • debug:是否开启调试模式,默认为 false。

发送消息

其中,topic 是消息所属的主题,partition 是消息所属的分区(可选),message 是消息内容,key 是消息的键(可选),timestamp 是消息的时间戳(可选)。

接收消息

其中,topics 是一个数组,包含消费者要订阅的所有主题。

这会拉取所有未处理的消息。

这里的 data 是一个对象,包含了消息的所有信息,如下所示:

关闭连接

结论

wikimedia-node-rdkafka 是一个基于 Node.js 的 Kafka 客户端。它提供了对 Kafka 生产者和消费者的操作,支持流式处理和批量处理,可以在高吞吐量和低延迟的场景下运行。本文提供了关于如何使用该客户端的介绍和示例。希望这篇文章能够帮助你更好地使用 wikimedia-node-rdkafka。

来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/6006710c8dd3466f61ffe140

纠错
反馈