日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

一、關于 RabbitMQ

說到 RabbitMQ,相信大家都不會陌生,微服務開發中必不可少的中間件。

深入剖析 rabbitMQ

 

在上篇關于消息隊列的文章中,我們了解到 RabbitMQ 本質其實是用 Erlang 開發的 AMQP(Advanced Message Queuing Protocol )的具體實現,最初起源于金融系統,主要用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面有著不俗的表現。

2010年4月,RabbitMQ 科技公司被 VMware 旗下的 SpringSource 收購,在 2013 年 5 月被并入 Pivotal 。

其實 VMware,Pivotal 本質上是一家的。不同的是,VMware 是獨立上市子公司,而 Pivotal 是整合了EMC的某些資源,現在并沒有上市。其中我們現在使用的 Spring 系列框架,就是 Pivotal 公司熱門的產品之一。

直到后來 Pivotal 將其開源,RabbitMQ 才逐漸走向大眾!

RabbitMQ 發展到今天,已經被越來越多的人認可,尤其是互聯網公司,已經有著大規模的場景應用,今天我們就一起來深入了解一下 RabbitMQ。

二、RabbitMQ 模型介紹

2.1、內部結構分析

上面我們有說到 RabbitMQ 本質是 AMQP 協議的一個開源實現,在詳細介紹 RabbitMQ 之前,我們先來看一下 AMQP 的內部結構圖!

深入剖析 rabbitMQ

 

基本概念如下

  • Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序
  • Exchange:交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列
  • Binding:綁定,用于將消息隊列和交換器之間建立關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將它理解成一個由綁定構成的路由表。
  • Queue:消息隊列,用來保存消息直到發送給消費者
  • Connection:網絡連接,比如一個 TCP 連接
  • Channel:信道,多路復用連接中的一條獨立的雙向數據流通道
  • Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序
  • Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 /
  • Broker:表示消息隊列服務器實體
  • Message:消息實體,它由消息頭和消息體組成。消息頭主要由路由鍵、交換器、隊列、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等屬性組成,而消息體就是指具體的業務對象

相比傳統的 JMS 模型,AMQP 主要多了 ExchangeBinding 這個新概念。

在 AMQP 模型中,消息的生產者不是直接將消息發送到Queue隊列,而是將消息發送到Exchange交換器,其中還新加了一個中間層Binding綁定,作用就是通過路由鍵Key將交換器和隊列建立綁定關系。

深入剖析 rabbitMQ

 

就好比類似用戶表角色表,中間通過用戶角色表來將用戶和角色建立關系,從而實現關系綁定,在 RabbitMQ 中,消息生產者不直接跟隊列建立關系,而是將消息發送到交換器之后,由交換器通過已經建立好的綁定關系,將消息發送到對應的隊列!

RabbitMQ 最終的架構模型,核心部分就變成如下圖所示:

深入剖析 rabbitMQ

 

從圖中很容易看出,與 JMS 模型最明顯的差別就是消息的生產者不直接將消息發送給隊列,而是由Binding綁定決定交換器的消息應該發送到哪個隊列,進一步實現了在消息的推送方面,更加靈活!

2.2、交換器分發策略

當消息的生產者將消息發送到交換器之后,是不會存儲消息的,而是通過中間層綁定關系將消息分發到不同的隊列上,其中交換器的分發策略分為四種:Direct、Topic、Headers、Fanout!

  • Direct:直連類型,即在綁定時設定一個 routing_key, 消息的 routing_key 匹配時, 才會被交換器投送到綁定的隊列中去,原則是先匹配、后投送
  • Topic:按照規則轉發類型,支持通配符匹配,和 Direct 功能一樣,但是在匹配 routing_key的時候,更加靈活,支持通配符匹配,原則也是先匹配、后投送
  • Headers:頭部信息匹配轉發類型,根據消息頭部中的 header attribute 參數類型,將消息轉發到對應的隊列,原則也是先匹配、后投送
  • Fanout:廣播類型,將消息轉發到所有與該交互機綁定的隊列上,不關心 routing_key;

2.2.1、Direct

Direct 是 RabbitMQ 默認的交換機模式,也是最簡單的模式,消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。

如果傳入的 routing key 為 black,不會轉發到black.green。Direct 類型交換器是完全匹配、單播的模式

深入剖析 rabbitMQ

 

2.2.2、Topic

Topic 類型交換器轉發消息和 Direct 一樣,不同的是:它支持通配符轉發,相比 Direct 類型更加靈活!

兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個。

如果傳入的 routing key 為 black#,不僅會轉發到black,也會轉發到black.green。

深入剖析 rabbitMQ

 

2.2.3、Headers

headers 也是根據規則匹配, 相比 direct 和 topic 固定地使用 routing_key , headers 則是通過一個自定義匹配規則的消息頭部類進行匹配。

