Apache Kafka 是一个高吞吐量,分布式的消息系统,经常用于处理大量数据流和事件流。在云原生的应用开发中,有时候需要将 Kafka 部署在 Kubernetes 中,以便实现更好的弹性和可扩展性。本文将详细介绍如何在 Kubernetes 上部署 Kafka,并给出相应的示例代码和指导意义。
1. 使用 Helm 安装 Kafka
Helm 是 Kubernetes 的包管理器,我们可以使用 Helm 来安装 Kafka。首先,需要添加官方 Kafka Helm chart 的仓库:
$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator $ helm repo update
接着,我们可以使用如下命令来安装 Kafka:
$ helm install incubator/kafka --name my-kafka
这个命令将会在 Kubernetes 集群中创建一个 Kafka 的实例和相应的 Zookeeper 实例,并将它们连接起来,以便 Kafka 能够正常工作。
2. 创建 Kafka 服务和 StatefulSet
为了对外提供 Kafka 的服务,我们需要创建一个 Kubernetes 服务对象。此外,由于 Kafka 是一个状态化的应用,所以我们需要使用 Kubernetes 中的 StatefulSet 来管理 Kafka 和 Zookeeper。下面是一个示例的 yaml 配置文件,用于创建 Kafka 服务和 StatefulSet:
apiVersion: v1 kind: Service metadata: name: kafka labels: name: kafka spec: ports: - name: tcp port: 9092 targetPort: 9092 protocol: TCP selector: app: kafka release: my-kafka type: ClusterIP --- apiVersion: apps/v1beta1 kind: StatefulSet metadata: name: my-kafka spec: serviceName: kafka replicas: 3 template: metadata: labels: app: kafka release: my-kafka annotations: pod.beta.kubernetes.io/init-containers: '[ { "name": "wait-for-zookeeper", "image": "busybox", "args": [ "sh", "-c", "until nslookup zookeeper-0.zookeeper.default.svc.cluster.local > /dev/null ; do echo waiting for zookeeper ; sleep 2 ; done" ] } ]' spec: containers: - name: kafka image: wurstmeister/kafka:2.12-2.3.0 env: - name: BROKER_ID_COMMAND value: "hostname | awk -F\"-\" '{print $NF}'" - name: KAFKA_ADVERTISED_LISTENERS value: "PLAINTEXT://$(hostname -f):29092" - name: KAFKA_ZOOKEEPER_CONNECT value: "zk-headless:2181" - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE value: "true" - name: KAFKA_DEFAULT_REPLICATION_FACTOR value: "3" - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR value: "3" - name: KAFKA_NUM_PARTITIONS value: "6" - name: KAFKA_LOG_DIRS value: "/kafka/data" volumeMounts: - name: kafka-data mountPath: /kafka/data ports: - name: tcp containerPort: 29092 readinessProbe: exec: command: - sh - -c - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:29092" initialDelaySeconds: 5 timeoutSeconds: 30 - name: zookeeper image: wurstmeister/zookeeper:3.4.6 env: - name: ZOO_MY_ID valueFrom: fieldRef: fieldPath: metadata.annotations['zookeeper.alpha.kubernetes.io/myid'] - name: ZOO_SERVERS value: 'server.1=zookeeper-0.zookeeper.default.svc.cluster.local:2888:3888 server.2=zookeeper-1.zookeeper.default.svc.cluster.local:2888:3888 server.3=zookeeper-2.zookeeper.default.svc.cluster.local:2888:3888' volumeMounts: - name: zookeeper-data mountPath: /data volumeClaimTemplates: - metadata: name: kafka-data spec: accessModes: - ReadWriteOnce resources: requests: storage: 1Gi - metadata: name: zookeeper-data spec: accessModes: - ReadWriteOnce resources: requests: storage: 1Gi
其中,service
对象用于将 Kafka 对外暴露在一个唯一的 IP 和端口上,以便让客户端连接 Kafka。StatefulSet
对象则用于自动管理 Kafka 集群中的实例,包括创建、删除和扩容等操作。
3. 测试 Kafka 集群
安装好 Kafka 集群之后,可以使用 kafkacat
等工具测试 Kafka 集群是否正常工作。具体的操作如下:
首先,创建一个名为 test-topic
的测试主题:
$ kubectl exec my-kafka-0 -c kafka -- kafka-topics.sh --create --topic test-topic --partitions 3 --replication-factor 3 --zookeeper zk-headless:2181 $ kubectl exec my-kafka-0 -c kafka -- kafka-topics.sh --list --zookeeper zk-headless:2181 test-topic
此时,我们已经成功创建了一个名为 test-topic
的主题。接着,可以使用 kafkacat
工具来往这个主题写入和读取消息,以检测 Kafka 集群是否正常。
$ kafkacat -P -b my-kafka:9092 -t test-topic > hello, world > kafka is awesome > ^D $ kafkacat -C -b my-kafka:9092 -t test-topic -o beginning hello, world kafka is awesome
这个例子中,我们向主题 test-topic
写入了两条消息,并且成功地从这个主题中读取了这两条消息。这表明 Kafka 集群已经成功工作了。
4. 总结
本文介绍了如何在 Kubernetes 上部署 Kafka,并且给出了相应的示例代码和指导意义。通过这些步骤,我们可以快速地构建起一个 Kafka 集群,并且在云原生环境下实现更好的弹性和可扩展性。
来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/659609b1eb4cecbf2d9ee831