日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

之前寫過關于 Apache Pulsar 的簡單示例,用來了解如何使用 Pulsar 這個新生代的消息隊列中間件,但是如果想要在項目中使用,還會欠缺很多,最明顯的就是 集成復雜,如果你用過其他消息中間件,比如 Kafka、RabbitMq,只需要簡單的引入 jar,就可以通過注解+配置快速集成到項目中。

開始一個 Pulsar Starter

既然已經了解了 Apache Pulsar,又認識了 spring-boot-starter,今天不妨來看下如何寫一個 pulsar-spring-boot-starter 模塊。

目標

寫一個完整的類似 kafka-spring-boot-starter(springboot 項目已經集成到 spring-boot-starter 中),需要考慮到很多 kafka 的特性, 今天我們主要實現下面幾個模板

  • 在項目中夠通過引入 jar 依賴快速集成
  • 提供統一的配置入口
  • 能夠快速發送消息
  • 能夠基于注解實現消息的消費

定義結構

└── pulsar-starter
    ├── pulsar-spring-boot-starter
    ├── pulsar-spring-boot-autoconfigure
    ├── spring-pulsar
    ├── spring-pulsar-xx
    ├── spring-pulsar-sample
└── README.md

整個模塊的結構如上其中pulsar-starter作為一個根模塊,主要控制子模塊依賴的其他 jar 的版本以及使用到的插件版本。類似于 Spring-Bom,這樣我們在后續升級 時,就可以解決各個第三方 jar 的可能存在版本沖突導致的問題。

  • pulsar-spring-boot-starter

該模塊作為外部項目集成的直接引用 jar,可以認為是 pulsar-spring-boot-starter 組件的入口,里面不需要寫任何代碼,只需要引入需要的依賴(也就是下面的子模塊)即可

  • pulsar-spring-boot-autoconfigure

該模塊主要定義了 spring.factories 以及 AutoConfigure、Properties。也就是自動配置的核心(配置項+Bean 配置)

  • spring-pulsar

該模塊是核心模塊,主要的實現都在這里

  • spring-pulsar-xx

擴展模塊,可以對 spring-pulsar 做更細化的劃分

  • spring-pulsar-sample

starter 的使用示例項目

實現

上面我們說到實現目標,現在看下各個模塊應該包含什么內容,以及怎么實現我們的目標

  • 入口 pulsar-spring-boot-starter

上面說到 starter 主要是引入整個模塊基礎的依賴即可,里面不用寫代碼。

<dependencies>
    <dependency>
        <groupId>com.sucl</groupId>
        <artifactId>spring-pulsar</artifactId>
        <version>${project.version}</version>
    </dependency>

    <dependency>
        <groupId>com.sucl</groupId>
        <artifactId>pulsar-spring-boot-autoconfigure</artifactId>
        <version>${project.version}</version>
    </dependency>
</dependencies>
  • pulsar-spring-boot-autoconfigure
  1. 添加 spring-boot 基礎的配置
<dependencies>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot</artifactId>
     </dependency>

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-logging</artifactId>
     </dependency>

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-configuration-processor</artifactId>
         <optional>true</optional>
     </dependency>
</dependencies>
  1. 定義自動配置類PulsarAutoConfiguration
  2. 引入Properties,基于EnableConfigurationPropertiesspring-boot-configuration-processor解析 Properties 生成對應spring-configuration-metadata.json文件,這樣編寫 Application.yml 配置時就可以自動提示配置項的屬性和值了。
  3. 構建一些必須的 Bean,如 PulsarClient、ConsumerFactory、ConsumerFactory 等
  4. Import 配置 PulsarAnnotationDrivenConfiguration,這個主要是一些額外的配置,用來支持后面的功能

@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
@Import({PulsarAnnotationDrivenConfiguration.class})
public class PulsarAutoConfiguration {

    private final PulsarProperties properties;

    public PulsarAutoConfiguration(PulsarProperties properties) {
        this.properties = properties;
    }