在隊列與交換器綁定時,會設定一組鍵值對規則,消息中也包括一組鍵值對( headers 屬性),當這些鍵值對有一對, 或全部匹配時,消息被投送到對應隊列。

此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。

深入剖析 rabbitMQ

 

2.2.4、Fanout

Fanout 類型交換器與上面幾個不同,不管路由鍵或者是路由模式,會把消息發給綁定給它的全部隊列,如果配置了 routing_key 會被忽略,也被成為消息廣播模式。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息

fanout 類型轉發消息在四種類型中是最快的。

深入剖析 rabbitMQ

 

三、RabbitMQ 安裝

RabbitMQ 基于 erlang 進行通信,相比其它的軟件,安裝有些麻煩,為了跟生產環境保持一直,操作系統選擇centos7,不過本例采用rpm方式安裝,任何新手都可以完成安裝,過程如下!

3.1、安裝前命令準備

輸入如下命令,完成安裝前的環境準備。

yum install lsof  build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim

3.2、下載 RabbitMQ、erlang、socat 的安裝包

本次下載的是RabbitMQ-3.6.5版本,采用rpm一鍵安裝,適合新手直接上手。

先創建一個rabbitmq目錄,本例的目錄路徑為/usr/App/rabbitmq,然后在目錄下執行如下命令,下載安裝包!

  • 下載erlang
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
  • 下載socat
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
  • 下載rabbitMQ
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

最終目錄文件如下:

深入剖析 rabbitMQ

 

3.3、安裝軟件包

下載完之后,按順序依次安裝軟件包,這個很重要哦~

  • 安裝erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
  • 安裝socat
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
  • 安裝rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安裝完成之后,修改rabbitmq的配置,默認配置文件在
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

修改loopback_users節點的值!

深入剖析 rabbitMQ

 

最后只需通過如下命令,啟動服務即可!

rabbitmq-server start &

運行腳本之后,如果報錯,例如下圖!

深入剖析 rabbitMQ

 

解決辦法如下:

vim /etc/rabbitmq/rabbitmq-env.conf

在文件里添加一行,如下配置!

NODENAME=rabbit@localhost

然后,再保存!再次以下命令啟動服務!

rabbitmq-server start &

通過如下命令,查詢服務是否啟動成功!

lsof -i:5672

如果出現5672已經被監聽,說明已經啟動成功!

深入剖析 rabbitMQ

 

3.4、啟動可視化的管控臺

輸入如下命令,啟動控制臺!

rabbitmq-plugins enable rabbitmq_management

用瀏覽器打開http://ip:15672,這里的ip就是 CentOS 系統的 ip,結果如下:

深入剖析 rabbitMQ

 

賬號、密碼,默認為guest,如果出現無法訪問,檢測防火墻是否開啟,如果開啟將其關閉即可!

登錄之后的監控平臺,界面如下:

深入剖析 rabbitMQ

 

四、web界面使用

相比其他的消息隊列,rabbitMQ 其中一個很明顯的好處就是有 web 操作界面,而且簡單易用。

進入 web 管理界面之后,可以很清晰的看到分了 6 個菜單目錄,分別是:Overview、Connections、Channels、Exchanges、Queues、Admin

  • Overview:總覽概述,主要介紹 rabbitmq 一些基礎匯總等信息
深入剖析 rabbitMQ

 

  • Connections:連接池管理,主要介紹客戶端連接等信息
深入剖析 rabbitMQ

 

  • Channels:信道管理,主要介紹信道連接等信息
深入剖析 rabbitMQ

 

點擊具體某個具體的信道,可以看到對應的消費隊列等信息。

深入剖析 rabbitMQ

 

  • Exchanges:交換器管理,主要介紹交換器等信息
深入剖析 rabbitMQ

 

  • Queues:隊列管理,主要介紹隊列等信息
深入剖析 rabbitMQ

 

  • Admin:系統管理,主要介紹用戶、虛擬主機、權限等信息
深入剖析 rabbitMQ

 

下面,我們重點介紹一些如何通過 web 頁面來操作 rabbitMQ!

4.1、交換器管理

點擊進入 Exchanges 菜單,最下面有一個Add a new exchange標簽。

深入剖析 rabbitMQ

 

點擊Add a new exchange,會展示如下信息!

深入剖析 rabbitMQ

 

  • Name:交換器名稱
  • Type:交換器類型
  • Durability:是否持久化,Durable:持久化,Transient:不持久化
  • Auto delete:是否自動刪除,當最后一個綁定(隊列或者exchange)被unbind之后,該exchange 自動被刪除
  • Internal:是否是內部專用exchange,是的話,就意味著我們不能往該exchange里面發消息
  • Arguments:參數,是AMQP協議留給AMQP實現做擴展使用的

我們先新建一個名稱為hello-exchange,類型為direct的交換器,結果如下。

