1.第一個kafka程序
1.1.創建我們的主題
kafka-topics.bat --zookeeper localhost:2181/kafka --create --topic hello-kafka --replication-factor 1 --partitions 4
(主題不創建,可能會造成程序報錯,也可在程序中配置如:
spring.kafka.listener.missing-topics-fatal=false
)
1.2.生產者發送消息
引入jar:
<dependency>
<groupId>org.Apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
生產者代碼示例:
1.2.1.必選屬性
創建生產者對象時有三個屬性必須指定。
bootstrap.servers
該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查詢其他 broker 的信息。
不過最少提供 2 個 broker 的信息(用逗號分隔,比如: 127.0.0.1:9092,192.168.0.13:9092),一旦其中一個宕機,生產者仍能連接到集群上。
key.serializer
生產者接口允許使用參數化類型,可以把 JAVA 對象作為鍵和值傳 broker,但是 broker 希望收到的消息的鍵和值都是字節數組,所以,必須提供將對象序列化成字節數組的序列化器。key.serializer 必須設置為實現
org.apache.kafka.common.serialization.Serializer 的接口類,Kafka 的客戶端默認提供了ByteArraySerializer,IntegerSerializer, StringSerializer,也可以實現自定義的序列化器。
value.serializer
同 key.serializer。
1.3.消費者接受消息
代碼示例:
1.3.1.必選參數
bootstrap.servers、key.serializer、value.serializer 含義同生產者
group.id
并非完全必需,它指定了消費者屬于哪一個群組,但是創建不屬于任何一個群組的消費者并沒有問題。
新版本特點:poll(Duration)這個版本修改了這樣的設計,會把元數據獲取也計入整個超時時間(更加的合理)
1.4.演示示例
1.默認創建主題,只有一個分區時,演示生產者和消費者情況。
2.修改主題分區為 2(使用管理命令),再重新演示生產者和消費者情況。
2.Kafka 的生產者
2.1.生產者發送消息的基本流程
從創建一個 ProducerRecord 對象開始, Producer Record 對象需要包含目標主題和要發送的內容。我們還可以指定鍵或分區。在發送 ProducerRecord對象時,生產者要先把鍵和值對象序列化成字節數組,這樣它們才能夠在網絡上傳輸。
接下來,數據被傳給分區器。如果之前在 Producer Record 對象里指定了分區,那么分區器就不會再做任何事情,直接把指定的分區返回。如果沒有指定分區,那么分區器會根據 Producer Record 對象的鍵來選擇一個分區。選好分區以后,生產者就知道該往哪個主題和分區發送這條記錄了。緊接著,這條記錄被添加到一個記錄批次里(雙端隊列,尾部寫入),這個批次里的所有消息會被發送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。
服務器在收到這些消息時會返回一個響應。如果消息成功寫入 Kafka ,就返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量。如果寫入失敗, 則會返回一個錯誤。生產者在收到錯誤之后會嘗試重新發送消息,幾次之后如果還是失敗,就返回錯誤信息。
生產者發送消息一般會發生兩類錯誤:
一類是可重試錯誤,比如連接錯誤(可通過再次建立連接解決)、無主 no leader(可通過分區重新選舉首領解決)。
另一類是無法通過重試解決,比如“消息太大”異常,具體見 message.max.bytes,這類消息不會進行任何重試,直接拋出異常
2.2.Kafka 三種發送方式
我們通過生成者的 send 方法進行發送。send 方法會返回一個包含 RecordMetadata 的 Future 對象。RecordMetadata 里包含了目標主題,分區信息和消息的偏移量。
2.2.1.發送并忘記
忽略 send 方法的返回值,不做任何處理。大多數情況下,消息會正常到達,而且生產者會自動重試,但有時會丟失消息。
2.2.2.同步發送
獲得 send 方法返回的 Future 對象,在合適的時候調用 Future 的 get 方法。
2.2.3.異步發送
實現接口
org.apache.kafka.clients.producer.Callback,然后將實現類的實例作為參數傳遞給 send 方法。
2.3.更多發送配置
生產者有很多屬性可以設置,大部分都有合理的默認值,無需調整。有些參數可能對內存使用,性能和可靠性方面有較大影響。可以參考
org.apache.kafka.clients.producer 包下的 ProducerConfig 類。
acks:
Kafk 內部的復制機制是比較復雜的,這里不談論內部機制(后續章節進行細講),我們只討論生產者發送消息時與副本的關系。
指定了必須要有多少個分區副本收到消息,生產者才會認為寫入消息是成功的,這個參數對消息丟失的可能性有重大影響。
acks=0:生產者在寫入消息之前不會等待任 何來自服務器的響應,容易丟消息,但是吞吐量高。
acks=1:只要集群的首領節點收到消息,生產者會收到來自服務器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新首領沒有選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。默認使用這個配置。
acks=all:只有當所有參與復制的節點都收到消息,生產者才會收到一個來自服務器的成功響應。延遲高。
金融業務,主備外加異地災備。所以很多高可用場景一般不是設置 2 個副本,有可能達到 5 個副本,不同機架上部署不同的副本,異地上也部署一套副本。
buffer.memory
設置生產者內存緩沖區的大小(結合生產者發送消息的基本流程),生產者用它緩沖要發送到服務器的消息。如果數據產生速度大于向 broker 發送的速度,導致生產者空間不足,producer 會阻塞或者拋出異常。缺省 33554432 (32M)
max.block.ms
指定了在調用 send()方法或者使用 partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩沖區已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。缺省 60000ms
retries
發送失敗時,指定生產者可以重發消息的次數(缺省 Integer.MAX_VALUE)。默認情況下,生產者在每次重試之間等待 100ms,可以通過參數retry.backoff.ms 參數來改變這個時間間隔。
receive.buffer.bytes 和 send.buffer.bytes
指定 TCP socket 接受和發送數據包的緩存區大小。如果它們被設置為-1,則使用操作系統的默認值。如果生產者或消費者處在不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。缺省 102400
batch.size
當多個消息被發送同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次內存被填滿后,批次里的所有消息會被發送出去。但是生產者不一定都會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也有可能被發送。缺省16384(16k) ,如果一條消息超過了批次的大小,會寫不進去。
linger.ms
指定了生產者在發送批次前等待更多消息加入批次的時間。它和 batch.size 以先到者為先。也就是說,一旦我們獲得消息的數量夠 batch.size 的數量了,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比 batch.size 設置要小的多,我們需要“linger”特定的時間以獲取更多的消息。這個設置默認為 0,即沒有延遲。設定 linger.ms=5,例如,將會減少請求數目,但是同時會增加 5ms 的延遲,但也會提升消息的吞吐量。
compression.type
producer 用于壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是 none、gzip、snAppy。壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好。snappy 占用 cpu 少,提供較好的性能和可觀的壓縮比,如果比較關注性能和網絡帶寬,用這個。如果帶寬緊張,用 gzip,會占用較多的 cpu,但提供更高的壓縮比。
client.id
當向 server 發出請求時,這個字符串會發送給 server。目的是能夠追蹤請求源頭,以此來允許 ip/port 許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤。
max.in.flight.requests.per.connection
指定了生產者在接收到服務器響應之前可以發送多個消息,值越高,占用的內存越大,當然也可以提升吞吐量。發生錯誤時,可能會造成數據的發送順序改變,默認是 5 (修改)。
如果需要保證消息在一個分區上的嚴格順序,這個值應該設為 1。不過這樣會嚴重影響生產者的吞吐量。
request.timeout.ms
客戶端將等待請求的響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求;超過重試次數將拋異常,默認 30 秒。
metadata.fetch.timeout.ms
是指我們所獲取的一些元數據的第一個時間數據。元數據包含:topic,host,partitions。此項配置是指當等待元數據 fetch 成功完成所需要的時間,否則會跑出異常給客戶端
max.request.size
控制生產者發送請求最大大小。默認這個值為 1M,如果一個請求里只有一個消息,那這個消息不能大于 1M,如果一次請求是一個批次,該批次包含了 1000 條消息,那么每個消息不能大于 1KB。注意:broker 具有自己對消息記錄尺寸的覆蓋,如果這個尺寸小于生產者的這個設置,會導致消息被拒絕。這個參數和 Kafka 主機的 message.max.bytes 參數有關系。如果生產者發送的消息超過 message.max.bytes 設置的大小,就會被 Kafka 服務器拒絕。
以上參數不用全記住,一般來說,就記住 acks、batch.size、linger.ms、max.request.size 就行了,因為這 4 個參數重要些,其他參數一般沒有太大必要調整
2.4.順序保證
Kafka 可以保證同一個分區里的消息是有序的。也就是說,發送消息時,主題只有且只有一個分區,同時生產者按照一定的順序發送消息, broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。在某些情況下, 順序是非常重要的。例如,往一個賬戶存入 100 元再取出來,這個與先取錢再存錢是截然不同的!不過,有些場景對順序不是很敏感。
如果把 retires 設為非零整數,同時把
max.in.flight.requests.per.connection 設為比 1 大的數,那么,如果第一個批次消息寫入失敗,而第二個批次寫入成功, broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那么兩個批次的順序就反過來了。
一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功也是很關鍵的,所以不建議把 retires 設為 0(不重試的話消息可能會因為連接關閉等原因會丟) 。所以還是需要重試,同時把
max.in.flight.request.per.connection 設為 1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發
送給 broker 。不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這么做。
2.5.序列化
創建生產者對象必須指定序列化器,默認的序列化器并不能滿足我們所有的場景。我們完全可以自定義序列化器。只要實現
org.apache.kafka.common.serialization.Serializer 接口即可。
2.5.1.自定義序列化需要考慮的問題
自定義序列化容易導致程序的脆弱性。舉例,在我們上面的實現里,我們有多種類型的消費者,每個消費者對實體字段都有各自的需求,比如,有的將字段變更為 long 型,有的會增加字段,這樣會出現新舊消息的兼容性問題。特別是在系統升級的時候,經常會出現一部分系統升級,其余系統被迫跟著升級的情況。
解決這個問題,可以考慮使用自帶格式描述以及語言無關的序列化框架。比如 Protobuf,或者 Kafka 官方推薦的 Apache Avro。
Avro 會使用一個 JSON 文件作為 schema 來描述數據,Avro 在讀寫時會用到這個 schema,可以把這個 schema 內嵌在數據文件中。這樣,不管數據格式如何變動,消費者都知道如何處理數據。
但是內嵌的消息,自帶格式,會導致消息的大小不必要的增大,消耗了資源。我們可以使用 schema 注冊表機制,將所有寫入的數據用到的 schema保存在注冊表中,然后在消息中引用 schema 的標識符,而讀取的數據的消費者程序使用這個標識符從注冊表中拉取 schema 來反序列化記錄。
注意:Kafka 本身并不提供 schema 注冊表,需要借助第三方,現在已經有很多的開源實現,比如 Confluent Schema Registry,可以從 GitHub 上獲取。
如何使用參考如下網址:
https://cloud.tencent.com/developer/article/1336568
2.6.分區
我們在新增 ProducerRecord 對象中可以看到,ProducerRecord 包含了目標主題,鍵和值,Kafka 的消息都是一個個的鍵值對。鍵可以設置為默認的 null。
鍵的主要用途有兩個:一,用來決定消息被寫往主題的哪個分區,擁有相同鍵的消息將被寫往同一個分區,二,還可以作為消息的附加消息。
如果鍵值為 null,并且使用默認的分區器,分區器使用輪詢算法將消息均衡地分布到各個分區上。
如果鍵不為空,并且使用默認的分區器,Kafka 對鍵進行散列(Kafka 自定義的散列算法,具體算法原理不知),然后根據散列值把消息映射到特定的分區上。很明顯,同一個鍵總是被映射到同一個分區。但是只有不改變主題分區數量的情況下,鍵和分區之間的映射才能保持不變,一旦增加了新的分區,就無法保證了,所以如果要使用鍵來映射分區,那就要在創建主題的時候把分區規劃好,而且永遠不要增加新分區。
2.6.1.自定義分區器
某些情況下,數據特性決定了需要進行特殊分區,比如電商業務,北京的業務量明顯比較大,占據了總業務量的 20%,我們需要對北京的訂單進行單獨分區處理,默認的散列分區算法不合適了, 我們就可以自定義分區算法,對北京的訂單單獨處理,其他地區沿用散列分區算法。或者某些情況下,我們用 value 來進行分區。
3.Kafka 的消費者
3.1.消費者的入門
消費者的含義,同一般消息中間件中消費者的概念。在高并發的情況下,生產者產生消息的速度是遠大于消費者消費的速度,單個消費者很可能會負擔不起,此時有必要對消費者進行橫向伸縮,于是我們可以使用多個消費者從同一個主題讀取消息,對消息進行分流。(買單的故事,群組,消費者的一群人, 消費者:買單的,分區:一筆單,一筆單能被買單一次,當然一個消費者可以買多個單,如果有一個消費者掛掉了<跑單了>,另外的消費者接上)
3.2.消費者群組
Kafka 里消費者從屬于消費者群組,一個群組里的消費者訂閱的都是同一個主題,每個消費者接收主題一部分分區的消息。
上圖,主題 T 有 4 個分區,群組中只有一個消費者,則該消費者將收到主題 T1 全部 4 個分區的消息。
如上圖,在群組中增加一個消費者 2,那么每個消費者將分別從兩個分區接收消息,上圖中就表現為消費者 1 接收分區 1 和分區 3 的消息,消費者 2接收分區 2 和分區 4 的消息。
如上圖,在群組中有 4 個消費者,那么每個消費者將分別從 1 個分區接收消息。
但是,當我們增加更多的消費者,超過了主題的分區數量,就會有一部分的消費者被閑置,不會接收到任何消息。
往消費者群組里增加消費者是進行橫向伸縮能力的主要方式。所以我們有必要為主題設定合適規模的分區,在負載均衡的時候可以加入更多的消費者。但是要記住,一個群組里消費者數量超過了主題的分區數量,多出來的消費者是沒有用處的。
如果是多個應用程序,需要從同一個主題中讀取數據,只要保證每個應用程序有自己的消費者群組就行了。
3.3.消費者配置
消費者有很多屬性可以設置,大部分都有合理的默認值,無需調整。有些參數可能對內存使用,性能和可靠性方面有較大影響。可以參考
org.apache.kafka.clients.consumer 包下 ConsumerConfig 類。
auto.offset.reset
消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下,如何處理。默認值是 latest,從最新的記錄開始讀取,另一個值是 earliest,表示消費者從起始位置讀取分區的記錄。
注意:如果是消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況(因消費者長時間失效,包含的偏移量記錄已經過時并被刪除)下,默認值是 latest 的話,消費者將從最新的記錄開始讀取數據( 在消費者啟動之后生成的記錄),可以先啟動生產者,再啟動消費者,觀察到這種情況。觀察代
碼,在模塊 kafka-no-spring 下包 hellokafka 中。
enable .auto.commit
默認值 true,表明消費者是否自動提交偏移。為了盡量避免重復數據和數據丟失,可以改為 false,自行控制何時提交。
partition.assignment.strategy
分區分配給消費者的策略。系統提供兩種策略。默認為 Range。允許自定義策略。
Range
把主題的連續分區分配給消費者。(如果分區數量無法被消費者整除、第一個消費者會分到更多分區)
RoundRobin
把主題的分區循環分配給消費者。
自定義策略
extends 類 AbstractPartitionAssignor,然后在消費者端增加參數:
properties.put(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 類.class.getName());即可。
max.poll.records
控制每次 poll 方法返回的的記錄數量。
fetch.min.bytes
每次 fetch 請求時,server 應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。缺省為 1 個字節。多消費者下,可以設大這個值,以降低 broker 的工作負載
fetch.wait.max.ms
如果沒有足夠的數據能夠滿足 fetch.min.bytes,則此項配置是指在應答 fetch 請求之前,server 會阻塞的最大時間。缺省為 500 個毫秒。和上面的fetch.min.bytes 結合起來,要么滿足數據的大小,要么滿足時間,就看哪個條件先滿足。
max.partition.fetch.bytes
指定了服務器從每個分區里返回給消費者的最大字節數,默認 1MB。假設一個主題有 20 個分區和 5 個消費者,那么每個消費者至少要有 4MB 的可用內存來接收記錄,而且一旦有消費者崩潰,這個內存還需更大。注意,這個參數要比服務器的 message.max.bytes 更大,否則消費者可能無法讀取消息。
session.timeout.ms
如果 consumer 在這段時間內沒有發送心跳信息,則它會被認為掛掉了。默認 3 秒。
client.id
當向 server 發出請求時,這個字符串會發送給 server。目的是能夠追蹤請求源頭,以此來允許 ip/port 許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤。
receive.buffer.bytes 和 send.buffer.bytes
指定 TCP socket 接受和發送數據包的緩存區大小。如果它們被設置為-1,則使用操作系統的默認值。如果生產者或消費者處在不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
3.4.消費者中的基礎概念
消費者的含義,同一般消息中間件中消費者的概念。在高并發的情況下,生產者產生消息的速度是遠大于消費者消費的速度,單個消費者很可能會負擔不起,此時有必要對消費者進行橫向伸縮,于是我們可以使用多個消費者從同一個主題讀取消息,對消息進行分流。
(買單的故事,群組,消費者的一群人, 消費者:買單的,分區:一筆單,一筆單能被買單一次,當然一個消費者可以買多個單,如果有一個消費者掛掉了<跑單了>,另外的消費者接上)
3.4.1.訂閱
創建消費者后,使用 subscribe()方法訂閱主題,這個方法接受一個主題列表為參數,也可以接受一個正則表達式為參數;正則表達式同樣也匹配多個主題。如果新創建了新主題,并且主題名字和正則表達式匹配,那么會立即觸發一次再均衡,消費者就可以讀取新添加的主題。比如,要訂閱所有和 test相關的主題,可以 subscribe(“tets.*”)
3.4.2.輪詢
為了不斷的獲取消息,我們要在循環中不斷的進行輪詢,也就是不停調用 poll 方法。
poll 方法的參數為超時時間,控制 poll 方法的阻塞時間,它會讓消費者在指定的毫秒數內一直等待 broker 返回數據。poll 方法將會返回一個記錄(消息)列表,每一條記錄都包含了記錄所屬的主題信息,記錄所在分區信息,記錄在分區里的偏移量,以及記錄的鍵值對。
poll 方法不僅僅只是獲取數據,在新消費者第一次調用時,它會負責查找群組,加入群組,接受分配的分區。如果發生了再均衡,整個過程也是在輪詢期間進行的。
3.4.3.提交和偏移量
當我們調用 poll 方法的時候,broker 返回的是生產者寫入 Kafka 但是還沒有被消費者讀取過的記錄,消費者可以使用 Kafka 來追蹤消息在分區里的位置,我們稱之為 偏移量。消費者更新自己讀取到哪個消息的操作,我們稱之為 提交。
消費者是如何提交偏移量的呢?消費者會往一個叫做_consumer_offset 的特殊主題發送一個消息,里面會包括每個分區的偏移量。
3.5.消費者中的核心概念
3.5.1.多線程安全問題
KafkaConsumer 的實現不是線程安全的,所以我們在多線程的環境下,使用 KafkaConsumer 的實例要小心,應該每個消費數據的線程擁有自己的
3.5.2.群組協調
消費者要加入群組時,會向群組協調器發送一個 JoinGroup 請求,第一個加入群主的消費者成為群主,群主會獲得群組的成員列表,并負責給每一個消費者分配分區。分配完畢后,群主把分配情況發送給群組協調器,協調器再把這些信息發送給所有的消費者,每個消費者只能看到自己的分配信息,只有群主知道群組里所有消費者的分配信息。群組協調的工作會在消費者發生變化(新加入或者掉線),主題中分區發生了變化(增加)時發生。
3.5.3.分區再均衡
當消費者群組里的消費者發生變化,或者主題里的分區發生了變化,都會導致再均衡現象的發生。從前面的知識中,我們知道,Kafka 中,存在著消費者對分區所有權的關系,這樣無論是消費者變化,比如增加了消費者,新消費者會讀取原本由其他消費者讀取的分區,消費者減少,原本由它負責的分區要由其他消費者來讀取,增加了分區,哪個消費者來讀取這個新增的分區,這些行為,都會導致分區所有權的變化,這種變化就被稱為 再均衡。
再均衡對 Kafka 很重要,這是消費者群組帶來高可用性和伸縮性的關鍵所在。不過一般情況下,盡量減少再均衡,因為再均衡期間,消費者是無法讀取消息的,會造成整個群組一小段時間的不可用。
消費者通過向稱為群組協調器的 broker(不同的群組有不同的協調器)發送心跳來維持它和群組的從屬關系以及對分區的所有權關系。如果消費者長時間不發送心跳,群組協調器認為它已經死亡,就會觸發一次再均衡。
在 0.10.1 及以后的版本中,心跳由單獨的線程負責,相關的控制參數為 max.poll.interval.ms。
3.6.Kafka 中的消費安全
一般情況下,我們調用 poll 方法的時候,broker 返回的是生產者寫入 Kafka 同時 kafka 的消費者提交偏移量,這樣可以確保消費者消息消費不丟失也不重復,所以一般情況下 Kafka 提供的原生的消費者是安全的,但是事情會這么完美嗎?
3.7.消費者提交偏移量導致的問題
當我們調用 poll 方法的時候,broker 返回的是生產者寫入 Kafka 但是還沒有被消費者讀取過的記錄,消費者可以使用 Kafka 來追蹤消息在分區里的位置,我們稱之為 偏移量。消費者更新自己讀取到哪個消息的操作,我們稱之為 提交。
消費者是如何提交偏移量的呢?消費者會往一個叫做_consumer_offset 的特殊主題發送一個消息,里面會包括每個分區的偏移量。發生了再均衡之后,消費者可能會被分配新的分區,為了能夠繼續工作,消費者者需要讀取每個分區最后一次提交的偏移量,然后從指定的地方,繼續做處理。
分區再均衡的例子:某軟件公司,有一個項目,有兩塊的工作,有兩個碼農,一個負責一塊,干得好好的。突然一天,小王桌子一拍不干了,老子中了 5 百萬了,不跟你們玩了,立馬收拾完電腦就走了。然后你今天剛好入職,一個蘿卜一個坑,你就入坑了。這個過程我們就好比我們的分區再均衡,分區就是一個項目中的不同塊的工作,消費者就是碼農,一個碼農不玩了,另一個碼農立馬頂上,這個過程就發生了分區再均衡
1)如果提交的偏移量小于消費者實際處理的最后一個消息的偏移量,處于兩個偏移量之間的消息會被重復處理,
2)如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失
所以, 處理偏移量的方式對客戶端會有很大的影響 。KafkaConsumer API 提供了很多種方式來提交偏移量 。
3.7.1.自動提交(重復消費不可避免)
最簡單的提交方式是讓消費者自動提交偏移量。如果 enable.auto.comnit 被設為 true,消費者會自動把從 poll()方法接收到的最大偏移量提交上去。
提交時間間隔由 auto.commit.interval.ms 控制,默認值是 5s。自動提交是在輪詢里進行的,消費者每次在進行輪詢時會檢査是否該提交偏移量了,如果是,那么就會提交從上一次輪詢返回的偏移量。
不過,在使用這種簡便的方式之前,需要知道它將會帶來怎樣的結果。
假設我們仍然使用默認的5s提交時間間隔, 在最近一次提交之后的3s發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。
這個時候偏移量已經落后了 3s,所以在這 3s 內到達的消息會被重復處理。可以通過修改提交時間間隔來更頻繁地提交偏移量, 減小可能出現重復消息的時間窗, 不過這種情況是無法完全避免的 。
在使用自動提交時,每次調用輪詢方法都會把上一次調用返回的最大偏移量提交上去,它并不知道具體哪些消息已經被處理了,所以在再次調用之前最好確保所有當前調用返回的消息都已經處理完畢(enable.auto.comnit 被設為 true 時,在調用 close()方法之前也會進行自動提交)。一般情況下不會有什么
問題,不過在處理異常或提前退出輪詢時要格外小心。
自動提交雖然方便,但是很明顯是一種基于時間提交的方式,不過并沒有為我們留有余地來避免重復處理消息。
3.7.2.手動提交(同步)
我們通過控制偏移量提交時間來消除丟失消息的可能性,并在發生再均衡時減少重復消息的數量。消費者 API 提供了另一種提交偏移量的方式,開發者可以在必要的時候提交當前偏移量,而不是基于時間間隔。
把 auto.commit. offset 設為 false,自行決定何時提交偏移量。使用 commitsync()提交偏移量最簡單也最可靠。這個方法會提交由 poll()方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
注意:commitsync()將會提交由 poll()返回的最新偏移量,所以在處理完所有記錄后要確保調用了 commitsync(),否則還是會有丟失消息的風險。如果發生了再均衡,從最近批消息到發生再均衡之間的所有消息都將被重復處理。
只要沒有發生不可恢復的錯誤,commitSync()方法會阻塞,會一直嘗試直至提交成功,如果失敗,也只能記錄異常日志。
3.7.3.異步提交
手動提交時,在 broker 對提交請求作出回應之前,應用程序會一直阻塞。這時我們可以使用異步提交 API,我們只管發送提交請求,無需等待 broker的響應。
在成功提交或碰到無法恢復的錯誤之前, commitsync()會一直重試,但是 commitAsync 不會。它之所以不進行重試,是因為在它收到服務器響應的時候,可能有一個更大的偏移量已經提交成功。
假設我們發出一個請求用于提交偏移量 2000,,這個時候發生了短暫的通信問題,服務器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync()重新嘗試提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。這個時候如果發生再均衡,
就會出現重復消息。
commitAsync()也支持回調,在 broker 作出響應時會執行回調。回調經常被用于記錄提交錯誤或生成度量指標。
3.7.4.同步和異步組合
因為同步提交一定會成功、異步可能會失敗,所以一般的場景是同步和異步一起來做。
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。但如果這是發生在關閉消費者或 再均衡前的最后一次提交,就要確保能夠提交成功。
因此,在消費者關閉前一般會組合使用 commitAsync()和 commitsync()。具體使用,參見模塊 kafka-no-spring 下包 commit 包中代碼 SyncAndAsync。
3.7.5.特定提交
在我們前面的提交中,提交偏移量的頻率與處理消息批次的頻率是一樣的。但如果想要更頻繁地提交該怎么辦?
如果 poll()方法返回一大批數據,為了避免因再均衡引起的重復處理整批消息,想要在批次中間提交偏移量該怎么辦?這種情況無法通過調用commitSync()或 commitAsync()來實現,因為它們只會提交最后一個偏移量,而此時該批次里的消息還沒有處理完。
消費者 API 允許在調用 commitsync()和 commitAsync()方法時傳進去希望提交的分區和偏移量的 map。假設我們處理了半個批次的消息,最后一個來自主題“customers”,分區 3 的消息的偏移量是 5000,你可以調用 commitsync()方法來提交它。不過,因為消費者可能不只讀取一個分區,因為我們需要跟蹤所有分區的偏移量,所以在這個層面上控制偏移量的提交會讓代碼變復雜。
3.8.分區再均衡
3.8.1.再均衡監聽器
在提交偏移量一節中提到過,消費者在退出和進行分區再均衡之前,會做一些清理工作比如,提交偏移量、關閉文件句柄、數據庫連接等。
在為消費者分配新分區或移除舊分區時,可以通過消費者 API 執行一些應用程序代碼,在調用 subscribe()方法時傳進去一個 ConsumerRebalancelistener實例就可以了。
ConsumerRebalancelistener 有兩個需要實現的方法。
1) public void onPartitionsRevoked( Collection< TopicPartition> partitions)方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取了
2) public void onPartitionsAssigned( Collection< TopicPartition> partitions)方法會在重新分配分區之后和消費者開始讀取消息之前被調用。
3.8.2.從特定偏移量處開始記錄
到目前為止,我們知道了如何使用 poll()方法從各個分區的最新偏移量處開始處理消息。不過,有時候我們也需要從特定的偏移量處開始讀取消息。
如果想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息,可以使 seekToBeginning(Collectiontp)和seekToEnd( Collectiontp)這兩個方法。
不過,Kaka 也為我們提供了用于查找特定偏移量的 API。它有很多用途,比如向后回退幾個消息或者向前跳過幾個消息(對時間比較敏感的應用程序在處理滯后的情況下希望能夠向前跳過若干個消息)。在使用 Kafka 以外的系統來存儲偏移量時,它將給我們帶來更大的驚喜--讓消息的業務處理和偏移量的提
交變得一致。試想一下這樣的場景:應用程序從 Kaka 讀取事件(可能是網站的用戶點擊事件流),對它們進行處理(可能是使用自動程序清理點擊操作并添加會話信息),然后把結果保存到數據庫。假設我們真的不想丟失任何數據,也不想在數據庫里多次保存相同的結果。
我們可能會,毎處理一條記錄就提交一次偏移量。盡管如此,在記錄被保存到數據庫之后以及偏移量被提交之前,應用程序仍然有可能發生崩潰,導致重復處理數據,數據庫里就會出現重復記錄。
如果保存記錄和偏移量可以在一個原子操作里完成,就可以避免出現上述情況。記錄和偏移量要么都被成功提交,要么都不提交。如果記錄是保存在數據庫里而偏移量是提交到Kafka上,那么就無法實現原子操作不過,如果在同一個事務里把記錄和偏移量都寫到數據庫里會怎樣呢?那么我們就會知道記錄和偏移量要么都成功提交,要么都沒有,然后重新處理記錄。
現在的問題是:如果偏移量是保存在數據庫里而不是 Kafka 里,那么消費者在得到新分區時怎么知道該從哪里開始讀取?這個時候可以使用 seek()方法。在消費者啟動或分配到新分區時,可以使用 seck()方法查找保存在數據庫里的偏移量。我們可以使用使用 Consumer Rebalancelistener 和 seek()方法確保我們是從數據庫里保存的偏移量所指定的位置開始處理消息的。
3.9.優雅退出
如果確定要退出循環,需要通過另一個線程調用 consumer. wakeup()方法。如果循環運行在主線程里,可以在 ShutdownHook 里調用該方法。要記住,consumer. wakeup()是消費者唯一一個可以從其他線程里安全調用的方法。調用 consumer. wakeup()可以退出 poll(),并拋出 WakeupException 異常。我們不需要處理 Wakeup Exception,因為它只是用于跳出循環的一種方式。不過,在退出線程之前調用 consumer.close()是很有必要的,它會提交任何還沒有提交的東西,并向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡,而不需要等待會話超時。
3.10.反序列化
不過就是序列化過程的一個反向,原理和實現可以參考生產者端的實現,同樣也可以自定義反序列化器。
3.11.獨立消費者
到目前為止,我們討論了消費者群組,分區被自動分配給群組里的消費者,在群組里新增或移除消費者時自動觸發再均衡。不過有時候可能只需要一個消費者從一個主題的所有分區或者某個特定的分區讀取數據。這個時候就不需要消費者群組和再均衡了,只需要把主題或者分區分配給消費者,然后開始讀取消息并提交偏移量。
如果是這樣的話,就不需要訂閱主題,取而代之的是為自己分配分區。一個消費者可以訂閱主題(并加入消費者群組),或者為自己分配分區,但不能同時做這兩件事情。
獨立消費者相當于自己來分配分區,但是這樣做的好處是自己控制,但是就沒有動態的支持了,包括加入消費者(分區再均衡之類的),新增分區,這些都需要代碼中去解決,所以一般情況下不推薦使用。