日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

哈嘍,大家好,我是指北君。

最近項目中準備使用消息中間件Apache Pulsar,借著機會先做個簡單了解吧。

Apache Pulsar

Apache Pulsar是Apache軟件基金會頂級項目,是下一代云原生分布式消息流平臺。

Pulsar 作為下一代云原生分布式消息流平臺,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐以及低延時的高可擴展流數據存儲特性, 內置諸多其他系統商業版本才有的特性,是云原生時代解決實時消息流數據傳輸、存儲和計算的最佳解決方案。

圖片

Pulsar簡介

  • 系統架構

圖片

  • 功能特色
    租戶和命名空間(namespace)是 Pulsar 支持多租戶的兩個核心概念。在租戶級別,Pulsar 為特定的租戶預留合適的存儲空間、應用授權與認證機制。在命名空間級別,Pulsar 有一系列的配置策略(policy),包括存儲配額、流控、消息過期策略和命名空間之間的隔離策略。
    Pulsar 做了隊列模型和流模型的統一,在 Topic 級別只需保存一份數據,同一份數據可多次消費。以流式、隊列等方式計算不同的訂閱模型大大提升了靈活度。
    Pulsar 使用計算與存儲分離的云原生架構,數據從 Broker 搬離,存在共享存儲內部。上層是無狀態 Broker,復制消息分發和服務;下層是持久化的存儲層 Bookie 集群。Pulsar 存儲是分片的,這種構架可以避免擴容時受限制,實現數據的獨立擴展和快速恢復。
    Pulsar 原生支持跨地域復制,因此 Pulsar 可以跨不同地理位置的數據中心復制數據。當數據中心中斷或網絡分區時,在多個數據中心存有消息副本尤為重要,提高可用性。
    Pulsar Functions 是基于 Pulsar 的輕量級流處理方式。Pulsar Functions 直接部署在 broker 節點上(或作為 Kube.NETes 集群中的容器)。通過 Pulsar Functions,Pulsar 可以直接解決許多流處理任務,簡化操作。?
  • 支持客戶端

JAVA 客戶端

C++ 客戶端

.Net/C# 客戶端

Go 客戶端

NodeJS 客戶端

Ruby 客戶端

Pulsar安裝與部署

目前Pulsar不支持Window,下面通過Docker進行安裝,可以參考官網??https://pulsar.apache.org/docs/next/getting-started-docker/?? 

同時可以安裝Pulsar Manager,具體操作可以參考官方文檔 ??https://pulsar.apache.org/docs/next/administration-pulsar-manager/??

其中Pulsar Manager 是一個網頁式可視化管理與監測工具,支持多環境下的動態配置。可用于管理和監測租戶、命名空間、topic、訂閱、broker、集群等。

  1. window環境使用docker推薦使用Docker Desktop,和linux一樣可以通過docker命令管理鏡像、部署容器等操作。

打開并啟動Docker Desktop后,在終端執行命令執行 

_> docker search pulsar 

可以查詢到pulsar相關的鏡像

圖片

  1. 鏡像下載

這里我們選擇分別下載紅框的兩個鏡像,執行命令 

_> docker pull apachepulsar/pulsar _> docker pull apachepulsar/pulsar-manager

  1. 啟動
  • 啟動Pulsar
docker run -it -p 6650:6650 -p 8080:8080 
      --mount source=pulsardata,target=/pulsar/data 
      --mount source=pulsarconf,target=/pulsar/conf 
      apachepulsar/pulsar bin/pulsar standalone

啟動Pulsar Manager
docker run --name pulsar-manager -dit 
      -p 9527:9527 -p 7750:7750 
      -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/Application.properties 
      apachepulsar/pulsar-manager

添加用戶:

for /f "tokens=1" %A in ('curl http://localhost:7750/pulsar-manager/csrf-token') do set CSRF_TOKEN=%A
curl -X PUT "X-XSRF-TOKEN: %CSRF_TOKEN%"   -H "Cookie: XSRF-TOKEN=%CSRF_TOKEN%;" 
  -H "Content-Type: application/json" -d "{"name": "admin", "password": "123456", "description": "super user admin", "email": "admin@test.com"}" 
  "http://localhost:7750/pulsar-manager/users/superuser"

