本文介紹了將Kafka Streams與Serde結合使用,這些Serde依賴于標頭中的架構引用的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我正在嘗試使用Kafka Streams對CDC數據執行KTable-KTable外鍵聯接。我將讀取的數據是Avro格式的,但是它被序列化的方式與其他行業序列化程序/反序列化程序(例如。合流架構注冊表),因為架構標識符存儲在標頭中。
當我設置KTables的Serdes時,我的Kafka Streams應用程序最初運行,但最終失敗,因為它在內部調用了帶有byte[] serialize(String topic, T data);
的序列化程序方法,而不是帶有標頭的方法(即。byte[] serialize(String topic, Headers headers, T data)
在包裝序列化程序ValueAndTimestampSerializer中。我正在使用的Serdes無法處理此問題并引發異常。
第一個問題是,有沒有人知道如何懇求Kafka Streams在內部調用帶有正確方法簽名的方法?
我正在探索解決這個問題的方法,包括編寫新的Serde,用消息本身中的模式標識符重新序列化。這可能涉及將數據重新復制到新主題或使用攔截器。
但是,我知道ValueTransformer
可以訪問ProcessorContext
中的標頭,我想知道是否有更快的方法使用transformValues()
。其想法是首先將該值作為byte[]
讀取,然后將該值反序列化為轉換器中的Avro類(請參見下面的示例)。但是,當我這樣做時,我會得到一個例外。
StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, MySpecificClass> myTable = builder.table(
"my-topic",
Consumed.with(Serdes.Long(), Serdes.ByteArray())
)
.transformValues(MyDeserializerTransformer::new);
...
KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);
joinResultTable.toStream()...
public class MyDeserializerTransformer implements
ValueTransformerWithKey<Long, byte[], MySpecificClass> {
MyAvroDeserializer deserializer;
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
deserializer = new MyAvroDeserializer();
this.context = context;
}
@Override
public MySpecificClass transform(Long key, byte[] value) {
return deserializer.deserialize(context.topic(), context.headers(), value);
}
@Override
public void close() {
}
}
當我運行它時,我收到一個ClassCastException。我如何解決此問題或找到解決方法?我需要使用輔助狀態存儲嗎?
"class": "org.apache.kafka.streams.errors.StreamsException",
"msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
"stack": [
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
"class": "java.lang.ClassCastException",
"msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
"stack": [
"org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
推薦答案
我能夠通過以下方法解決此問題:首先將輸入主題作為KStream讀取,然后將其轉換為具有不同Serde的KTable。第二步,狀態存儲似乎遇到了未調用帶有標頭的序列化程序/反序列化程序方法簽名的問題。
這篇關于將Kafka Streams與Serde結合使用,這些Serde依賴于標頭中的架構引用的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,