日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

目錄
  • 搜索rabbitmq鏡像
  • 下載鏡像
  • 啟動容器
  • 打印容器
  • 訪問RabbitMQ Management
  • 編寫生產者類
  • 消費者
  • 工作隊列
    • RabbitMqUtils工具類
    • 啟動2個工作線程
    • 啟動發送線程
  • 消息應答機制
    • 生產者
    • 消費者
    • 模擬
    • 不公平分發
  • 總結 

    搜索rabbitmq鏡像

    docker search rabbitmq:management

    docker啟動rabbitmq以及使用方式詳解

    下載鏡像

    docker pull rabbitmq:management

    docker啟動rabbitmq以及使用方式詳解

    啟動容器

    docker run -d –hostname localhost –name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    打印容器

    docker logs rabbitmq

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    訪問RabbitMQ Management

    http://localhost:15672

    賬戶密碼默認:guest

    docker啟動rabbitmq以及使用方式詳解

    編寫生產者類

    package com.xun.rabbitmqdemo.example;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
            /**
             * 生成一個queue隊列
             * 1、隊列名稱 QUEUE_NAME
             * 2、隊列里面的消息是否持久化(默認消息存儲在內存中)
             * 3、該隊列是否只供一個Consumer消費 是否共享 設置為true可以多個消費者消費
             * 4、是否自動刪除 最后一個消費者斷開連接后 該隊列是否自動刪除
             * 5、其他參數
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message = "Hello world!";
            /**
             * 發送一個消息
             * 1、發送到哪個exchange交換機
             * 2、路由的key
             * 3、其他的參數信息
             * 4、消息體
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println(" [x] Sent '"+message+"'");
    
            channel.close();
            connection.close();
        }
    }
    

    運行該方法,可以看到控制臺的打印

    docker啟動rabbitmq以及使用方式詳解

    name=hello的隊列收到Message

    docker啟動rabbitmq以及使用方式詳解

    消費者

    package com.xun.rabbitmqdemo.example;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Receiver {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setConnectionTimeout(600000);//milliseconds
            factory.setRequestedHeartbeat(60);//seconds
            factory.setHandshakeTimeout(6000);//milliseconds
            factory.setRequestedChannelMax(5);
            factory.setNetworkRecoveryInterval(500);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            System.out.println("Waiting for messages. ");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME,true,consumer);
        }
    }
    
    

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    工作隊列

    RabbitMqUtils工具類

    package com.xun.rabbitmqdemo.utils;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitMqUtils {
        public static Channel getChannel() throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("guest");
            factory.setPassword("guest");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return channel;
        }
    }
    

    啟動2個工作線程

    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    
    public class Work01 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String receivedMessage = new String(delivery.getBody());
                System.out.println("接收消息:"+receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
            };
            System.out.println("C1 消費者啟動等待消費....");
            /**
             * 消費者消費消息
             * 1、消費哪個隊列
             * 2、消費成功后是否自動應答
             * 3、消費的接口回調
             * 4、消費未成功的接口回調
             */
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }
    
    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    
    public class Work02 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String receivedMessage = new String(delivery.getBody());
                System.out.println("接收消息:"+receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
            };
            System.out.println("C2 消費者啟動等待消費....");
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }
    

    啟動工作線程

    docker啟動rabbitmq以及使用方式詳解

    啟動發送線程

    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.Channel;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;
    
    public class Task01 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            try(Channel channel= RabbitMqUtils.getChannel();){
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                //從控制臺接收消息
                Scanner scanner = new Scanner(System.in);
                while(scanner.hasNext()){
                    String message = scanner.next();
                    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                    System.out.println("發送消息完成:"+message);
                }
            }
        }
    }
    

    啟動發送線程,此時發送線程等待鍵盤輸入

    docker啟動rabbitmq以及使用方式詳解

    發送4個消息

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    可以看到2個工作線程按照順序分別接收message。

    消息應答機制

    rabbitmq將message發送給消費者后,就會將該消息標記為刪除。

    但消費者在處理message過程中宕機,會導致消息的丟失。

    因此需要設置手動應答。

    生產者

    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;
    
    public class Task02 {
        private static final String TASK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            try(Channel channel = RabbitMqUtils.getChannel()){
                channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
                Scanner scanner = new Scanner(System.in);
                System.out.println("請輸入信息");
                while(scanner.hasNext()){
                    String message = scanner.nextLine();
                    channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                    System.out.println("生產者task02發出消息"+ message);
                }
            }
        }
    }
    

    消費者

    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;
    
    public class Work03 {
        private static final String ACK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("Work03 等待接收消息處理時間較短");
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String message = new String(delivery.getBody());
                SleepUtils.sleep(1);
                System.out.println("接收到消息:"+message);
                /**
                 * 1、消息的標記tag
                 * 2、是否批量應答
                 */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
            };
            //采用手動應答
            boolean autoAck = false;
            channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }
    
    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;
    
    public class Work04 {
        private static final String ACK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("Work04 等待接收消息處理時間較長");
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String message = new String(delivery.getBody());
                SleepUtils.sleep(30);
                System.out.println("接收到消息:"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
            };
            //采用手動應答
            boolean autoAck = false;
            channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }
    

    工具類SleepUtils

    package com.xun.rabbitmqdemo.utils;
    public class SleepUtils {
        public static void sleep(int second){
            try{
                Thread.sleep(1000*second);
            }catch (InterruptedException _ignored){
                Thread.currentThread().interrupt();
            }
        }
    }
    

    模擬

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    work04等待30s后發出ack

    docker啟動rabbitmq以及使用方式詳解

    在work04處理message時手動停止線程,可以看到message:dd被rabbitmq交給了work03

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    docker啟動rabbitmq以及使用方式詳解

    不公平分發

    上面的輪詢分發,生產者依次向消費者按順序發送消息,但當消費者A處理速度很快,而消費者B處理速度很慢時,這種分發策略顯然是不合理的。
    不公平分發:

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    

    通過此配置,當消費者未處理完當前消息,rabbitmq會優先將該message分發給空閑消費者。

    docker啟動rabbitmq以及使用方式詳解

    總結 

    分享到:
    標簽:Docker 啟動 方式 服務器 詳解
    用戶無頭像

    網友整理

    注冊時間:

    網站:5 個   小程序:0 個  文章:12 篇

    • 51998

      網站

    • 12

      小程序

    • 1030137

      文章

    • 747

      會員

    趕快注冊賬號,推廣您的網站吧!
    最新入駐小程序

    數獨大挑戰2018-06-03

    數獨一種數學游戲,玩家需要根據9

    答題星2018-06-03

    您可以通過答題星輕松地創建試卷

    全階人生考試2018-06-03

    各種考試題,題庫,初中,高中,大學四六

    運動步數有氧達人2018-06-03

    記錄運動步數,積累氧氣值。還可偷

    每日養生app2018-06-03

    每日養生,天天健康

    體育訓練成績評定2018-06-03

    通用課目體育訓練成績評定