訪問:

http://localhost:9527/ 
用戶名密碼:admin/123456

配置environments:

這里需要保證Pulsar Manager應用服務能夠訪問到Pulsar應用,由于都是通過Docker部署,配置Service URL需要使用網絡IP,不要用localhost。

圖片

管理界面:

圖片

Pulsar與SpringBoot集成

  • springboot version : 2.3.7.RELEASE
  • pulsar client: 2.10.2
  1. 通過Properties簡單定義一些Broker相關的屬性
@Data
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
    
    private String cluster;
    
    private String namespace;

    private String serverUrl;

    private String token;
}
  1. 通過配置定義了一些常用的組件,比如生產、消費工廠
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
public class PulsarBootstrapConfiguration {

    private final PulsarProperties properties;

    public PulsarBootstrapConfiguration(PulsarProperties properties) {
        this.properties = properties;
    }

    @Bean(destroyMethod = "close")
    public PulsarClient pulsarClient() throws PulsarClientException {
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(properties.getServerUrl());
        return clientBuilder.build();
    }

    @Bean
    public PulsarProducerFactory pulsarProducerFactory() throws PulsarClientException {
        return new PulsarProducerFactory(pulsarClient(), properties);
    }

    @Bean
    public PulsarConsumerFactory pulsarConsumerFactory() throws PulsarClientException {
        return new PulsarConsumerFactory(pulsarClient(), properties);
    }

}
  1. 啟動服務,在服務啟動后,通過實現SmartInitializingSingleton接口,完成容器基本啟動(不包含Lazy的Bean)后,開始對消費者Consumer監聽
@Slf4j
@SpringBootApplication
public class PulsarApplication implements SmartInitializingSingleton {

    @Autowired
    private PulsarConsumerFactory consumerFactory;

    public static void main(String[] args) {
        SpringApplication.run(PulsarApplication.class,args);
    }

    @Override
    public void afterSingletonsInstantiated() {
        startConsumerListener();
    }

    private void startConsumerListener(){
        Consumer<String> consumer = createConsumer();
        if( consumer != null ){
            while (!Thread.currentThread().isInterrupted()){
                CompletableFuture<? extends Message<?>> completableFuture = consumer.receiveAsync();
                Message<?> message = null;
                try {
                    message = completableFuture.get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("錯誤",e);
                } catch (ExecutionException e) {
                    log.error("錯誤",e);
                }

                if( message!=null ){
                    try {
                        log.info(" 接收消息:{} ", message.getValue() );
                        consumer.acknowledge(message);
                    } catch (PulsarClientException e) {
                        consumer.negativeAcknowledge(message);
                        throw new RuntimeException(e);
                    }
                }
            }
        }
    }

    private Consumer<String> createConsumer() {
        try {
            return consumerFactory.getConsumer(Constants.TOPIC_DEMO);
        } catch (PulsarClientException e) {
            log.error("創建consumer出錯:{}", e.getMessage(),e);
        }
        return null;
    }
}
  1. 消息發送測試
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class PulsarBootTests {

    @Autowired
    private PulsarProducerFactory producerFactory;

    @Test
    public void sendMessage() throws PulsarClientException {
        Producer producer = producerFactory.getProducer(Constants.TOPIC_DEMO);

        producer.send(" 測試消息: " + new Date());

        producer.close();
    }

}
  1. 檢查消息接收情況
 
2023-02-05 12:05:14.043  INFO 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [TOPIC_DEMO] [sub-TOPIC_DEMO] [7c2b2] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2023-02-05 12:06:16.425  INFO 23472 --- [           main] com.sucl.pulsar.PulsarApplication        :  接收消息: 測試消息: Sun Feb 05 12:06:16 CST 2023

結束語

該篇主要通過官網對Apache Pulsar做了簡單的了解與嘗試,同時基于SpringBoot,以簡單的示例代碼實現了消息的發送與接收,其中各個組件僅僅使用了默認的配置,在生產環境需要根據Pulsar的特性以及官方API使其具有擴展性與易用性。

分享到:
標簽:MQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定