日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

一、Maven依賴添加

 <!-- rabbitmq相關(guān)依賴 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.0.4</version>
        </dependency>

二、七種工作模式的JAVA實(shí)例

1、簡單模式

最簡單的一個消費(fèi)者和一個生產(chǎn)者模式,生產(chǎn)者生成消息,消費(fèi)者監(jiān)聽消息,若是消費(fèi)者監(jiān)聽到它所需要的消息,就會消費(fèi)該消息,這種消息是次性的,被消費(fèi)了就沒有了。

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

1.1.1、EasyRecv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class EasyRecv {
    //隊(duì)列名稱
    private final static String QUEUE_NAME = "hello world";
    public static void main(String[] argv) throws java.io.IOException,java.lang.InterruptedException {
        //打開連接和創(chuàng)建頻道,與發(fā)送端一樣
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置RabbitMQ所在主機(jī)ip或者主機(jī)名
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明隊(duì)列,主要為了防止消息接收者先運(yùn)行此程序,隊(duì)列還不存在時創(chuàng)建隊(duì)列。
               /**
         * 隊(duì)列名
         * 是否持久化
         *  是否排外  即只允許該channel訪問該隊(duì)列   一般等于true的話用于一個隊(duì)列只能有一個消費(fèi)者來消費(fèi)的場景
         *  是否自動刪除  消費(fèi)完刪除
         *  其他屬性
         *
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");

        //創(chuàng)建隊(duì)列消費(fèi)者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消費(fèi)隊(duì)列
         /**
         * 隊(duì)列名
         * 其他屬性  路由
         * 消息body
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true)
        {
            //nextDelivery是一個阻塞方法(內(nèi)部實(shí)現(xiàn)其實(shí)是阻塞隊(duì)列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'");
        }

    }
}

1.1.2、EasySend.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class EasySend {

    //隊(duì)列名稱
    private final static String QUEUE_NAME = "hello world";

    public static void main(String[] argv) throws java.io.IOException
    {
        /**
         * 創(chuàng)建連接連接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置MabbitMQ所在主機(jī)ip或者主機(jī)名
        factory.setHost("127.0.0.1");


        while (true){
            //創(chuàng)建一個連接
            Connection connection = factory.newConnection();
            //創(chuàng)建一個頻道
            Channel channel = connection.createChannel();
            //指定一個隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //發(fā)送的消息
            Scanner scanner = new Scanner(System.in);
            String ms = scanner.nextLine();
            //String message = "hello world!";
            //往隊(duì)列中發(fā)出一條消息
            channel.basicPublish("", QUEUE_NAME, null, ms.getBytes());
            System.out.println("Sent '" + ms + "'");
            //關(guān)閉頻道和連接
            channel.close();
            connection.close();
        }
    }

以上兩個已經(jīng)可以進(jìn)行通信了,下面同樣是簡單的實(shí)例,但是我們可以看到在代碼層面上,連接的代碼都是一樣的,所以我們可以創(chuàng)建一個連接的工具類。

1.2.1、RabbitmqConnectionUtil .java

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;


public  class RabbitmqConnectionUtil {

    public static Connection getConnection() throws IOException {
        //連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //連接5672端口  注意15672為工具界面端口  25672為集群端口
        factory.setPort(5672);
        //factory.setVirtualHost("/xxxxx");
       // factory.setUsername("xxxxxx");
       // factory.setPassword("123456");
        //獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}

1.2.2、UtilSend.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class UtilSend {
    private final static String QUEUE_NAME = "UtilConn";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqConnectionUtil.getConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //消息內(nèi)容
  
        String message = "這里是lbw廣場";
        channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
        System.out.println("[x]Sent '"+message + "'");
        //最后關(guān)閉通關(guān)和連接
        channel.close();
        connection.close();
    }
}

1.2.3、UtilRecv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class UtilRecv {
    private final static String QUEUE_NAME = "UtilConn";
    public static void main(String[] args) throws IOException, InterruptedException {

        Connection connection = null;
        connection = RabbitmqConnectionUtil.getConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列
       
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);

        while(true){
            //該方法會阻塞
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received '"+message+"'");
        }

    }
}

2、工作隊(duì)列

工作隊(duì)列也就是簡單模式的強(qiáng)化版,一個隊(duì)列是可以多個生產(chǎn)者,也可以有多個消費(fèi)者來競爭消費(fèi)消息,但是我們?nèi)孕璞WC隊(duì)列的冪等性,隊(duì)列存在就不能再創(chuàng)建同名隊(duì)列。

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

下面的每個進(jìn)程都控制其主線程休眠,讓我們可以更好的看到結(jié)果。

2.1.1、Sender1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender1 {
    private final  static String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for(int i = 0; i < 100; i++){
            String message = "lbw" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '"+message + "'");
            Thread.sleep(i*10);
        }

        channel.close();
        connection.close();
    }
}

2.1.2、Sender2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender2 {
    private final  static String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for(int i = 0; i < 100; i++){
            String message = "nb" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '"+message + "'");
            Thread.sleep(i*10);
        }
        channel.close();
        connection.close();
    }
}

2.1.3、Receiver1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/**
 * Created by san
 */
public class Receiver1 {
    private final static  String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false, false,null);
        //同一時刻服務(wù)器只會發(fā)送一條消息給消費(fèi)者
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        //關(guān)于手工確認(rèn) 待之后有時間研究下
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received1 '"+message+"'");
            Thread.sleep(10);
            //返回確認(rèn)狀態(tài)
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }
}