深入剖析 rabbitMQ

 

等會用于跟隊列關聯!

4.2、隊列管理

點擊進入 Queues 菜單,最下面也有一個Add a new queue標簽。

深入剖析 rabbitMQ

 

點擊標簽,即可進入添加隊列操作界面!

深入剖析 rabbitMQ

 

  • Name:隊列名稱
  • Durability:是否持久化,Durable:持久化,Transient:不持久化
  • Auto delete:是否自動刪除,是的話,當隊列內容為空時,會自動刪除隊列
  • Arguments:參數,是AMQP協議留給AMQP實現做擴展使用的

同樣的,新建一個名稱為hello-mq的消息隊列,結果如下。

深入剖析 rabbitMQ

 

隊列新建好了之后,繼續來建立綁定關系!

4.3、綁定管理

建立綁定關系,既可以從隊列進入也可以從交換器進入。

如果是從交換器進入,那么被關聯的對象就是隊列。

深入剖析 rabbitMQ

 

如果是從隊列進入,那么被關聯的對象就是交換器。

深入剖析 rabbitMQ

 

我們選擇從隊列入手,被綁定的交換器是hello-exchange,因為類型是direct,所以還需要填寫routing key。

深入剖析 rabbitMQ

 

建立完成之后,在交換器那邊也可以看到對應的綁定關系。

深入剖析 rabbitMQ

 

4.4、發送消息

最后,我們從交換器入手,選擇對應的交換器,點擊Publish message標簽,填寫對應的路由鍵 key,發送一下數據,查看數據是否發送到對應的隊列中。

深入剖析 rabbitMQ

 

然后點擊進入 Queues 菜單,查詢消息隊列基本情況。

深入剖析 rabbitMQ

 

然后選擇hello-mq消息隊列,點擊Get messages標簽,獲取隊列中的消息。

深入剖析 rabbitMQ

 

結果如下,可以很清晰的看到,消息寫入到隊列!

深入剖析 rabbitMQ

 

五、JAVA客戶端使用

RabbitMQ 支持多種語言訪問,本次介紹 RabbitMQ Java Client 的一些簡單的api使用,如聲明 Exchange、Queue,發送消息,消費消息,一些高級 api 會在后面的文章中詳細的說明。

5.1、引入 rabbitMQ 依賴包

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

5.2、連接服務器

使用給定的參數(host name,端口等等)連接AMQP的服務器。

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

也可以使用通過 URI 方式進行連接。

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

Connection(連接)接口可以被用作創建一個channel(管道),利用 channel(管道)可以進行發送和接收消息,在后面我們會頻繁使用到它。

Channel channel = conn.createChannel();

注意,管道使用之后,需要進行關閉。

channel.close();
conn.close();

5.3、創建交換器

不僅可以通過 web頁面進行創建交換器,還可以通過代碼進行聲明(創建的意思)交換器。

//創建exchange,類型是direct類型
channel.exchangeDeclare("ex-hello","direct");

//第三個參數表示是否持久化,同步操作,有返回值
AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("ex-hello","direct",true);
System.out.println(ok);

//創建帶屬性的交換器
Map<String,Object> argument = new HashMap<>();
argument.put("alternate-exchange","log");
channel.exchangeDeclare("ex-hello","direct",true,false,argument);

//異步創建exchange,沒有返回值
channel.exchangeDeclareNoWait("ex-hello","direct",true,false,false,argument);

///判斷exchange是否存在,存在的返回ok,不存在的exchange則報錯
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("ex-hello");
System.out.println(declareOk);


//刪除exchange(可重復執行),刪除一個不存在的也不會報錯
channel.exchangeDelete("ex-hello");

創建交換器參數解讀

  • 第一個參數:表示交換器名稱
  • 第二個參數:表示交換器類型
  • 第三個參數:表示是否持久化,為true表示會將隊列持久化存儲到硬盤
  • 第四個參數:表示是否自動刪除,當最后一個綁定(隊列或者exchange)被unbind之后,該exchange 自動被刪除
  • 第五個參數:表示設置參數,參數類型為Map<String, Object>

5.4、創建隊列

同樣的,也可以通過代碼進行聲明隊列。

//同步創建隊列
channel.queueDeclare(queueName, true, false, false, null);

//異步創建隊列沒有返回值
channel.queueDeclareNoWait(queueName,true,false,false,null);

//判斷queue是否存在,不存在會拋出異常
channel.exchangeDeclarePassive(queueName);

//刪除隊列
channel.queueDelete(queueName);

創建隊列參數解讀

  • 第一個參數:表示隊列名稱
  • 第二個參數:表示是否持久化,為true表示會將隊列持久化存儲到硬盤
  • 第三個參數:表示是否排它性,為true表示只對首次聲明它的連接可見,會在其連接斷開的時候自動刪除
  • 第四個參數:表示是否自動刪除,為true表示有過消費者并且所有消費者都解除訂閱了,自動刪除隊列
  • 第五個參數:表示設置參數,參數類型為Map<String, Object>

