日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

在本文中,我們將深入研究使用Rust構(gòu)建實(shí)時(shí)消息代理服務(wù)器,展示其強(qiáng)大的并發(fā)特性。我們將使用Warp作為web服務(wù)器,并使用Tokio來管理異步任務(wù)。此外,我們將創(chuàng)建一個(gè)WebSocket客戶端來測(cè)試代理服務(wù)器的功能。

設(shè)計(jì)圖如下:

圖片圖片

構(gòu)建消息代理服務(wù)器

消息代理服務(wù)器允許客戶端為主題生成事件并訂閱它們。它使用Warp作為HTTP和WebSocket服務(wù)器,使用Tokio作為異步運(yùn)行時(shí)。

使用以下命令創(chuàng)建一個(gè)Rust項(xiàng)目:

cargo new real-ime-message

在Cargo.toml文件中加入以下依賴項(xiàng):

[dependencies]
futures-util = "0.3.30"
tokio = {version = "1.35.1", features = ["full"]}
tokio-tungstenite = "0.21.0"
url = "2.5.0"
warp = "0.3.6"

在src/mAIn.rs文件中定義一個(gè)Broker結(jié)構(gòu)體:

use std::{
    collections::{HashMap, VecDeque},
    sync::Arc,
};

use futures_util::{SinkExt, StreamExt};
use tokio::sync::{
    mpsc::{self, UnboundedSender},
    RwLock,
};
use warp::{filters::ws::Message, Filter};

type Topic = String;
type Event = String;
type WsSender = UnboundedSender<warp::ws::Message>;

struct Broker {
    events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,
    subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,
}
  • events:存儲(chǔ)每個(gè)主題的事件。
  • subscribers:跟蹤每個(gè)主題的訂閱者。

創(chuàng)建一個(gè)新的Broker實(shí)例:

impl Broker {
    fn new() -> Self {
        Broker {
            events: Arc::new(RwLock::new(HashMap::new())),
            subscribers: Arc::new(RwLock::new(HashMap::new())),
        }
    }
}

定義發(fā)布事件的方法produce:

impl Broker {
    ......

    async fn produce(&self, topic: Topic, event: Event) {
        let mut events = self.events.write().await;
        events
            .entry(topic.clone())
            .or_default()
            .push_back(event.clone());

        // 異步通知所有訂閱者
        let subscribers_list;
        {
            let subscribers = self.subscribers.read().await;
            subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
        }

        for ws_sender in subscribers_list {
            // 將事件發(fā)送到WebSocket客戶端
            let _ = ws_sender.send(warp::ws::Message::text(event.clone()));
        }
    }
}

這個(gè)方法主要是將事件添加到相應(yīng)的主題,然后將新事件通知所有訂閱者。

定義subscribe方法,來管理新的訂閱:

impl Broker {
    ......

    pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
        let (ws_sender, mut ws_receiver) = socket.split();

        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();

        {
            let mut subs = self.subscribers.write().await;
            subs.entry(topic).or_default().push(tx);
        }

        tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                    Ok(message) => {
                        // 處理有效的消息
                        if message.is_text() {
                            println!(
                                "Received message from client: {}",
                                message.to_str().unwrap()
                            );
                        }
                    }
                    Err(e) => {
                        // 處理錯(cuò)誤
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                    }
                }
            }
            println!("WebSocket connection closed");
        });

        tokio::task::spawn(async move {
            let mut sender = ws_sender;

            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
        });
    }
}

這個(gè)方法主要是將WebSocket拆分為發(fā)送方和接收方,將訂閱者添加到訂閱者列表中,處理傳入的WebSocket消息。

main函數(shù)代碼如下:

#[tokio::main]
async fn main() {
    let broker = Arc::new(Broker::new());
    let broker_clone1 = Arc::clone(&broker);
    let broker_clone2 = Arc::clone(&broker);

    let produce = warp::path!("produce" / String)
        .and(warp::post())
        .and(warp::body::json())
        .and(warp::any().map(move || Arc::clone(&broker_clone1)))
        .and_then(
            move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move {
                broker_clone2.produce(topic, event).await;
                Ok::<_, warp::Rejection>(warp::reply())
            },
        );

    let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(
        move |topic: String, ws: warp::ws::Ws| {
            let broker_clone3 = Arc::clone(&broker_clone2);
            ws.on_upgrade(move |socket| async move {
                broker_clone3.subscribe(topic.clone(), socket).await;
            })
        },
    );

    let routes = produce.or(subscribe);

    println!("Broker server running at http://127.0.0.1:3030");
    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}

實(shí)現(xiàn)WebSocket客戶端

WebSocket客戶端將模擬一個(gè)訂閱主題和接收消息的真實(shí)用戶。

在src/bin目錄下,創(chuàng)建一個(gè)ws_cli.rs文件。在文件中定義websocket_client函數(shù),建立WebSocket連接并管理消息:

use futures_util::{sink::SinkExt, stream::StreamExt};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{sleep, Duration};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use url::Url;

async fn websocket_client(topic_url: &str) {
    // 解析要連接WebSocket服務(wù)器的URL
    let url = Url::parse(topic_url).expect("Invalid URL");

    // 連接到WebSocket服務(wù)器
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    println!("WebSocket client connected");

    let (mut write, mut read) = ws_stream.split();
    let message = Arc::new(RwLock::new(String::new()));
    let message_1 = message.clone();
    // 生成一個(gè)任務(wù)來處理傳入的消息
    tokio::spawn(async move {
        let msg_lock = message_1.clone();
        while let Some(message) = read.next().await {
            match message {
                Ok(msg) => {
                    let mut ms = msg_lock.write().await;
                    *ms = msg.to_text().unwrap().to_string();
                    println!("Received message: {}", msg.to_text().unwrap());
                }
                Err(e) => {
                    eprintln!("Error receiving message: {:?}", e);
                    break;
                }
            }
        }
    });

    // 發(fā)送消息
    loop {
        let msg_lock = message.clone();
        let ms = msg_lock.read().await;
        if let Err(e) = write.send(Message::Text(ms.to_string())).await {
            eprintln!("Error sending message: {:?}", e);
            break;
        }
        sleep(Duration::from_secs(5)).await;
    }
}

main函數(shù)代碼如下:

#[tokio::main]
async fn main() {
    websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await;
}

測(cè)試

執(zhí)行如下命令運(yùn)行消息代理服務(wù)器:

cargo run --bin real-ime-message

執(zhí)行結(jié)果:

Broker server running at http://127.0.0.1:3030

然后打開一個(gè)新的命令行,執(zhí)行如下命令運(yùn)行WebSocket客戶端:

cargo run --bin ws_cli

執(zhí)行結(jié)果:

WebSocket client connected

向http://127.0.0.1:3030/produce/newtopic接口發(fā)送post請(qǐng)求,如圖:

圖片圖片

客戶端接收到消息:

WebSocket client connected
Received message: This is a new event

總結(jié)

我們已經(jīng)探索了在Rust中創(chuàng)建一個(gè)簡(jiǎn)單的消息代理,并使用WebSocket客戶端對(duì)其進(jìn)行測(cè)試。這個(gè)例子突出了Rust在構(gòu)建高效、并發(fā)的網(wǎng)絡(luò)應(yīng)用程序方面的能力。

分享到:
標(biāo)簽:Rust
用戶無頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定