目錄
- 概述
- 環境準備
- Docker & Docker-Compose
- Linux服務器
- 步驟一:部署到開發環境上
- docker-compose.yml文件編寫
- 運行啟動腳本
- (拓展)容器可視化頁面
- (拓展)Kafka可視化頁面
- 用腳本命令進行測試
- 整合Spring Boot應用
- 步驟二:部署到生產環境上
- docker-compose.yml
- docker-compose配置文件變化部分以及說明
- 啟動Nginx容器
- 最后一步:IP別名映射的重要性
- extra_hosts配置
- 開發機修改hosts
概述
Kafka的環境配置花了我好幾天時間才搞明白,現整理一下。
原來公司對Kafka環境的安裝是這樣的:
- 獲?。ㄏ螺d)Zookeeper和Kafka的Linux安裝包
- 安裝Zookeeper前保證有Java環境
- 修改配置文件
- 根據官網教程逐步安裝
原來的環境部署方式已經有些落后了,現在我們搭建環境基本都會采用Docker容器,不僅簡化了部署流程,還方便管理。
在Kafka2.8版本之前,Kafka是強依賴于Zookeeper中間件的,這本身就很不合理,中間件依賴另一個中間件,搭建起來實在麻煩,并且Zookeeper需要搭建三個以上服務作為集群(不考慮掛掉的話一個也可以,但這就不叫集群了,只能算單節點),Kafka也要三個以上做集群(為了數據的高可用),所幸Kafka2.8之后推出了KRaft模式,即拋棄Zookeeper,由Kafka節點自己做Controller來選舉Leader,本篇文章內容就是介紹如何在Docker-Compose中搭建Kafka KRaft環境。
環境準備
Docker & Docker-Compose
需要提前準備好Docker和Docker-Compose環境,如果沒有安裝是不行的
Linux服務器
可以是Windows下的VMware虛擬機,也可以是云服務器
順帶說明一下我當前的環境吧:
- 開發機是Windows,因為需要Linux環境,所以整了個虛擬機安裝CentOS 7(有Docker),在這上面搭建我開發時需要的所有環境,比如MySQL、Redis等,這樣可以保證開發和生產的環境一致,也不用另一套在Windows上搭建環境的學習成本。
- 虛擬機上的CentOS,上面有說,在這里只安裝了Docker & Docker-Compose,然后搭建容器環境給開發時用。
- 云服務器,同樣也是CentOS7環境,畢竟光開發是不夠的,應用到生產環境上才算部署完整,因為大概率我們會需要Nginx轉發等配置,和開發環境會有些許不同,所以最終目的是要在云服務器上搭建完成才可以。
步驟一:部署到開發環境上
首先讓我們在開發環境上面部署好Kafka環境,然后寫一個Spring Boot應用去連接。
docker-compose.yml文件編寫
version: "3" services: kafka1: image: 'bitnami/kafka:3.3.1' network_mode: mynetwork container_name: kafka11 user: root ports: - 9192:9092 - 9193:9093 environment: ### 通用配置 # 允許使用kraft,即Kafka替代Zookeeper - KAFKA_ENABLE_KRAFT=yes # kafka角色,做broker,也要做controller - KAFKA_CFG_PROCESS_ROLES=broker,controller # 指定供外部使用的控制類請求信息 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 定義kafka服務端socket監聽端口 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 # 定義安全協議 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT # 使用Kafka時的集群id,集群內的Kafka都要用這個id做初始化,生成一個UUID即可 - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA # 集群地址 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093 # 允許使用PLAINTEXT監聽器,默認false,不建議在生產環境使用 - ALLOW_PLAINTEXT_LISTENER=yes # 設置broker最大內存,和初始內存 - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M # 不允許自動創建主題 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false ? ### broker配置 # 定義外網訪問地址(宿主機ip地址和端口) - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.54:9192 # broker.id,必須唯一 - KAFKA_BROKER_ID=1 volumes: - /home/mycontainers/kafka1/kafka/kraft:/bitnami/kafka #extra_hosts: #- "kafka1:云服務器IP" #- "kafka2:云服務器IP" #- "kafka3:云服務器IP" kafka2: image: 'bitnami/kafka:3.3.1' network_mode: mynetwork container_name: kafka22 user: root ports: - 9292:9092 - 9293:9093 environment: ### 通用配置 # 允許使用kraft,即Kafka替代Zookeeper - KAFKA_ENABLE_KRAFT=yes # kafka角色,做broker,也要做controller - KAFKA_CFG_PROCESS_ROLES=broker,controller # 指定供外部使用的控制類請求信息 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 定義kafka服務端socket監聽端口 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 # 定義安全協議 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT # 使用Kafka時的集群id,集群內的Kafka都要用這個id做初始化,生成一個UUID即可 - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA # 集群地址 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093 # 允許使用PLAINTEXT監聽器,默認false,不建議在生產環境使用 - ALLOW_PLAINTEXT_LISTENER=yes # 設置broker最大內存,和初始內存 - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M # 不允許自動創建主題 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false ? ### broker配置 # 定義外網訪問地址(宿主機ip地址和端口) - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.54:9292 # broker.id,必須唯一 - KAFKA_BROKER_ID=2 volumes: - /home/mycontainers/kafka2/kafka/kraft:/bitnami/kafka kafka3: image: 'bitnami/kafka:3.3.1' network_mode: mynetwork container_name: kafka33 user: root ports: - 9392:9092 - 9393:9093 environment: ### 通用配置 # 允許使用kraft,即Kafka替代Zookeeper - KAFKA_ENABLE_KRAFT=yes # kafka角色,做broker,也要做controller - KAFKA_CFG_PROCESS_ROLES=broker,controller # 指定供外部使用的控制類請求信息 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 定義kafka服務端socket監聽端口 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 # 定義安全協議 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT # 使用Kafka時的集群id,集群內的Kafka都要用這個id做初始化,生成一個UUID即可 - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA # 集群地址 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093 # 允許使用PLAINTEXT監聽器,默認false,不建議在生產環境使用 - ALLOW_PLAINTEXT_LISTENER=yes # 設置broker最大內存,和初始內存 - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M # 不允許自動創建主題 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false ? ### broker配置 # 定義外網訪問地址(宿主機ip地址和端口) - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.54:9392 # broker.id,必須唯一 - KAFKA_BROKER_ID=3 volumes: - /home/mycontainers/kafka3/kafka/kraft:/bitnami/kafka
配置文件說明: 如果你懂一些docker-compose的話,這個配置文件應該很好明白,這里挑一些來說:
鏡像選擇:bitnami/kafka:3.3.1
這是寫這篇文章時最新的版本
容器名是:kafka11、kafka22、kafka33
這個當然隨便寫,我用11、22、33是為了后面演示生產環境時不沖突,先不用管,后面會說
端口配置是9092和9093:
9092端口用于BROKER傳輸,即Kafka集群服務端口,我們用Kafka腳本或SpringBoot應用時,連接的就是這個端口;9093是CONTROLLER端口,前面說過,我們拋棄了Zookeeper,用Kafka來代替,這個9093就是充當著原來Zookeeper集群的通訊端口
總結一下,9092用于外網,因為是Kafka要給外部訪問;9093用于內網,只用于集群通訊,用于內網是因為環境都搭建在一個服務器的Docker容器內,相當于公司服務器內網,所以不對外開放,生產時一般會搭建在不同的服務器上面,可以是外網也可以是內網,集群的環境配置自由度很高,這點我就不多說明了。
network_mode:
給容器加入網絡,這樣才可以進行容器間的通訊,可以讓容器1中能識別容器2的容器名稱,進而解析出IP,很重要,不然只能手動配置容器IP,不過這樣很蠢,容器IP不固定,會動態變化。
當然用–link也可以。
KAFKA_KRAFT_CLUSTER_ID:
KRaft模式下,要配置這個集群ID,這個ID可以用Kafka命令去生成:
kafka-storage.sh random-uuid
一開始就用官網示例的LelM2dIFQkiUFvXCEcqRWA就可以了。
KAFKA_BROKER_ID:
broker的id,這個要唯一,搞過Kafka的懂的都懂
KAFKA_CFG_ADVERTISED_LISTENERS:
對外的訪問地址,我配置的是192.168.1.54,因為我這臺虛擬機用的是橋接網絡,分配的IP就是192.168.1.54,這個和容器IP不一樣,容器IP只用于容器間的通訊,這個54的配置目的是可以在開發機上(假如開發機的IP是192.168.1.10)通過:192.168.1.54:9092來連接Kafka,很重要
因為有三個Kafka容器,容器內部都用9092和9093端口保持一致,所以端口映射就有所不同:
kafka1配置9192、9193;kafka2配置9292、9293;kafka3配置9392、9393
這個配置也很重要
extra_hosts:
這個配置的目的是修改容器內的hosts文件,增加一個IP的別名映射,在開發階段用不上,是后面部署生產環境時用的,暫時不用理會,這時候可以先注釋掉。
其他的看注釋就可以了,還有的是在YML配置文件對Kafka的配置,比如KAFKA_ENABLE_KRAFT,是從Docker Hub的Kafka鏡像文檔上看的,這是一種快速配置手段,但我們同樣可以使用老辦法,將kafka的配置文件(server.properties)進行路徑映射,如果熟悉docker應該能理解。
運行啟動腳本
docker-compose -f docker-compose.yml up
如果沒有報錯,通過docker ps可以看到Kafka容器:
[root@localhost ~]# docker ps | grep kafka 80cf69513390 provectuslabs/kafka-ui:latest "/bin/sh -c 'java $J…" 14 hours ago Up 14 hours 0.0.0.0:17008->8080/tcp, :::17008->8080/tcp kafka-ui c66b8c979abb bitnami/kafka:3.3.1 "/opt/bitnami/script…" 14 hours ago Up 4 hours 0.0.0.0:9292->9092/tcp, :::9292->9092/tcp, 0.0.0.0:9293->9093/tcp, :::9293->9093/tcp kafka2 70f26172ba3e bitnami/kafka:3.3.1 "/opt/bitnami/script…" 14 hours ago Up 4 hours 0.0.0.0:9392->9092/tcp, :::9392->9092/tcp, 0.0.0.0:9393->9093/tcp, :::9393->9093/tcp kafka3 0193e15cd92a bitnami/kafka:3.3.1 "/opt/bitnami/script…" 15 hours ago Up 4 hours 0.0.0.0:9192->9092/tcp, :::9192->9092/tcp, 0.0.0.0:9193->9093/tcp, :::9193->9093/tcp kafka1
注意:在啟動容器的時候,不出意外的話會出很多意外,檢查一下腳本有沒有問題,重點排查KAFKA_BROKER_ID和KAFKA_CFG_ADVERTISED_LISTENERS。還有容器名稱不要沖突,區分好1、2、3,路徑映射也不要弄錯,kafka1、kafka2、kafka3。
(拓展)容器可視化頁面
如果嫌棄docker ps,可以安裝portainer來查看可視化容器頁面:
version: "3" services: portainer: container_name: portainer ports: - 9000:9000 restart: always network_mode: mynetwork volumes: - /etc/localtime:/etc/localtime - /var/run/docker.sock:/var/run/docker.sock - /home/mycontainers/portainer/data:/data:rw image: portainer/portainer
訪問:http://localhost:9000,即可,請自行修改端口和IP
(拓展)Kafka可視化頁面
kafka-ui是Apache出品的Kafka可視化容器,入門是夠用了,當然還有很多可視化工具,由于篇幅關系,并且這不是本文重點,所以就不細說了。
version: "3" services: kafka-ui: image: provectuslabs/kafka-ui:latest network_mode: mynetwork container_name: kafka-ui restart: always ports: - 8080:8080 volumes: - /etc/localtime:/etc/localtime environment: # 集群名稱 - KAFKA_CLUSTERS_0_NAME=local # 集群地址 - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka11:9092,kafka22:9092,kafka33:9092
訪問:http://localhost:8080,即可,請自行修改端口和IP
用腳本命令進行測試
Kafka容器內有自帶腳本可以使用,所以部署完Kafka環境后,最簡單快速的方式就是進入容器內進行測試:
進入任意一個Kafka容器
docker exec -it kafka11 bash
創建一個主題,名稱為demo
kafka-topics.sh --create --topic demo --partitions 3 --replication-factor 3 --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092
查看所有主題
kafka-topics.sh --bootstrap-server kafka11:9092 --list
生產一些消息
kafka-console-producer.sh --bootstrap-server kafka11:9092 --topic demo
消費一些消息
kafka-console-consumer.sh --bootstrap-server kafka11:9092 --topic demo
制造一些假數據
kafka-producer-perf-test.sh --topic demo --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=kafka11:9092 batch.size=16384 linger.ms=0
測試沒有問題,命令可以使用,到kafka-ui的可視化頁面查看下,有我們剛剛創建的主題,也有我們制造的假數據,三個Kafka節點沒有異常,Controller也進行了選舉,說明我們的Kafka已經成功創建并運行,還替代了原本Zookeeper的工作,至此,開發環境搭建完成。
整合Spring Boot應用
搭建完了Kafka環境,現在讓我們整合到Spring Boot中來使用
-
創建一個Maven項目
配置pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ? <groupId>com.cc</groupId> <artifactId>kafkaDemo</artifactId> <version>1.0-SNAPSHOT</version> ? <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> </parent> ? <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.4.RELEASE</version> </dependency> </dependencies> </project>
新建應用程序啟動類:
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
配置application.yml,去連接Kafka環境
server: port: 8888 ? spring: application: name: kafkaDemo ? kafka: bootstrap-servers: mylocalhost:9192,mylocalhost:9292,mylocalhost:9392 producer: # 生產者序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # ack策略 # 0:生產者發送消息就不管了,效率高,但是容易丟數據,且沒有重試機制 # 1:消息發送到Leader并落盤后就返回,如果Leader掛了并且Follower還沒有同步數據就會丟失數據 # -1:消息要所有副本都羅盤才返回,保證數據不丟失(但是有可能重復消費) acks: -1 # 失敗重試次數 retries: 3 # 批量提交的數據大小 batch-size: 16384 # 生產者暫存數據的緩沖區大小 buffer-memory: 33554432 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 是否自動提交偏移量,如果要手動確認消息,就要設置為false enable-auto-commit: false # 消費消息后間隔多長時間提交偏移量(ms) auto-commit-interval: 100 # 默認的消費者組,如果不指定就會用這個 group-id: mykafka # kafka意外宕機時的消息消費策略 # earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 # latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 # none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常 auto-offset-reset: latest listener: # 手動確認消息 ack-mode: manual_immediate # 消費者運行的線程數 concurrency: 2
我們只測試連接,不演示消息的生產和消費,現在關注application.yml配置文件,我們只看這個配置:
bootstrap-servers: mylocalhost:9192,mylocalhost:9292,mylocalhost:9392
mylocalhost是我在本機hosts中設置的IP別名映射,即:mylocalhost=192.168.1.54
9192、9292、9392是kafka集群容器映射 出來對外的端口,查看一下docker-compose.yml文件即可明白。
啟動程序,應用連接Kafka,出現了…JVM…字樣,表示成功連接Kafka。
步驟二:部署到生產環境上
生產環境和開發環境有什么不同呢?不同在我們往往沒有那么多的IP資源可以對外開放,則需要做Nginx轉發。
比如說Kafka集群是在內網機器中部署,最終由一臺代理機器轉發Kafka集群,當然這個代理機器實際上也是要做負載均衡的,不然代理機器掛了整個集群就無了,不過Nginx的負載均衡不在本文篇幅中。
先來看看生產環境的docker-compose.yml:
docker-compose.yml
version: "3" services: kafka1: image: 'bitnami/kafka:3.3.1' network_mode: mynetwork container_name: kafka1 user: root environment: ### 通用配置 # 允許使用kraft,即Kafka替代Zookeeper - KAFKA_ENABLE_KRAFT=yes # kafka角色,做broker,也要做controller - KAFKA_CFG_PROCESS_ROLES=broker,controller # 指定供外部使用的控制類請求信息 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 定義kafka服務端socket監聽端口 - KAFKA_CFG_LISTENERS=PLAINTEXT://:17005,CONTROLLER://:9093 # 定義安全協議 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT # 集群地址 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 # 允許使用PLAINTEXT監聽器,默認false,不建議在生產環境使用 - ALLOW_PLAINTEXT_LISTENER=yes # 設置broker最大內存,和初始內存 - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M # 不允許自動創建主題 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false # 使用Kafka時的集群id,集群內的Kafka都要用這個id做初始化,生成一個UUID即可 - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA ? ### broker配置 # 定義外網訪問地址(宿主機ip地址和端口) - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:17005 # broker.id,必須唯一 - KAFKA_BROKER_ID=1 volumes: - /home/mycontainers/kafka1/kafka/kraft:/bitnami/kafka kafka2: image: 'bitnami/kafka:3.3.1' network_mode: mynetwork container_name: kafka2 user: root environment: ### 通用配置 # 允許使用kraft,即Kafka替代Zookeeper - KAFKA_ENABLE_KRAFT=yes # kafka角色,做broker,也要做controller - KAFKA_CFG_PROCESS_ROLES=broker,controller # 指定供外部使用的控制類請求信息 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 定義kafka服務端socket監聽端口 - KAFKA_CFG_LISTENERS=PLAINTEXT://:17005,CONTROLLER://:9093 # 定義安全協議 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT # 集群地址 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 # 允許使用PLAINTEXT監聽器,默認false,不建議在生產環境使用 - ALLOW_PLAINTEXT_LISTENER=yes # 設置broker最大內存,和初始內存 - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M # 不允許自動創建主題 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false # 使用Kafka時的集群id,集群內的Kafka都要用這個id做初始化,生成一個UUID即可 - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA ? ### broker配置 # 定義外網訪問地址(宿主機ip地址和端口) - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:17005 # broker.id,必須唯一 - KAFKA_BROKER_ID=2 ? volumes: - /home/mycontainers/kafka2/kafka/kraft:/bitnami/kafka kafka3: image: 'bitnami/kafka:3.3.1' network_mode: mynetwork container_name: kafka3 user: root environment: ### 通用配置 # 允許使用kraft,即Kafka替代Zookeeper - KAFKA_ENABLE_KRAFT=yes # kafka角色,做broker,也要做controller - KAFKA_CFG_PROCESS_ROLES=broker,controller # 指定供外部使用的控制類請求信息 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 定義kafka服務端socket監聽端口 - KAFKA_CFG_LISTENERS=PLAINTEXT://:17005,CONTROLLER://:9093 # 定義安全協議 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT # 集群地址 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 # 允許使用PLAINTEXT監聽器,默認false,不建議在生產環境使用 - ALLOW_PLAINTEXT_LISTENER=yes # 設置broker最大內存,和初始內存 - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M # 不允許自動創建主題 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false # 使用Kafka時的集群id,集群內的Kafka都要用這個id做初始化,生成一個UUID即可 - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA ? ### broker配置 # 定義外網訪問地址(宿主機ip地址和端口) - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:17005 # broker.id,必須唯一 - KAFKA_BROKER_ID=3 volumes: - /home/mycontainers/kafka3/kafka/kraft:/bitnami/kafka
注意,這里我們的容器名是kafka1、kafka2、kafka3,和開發環境有所區別。
接下來是重點部分,要理解重點配置才可以正常使用,生產環境這部分我花了兩天才搞通。
docker-compose配置文件變化部分以及說明
先重新聲明一些題要:
- 開發機的IP是:192.168.1.10
- 開發環境的虛擬機IP是:192.168.1.54(與開發機同網絡)
- Kafka集群處于內網環境,比如公司的內部服務器,無法對外訪問
- 云服務器的IP是:a.a.a.a(自行腦補),云服務器就是代理服務器,可以訪問Kafka集群服務
- 開發環境的Kafka容器名是:kafka11、kafka22、kafka33
- 生產環境的Kafka容器名是:kafka1、kafka2、kafka3
接下來對生產環境的配置變動進行說明:
- 去掉端口映射,因為我們等會會創建一個Ngnix容器來轉發,容器間已經可以進行通訊,所以就不需要對外端口了,除非這個Nginx容器是另外一臺機器上,那么就需要對外端口。
- KAFKA_CFG_ADVERTISED_LISTENERS的外網訪問地址從實際IP改成了kafka1:17005, 在開發環境中,我們的開發機子可以通過虛擬機IP來訪問容器,所以配置192.168.1.54,但是生產環境這里我們不能直接訪問了(假設生產環境的Kafka集群是在內網),我們只能訪問代理服務器,讓代理服務器幫忙轉發請求,所以這里改的:kafka1:17005,必須是代理服務器可以訪問到的,因為我們代理服務器和生產環境的Kafka集群是同一個容器組內,所以可以訪問,這是為了便于演示,實際上代理服務器和Kafka集群肯定不會在同一臺機器內,所以就不能用:kafka1:17005,而是要用:[代理服務器可以訪問到的Kafka集群地址]:17005
- 17005端口替換原來的9092端口,因為我的云服務器安全組沒有開放9092,所以改成17005,這個端口要和等會轉發用的Nginx端口保持一致,即Nginx容器也要開放17005端口
啟動Nginx容器
nginx.yml:
version: "3" services: nginx: image: nginx:latest network_mode: mynetwork container_name: nginx restart: always ports: - 17005:9092 volumes: - /etc/localtime:/etc/localtime - /home/mycontainers/nginx/nginx.conf:/etc/nginx/nginx.conf - /home/mycontainers/nginx/logs:/var/log/nginx - /home/mycontainers/nginx/conf.d/:/etc/nginx/conf.d
手動修改nginx.conf:
stream { upstream kafka { server kafka1:17005; server kafka2:17005; server kafka3:17005; } ? server { listen 9092; proxy_pass kafka; } } ? http { ... location / { ... } ... }
啟動Nginx容器后,修改一下配置,將Kaka集群配置進來,然后監聽9092端口進行轉發,當然端口可以定制,這個不重要,重要的是Nginx容器對外的17005端口,流程是這樣的:
- 外部訪問17005端口,映射到Nginx的9092端口
- Nginx的9092端口對應了Kafka的集群,Kafka集群的端口是17005,所以Nginx的對外也要是17005,這樣要保證強一致
至此,生產環境搭建完成。
最后一步:IP別名映射的重要性
還記得上面搭建開發環境時候先注釋掉的extra_hosts配置嗎:
# 剛剛我們有進行聲明,云服務器IP是a.a.a.a,所以去掉注釋,自行替換 extra_hosts: - "kafka1:云服務器IP" - "kafka2:云服務器IP" - "kafka3:云服務器IP"
現在用上了,這個只配置到了開發環境的kafka11容器中,kafka22是沒有的,我們先進入kafka22容器中去連接生產環境的Kafka集群看看:
進入kafka22容器
docker exec -it kafka22 bash
連接生產環境的Kafka集群(通過Nginx轉發)
kafka-topics.sh --bootstrap-server a.a.a.a:17005 --list
會報錯:
[2023-01-11 07:29:07,495] WARN [AdminClient clientId=adminclient-1] Error connecting to node kafka3:17005 (id: 3 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka3
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1143)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1403)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1346)
at java.base/java.lang.Thread.run(Thread.java:829)
可以看到關鍵字:kafka3:17005,在開發環境中連接生產環境報這個kafka3,而kafka3是在生產環境中配置的,我們開發環境配置的是kafka33,所以很明顯,在連接Kafka的時候,會自動去讀取集群地址,就是我們生產環境的docker-compose.yml中的:
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
雖然Kafka集群運行起來了,Nginx轉發成功了,但是實際連接不上,因為不知道這個kafka3是什么意思,所以我們還需要做最后一步:IP別名映射
extra_hosts配置
退出kafka22容器,恢復kafka11容器的extra_hosts配置:
extra_hosts: - "kafka1:a.a.a.a" - "kafka2:a.a.a.a" - "kafka3:a.a.a.a"
重新啟動開發環境的Kafka容器,然后進入kafka11容器去連接生產環境,這時候就成功了,因為會把kafka1、kafka2、kafka3都映射到a.a.a.a即代理服務器的公網IP,所以至此,生產環境搭建完成。
開發機修改hosts
我們現在知道,連接生產環境的Kafka獲取的集群地址是kafka1、kafka2、kafka3,所以在開發機中我們同樣需要修改hosts配置,映射實際的公網IP,不然無法識別。
以上就是詳細講解Docker-Compose部署Kafka KRaft集群環境的詳細內容,更多關于Docker Compose部署Kafka KRaft的資料請關注其它相關文章!