說明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根據基礎知識去理解RabbitMQ是什么、提供了什么功能。
一、MQ的簡單理解
1. 什么是MQ?
- 消息隊列(Message Queue),是基礎數據結構中 “先進先出” 的一種數據結構。
- 一般用來解決應用解耦、異步消息、流量削峰等問題,實現高性能、高可用、可伸縮和最終一致性架構。
2.MQ是怎么實現消息傳遞的?
- 生產者產生消息并把傳輸的數據(消息)放在隊列中,用隊列機制來實現消息傳遞。
- 消費者可以到指定的隊列拉取消息,或者訂閱相應的隊列,由MQ服務端給其推送消息。
3.MQ的幾個主要特性
- 解耦:一個業務需要多個模塊共同實現,或一條消息有多個系統對應處理,只需要在主業務完成以后,發送一條MQ,其余模塊消費MQ消息,即可實現業務,降低模塊之間的耦合。
- 異步:主業務執行結束后,從屬業務通過MQ異步處理,減少業務的響應時間,提高用戶體驗。
- 削峰:高并發情況下,業務異步處理,提供高峰期業務處理能力,避免系統癱瘓。
4.MQ的缺點
- 系統可用性降低。依賴服務越多,服務越容易掛掉。需要考慮MQ癱瘓的情況。
- 系統的復雜性提高。需要考慮消息丟失、消息重復消費、消息傳遞的順序性。
- 業務一致性。主業務和從屬業務一致性的處理。
二、RabbitMQ的簡單介紹
1. 什么是RabbitMQ?
RabbitMQ是消息代理,它接受并轉發消息。
- RabbitMQ可以理解為一個郵箱,或者一個郵局,或者是一個郵遞員,保證 “張三” 的信件最終傳遞給 “李四”。
- RabbitMQ與上述所描述的郵局(郵箱、郵遞員)的主要區別在于它不處理紙張,而是接受、存儲和轉發二進制數據塊消息。
2.RabbitMQ和消息傳遞的三個術語
- 生產:生產只意味著發送。發送消息的程序是生產者(production)。
- 隊列:隊列是位于RabbitMQ中的“郵箱”的名稱。盡管消息流經RabbitMQ和應用程序,但他們只能存在于隊列中。隊列只受主機的內存和磁盤限制,它的本質上是一個打的消息緩沖區。許多生產者可以向一個隊列發送消息,許多消費者可以嘗試從一個隊列接收數據。
- 消費(接收):消費與接收具有相似的含義。一個消費者(consumer)是一個程序,主要是等待接收信息。
注意:生產者、消費者、代理不必部署在同一主機上,應用程序既可以是生產者,又可以是消費者
三、RabbitMQ安裝
3.1環境說明(本文以RabbitMQ3.8.11為例)
RabbitMQ對Erlang版本要求(Rabbit是基于Erlang編寫的)
RabbitMQ對JDK版本要求
3.2 安裝Erlang步驟(本文以windows版安裝為例)
3.2.1 下載Erlang,或訪問如下鏈接進行下載:
http://erlang.org/download/otp_win64_23.2.exe
3.2.2 雙擊運行 otp_win64_23.2.exe ,點擊下一步完成安裝。
3.2.3 安裝完成后配置環境變量,如下圖所示
3.2.4 運行窗口輸入cmd,在dos窗口輸入 erl ,返回如圖中所示,則代表erlang安裝完成。
3.2 安裝RibbitMQ步驟(本文以windows版安裝為例)
3.2.1 點擊下載RibbitMQ,或訪問如下鏈接進行下載:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.11/rabbitmq-server-3.8.11.exe
3.2.2 雙擊運行 rabbitmq-server-3.8.11.exe,點擊下一步完成安裝。
3.2.3 雙擊RabbitMQ Service - start 運行RabbitMQ
出現如下提示,則代表服務啟動成功:
3.2.4 訪問RabbitMQ控制臺
控制臺地址: http://localhost:15672/
控制臺用戶名/密碼 : guest/guest
四、RabbitMQ傳遞消息的方式(JAVA客戶端)
- Work queues(工作隊列)
- Publish/Subscribe(發布/訂閱)
- Routing(路由)
- Topics(主題)
- RPC(遠程過程調用)
- Publisher Confirms(發布者確認)
環境要求:
- JDK版本為15(1.8+即可)
- amqp-client 5.10.0
添加依賴:
<!--ribbitMq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
4.1 Work queues(工作隊列)
官方描述:
工作隊列(又名:任務隊列)背后的主要思想是避免立即執行資源密集型任務,并且必須等待它完成。相反,我們把任務安排在以后完成。我們將任務封裝為消息并將其發送到隊列。后臺運行的工作進程將彈出任務并最終執行作業。當您運行多個worker時,任務將在它們之間共享。
代碼示例:
生產者:
1 public class NewTask {
2
3 private static final String TASK_QUEUE_NAME = "task_queue";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 設置IP
10 factory.setHost("127.0.0.1");
11
12 // 設置端口號
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
18
19 String message = String.join(" ", "four");
20
21 channel.basicPublish("", TASK_QUEUE_NAME,
22 MessageProperties.PERSISTENT_TEXT_PLAIN,
23 message.getBytes(StandardCharsets.UTF_8));
24
25 System.out.println(" [x] Sent '" + message + "'");
26 }
27 }
28 }
消費者:
1 public class Worker {
2
3 private static final String TASK_QUEUE_NAME = "task_queue";
4
5 public static void main(String[] args )throws Exception {
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 設置IP
10 factory.setHost("127.0.0.1");
11
12 // 設置端口號
13 factory.setPort(5672);
14
15 final Connection connection = factory.newConnection();
16 final Channel channel = connection.createChannel();
17
18 channel.queueDeclare(TASK_QUEUE_NAME, true, false,false,null);
19 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
20
21 channel.basicQos(1);
22
23 DeliverCallback deliverCallback = (comsumerTag, delivery) ->{
24 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25
26 System.out.println(" [x] Received '" + message + "'");
27
28 try {
29 doWork(message);
30 } finally {
31 System.out.println("[x] Done");
32 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
33 }
34 };
35 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, comsumerTag -> {});
36 }
37
38 private static void doWork(String task){
39 for (char ch : task.toCharArray()){
40 if(ch == '.'){
41 try {
42 Thread.sleep(1000);
43 } catch (InterruptedException e) {
44 Thread.currentThread().interrupt();
45 }
46 }
47 }
48 }
49 }
4.3 Publish/Subscribe(發布/訂閱)
官方描述:
RabbitMQ消息傳遞模型中的核心思想是生產者從不將任何消息直接發送到隊列。實際上,生產者經常甚至根本不知道是否將消息傳遞到任何隊列。相反,生產者只能將消息發送到交換機。交流是一件非常簡單的事情。一方面,它接收來自生產者的消息,另一方面,將它們推入隊列。交易所必須確切知道如何處理收到的消息。是否應將其附加到特定隊列?是否應該將其附加到許多隊列中?還是應該丟棄它。規則由交換類型定義 。
簡而言之:
相當于我們關注了一個微信公眾號,公眾號每次推文我們都能及時的收到。我們就相當于消費者,公眾號相當于消息中轉站,文章作者相當于生產者。
代碼示例:
生產者:
1 public class EmitLog {
2
3 private static final String ExCHANGE_NAME = "logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 設置IP
10 factory.setHost("127.0.0.1");
11
12 // 設置端口號
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18
19 String message = args.length < 1 ? "info: Hello World!" : String.join(" ", args);
20
21 channel.basicPublish(ExCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
22
23 System.out.println(" [x] Sent '" + message + "'");
24
25 }
26
27 }
28
29 }
消費者:
1 public class ReceiveLogs {
2
3 private static final String ExCHANGE_NAME = "logs";
4
5 public static void main(String[] args) throws Exception{
6 ConnectionFactory factory = new ConnectionFactory();
7
8 // 設置IP
9 factory.setHost("127.0.0.1");
10
11 // 設置端口號
12 factory.setPort(5672);
13
14 Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel();
16
17 channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18 String queueName = channel.queueDeclare().getQueue();
19 channel.queueBind(queueName, ExCHANGE_NAME, "");
20
21 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
22
23 DeliverCallback deliverCallback = (sonsumerTag, delivery) -> {
24 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25 System.out.println(" [x] Received '" + message + "'");
26 };
27 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
28 }
29 }
4.4 Routing(路由)
官方描述:
接上例,我們可能希望將日志消息寫入磁盤的程序僅接收嚴重錯誤,而不會在警告或信息日志消息上浪費磁盤空間。
簡而言之:
如果我們只想接收某些信息,比如日志級別有INFO、ERROR、DEBUG等,我們只愿接收INFO日志。可以使用Routing進行過濾。
代碼示例:
生產者:
1 public class EmitLogDirect {
2
3 private static final String EXCHANGE_NAME = "direct_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 設置IP
10 factory.setHost("127.0.0.1");
11
12 // 設置端口號
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
18
19 String severity = getServerity(args);
20 String message = getMessage(args);
21
22 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8));
23 System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
24
25 }
26
27 }
28
29 private static String getServerity(String[] strings){
30 if (strings.length < 1){
31 return "info";
32 }
33 return strings[0];
34
35 }
36
37 private static String getMessage(String[] strings){
38 if (strings.length < 2) {
39 return "Hello World!";
40 }
41 return joinStrings(strings, " ", 1);
42 }
43
44 private static String joinStrings(String[] strings, String delimiter, int startIndex){
45 int length = strings.length;
46 if(length == 0){
47 return "";
48 }
49 if(length <= startIndex){
50 return "";
51 }
52 StringBuilder words = new StringBuilder(strings[startIndex]);
53 for (int i = startIndex + 1; i < length; i++){
54 words.Append(delimiter).append(strings[i]);
55 }
56 return words.toString();
57
58 }
59 }
消費者:
1 public class ReceiveLogsDirect {
2
3 private static final String EXCHANGE_NAME = "direct_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 設置IP
10 factory.setHost("127.0.0.1");
11
12 // 設置端口號
13 factory.setPort(5672);
14
15 Connection connection = factory.newConnection();
16
17 Channel channel = connection.createChannel();
18
19 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
20
21 String queueName = channel.queueDeclare().getQueue();
22
23 if(args.length < 1){
24 System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
25 System.exit(1);
26 }
27
28 for (String severity : args){
29 channel.queueBind(queueName, EXCHANGE_NAME, severity);
30 }
31 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
32
33 DeliverCallback deliverCallback = (consumerTag, delivery)->{
34 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
35 System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
36 };
37
38 channel.basicConsume(queueName, true, deliverCallback, comsumerTag ->{});
39 }
40 }
4.5 Topics(主題)
官方描述:
發送到主題交換機的消息不能具有任意的 routing_key-它必須是單詞列表,以點分隔。這些詞可以是任何東西,但通常它們指定與消息相關的某些功能。一些有效的路由關鍵示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由關鍵字中可以包含任意多個單詞,最多255個字節。
綁定密鑰也必須采用相同的形式。主題交換背后的邏輯 類似于直接交換-用特定路由鍵發送的消息將傳遞到所有用匹配綁定鍵綁定的隊列。但是,綁定鍵有兩個重要的特殊情況:
- *(星)只能代替一個單詞。#(散列)可以代替零個或多個單詞。
簡而言之:
Topic會根據消息自身所攜帶的路由鍵(Routing Key)在所有的綁定關系中尋找,與消息相匹配的隊列推送該消息。
注意:
當在綁定中不使用特殊字符“ * ”(星號)和“ # ”(哈希)時,主題交換的行為就像直接的一樣。
代碼示例:
生產者:
1 public class EmitLogTopic {
2
3 private static final String EXCHANGE_NAME = "topic_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8 // 設置IP
9 factory.setHost("127.0.0.1");
10
11 // 設置端口號
12 factory.setPort(5672);
13
14 try(Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel()){
16
17 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18
19 String routingKey = getRouting(args);
20 String message = getMessage(args);
21
22 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
23 System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
24 }
25 }
26
27 private static String getRouting(String[] strings){
28 if (strings.length < 1){
29 return "anonymous.info";
30 }
31 return strings[0];
32 }
33
34 private static String getMessage(String[] strings){
35 if (strings.length < 2){
36 return "hello world";
37 }
38 return joinStrings(strings, " ", 1);
39 }
40
41 private static String joinStrings(String[] strings, String delimiter, int startIndex){
42 int length = strings.length;
43 if(length == 0){
44 return "";
45 }
46 if(length < startIndex){
47 return "";
48 }
49 StringBuilder words = new StringBuilder(strings[startIndex]);
50 for (int i = startIndex + 1; i < length; i++){
51 words.append(delimiter).append(strings[i]);
52 }
53 return words.toString();
54 }
55 }
消費者:
1 public class ReceiveLogTopic {
2
3 private static final String EXCHANGE_NAME = "topic_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8 // 設置IP
9 factory.setHost("127.0.0.1");
10
11 // 設置端口號
12 factory.setPort(5672);
13
14 Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel();
16
17 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18
19 String queueName = channel.queueDeclare().getQueue();
20
21 if(args.length < 1){
22 System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
23 System.exit(1);
24 }
25
26 for (String bindingKey : args){
27 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
28 }
29
30 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
31
32 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
33 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
34 System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
35 };
36 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
37 }
38 }
4.6 RPC(遠程過程調用)
官方描述:
盡管RPC是計算中非常普遍的模式,但它經常受到批評。當程序員不知道函數調用是本地的還是緩慢的RPC時,就會出現問題。這樣的混亂會導致系統變幻莫測,并給調試增加了不必要的復雜性。濫用RPC可能會導致無法維護的意大利面條代碼,而不是簡化軟件。
代碼示例:
生產者
1 public class RPCServer {
2
3 private static final String RPC_QUEUE_NAME = "rpc_queue";
4
5 private static int fib(int n){
6 if(n == 0){
7 return 0;
8 }
9 if(n == 1){
10 return 1;
11 }
12 return fib(n - 1) + fib(n - 2);
13 }
14
15 public static void main(String[] args) throws Exception{
16
17 // 創建服務器的連接
18 ConnectionFactory factory = new ConnectionFactory();
19
20 // 設置IP
21 factory.setHost("127.0.0.1");
22
23 // 設置端口號
24 factory.setPort(5672);
25
26 try (Connection connection = factory.newConnection();
27 Channel channel = connection.createChannel()) {
28 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
29 channel.queuePurge(RPC_QUEUE_NAME);
30
31 channel.basicQos(1);
32
33 System.out.println(" [x] Awaiting RPC requests");
34
35 Object monitor = new Object();
36 DeliverCallback deliverCallback = (consumerTag, delivery) ->{
37 AMQP.BasicProperties replyProps = new AMQP.BasicProperties
38 .Builder()
39 .correlationId(delivery.getProperties().getCorrelationId())
40 .build();
41
42 String response = "";
43
44 try{
45 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
46 int n = Integer.parseInt(message);
47
48 System.out.println(" [.] fib(" + message + ")");
49 response += fib(n);
50 }catch (RuntimeException e){
51 System.out.println(" [.] " + e.toString());
52 }finally {
53 channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
54 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
55
56 // RabbitMq consumer worker thread notifies the RPC server owner thread
57 // RabbitMq使用者工作線程通知RPC服務器所有者線程
58 synchronized (monitor){
59 monitor.notify();
60 }
61 }
62 };
63 channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
64 // Wait and be prepared to consume the message from RPC client.
65 // 等待并準備使用來自RPC客戶端的消息。
66 while(true){
67 synchronized (monitor){
68 try {
69 monitor.wait();
70 }catch (InterruptedException e){
71 e.printStackTrace();
72 }
73 }
74 }
75 }
76 }
77 }
消費者:
1 public class RPCClient {
2
3 private Connection connection;
4 private Channel channel;
5 private String requestQueueName = "rpc_queue";
6
7 public RPCClient() throws IOException, TimeoutException {
8 // 創建服務器的連接
9 ConnectionFactory factory = new ConnectionFactory();
10
11 // 設置IP
12 factory.setHost("127.0.0.1");
13
14 // 設置端口號
15 factory.setPort(5672);
16
17 connection = factory.newConnection();
18 channel = connection.createChannel();
19 }
20
21 public static void main(String[] args) throws Exception{
22 RPCClient fibonacciRpc = new RPCClient();
23 for (int i = 0; i < 32; i++) {
24 String i_str = Integer.toString(i);
25 System.out.println(" [x] Requesting fib(" + i_str + ")");
26 String response = fibonacciRpc.call(i_str);
27 System.out.println(" [.] Got '" + response + "'");
28 }
29
30 }
31
32 public String call(String message) throws IOException, InterruptedException {
33 final String corrId = UUID.randomUUID().toString();
34
35 String replyQueueName = channel.queueDeclare().getQueue();
36 AMQP.BasicProperties props = new AMQP.BasicProperties
37 .Builder()
38 .correlationId(corrId)
39 .replyTo(replyQueueName)
40 .build();
41
42 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
43
44 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
45
46 String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
47 if (delivery.getProperties().getCorrelationId().equals(corrId)) {
48 response.offer(new String(delivery.getBody(), "UTF-8"));
49 }
50 }, consumerTag -> {
51 });
52
53 String result = response.take();
54 channel.basicCancel(ctag);
55 return result;
56 }
57
58 public void close() throws IOException {
59 connection.close();
60 }
61 }
4.7 Publisher Confirms(發布者確認)
官方描述:
在某些應用程序中,確保將發布的消息發送到代理非常重要。發布者確認是RabbitMQ功能,可以幫助滿足此要求。發布者確認本質上是異步的,但也可以同步處理它們。沒有確定的方法可以實現發布者確認,這通常歸結為應用程序和整個系統中的約束。典型的技術有:
- 單獨發布消息,同步等待確認:簡單,但吞吐量非常有限。
- 批量發布消息,同步等待批量確認:簡單,合理的吞吐量,但是很難推斷出什么時候出了問題。
- 異步處理:最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是可以正確實施。
代碼示例:
1 public class PublisherConfirms {
2
3 static final int MESSAGE_COUNT = 50_000;
4
5 static Connection createConnection() throws Exception{
6
7 ConnectionFactory cf = new ConnectionFactory();
8
9 // 設置IP
10 cf.setHost("127.0.0.1");
11
12 // 設置端口號
13 cf.setPort(5672);
14
15 // 設置用戶名
16 cf.setUsername("guest");
17
18 // 設置密碼
19 cf.setPassword("guest");
20
21 return cf.newConnection();
22 }
23
24 public static void main(String[] args) throws Exception{
25 publishMessagesIndividually();
26 publishMessagesInBatch();
27 handlePublishConfirmsAsynchronously();
28 }
29
30 static void publishMessagesIndividually() throws Exception{
31 try(Connection connection = createConnection()){
32 Channel ch = connection.createChannel();
33
34 String queue = UUID.randomUUID().toString();
35 ch.queueDeclare(queue, false, false, true, null);
36
37 ch.confirmSelect();
38
39 long start = System.nanoTime();
40 for (int i = 0; i < MESSAGE_COUNT; i++){
41 String body = String.valueOf(i);
42 ch.basicPublish("", queue, null, body.getBytes(StandardCharsets.UTF_8));
43 ch.waitForConfirmsOrDie(5_000);
44 }
45 long end = System.nanoTime();
46 System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
47 }
48 }
49
50 static void publishMessagesInBatch() throws Exception {
51 try (Connection connection = createConnection()) {
52 Channel ch = connection.createChannel();
53
54 String queue = UUID.randomUUID().toString();
55 ch.queueDeclare(queue, false, false, true, null);
56
57 ch.confirmSelect();
58
59 int batchSize = 100;
60 int outstandingMessageCount = 0;
61 long start = System.nanoTime();
62 for (int i = 0; i < MESSAGE_COUNT; i++) {
63 String body = String.valueOf(i);
64 ch.basicPublish("", queue, null, body.getBytes());
65 outstandingMessageCount++;
66
67 if (outstandingMessageCount == batchSize) {
68 ch.waitForConfirmsOrDie(5_000);
69 outstandingMessageCount = 0;
70 }
71 }
72
73 if (outstandingMessageCount > 0) {
74 ch.waitForConfirmsOrDie(5_000);
75 }
76 long end = System.nanoTime();
77 System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
78
79 }
80
81 }
82
83 static void handlePublishConfirmsAsynchronously() throws Exception {
84 try (Connection connection = createConnection()) {
85 Channel ch = connection.createChannel();
86
87 String queue = UUID.randomUUID().toString();
88 ch.queueDeclare(queue, false, false, true, null);
89
90 ch.confirmSelect();
91
92 ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
93
94 ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
95 if (multiple) {
96 ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
97 sequenceNumber, true
98 );
99 confirmed.clear();
100 } else {
101 outstandingConfirms.remove(sequenceNumber);
102 }
103 };
104
105 ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
106 String body = outstandingConfirms.get(sequenceNumber);
107 System.err.format(
108 "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
109 body, sequenceNumber, multiple
110 );
111 cleanOutstandingConfirms.handle(sequenceNumber, multiple);
112 });
113
114 long start = System.nanoTime();
115 for (int i = 0; i < MESSAGE_COUNT; i++) {
116 String body = String.valueOf(i);
117 outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
118 ch.basicPublish("", queue, null, body.getBytes());
119 }
120
121 if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
122 throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
123 }
124
125 long end = System.nanoTime();
126 System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
127 }
128 }
129
130 static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
131 int waited = 0;
132 while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
133 Thread.sleep(100L);
134 waited = +100;
135 }
136 return condition.getAsBoolean();
137 }
138 }
五、總結
總的來說,RabbitMQ還是比較簡單的。目前文章只是簡單記錄一下,后期會更深入學習。
作者: 學海無涯519
原文鏈接:https://www.cnblogs.com/wgx519/p/14371511.html