本文介紹了我們有沒(méi)有辦法暫停卡夫卡流一段時(shí)間,然后再恢復(fù)?的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
我們有一個(gè)要求,我們使用Kafka Streams讀取Kafka主題,然后通過(guò)一個(gè)會(huì)話池通過(guò)網(wǎng)絡(luò)發(fā)送數(shù)據(jù)。然而,有時(shí)網(wǎng)絡(luò)調(diào)用有點(diǎn)慢,我們需要頻繁地暫停流,以確保我們沒(méi)有使網(wǎng)絡(luò)超載。目前,我們將數(shù)據(jù)捕獲到流中,并將其加載到Executor服務(wù),然后通過(guò)會(huì)話池通過(guò)網(wǎng)絡(luò)發(fā)送。
如果Executor服務(wù)中的數(shù)據(jù)太高,我們需要暫停流一段時(shí)間,然后在Executor服務(wù)上的積壓清理完畢后恢復(fù)它。為了實(shí)現(xiàn)此暫停機(jī)制,我們當(dāng)前正在關(guān)閉流,并在清除積壓后重新啟動(dòng)。
有什么方法可以暫停Kafka流嗎?
推薦答案
如果我理解正確的話,您沒(méi)有什么特別需要做的。你說(shuō)的是”背壓”,而Kafka Streams可以開(kāi)箱即用。
可以做的是將該數(shù)據(jù)放入某個(gè)最大大小的隊(duì)列中,并使用該隊(duì)列加載Executor服務(wù)。當(dāng)隊(duì)列達(dá)到某個(gè)閾值時(shí),有兩種方法:
如果您將數(shù)據(jù)放入隊(duì)列的調(diào)用在沒(méi)有超時(shí)的情況下被阻塞,則無(wú)需再做任何操作。只要等系統(tǒng)恢復(fù)在線,你的電話
返回,處理將繼續(xù)。
如果將數(shù)據(jù)放入隊(duì)列的調(diào)用因超時(shí)而阻塞,只需執(zhí)行查找以檢查隊(duì)列的大小。重復(fù)此操作,直到系統(tǒng)重新聯(lián)機(jī),您的呼叫成功。
唯一的警告是,只要您的Streams應(yīng)用程序阻止,內(nèi)部使用的Kafka消費(fèi)者客戶(hù)端就不會(huì)向Kafka發(fā)送任何心跳信號(hào),并且可能會(huì)超時(shí)。因此,您需要將超時(shí)配置參數(shù)設(shè)置為高于外部系統(tǒng)的預(yù)期最長(zhǎng)停機(jī)時(shí)間。
另一種方法是使用Kafka-Streams中提供的處理器API,但這通常不是推薦的模式。
如果有幫助,請(qǐng)讓我知道!!
這篇關(guān)于我們有沒(méi)有辦法暫停卡夫卡流一段時(shí)間,然后再恢復(fù)?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,