在 Kubernetes 上部署 Kafka

Apache Kafka 是一个高吞吐量,分布式的消息系统,经常用于处理大量数据流和事件流。在云原生的应用开发中,有时候需要将 Kafka 部署在 Kubernetes 中,以便实现更好的弹性和可扩展性。本文将详细介绍如何在 Kubernetes 上部署 Kafka,并给出相应的示例代码和指导意义。

1. 使用 Helm 安装 Kafka

Helm 是 Kubernetes 的包管理器,我们可以使用 Helm 来安装 Kafka。首先,需要添加官方 Kafka Helm chart 的仓库:

接着,我们可以使用如下命令来安装 Kafka:

这个命令将会在 Kubernetes 集群中创建一个 Kafka 的实例和相应的 Zookeeper 实例,并将它们连接起来,以便 Kafka 能够正常工作。

2. 创建 Kafka 服务和 StatefulSet

为了对外提供 Kafka 的服务,我们需要创建一个 Kubernetes 服务对象。此外,由于 Kafka 是一个状态化的应用,所以我们需要使用 Kubernetes 中的 StatefulSet 来管理 Kafka 和 Zookeeper。下面是一个示例的 yaml 配置文件,用于创建 Kafka 服务和 StatefulSet:

apiVersion: v1
kind: Service
metadata:
  name: kafka
  labels:
    name: kafka
spec:
  ports:
  - name: tcp
    port: 9092
    targetPort: 9092
    protocol: TCP
  selector:
    app: kafka
    release: my-kafka
  type: ClusterIP
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: my-kafka
spec:
  serviceName: kafka
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
        release: my-kafka
      annotations:
        pod.beta.kubernetes.io/init-containers: '[
          {
            "name": "wait-for-zookeeper",
            "image": "busybox",
            "args": [
              "sh",
              "-c",
              "until nslookup zookeeper-0.zookeeper.default.svc.cluster.local > /dev/null ; do echo waiting for zookeeper ; sleep 2 ; done"
            ]
          }
        ]'
    spec:
      containers:
      - name: kafka
        image: wurstmeister/kafka:2.12-2.3.0
        env:
        - name: BROKER_ID_COMMAND
          value: "hostname | awk -F\"-\" '{print $NF}'"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://$(hostname -f):29092"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: "zk-headless:2181"
        - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
          value: "true"
        - name: KAFKA_DEFAULT_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_NUM_PARTITIONS
          value: "6"
        - name: KAFKA_LOG_DIRS
          value: "/kafka/data"
        volumeMounts:
        - name: kafka-data
          mountPath: /kafka/data
        ports:
        - name: tcp
          containerPort: 29092
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:29092"
          initialDelaySeconds: 5
          timeoutSeconds: 30
      - name: zookeeper
        image: wurstmeister/zookeeper:3.4.6
        env:
        - name: ZOO_MY_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.annotations['zookeeper.alpha.kubernetes.io/myid']
        - name: ZOO_SERVERS
          value: 'server.1=zookeeper-0.zookeeper.default.svc.cluster.local:2888:3888 server.2=zookeeper-1.zookeeper.default.svc.cluster.local:2888:3888 server.3=zookeeper-2.zookeeper.default.svc.cluster.local:2888:3888'
        volumeMounts:
        - name: zookeeper-data
          mountPath: /data
  volumeClaimTemplates:
  - metadata:
      name: kafka-data
    spec:
      accessModes:
        - ReadWriteOnce
      resources:
        requests:
          storage: 1Gi
  - metadata:
      name: zookeeper-data
    spec:
      accessModes:
        - ReadWriteOnce
      resources:
        requests:
          storage: 1Gi

其中,service 对象用于将 Kafka 对外暴露在一个唯一的 IP 和端口上,以便让客户端连接 Kafka。StatefulSet 对象则用于自动管理 Kafka 集群中的实例,包括创建、删除和扩容等操作。

3. 测试 Kafka 集群

安装好 Kafka 集群之后,可以使用 kafkacat 等工具测试 Kafka 集群是否正常工作。具体的操作如下:

首先,创建一个名为 test-topic 的测试主题:

此时,我们已经成功创建了一个名为 test-topic 的主题。接着,可以使用 kafkacat 工具来往这个主题写入和读取消息,以检测 Kafka 集群是否正常。

这个例子中,我们向主题 test-topic 写入了两条消息,并且成功地从这个主题中读取了这两条消息。这表明 Kafka 集群已经成功工作了。

4. 总结

本文介绍了如何在 Kubernetes 上部署 Kafka,并且给出了相应的示例代码和指导意义。通过这些步骤,我们可以快速地构建起一个 Kafka 集群,并且在云原生环境下实现更好的弹性和可扩展性。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/659609b1eb4cecbf2d9ee831


纠错反馈