coordinate-connector 架构设计
1. 系统概述coordinate-connector 是 Coordinate 消息系统中的 MQTT 客户端 SDK基于rumqttc进行二次开发采用 Rust 异步运行时tokio实现。它为应用程序提供简洁的 API 用于连接 MQTT Broker、发布消息和订阅主题。作为 coordinate-broadcast 的配套客户端库coordinate-connector 主要用于 coordinate-server 与广播组件之间的通信同时也支持外部应用接入。从功能定位角度来看coordinate-connector 主要承担三类职责第一是作为 MQTT 客户端封装底层协议细节提供简洁的异步 API第二是作为状态管理器维护连接状态、QoS 流程控制和 packet id 分配第三是作为传输层抽象支持 TCP、TLS、WebSocket 等多种传输方式。系统设计遵循高性能、低延迟的原则使用预分配数据结构和事件驱动模式整体能够支撑高并发的消息处理。2. 系统架构2.1 整体架构图coordinate-broadcastcoordinate-connectorEventLoop - 事件循环Client - 高层抽象MQTTTCP/TLS/WSpublish/subscribeTransport - 传输层TCPTLSWebSocketWSSProtocol - 协议层v5 Extendedv5 Lite应用程序Client APIClientMqttOptionsEventLoopMqttStateBroadcast ServerProtocolTransport2.2 模块职责划分coordinate-connector 采用模块化的架构设计每个模块承担独立的职责通过清晰定义的接口进行模块间交互。这种设计使得各模块可以独立演进同时便于测试和维护。系统主要包含以下五个核心模块client 模块负责提供高层的异步客户端抽象是应用程序使用的主要入口点。它封装了请求通道Channel提供了 publish、subscribe、ack、disconnect 等简洁的异步方法。client 模块的设计遵循简单易用原则隐藏了底层的事件循环细节应用程序只需关注业务逻辑。eventloop 模块是系统的核心事件循环引擎负责驱动整个客户端的运行。它管理网络连接、处理入站和出站数据包、维护 Keep-alive 机制。EventLoop 使用 tokio 的 select 模式实现异步事件处理能够高效地处理并发请求和网络 I/O。state 模块负责维护 MQTT 连接状态包括飞行中消息inflight管理、packet id 分配、QoS 流程控制、会话恢复等。state 模块是 MQTT 协议实现的核心支持确保消息传递的可靠性。protocol 模块实现了 MQTT 协议的解析与序列化功能。该模块根据 Cargo feature 决定使用不同协议版本v5 Extended包含 v0 feature用于 coordinate-server 连接 broadcast在内网使用是 MQTT v5 完整协议并扩展 AddSubscribe/RemoveSubscribe 消息类型支持动态订阅管理v5 Lite默认用于外部客户端连接 broadcast 接收消息是 MQTT v5 的裁剪集合仅包含连接管理消息Connect/ConnAck/Ping/Disconnectv5 Extended 扩展功能AddSubscribe服务端代客户端订阅主题RemoveSubscribe服务端代客户端取消订阅transport 模块负责底层网络传输支持 TCP、TCPTLS、WebSocket、WebSocketTLS 四种传输方式。传输层抽象使得客户端可以灵活适应不同的网络环境。3. 核心组件设计3.1 MqttOptions 配置MqttOptions 是客户端连接的核心配置类提供了丰富的配置选项pubstructMqttOptions{broker_addr:String,// Broker 地址port:u16,// 端口transport:Transport,// 传输层类型keep_alive:Duration,// 保活间隔clean_start:bool,// 清理会话标志client_id:String,// 客户端标识credentials:OptionLogin,// 认证凭据last_will:OptionLastWill,// 遗嘱消息// ... 更多选项}配置示例letmutoptionsMqttOptions::new(client-id,broker-host,21884);options.set_keep_alive(Duration::from_secs(30)).set_clean_start(true).set_credentials(user,password).set_last_will(LastWill::new(will-topic,msg,QoS::AtLeastOnce,false,None));3.2 Client 客户端Client 是面向应用程序的高层 API提供了简洁的异步接口pubstructClient{request_tx:SenderRequest,}// 创建客户端和事件循环let(client,muteventloop)Client::new(options,cap);// 消息循环loop{matcheventloop.poll().await{Ok(Event::Incoming(Incoming::Publish(publish))){// 处理接收到的消息}Ok(Event::Outgoing(_)){/* 发送确认 */}Err(e)break,}}3.3 EventLoop 事件循环EventLoop 是系统的核心引擎管理连接和事件处理pubstructEventLoop{puboptions:MqttOptions,pubstate:MqttState,requests_rx:ReceiverRequest,pending:VecDequeRequest,network:OptionNetwork,keepalive_timeout:OptionPinBoxSleep,}implEventLoop{pubasyncfnpoll(mutself)-ResultEvent,ConnectionError{// 连接建立、数据收发、Keep-alive 处理}}3.4 MqttState 状态管理MqttState 维护 MQTT 连接状态pubstructMqttState{pubawait_pingresp:bool,publast_pkid:u16,pubinflight:u16,puboutgoing_pub:VecOptionPublish,pubincoming_pub:FixedBitSet,pubevents:VecDequeEvent,pubmanual_acks:bool,pubbroker_topic_alias_max:u16,pubmax_outgoing_inflight:u16,}4. 关键技术实现4.1 客户端创建与连接// 创建客户端和事件循环let(client,muteventloop)Client::new(options,10000);// 使用方式loop{matcheventloop.poll().await{Ok(Event::Incoming(Incoming::Publish(publish))){println!(Received: {:?},publish.topic);}Ok(Event::Outgoing(_)){}Err(e)break,}}4.2 消息发布// 异步发布client.publish(topic/test,QoS::AtLeastOnce,false,payload).await?;// 非阻塞发布client.try_publish(topic,QoS::AtMostOnce,true,data)?;4.3 消息订阅// 订阅单个主题client.subscribe(home//temperature,QoS::AtLeastOnce).await?;// 批量订阅client.subscribe_many(vec![Filter::new(topic1,QoS::AtMostOnce),Filter::new(topic2,QoS::AtLeastOnce),]).await?;// 非阻塞订阅client.try_subscribe(topic,QoS::AtMostOnce)?;4.4 传输层系统支持四种传输层类型pubenumTransport{Tcp,// 普通 TCPTls(TlsConfiguration),// TLS 加密Ws,// WebSocketWss(TlsConfiguration),// WebSocket TLS}4.5 TLS 配置// 使用默认配置Transport::tls_with_default_config()// 自定义配置Transport::tls(ca:Vecu8,client_auth:Option(Vecu8,Vecu8),alpn:OptionVecVecu8,)5. 与 coordinate-broadcast 的交互5.1 连接配置coordinate-server 通过 coordinate-connector 连接 broadcast 服务[broadcast] host 192.168.31.195 port 21884 username password endpoint ws://192.168.31.195:80005.2 连接流程asyncfnconnect(options:mutMqttOptions)-Result(Network,ConnAck),ConnectionError{// 1. 建立网络连接let(network,connack)timeout(Duration::from_secs(self.options.connection_timeout()),connect(mutself.options),).await??;// 2. MQTT 握手network.write(Packet::Connect(...)).await?;network.flush().await?;matchnetwork.read().await?{Incoming::ConnAck(connack)ifconnack.codeSuccessOk(connack),_Err(ConnectionError::ConnectionRefused),}}5.3 消息流coordinate-server → Client.publish() → EventLoop → MqttState → Network.write() → coordinate-broadcast coordinate-broadcast → Network.read() → EventLoop.poll() → Event::Incoming → 应用程序6. 配置设计6.1 Cargo.toml Features[features] default [] use-rustls [use-rustls-no-provider, tokio-rustls/default] use-rustls-no-provider [dep:tokio-rustls, dep:rustls-webpki] use-native-tls [dep:tokio-native-tls, dep:native-tls] websocket [dep:async-tungstenite, dep:ws_stream_tungstenite] v0 []6.2 MqttOptions 配置项配置项说明broker_addrBroker 地址port端口transport传输层类型keep_alive保活间隔clean_start清理会话client_id客户端标识credentials认证凭据last_will遗嘱消息manual_acks手动确认模式6.3 NetworkOptions 配置项pubstructNetworkOptions{tcp_send_buffer_size:Optionu32,tcp_recv_buffer_size:Optionu32,tcp_nodelay:bool,conn_timeout:u64,}7. 设计模式总结7.1 架构模式系统采用以下架构模式实现高性能和高可用性生产者-消费者模式Client 作为生产者将请求放入通道EventLoop 作为消费者从通道取出请求进行处理。通道提供了高效的异步通信机制。事件驱动模式EventLoop 基于 tokio 的 select 模式实现异步事件处理能够高效地处理并发请求和网络 I/O。状态机模式MqttState 作为状态机管理 MQTT 连接的各种状态包括连接中、已连接、断开等。7.2 扩展性设计系统提供了良好的扩展性支持通过 Cargo features 可以选择不同的协议版本和传输层通过 TlsConfiguration 可以自定义 TLS 行为通过 MqttOptions 可以灵活配置连接参数。8. 技术规格指标规格支持协议v5 Extended含 v0 feature, v5 Lite默认传输层TCP, TLS, WebSocket, WSSQoS 级别0, 1依赖 Runtimetokio异步模型futures-channel9. 使用示例9.1 基本使用usecoordinate_connector::{Client,Event,MqttOptions,QoS};usestd::time::Duration;#[tokio::main]asyncfnmain()-Result(),Boxdynstd::error::Error{letmutoptionsMqttOptions::new(test-client,localhost,21884);options.set_keep_alive(Duration::from_secs(30));options.set_clean_start(true);let(client,muteventloop)Client::new(options,10000);// 订阅主题client.subscribe(home/temperature,QoS::AtLeastOnce).await?;// 消息循环loop{tokio::select!{eventeventloop.poll(){matchevent?{Event::Incoming(coordinate_connector::Incoming::Publish(publish))){println!(Received: {},publish.topic);}_{}}}}}}9.2 发布消息// 发布消息client.publish(home/temperature,QoS::AtLeastOnce,false,25.5).await?;// 批量订阅client.subscribe_many(vec![coordinate_connector::Filter::new(home/temperature,QoS::AtLeastOnce),coordinate_connector::Filter::new(home/humidity,QoS::AtLeastOnce),]).await?;