本文介紹了如何更改記錄的時間戳?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在使用FluentD(v.12最后一個穩定版本)向Kafka發送消息。但FluentD使用的是舊的KafkaProducer,因此記錄時間戳始終設置為-1。
因此,我必須使用WallclockTimestampExtractor將記錄的時間戳設置為消息到達Kafka時的時間點。
是否有特定于Kafka Streams的解決方案?
我真正感興趣的時間戳是由fluentd在消息中發送的:
";timestamp";:";1507885936";,&Quot;主機:&Quot;V.X.Y.Z.
以卡夫卡表示的記錄:
偏移量=0,時間戳=-1,鍵=空,值={";timestamp";:";1507885936";,;主機;:V.X.Y.Z.&Quot;}
我希望有這樣一張卡夫卡唱片:
OFFSET=0,TIMESTAMP=1507885936,KEY=NULL,VALUE={";timestamp";:";1507885936";,;HOST&QOT;:&QOT;V.X.Y.Z.&QOT;}
我的解決方法如下所示:
編寫消費者提取時間戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
編寫一個生產者,生成一個時間戳設置為(ProducerRecord(字符串主題,整數分區,長時間戳,K鍵,V值)的新記錄)
我更喜歡KafkaStreams解決方案(如果有)。
推薦答案
您可以編寫非常簡單的Kafka Streams應用程序,如下所示:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
并使用從記錄中提取時間戳并返回時間戳的自定義TimestampExtractor
配置應用程序。
Kafka Streams在將記錄寫回Kafka時將使用返回的時間戳。
注意:如果您有亂序數據–即時間戳沒有嚴格排序–結果也將包含亂序時間戳。Kafka Streams使用返回的時間戳回寫Kafka(即,無論提取程序返回什么,都用作記錄元數據時間戳)。請注意,在寫入時,當前處理的輸入記錄中的時間戳用于所有生成的輸出記錄–這適用于版本1.0,但在將來的版本中可能會更改。)。
更新:
一般來說,您可以通過處理器API修改時間戳。調用context.forward()
可以通過To.all().withTimestamp(...)
將輸出記錄時間戳設置為forward()
的參數。
這篇關于如何更改記錄的時間戳?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,