5.5、創建綁定

當交換器和隊列都創建成功之后,就可以建立綁定關系。

//交換器和隊列進行綁定(可重復執行,不會重復創建)
channel.queueBind(queueName, exchangeName, routingKey);

//異步進行綁定,最后一個參數表示可以帶自定義參數
channel.queueBindNoWait(queueName,exchangeName,routingKey,null);

//exchange和queue進行解綁(可重復執行)
channel.queueUnbind(queueName, exchangeName, routingKey);

//exchange與exchange進行綁定(可重復執行,不會重復創建)
//第一個參數表示目標交換器
//第二個參數表示原地址交換器
//第三個參數表綁定路由key
channel.exchangeBind(destination,source,routingKey);

//exchange和exchange進行解綁(可重復執行)
channel.exchangeUnbind(destination,source,routingKey);

綁定關系參數解讀

  • queueName:隊列名稱,取自創建的隊列名稱
  • exchangeName:交換器,取自創建的交換器名稱
  • routingKey:路由鍵key,自定義

5.6、發送消息

發送消息到交換器就會使用我們上文所提到的channel管道。

//發送的消息內容
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

也可以在發送消息前設定一些消息屬性。

//自己構建BasicProperties的對象
channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .contentType("text/plain")
               .deliveryMode(2)
               .priority(1)
               .userId("zhangsan")
               .build()),
               messageBodyBytes);

發送指定頭信息的消息。

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("userName",  '"zhangsan');
headers.put("userCode", "123");

//發送消息到交換器
channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

發送一個有過期時間的消息,單位:ms。

//設置消息過期時間,單位ms
channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("6000")
               .build()),
               messageBodyBytes);

更多用法,可以參見官方 API

5.7、接受消息

從消息隊列中接受消息也會使用我們上文所提到的channel管道。

//監聽隊列中的消息
channel.basicConsume(queueName,true,new SimpleConsumer(channel));

監聽隊列消息參數解讀

  • 第一個參數:表示需要監聽的隊列名稱
  • 第二個參數:表示是否自動確認,如果配置false表示手動確認消息是否收到
  • 第三個參數:表示消息處理類

具體的消息處理類需要繼承DefaultConsumer,并重寫handleDelivery方法,代碼如下:

public class SimpleConsumer extends DefaultConsumer{

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  //接受從隊列中發送的消息
        System.out.println(consumerTag);
        System.out.println("-----收到消息了---------------");
        System.out.println("消息屬性為:"+properties);
        System.out.println("消息內容為:"+new String(body));
    }
}

如果是手工確認消息,需要在handleDelivery方法中進行相關的確認,代碼如下:

//手動確認
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

5.8、完整demo

5.8.1、發送消息

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
        //連接RabbitMQ服務器
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("197.168.24.206");
        factory.setPort(5672);
        //創建一個連接
        Connection conn = factory.newConnection();
        //獲得信道
        Channel channel = conn.createChannel();
        //聲明交換器
        channel.exchangeDeclare("ex-hello","direct");
        //發送的消息內容
        byte[] messageBodyBytes = "Hello, world!".getBytes();
        channel.basicPublish("ex-hello", "route-hello", null, messageBodyBytes);
        //關閉通道
        channel.close();
        conn.close();
    }
}

5.8.2、接受消息

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //連接RabbitMQ服務器
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("197.168.24.206");
        factory.setPort(5672);
        //創建一個連接
        Connection conn = factory.newConnection();
        //獲得信道
        Channel channel = conn.createChannel();
        //聲明隊列
        channel.queueDeclare("queue-hello", true, false, false, null);
        //聲明綁定
        channel.queueBind("queue-hello", "ex-hello", "route-hello");

        //監聽隊列中的消息
        channel.basicConsume("queue-hello",true,new SimpleConsumer(channel));

        TimeUnit.SECONDS.sleep(10);

        channel.close();
        conn.close();
    }
}

消息處理類SimpleConsumer

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //接受從隊列中發送的消息
        System.out.println(consumerTag);
        System.out.println("-----收到消息了---------------");
        System.out.println("消息屬性為:"+properties);
        System.out.println("消息內容為:"+new String(body));
    }
}

消息發送成功之后,啟動消費者,輸出結果如下:

深入剖析 rabbitMQ

 

六、總結

整篇文章主要介紹了 RabbitMQ 內部結構、安裝步驟、使用教程,以及 java 客戶端使用等內容,內容比較長,限于筆者的才疏學淺,對本文內容可能還有理解不到位的地方,如有闡述不合理之處還望留言一起探討。

分享到:
標簽:rabbitMQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定