一、Maven依賴添加
<!-- rabbitmq相關(guān)依賴 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
二、七種工作模式的JAVA實(shí)例
1、簡單模式
最簡單的一個消費(fèi)者和一個生產(chǎn)者模式,生產(chǎn)者生成消息,消費(fèi)者監(jiān)聽消息,若是消費(fèi)者監(jiān)聽到它所需要的消息,就會消費(fèi)該消息,這種消息是次性的,被消費(fèi)了就沒有了。
1.1.1、EasyRecv.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class EasyRecv {
//隊(duì)列名稱
private final static String QUEUE_NAME = "hello world";
public static void main(String[] argv) throws java.io.IOException,java.lang.InterruptedException {
//打開連接和創(chuàng)建頻道,與發(fā)送端一樣
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置RabbitMQ所在主機(jī)ip或者主機(jī)名
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊(duì)列,主要為了防止消息接收者先運(yùn)行此程序,隊(duì)列還不存在時創(chuàng)建隊(duì)列。
/**
* 隊(duì)列名
* 是否持久化
* 是否排外 即只允許該channel訪問該隊(duì)列 一般等于true的話用于一個隊(duì)列只能有一個消費(fèi)者來消費(fèi)的場景
* 是否自動刪除 消費(fèi)完刪除
* 其他屬性
*
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Waiting for messages. To exit press CTRL+C");
//創(chuàng)建隊(duì)列消費(fèi)者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費(fèi)隊(duì)列
/**
* 隊(duì)列名
* 其他屬性 路由
* 消息body
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一個阻塞方法(內(nèi)部實(shí)現(xiàn)其實(shí)是阻塞隊(duì)列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "'");
}
}
}
1.1.2、EasySend.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class EasySend {
//隊(duì)列名稱
private final static String QUEUE_NAME = "hello world";
public static void main(String[] argv) throws java.io.IOException
{
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置MabbitMQ所在主機(jī)ip或者主機(jī)名
factory.setHost("127.0.0.1");
while (true){
//創(chuàng)建一個連接
Connection connection = factory.newConnection();
//創(chuàng)建一個頻道
Channel channel = connection.createChannel();
//指定一個隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//發(fā)送的消息
Scanner scanner = new Scanner(System.in);
String ms = scanner.nextLine();
//String message = "hello world!";
//往隊(duì)列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, ms.getBytes());
System.out.println("Sent '" + ms + "'");
//關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
以上兩個已經(jīng)可以進(jìn)行通信了,下面同樣是簡單的實(shí)例,但是我們可以看到在代碼層面上,連接的代碼都是一樣的,所以我們可以創(chuàng)建一個連接的工具類。
1.2.1、RabbitmqConnectionUtil .java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class RabbitmqConnectionUtil {
public static Connection getConnection() throws IOException {
//連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//連接5672端口 注意15672為工具界面端口 25672為集群端口
factory.setPort(5672);
//factory.setVirtualHost("/xxxxx");
// factory.setUsername("xxxxxx");
// factory.setPassword("123456");
//獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
1.2.2、UtilSend.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class UtilSend {
private final static String QUEUE_NAME = "UtilConn";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息內(nèi)容
String message = "這里是lbw廣場";
channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
System.out.println("[x]Sent '"+message + "'");
//最后關(guān)閉通關(guān)和連接
channel.close();
connection.close();
}
}
1.2.3、UtilRecv.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class UtilRecv {
private final static String QUEUE_NAME = "UtilConn";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = null;
connection = RabbitmqConnectionUtil.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
while(true){
//該方法會阻塞
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received '"+message+"'");
}
}
}
2、工作隊(duì)列
工作隊(duì)列也就是簡單模式的強(qiáng)化版,一個隊(duì)列是可以多個生產(chǎn)者,也可以有多個消費(fèi)者來競爭消費(fèi)消息,但是我們?nèi)孕璞WC隊(duì)列的冪等性,隊(duì)列存在就不能再創(chuàng)建同名隊(duì)列。
下面的每個進(jìn)程都控制其主線程休眠,讓我們可以更好的看到結(jié)果。
2.1.1、Sender1.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender1 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i = 0; i < 100; i++){
String message = "lbw" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '"+message + "'");
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
}
2.1.2、Sender2.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender2 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i = 0; i < 100; i++){
String message = "nb" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '"+message + "'");
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
}
2.1.3、Receiver1.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/**
* Created by san
*/
public class Receiver1 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false,false, false,null);
//同一時刻服務(wù)器只會發(fā)送一條消息給消費(fèi)者
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//關(guān)于手工確認(rèn) 待之后有時間研究下
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received1 '"+message+"'");
Thread.sleep(10);
//返回確認(rèn)狀態(tài)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
2.1.4、Receiver2.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/**
* Created by san
*/
public class Receiver2 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false,false, false,null);
//同一時刻服務(wù)器只會發(fā)送一條消息給消費(fèi)者
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received2 '"+message+"'");
Thread.sleep(1000);
//返回確認(rèn)狀態(tài)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
2.1.5、結(jié)果
上面的四個程序都運(yùn)行起來,結(jié)果可以看到如下,依據(jù)結(jié)果分析,可知,同一個消息隊(duì)列,是可以有多個生產(chǎn)者和消費(fèi)者的。
3、發(fā)布/訂閱(fanout)
3.1.1、Sender.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
public class Sender {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args)
{
try
{
//獲取連接
Connection connection = RabbitmqConnectionUtil.getConnection();
//從連接中獲取一個通道
Channel channel = connection.createChannel();
//聲明交換機(jī)(分發(fā):發(fā)布/訂閱模式)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//發(fā)送消息
for (int i = 0; i < 5; i++)
{
String message = "盧本偉廣場" + i;
System.out.println("[send]:" + message);
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
Thread.sleep(5 * i);
}
channel.close();
connection.close();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
3.1.2、Receiver1.java
import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver1 {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "test_exchange_fanout";
//隊(duì)列名稱
private static final String QUEUE_NAME = "test_queue_email";
public static void main(String[] args)
{
try
{
//獲取連接
Connection connection = RabbitmqConnectionUtil.getConnection();
//從連接中獲取一個通道
final Channel channel = connection.createChannel();
//聲明交換機(jī)(分發(fā):發(fā)布/訂閱模式)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//將隊(duì)列綁定到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//保證一次只分發(fā)一個
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel)
{
//當(dāng)消息到達(dá)時執(zhí)行回調(diào)方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "utf-8");
System.out.println("[email] Receive message:" + message);
try
{
//消費(fèi)者休息2s處理業(yè)務(wù)
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println("[1] done");
//手動應(yīng)答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//設(shè)置手動應(yīng)答
boolean autoAck = false;
//監(jiān)聽隊(duì)列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
3.1.3、Receiver2.java
import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver2 {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "test_exchange_fanout";
//隊(duì)列名稱
private static final String QUEUE_NAME = "test_queue_phone";
public static void main(String[] args)
{
try
{
//獲取連接
Connection connection = RabbitmqConnectionUtil.getConnection();
//從連接中獲取一個通道
final Channel channel = connection.createChannel();
//聲明交換機(jī)(分發(fā):發(fā)布/訂閱模式)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//將隊(duì)列綁定到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//保證一次只分發(fā)一個
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//定義消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel)
{
//當(dāng)消息到達(dá)時執(zhí)行回調(diào)方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "utf-8");
System.out.println("[phone] Receive message:" + message);
try
{
//消費(fèi)者休息1s處理業(yè)務(wù)
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println("[2] done");
//手動應(yīng)答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//設(shè)置手動應(yīng)答
boolean autoAck = false;
//監(jiān)聽隊(duì)列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
3.1.4、結(jié)果
從程序運(yùn)行結(jié)果和RabbitMq的后臺看出,這樣的消息屬于廣播型,兩個不同名的隊(duì)列的都能收到該消息,只需它們都將自己綁定到同一個交換機(jī),而且,該消息是持久的,只要交換機(jī)還在,消費(fèi)者啥時候上線都能消費(fèi)它所綁定的交換機(jī),而且只會一個消費(fèi)者只會消費(fèi)一
4、路由(direct)
- 在前面的示例中,我們已經(jīng)在創(chuàng)建綁定。您可能會想起類似的代碼:
channel.queueBind(queueName,EXCHANGE_NAME,“”);
綁定是交換和隊(duì)列之間的關(guān)系。可以簡單地理解為:隊(duì)列對來自此交換的消息感興趣。
- 綁定可以采用額外的routingKey參數(shù)。為了避免與basic_publish參數(shù)混淆,我們將其稱為 綁定鍵。這是我們可以創(chuàng)建帶有鍵的綁定的方法:
channel.queueBind(queueName,EXCHANGE_NAME,“ black”);
- 直接綁定(密鑰直接綁定到單個隊(duì)列)
- 多重綁定(相同的綁定密鑰綁定多個隊(duì)列)
- 不同密鑰綁定不同的隊(duì)列,可以發(fā)揮出不同日志級別發(fā)送到不同的隊(duì)列的效果。
4.1.1、Sender
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender {
private final static String EXCHANGE_NAME = "exchange_direct";
private final static String EXCHANGE_TYPE = "direct";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
String message = "那一定是藍(lán)色";
channel.basicPublish(EXCHANGE_NAME,"key2", null, message.getBytes());
System.out.println("[x] Sent '"+message+"'");
channel.close();
connection.close();
}
}
4.1.2、Receiver1.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/**
* Created by san
*/
public class Receiver1 {
private final static String QUEUE_NAME = "queue_routing";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, InterruptedException {
// 獲取到連接以及mq通道
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received1 "+message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
4.1.3、Receiver2.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/**
* Created by san
*/
public class Receiver2 {
private final static String QUEUE_NAME = "queue_routing2";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, InterruptedException {
// 獲取到連接以及mq通道
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received2 "+message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
4.1.4、結(jié)果-總結(jié)
有一點(diǎn)要注意是:在direct下,必須是Exchange(交換機(jī))已經(jīng)存在,消費(fèi)端的隊(duì)列才能綁定到Exchange,否則會報錯。也就說上面的程序第一次運(yùn)行時,需先啟Sender,才能成功啟動Reciver。
5、話題(topic)
話題也是一個持久的消息,只要交換機(jī)還在,每個上線的消費(fèi)者都可以消費(fèi)一次自己感興趣的topic。
- *(星號)可以代替一個單詞。
- #(哈希)可以替代零個或多個單詞。
5.1.1、Sender.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender {
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
//消息內(nèi)容
String message = "這里是盧本偉廣場";
//第二個參數(shù)是topic匹配值
channel.basicPublish(EXCHANGE_NAME,"lbw.nb",null,message.getBytes());
System.out.println("[x] Sent '"+message+"'");
//關(guān)通道 關(guān)連接
channel.close();
connection.close();
}
}
5.1.2、Receiver1.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver1 {
private final static String QUEUE_NAME = "queue_topic";
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false,false, null);
//第二參數(shù)就是去匹配我興趣的topic
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.nb.*");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received1 '"+message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5.1.3、Receiver2.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver2 {
private final static String QUEUE_NAME = "queue_topic2";
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false,false, null);
//第二參數(shù)就是去匹配我興趣的topic
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.#");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received2 '"+message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5.1.4、結(jié)果-分析
話題的特色就是隊(duì)列可以獲取自己感興趣的話題消息,可以通過通配符*或#來表示匹配所有的感興趣的字符串。
6、RPC(遠(yuǎn)程過程調(diào)用)
給張圖自己體會吧(官網(wǎng)沒給示例代碼,我也就不寫了),就是通過兩個交換機(jī)實(shí)現(xiàn)一個可回調(diào)的過程吧。
三、RabbitMq的交換機(jī)
RabbitMq是有一個交換機(jī)的概念的, 消息(Message)由Client發(fā)送,RabbitMQ接收到消息之后通過交換機(jī)轉(zhuǎn)發(fā)到對應(yīng)的隊(duì)列上面。Worker會從隊(duì)列中獲取未被讀取的數(shù)據(jù)處理。這樣就可以實(shí)現(xiàn)消息的發(fā)送者無需知道消息使用者的存在,反之亦然。Direct exchange:直連(路由)交換機(jī),轉(zhuǎn)發(fā)消息到routigKey指定的隊(duì)列
Fanout exchange:扇形交換機(jī),轉(zhuǎn)發(fā)消息到所有綁定隊(duì)列(相當(dāng)于廣播)
Topic exchange:主題交換機(jī),按規(guī)則轉(zhuǎn)發(fā)消息(很靈活)
Headers exchange:首部交換機(jī)
前面的簡單類型我們都是忽略了交換機(jī)的參數(shù)的,如該方法:channel.basicPublish("", QUEUE_NAME, null, message.getBytes());就是這個方法的第一個參數(shù),置空說明使用了默認(rèn)的交換機(jī)。
有幾種交換類型可用:direct,topic,headers 和fanout。
作者:小小卡爾
原文鏈接:https://blog.csdn.net/weixin_44185736/article/details/106574637