    @Bean(destroyMethod = "close")
    public PulsarClient pulsarClient() {
        ClientBuilder clientBuilder = new ClientBuilderImpl(properties);
        return clientBuilder.build();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory pulsarConsumerFactory() {
        return new DefaultPulsarConsumerFactory(pulsarClient(), properties.getConsumer().buildProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory pulsarProducerFactory() {
        return new DefaultPulsarProducerFactory(pulsarClient(), properties.getProducer().buildProperties());
    }

}
  1. 配置 spring.factory

在目錄src/main/resources/META-INF下創建spring.factories,內容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
  com.sucl.pulsar.autoconfigure.PulsarAutoConfiguration
  • spring-pulsar
  1. 添加 pulsar-client 相關的依賴
 <dependencies>
     <dependency>
         <groupId>org.apache.pulsar</groupId>
         <artifactId>pulsar-client</artifactId>
     </dependency>

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-autoconfigure</artifactId>
     </dependency>

     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-messaging</artifactId>
     </dependency>
</dependencies>
  1. 定義 EnablePulsar,之前說到過,@Enable 注解主要是配合 AutoConfigure 來做功能加強,沒有了自動配置,我們依然可以使用這些模塊的功能。這里做了一件事,向 Spring 容器注冊了兩個 Bean
  • PulsarListenerAnnotationBeanProcessor 在 Spring Bean 生命周期中解析注解自定義注解 PulsarListener、PulsarHandler,
  • PulsarListenerEndpointRegistry 用來構建 Consumer 執行環境以及對 TOPIC 的監聽、觸發消費回調等等,可以說是最核心的 Bean
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({PulsarListenerConfigurationSelector.class})
public @interface EnablePulsar {

}
  1. 定義注解,參考 RabbitMq,主要針對需要關注的類與方法,分別對應注解@PulsarListener、@PulsarHandler,通過這兩個注解配合可以讓我們監聽到關注的 TOPIC, 當有消息產生時,觸發對應的方法進行消費。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarListener {

    /**
     *
     * @return TOPIC 支持SPEL
     */
    String[] topics() default {};

    /**
     *
     * @return TAGS 支持SPEL
     */
    String[] tags() default {};
}

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarHandler {

}
  1. 注解@PulsarListener 的處理流程比較復雜,這里用一張圖描述,或者可以通過下面 github 的源代碼查看具體實現

 

flow

  • spring-pulsar-sample

按照下面的流程,你會發現通過簡單的幾行代碼就能夠實現消息的生產與消費,并集成到項目中去。

  1. 簡單寫一個 SpringBoot 項目,并添加 pulsar-spring-boot-starter
    <dependencies>
    <dependency>
        <groupId>com.sucl</groupId>
        <artifactId>pulsar-spring-boot-starter</artifactId>
        <version>${project.version}</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>
  1. 添加配置
cycads:
  pulsar:
    service-url: pulsar://localhost:6650
  listener-topics: TOPIC_TEST
  1. 編寫對應消費代碼
@Slf4j
@Component
@PulsarListener(topics = "#{'${cycads.listener-topics}'.split(',')}")
public class PulsarDemoListener {

    @PulsarHandler
    public void onConsumer(Message message){
        log.info(">>> 接收到消息:{}", message.getPayload());
    }

}
  1. 向 Pulsar Broker 發送消息進行測試
@Slf4j
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {ContextConfig.class})
@Import({PulsarAutoConfiguration.class})
public class ProducerTests {

    @Autowired
    private ProducerFactory producerFactory;

    @Test
    public void sendMessage() {
        Producer producer = producerFactory.createProducer("TOPIC_TEST");
        MessageId messageId = producer.send("this is a test message");
        log.info(">>>>>>> 消息發送完成:{}", messageId);
    }

    @Configuration
    @PropertySource(value = "classpath:application-test.properties")
    static class ContextConfig {
        //
    }
}
  1. 控制臺可以看到這樣的結果
2023-02-26 19:57:15.572  INFO 26520 --- [pulsar-01] c.s.p.s.listener.PulsarDemoListener : >>> 接收到消息:GenericMessage [payload=this is a test message, headers={id=f861488c-2afb-b2e7-21a1-f15e9759eec5, timestamp=1677412635571}]

知識點

  • Pulsar Client

基于 pulsar-client 提供的 ConfigurationData 擴展 Properties;了解 Pulsar Client 如何連接 Broker 并進行消息消費,包括同步消費、異步消費等等

  • spring.factories

實現 starter 自動配置的關鍵,基于 SPI 完成配置的自動加載

  • Spring Bean 生命周期

通過 Bean 生命周期相關擴展實現注解的解析與容器的啟動,比如 BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton, InitializingBean, DisposableBean 等

  • Spring Messaging

基于回調與 MethodHandler 實現消息體的封裝、參數解析以及方法調用;

源碼示例

https://github.com/sucls/pulsar-starter.git

結束語

如果你看過 spring-kafka 的源代碼,那么你會發現所有代碼基本都是仿造其實現。一方面能夠閱讀 kafka client 在 spring 具體如何實現;同時通過編寫自己的 spring starter 模塊,學習 整個 starter 的實現過程。

原文鏈接;
https://mp.weixin.qq.com/s/i7osvAb3AeFtZuz-HZaSxw

分享到:
標簽:spring boot
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定