异步Rust:构建实时消息代理服务器


在本文中,我们将深入研究使用Rust构建实时消息代理服务器,展示其强大的并发特性。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务。此外,我们将创建一个WebSocket客户端来测试代理服务器的功能。

设计图如下:

异步Rust:构建实时消息代理服务器图片

构建消息代理服务器

消息代理服务器允许客户端为主题生成事件并订阅它们。它使用Warp作为HTTP和WebSocket服务器,使用Tokio作为异步运行时。

使用以下命令创建一个Rust项目:

 cargo new real-ime-message

在Cargo.toml文件中加入以下依赖项:

 [dependencies] futures-util = "0.3.30" tokio = {version = "1.35.1", features = ["full"]} tokio-tungstenite = "0.21.0" url = "2.5.0" warp = "0.3.6"

在src/mAIn.rs文件中定义一个Broker结构体:

 use std::{     collections::{HashMap, VecDeque},     sync::Arc, };  use futures_util::{SinkExt, StreamExt}; use tokio::sync::{     mpsc::{self, UnboundedSender},     RwLock, }; use warp::{filters::ws::Message, Filter};  type Topic = String; type Event = String; type WsSender = UnboundedSender<warp::ws::Message>;  struct Broker {     events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,     subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>, }
  • events:存储每个主题的事件。
  • subscribers:跟踪每个主题的订阅者。

创建一个新的Broker实例:

 impl Broker {     fn new() -> Self {         Broker {             events: Arc::new(RwLock::new(HashMap::new())),             subscribers: Arc::new(RwLock::new(HashMap::new())),         }     } }

定义发布事件的方法produce:

 impl Broker {     ......      async fn produce(&self, topic: Topic, event: Event) {         let mut events = self.events.write().await;         events             .entry(topic.clone())             .or_default()             .push_back(event.clone());          // 异步通知所有订阅者         let subscribers_list;         {             let subscribers = self.subscribers.read().await;             subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();         }          for ws_sender in subscribers_list {             // 将事件发送到WebSocket客户端             let _ = ws_sender.send(warp::ws::Message::text(event.clone()));         }     } }

这个方法主要是将事件添加到相应的主题,然后将新事件通知所有订阅者。

定义subscribe方法,来管理新的订阅:

 impl Broker {     ......      pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {         let (ws_sender, mut ws_receiver) = socket.split();          let (tx, mut rx) = mpsc::unbounded_channel::<Message>();          {             let mut subs = self.subscribers.write().await;             subs.entry(topic).or_default().push(tx);         }          tokio::task::spawn(async move {             while let Some(result) = ws_receiver.next().await {                 match result {                     Ok(message) => {                         // 处理有效的消息                         if message.is_text() {                             println!(                                 "Received message from client: {}",                                 message.to_str().unwrap()                             );                         }                     }                     Err(e) => {                         // 处理错误                         eprintln!("WebSocket error: {:?}", e);                         break;                     }                 }             }             println!("WebSocket connection closed");         });          tokio::task::spawn(async move {             let mut sender = ws_sender;              while let Some(msg) = rx.recv().await {                 let _ = sender.send(msg).await;             }         });     } }

这个方法主要是将WebSocket拆分为发送方和接收方,将订阅者添加到订阅者列表中,处理传入的WebSocket消息。

main函数代码如下:

 #[tokio::main] async fn main() {     let broker = Arc::new(Broker::new());     let broker_clone1 = Arc::clone(&broker);     let broker_clone2 = Arc::clone(&broker);      let produce = warp::path!("produce" / String)         .and(warp::post())         .and(warp::body::json())         .and(warp::any().map(move || Arc::clone(&broker_clone1)))         .and_then(             move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move {                 broker_clone2.produce(topic, event).await;                 Ok::<_, warp::Rejection>(warp::reply())             },         );      let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(         move |topic: String, ws: warp::ws::Ws| {             let broker_clone3 = Arc::clone(&broker_clone2);             ws.on_upgrade(move |socket| async move {                 broker_clone3.subscribe(topic.clone(), socket).await;             })         },     );      let routes = produce.or(subscribe);      println!("Broker server running at http://127.0.0.1:3030");     warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; }

实现WebSocket客户端

WebSocket客户端将模拟一个订阅主题和接收消息的真实用户。

在src/bin目录下,创建一个ws_cli.rs文件。在文件中定义websocket_client函数,建立WebSocket连接并管理消息:

 use futures_util::{sink::SinkExt, stream::StreamExt}; use std::sync::Arc; use tokio::sync::RwLock; use tokio::time::{sleep, Duration}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use url::Url;  async fn websocket_client(topic_url: &str) {     // 解析要连接WebSocket服务器的URL     let url = Url::parse(topic_url).expect("Invalid URL");      // 连接到WebSocket服务器     let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");      println!("WebSocket client connected");      let (mut write, mut read) = ws_stream.split();     let message = Arc::new(RwLock::new(String::new()));     let message_1 = message.clone();     // 生成一个任务来处理传入的消息     tokio::spawn(async move {         let msg_lock = message_1.clone();         while let Some(message) = read.next().await {             match message {                 Ok(msg) => {                     let mut ms = msg_lock.write().await;                     *ms = msg.to_text().unwrap().to_string();                     println!("Received message: {}", msg.to_text().unwrap());                 }                 Err(e) => {                     eprintln!("Error receiving message: {:?}", e);                     break;                 }             }         }     });      // 发送消息     loop {         let msg_lock = message.clone();         let ms = msg_lock.read().await;         if let Err(e) = write.send(Message::Text(ms.to_string())).await {             eprintln!("Error sending message: {:?}", e);             break;         }         sleep(Duration::from_secs(5)).await;     } }

main函数代码如下:

 #[tokio::main] async fn main() {     websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await; }

测试

执行如下命令运行消息代理服务器:

 cargo run --bin real-ime-message

执行结果:

 Broker server running at http://127.0.0.1:3030

然后打开一个新的命令行,执行如下命令运行WebSocket客户端:

 cargo run --bin ws_cli

执行结果:

 WebSocket client connected

向http://127.0.0.1:3030/produce/newtopic接口发送post请求,如图:

异步Rust:构建实时消息代理服务器图片

客户端接收到消息:

 WebSocket client connected Received message: This is a new event

总结

我们已经探索了在Rust中创建一个简单的消息代理,并使用WebSocket客户端对其进行测试。这个例子突出了Rust在构建高效、并发的网络应用程序方面的能力。

本文收集自互联网,如果发现有涉嫌侵权或违法违规的内容,请联系6532516@qq.com以便进行及时清除
分享到