波多野结衣 蜜桃视频,国产在线精品露脸ponn,a v麻豆成人,AV在线免费小电影

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

本文介紹了將Kafka Streams與Serde結合使用,這些Serde依賴于標頭中的架構引用的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

我正在嘗試使用Kafka Streams對CDC數據執行KTable-KTable外鍵聯接。我將讀取的數據是Avro格式的,但是它被序列化的方式與其他行業序列化程序/反序列化程序(例如。合流架構注冊表),因為架構標識符存儲在標頭中。

當我設置KTables的Serdes時,我的Kafka Streams應用程序最初運行,但最終失敗,因為它在內部調用了帶有byte[] serialize(String topic, T data);的序列化程序方法,而不是帶有標頭的方法(即。byte[] serialize(String topic, Headers headers, T data)在包裝序列化程序ValueAndTimestampSerializer中。我正在使用的Serdes無法處理此問題并引發異常。

第一個問題是,有沒有人知道如何懇求Kafka Streams在內部調用帶有正確方法簽名的方法?

我正在探索解決這個問題的方法,包括編寫新的Serde,用消息本身中的模式標識符重新序列化。這可能涉及將數據重新復制到新主題或使用攔截器。

但是,我知道ValueTransformer可以訪問ProcessorContext中的標頭,我想知道是否有更快的方法使用transformValues()。其想法是首先將該值作為byte[]讀取,然后將該值反序列化為轉換器中的Avro類(請參見下面的示例)。但是,當我這樣做時,我會得到一個例外。

StreamsBuilder builder = new StreamsBuilder();
 final KTable<Long, MySpecificClass> myTable = builder.table(
      "my-topic",
       Consumed.with(Serdes.Long(), Serdes.ByteArray())
    )
    .transformValues(MyDeserializerTransformer::new);

 ...

 KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);

 joinResultTable.toStream()...
public class MyDeserializerTransformer implements
    ValueTransformerWithKey<Long, byte[], MySpecificClass> {
  MyAvroDeserializer deserializer;
  ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    deserializer = new MyAvroDeserializer();
    this.context = context;
  }

  @Override
  public MySpecificClass transform(Long key, byte[] value) {
    return deserializer.deserialize(context.topic(), context.headers(), value);
  }

  @Override
  public void close() {

  }
}

當我運行它時,我收到一個ClassCastException。我如何解決此問題或找到解決方法?我需要使用輔助狀態存儲嗎?

"class": "org.apache.kafka.streams.errors.StreamsException",
    "msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
    "stack": [
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
      "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
      "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
 "class": "java.lang.ClassCastException",
      "msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
      "stack": [
        "org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
        "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
        "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",

推薦答案

我能夠通過以下方法解決此問題:首先將輸入主題作為KStream讀取,然后將其轉換為具有不同Serde的KTable。第二步,狀態存儲似乎遇到了未調用帶有標頭的序列化程序/反序列化程序方法簽名的問題。

這篇關于將Kafka Streams與Serde結合使用,這些Serde依賴于標頭中的架構引用的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,

分享到:
標簽:Kafka Serde STREAMS 依賴于 引用 架構 標頭中
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定