2.1.4、Receiver2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/**
 * Created by san
 */
public class Receiver2 {
    private final static  String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {

        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false, false,null);
        //同一時刻服務(wù)器只會發(fā)送一條消息給消費(fèi)者
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        
        
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received2 '"+message+"'");
            Thread.sleep(1000);
            //返回確認(rèn)狀態(tài)
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }
}

2.1.5、結(jié)果

上面的四個程序都運(yùn)行起來,結(jié)果可以看到如下,依據(jù)結(jié)果分析,可知,同一個消息隊(duì)列,是可以有多個生產(chǎn)者和消費(fèi)者的。

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

3、發(fā)布/訂閱(fanout)

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

3.1.1、Sender.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

public class Sender {
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args)
    {
        try
        {
            //獲取連接
            Connection connection = RabbitmqConnectionUtil.getConnection();
            //從連接中獲取一個通道
            Channel channel = connection.createChannel();
            //聲明交換機(jī)(分發(fā):發(fā)布/訂閱模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //發(fā)送消息
            for (int i = 0; i < 5; i++)
            {
                String message = "盧本偉廣場" + i;
                System.out.println("[send]:" + message);
                //發(fā)送消息
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
                Thread.sleep(5 * i);
            }
            channel.close();
            connection.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

}

3.1.2、Receiver1.java

import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver1 {
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    //隊(duì)列名稱
    private static final String QUEUE_NAME    = "test_queue_email";

    public static void main(String[] args)
    {
        try
        {
            //獲取連接
            Connection connection = RabbitmqConnectionUtil.getConnection();
            //從連接中獲取一個通道
            final Channel channel = connection.createChannel();
            //聲明交換機(jī)(分發(fā):發(fā)布/訂閱模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //聲明隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //將隊(duì)列綁定到交換機(jī)
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //保證一次只分發(fā)一個
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定義消費(fèi)者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當(dāng)消息到達(dá)時執(zhí)行回調(diào)方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[email] Receive message:" + message);
                    try
                    {
                        //消費(fèi)者休息2s處理業(yè)務(wù)
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[1] done");
                        //手動應(yīng)答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //設(shè)置手動應(yīng)答
            boolean autoAck = false;
            //監(jiān)聽隊(duì)列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}

3.1.3、Receiver2.java

import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver2 {

    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    //隊(duì)列名稱
    private static final String QUEUE_NAME    = "test_queue_phone";

    public static void main(String[] args)
    {
        try
        {

            //獲取連接
            Connection connection = RabbitmqConnectionUtil.getConnection();
            //從連接中獲取一個通道
            final Channel channel = connection.createChannel();
            //聲明交換機(jī)(分發(fā):發(fā)布/訂閱模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //聲明隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //將隊(duì)列綁定到交換機(jī)
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //保證一次只分發(fā)一個
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定義消費(fèi)者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當(dāng)消息到達(dá)時執(zhí)行回調(diào)方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[phone] Receive message:" + message);
                    try
                    {
                        //消費(fèi)者休息1s處理業(yè)務(wù)
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[2] done");
                        //手動應(yīng)答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //設(shè)置手動應(yīng)答
            boolean autoAck = false;
            //監(jiān)聽隊(duì)列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}

3.1.4、結(jié)果

從程序運(yùn)行結(jié)果和RabbitMq的后臺看出,這樣的消息屬于廣播型,兩個不同名的隊(duì)列的都能收到該消息,只需它們都將自己綁定到同一個交換機(jī),而且,該消息是持久的,只要交換機(jī)還在,消費(fèi)者啥時候上線都能消費(fèi)它所綁定的交換機(jī),而且只會一個消費(fèi)者只會消費(fèi)一

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

4、路由(direct)

  1. 在前面的示例中,我們已經(jīng)在創(chuàng)建綁定。您可能會想起類似的代碼:
channel.queueBind(queueName,EXCHANGE_NAME,“”);

綁定是交換和隊(duì)列之間的關(guān)系。可以簡單地理解為:隊(duì)列對來自此交換的消息感興趣。

  1. 綁定可以采用額外的routingKey參數(shù)。為了避免與basic_publish參數(shù)混淆,我們將其稱為 綁定鍵。這是我們可以創(chuàng)建帶有鍵的綁定的方法:
channel.queueBind(queueName,EXCHANGE_NAME,“ black”);
  • 直接綁定(密鑰直接綁定到單個隊(duì)列)
RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

  • 多重綁定(相同的綁定密鑰綁定多個隊(duì)列)
RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

  • 不同密鑰綁定不同的隊(duì)列,可以發(fā)揮出不同日志級別發(fā)送到不同的隊(duì)列的效果。
RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

4.1.1、Sender

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;


public class Sender {
    private final static String EXCHANGE_NAME = "exchange_direct";
    private final static String EXCHANGE_TYPE = "direct";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);

        String message = "那一定是藍(lán)色";
        channel.basicPublish(EXCHANGE_NAME,"key2", null, message.getBytes());
        System.out.println("[x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

4.1.2、Receiver1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/**
 * Created by san
 */
public class Receiver1 {
    private final  static  String QUEUE_NAME = "queue_routing";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, InterruptedException {
        // 獲取到連接以及mq通道
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received1 "+message);
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }


    }
}

4.1.3、Receiver2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;

/**
 * Created by san
 */
public class Receiver2 {
    private final  static  String QUEUE_NAME = "queue_routing2";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, InterruptedException {
        // 獲取到連接以及mq通道
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received2 "+message);
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

4.1.4、結(jié)果-總結(jié)

有一點(diǎn)要注意是:在direct下,必須是Exchange(交換機(jī))已經(jīng)存在,消費(fèi)端的隊(duì)列才能綁定到Exchange,否則會報錯。也就說上面的程序第一次運(yùn)行時,需先啟Sender,才能成功啟動Reciver。

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

5、話題(topic)

話題也是一個持久的消息,只要交換機(jī)還在,每個上線的消費(fèi)者都可以消費(fèi)一次自己感興趣的topic。

  • *(星號)可以代替一個單詞。
  • #(哈希)可以替代零個或多個單詞。
RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

5.1.1、Sender.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender {
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

        //消息內(nèi)容
        String message = "這里是盧本偉廣場";
        //第二個參數(shù)是topic匹配值
        channel.basicPublish(EXCHANGE_NAME,"lbw.nb",null,message.getBytes());
        System.out.println("[x] Sent '"+message+"'");

        //關(guān)通道 關(guān)連接
        channel.close();
        connection.close();
    }
}

5.1.2、Receiver1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver1 {
    private final static String QUEUE_NAME = "queue_topic";
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false,false, null);
        //第二參數(shù)就是去匹配我興趣的topic
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.nb.*");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received1 '"+message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }
    }
}

5.1.3、Receiver2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver2 {

    private final static String QUEUE_NAME = "queue_topic2";
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false,false, null);

        //第二參數(shù)就是去匹配我興趣的topic
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.#");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received2 '"+message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

5.1.4、結(jié)果-分析

話題的特色就是隊(duì)列可以獲取自己感興趣的話題消息,可以通過通配符*或#來表示匹配所有的感興趣的字符串。

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 


RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

6、RPC(遠(yuǎn)程過程調(diào)用)

給張圖自己體會吧(官網(wǎng)沒給示例代碼,我也就不寫了),就是通過兩個交換機(jī)實(shí)現(xiàn)一個可回調(diào)的過程吧。

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

三、RabbitMq的交換機(jī)

RabbitMq是有一個交換機(jī)的概念的, 消息(Message)由Client發(fā)送,RabbitMQ接收到消息之后通過交換機(jī)轉(zhuǎn)發(fā)到對應(yīng)的隊(duì)列上面。Worker會從隊(duì)列中獲取未被讀取的數(shù)據(jù)處理。這樣就可以實(shí)現(xiàn)消息的發(fā)送者無需知道消息使用者的存在,反之亦然。Direct exchange:直連(路由)交換機(jī),轉(zhuǎn)發(fā)消息到routigKey指定的隊(duì)列

Fanout exchange:扇形交換機(jī),轉(zhuǎn)發(fā)消息到所有綁定隊(duì)列(相當(dāng)于廣播)

Topic exchange:主題交換機(jī),按規(guī)則轉(zhuǎn)發(fā)消息(很靈活)

Headers exchange:首部交換機(jī)

RabbitMq七種工作模式,結(jié)合簡單的java實(shí)例使用,別再說你不會

 

前面的簡單類型我們都是忽略了交換機(jī)的參數(shù)的,如該方法:channel.basicPublish("", QUEUE_NAME, null, message.getBytes());就是這個方法的第一個參數(shù),置空說明使用了默認(rèn)的交換機(jī)。
有幾種交換類型可用:direct,topic,headers 和fanout。

作者:小小卡爾

原文鏈接:https://blog.csdn.net/weixin_44185736/article/details/106574637

分享到:
標(biāo)簽:RabbitMq
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定