日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

目錄
  • 一、Docker拉取鏡像并啟動RabbitMQ
  • 二、Hello World
    • (一)依賴導入
    • (二)消息生產者
    • (三)消息消費者
  • 三、實現輪訓分發消息
    • (一)抽取工具類
    • (二)啟動兩個工作線程
    • (三)啟動發送線程
  • 四、實現手動應答
    • (一)消息應答概念
    • (二)消息應答的方法
    • (三)消息自動重新入隊 
    • (四)消息手動應答代碼 
    • 1、生產者
    • 2、睡眠工具類模擬業務執行
    • 3、消費者

一、Docker拉取鏡像并啟動RabbitMQ

拉取鏡像

docker pull rabbitmq:3.8.8-management

查看鏡像

docker images rabbitmq

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

 啟動鏡像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虛擬機記得開放5672端口或者關閉防火墻,在window通過 主機ip:15672 訪問rabbitmq控制臺

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

 用戶名密碼默認為guest

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

二、Hello World

(一)依賴導入

<!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依賴客戶端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一個依賴-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

(二)消息生產者

工作原理

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

  • Broker:接收和分發消息的應用,RabbitMQ Server 就是 Message Broker
  • Connection:publisher/consumer 和 broker 之間的 TCP 連接
  • Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection 的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個 thread 創建單獨的 channel 進行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統建立 TCP connection 的開銷
  • Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最終被送到這里等待 consumer 取走

我們需要先獲取連接(Connection),然后通過連接獲取信道(Channel),這里我們演示簡單例子,可以直接跳過交換機(Exchange)發送隊列(Queue)

public class Producer {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設置主機ip
        factory.setHost("182.92.234.71");
        // 設置用戶名
        factory.setUsername("guest");
        // 設置密碼
        factory.setPassword("guest");
        //channel 實現了自動 close 接口 自動關閉 不需要顯示關閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
        /*
         * 生成一個隊列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
         * 1.隊列名稱
         * 2.隊列里面的消息是否持久化 默認消息存儲在內存中
         * 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費
         * 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除
         * 5.其他參數
         **/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello rabbitmq";
        /*
         * 發送一個消息
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.發送到哪個交換機
         * 2.路由的key是哪個
         * 3.其他的參數信息
         * 4.發送消息的消息體
         *
         **/
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("發送成功");
    }
}

(三)消息消費者

public class Consumer {
 
    private static final String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設置主機ip
        factory.setHost("182.92.234.71");
        // 設置用戶名
        factory.setUsername("guest");
        // 設置密碼
        factory.setPassword("guest");
        //channel 實現了自動 close 接口 自動關閉 不需要顯示關閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
 
        // 推送的消息如何進行消費的回調接口
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消費的一個回調接口,如在消費的時候隊列被刪除了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消費被中斷");
        };
        /*
         * 消費者消費消息
         * basicConsume(String queue, boolean autoAck, 
         * DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * 1.消費哪個隊列
         * 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答
         * 3.消費者未成功消費的回調
         **/
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

三、實現輪訓分發消息

(一)抽取工具類

可以發現,上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個工具類來調用,并使用單例模式-餓漢式完成信道的初始化

public class RabbitMqUtils {
 
    private static Channel channel;
 
    static {
        ConnectionFactory factory = new ConnectionFactory();
        // 設置ip地址
        factory.setHost("192.168.23.100");
        // 設置用戶名
        factory.setUsername("guest");
        // 設置密碼
        factory.setPassword("guest");
        try {
            // 創建連接
            Connection connection = factory.newConnection();
            // 獲取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("創建信道失敗,錯誤信息:" + e.getMessage());
        }
    }
 
    public static Channel getChannel() {
        return channel;
    }
}

(二)啟動兩個工作線程

相當于前面的消費者,我們只需要寫一個類,通過ideal實現多線程啟動即可模擬兩個線程

public class Worker01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            System.out.println("接受到消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費者取消消費接口回調邏輯");
        };
        // 啟動兩次,第一次為C1, 第二次為C2
        System.out.println("C2消費者等待消費消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
    }
}

(三)啟動發送線程

public class Test01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 通過控制臺輸入充當消息,使輪訓演示更明顯
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發送完成:" + message);
        }
    }
}

結果 

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

四、實現手動應答

(一)消息應答概念

消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成 了部分突然它掛掉了,會發生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消 息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續 發送給該消費這的消息,因為它無法接收到。 為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是: 消費者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了。

自動應答:消費者發送后立即被認為已經傳送成功。這種模式需要在高吞吐量和數據傳輸安全性方面做權,因為這種模式如果消息在接收到之前,消費者那邊出現連接或者 channel 關閉,那么消息就丟失了。

當然另一方面這種模式消費者那邊可以傳遞過載的消息, 沒有對傳遞的消息數量進行限制 , 當然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終 使得內存耗盡,最終這些消費者線程被操作系統殺死,所以這種模式僅適用在消費者可以高效并 以某種速率能夠處理這些消息的情況下使用

手動應答:消費者接受到消息并順利完成業務后再調用方法進行確認,rabbitmq 才可以把該消息刪除

(二)消息應答的方法

  • Channel.basicAck(用于肯定確認)
  • RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
  • Channel.basicNack(用于否定確認)
  • Channel.basicReject(用于否定確認)
  • 與 Channel.basicNack 相比少一個參數Multiple
  • multiple 的 true 和 false 代表不同意思

        true 代表批量應答 channel 上未應答的消息
        比如說 channel 上有傳送 tag 的消息 5,6,7,8 當前 tag 是 8 那么此時
        5-8 的這些還未應答的消息都會被確認收到消息應答
        false 同上面相比
        只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答

  • 不處理該消息了直接拒絕,可以將其丟棄了

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

(三)消息自動重新入隊 

如果消費者由于某些原因失去連接(其通道已關閉,連接已關閉或 TCP 連接丟失),導致消息未發送 ACK 確認,RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者可以處理,它將很快將其重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確 保不會丟失任何消息。

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

(四)消息手動應答代碼 

1、生產者

public class Test01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發送完成:" + message);
        }
    }
}

2、睡眠工具類模擬業務執行

public class SleepUtils {
 
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3、消費者

public class Worker01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C1,業務時間短");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(1);  // 模擬業務執行1秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標識
             * 2、是否啟動批量確認,false:否。
             *    啟用批量有可能造成消息丟失,比如未消費的消息提前被確然刪除,后面業務消費該消息
             *    時出現異常會導致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費者取消消費接口回調邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}
 
==============================================================================
public class Worker02 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C2,業務時間長");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(15);  // 模擬業務執行15秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標識
             * 2、是否啟動批量確認,false:否。
             *    啟用批量有可能造成消息丟失,比如未消費的消息提前被確然刪除,后面業務消費該消息
             *    時出現異常會導致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費者取消消費接口回調邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

worker01業務時間短,worker02業務時間長,我們提前終止worker02模擬出異常,可以看到消息dd會被放回隊列由worker01接收處理。

注意:這里需要先啟動生產者聲明隊列ack,不然啟動消費者會報錯

Docker啟動RabbitMQ實現生產者與消費者的詳細過程

最后一個案例我們可以看到消息輪訓+消息自動重新入隊+手動應答。

分享到:
標簽:啟動 服務器 消費者 生產者 過程
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定