著 5G 時代的來臨,萬物互聯的偉大構想正在成為現實。聯網的 物聯網設備 在 2018 年已經達到了 70 億1,在未來兩年,僅智能水電氣表就將超過10億2。
海量的設備接入和設備管理對網絡帶寬、通信協議以及平臺服務架構都帶來了很大挑戰。對于 物聯網協議 來說,必須針對性地解決物聯網設備通信的幾個關鍵問題:其網絡環境復雜而不可靠、其內存和閃存容量小、其處理器能力有限。
MQTT 協議 是基于發布/訂閱模式的物聯網通信協議,憑借簡單易實現、支持 QoS、報文小等特點,占據了物聯網協議的半壁江山:
MQTT was created by Andy Stanford-Clark of IBM, and Arlen Nipper (then of Arcom Systems, later CTO of Eurotech).3
據 Arlen Nipper 在一 IBM Podcast 上的自述,MQTT 原名是 MQ TT, 注意 MQ 與 TT之間的空格,其全稱為: MQ Telemetry Transport,是九十年代早期,他在參與 Conoco Phillips 公司的一個原油管道數據采集監控系統(pipeline SCADA system)時,開發的一個實時數據傳輸協議。它的目的在于讓傳感器通過帶寬有限的 VSAT ,與 IBM 的 MQ Integrator 通信。由于 Nipper 是遙感和數據采集監控專業出身,所以按業內慣例給了個 MQ TT 的名字。
按照 Nipper 的介紹,MQTT 必須簡單容易實現,必須支持 QoS(設備網絡環境復雜),必須輕量且省帶寬(因為那時候帶寬很貴),必須數據無關(不關心 Payload 數據格式),必須有持續地會話感知能力(時刻知道設備是否在線)。下面將介紹 MQTT (3.1.1 版本) 的幾個核心特色,分別對應了這幾個設計原則的實現。
發布訂閱模式是傳統 Client/Server 模式的一種解耦方案。發布者通過 Broker 與消費者之間通信,Broker 的作用是將收到的消息通過某種過濾規則,正確地發送給消費者。發布/訂閱模式 相對于 客戶端/服務器模式 的好處在于:
在 MQTT 協議里,上面提到的 過濾規則 是 Topic。比如:所有發布到 news 這個 Topic 的消息,都會被 Broker 轉發給已經訂閱了 news 的訂閱者:
上圖中訂閱者預先訂閱了 news,然后發布者向 Broker 發布了一條消息 "some msg" 并指定發布到 news 主題,Broker 通過 Topic 匹配,決定將這條消息轉發給訂閱者。
MQTT 的 Topic 有層級結構,并且支持通配符 + 和 #:
MQTT 的主題是不要預先創建的,發布者發送消息到某個主題、或者訂閱者訂閱某個主題的時候,Broker 就會自動創建這個主題。
MQTT 協議將協議本身占用的額外消耗最小化,消息頭部最小只需要占用 2 個字節。
MQTT 的消息格式分三部分:
固定長度頭部,2 個字節,所有消息類型里都有可變長度頭部,只有某些消息類型里有Payload,只有某些消息類型里有
MQTT 的主要消息類型有:
其中 PINGREQ / PINGRESP 和 DISCONNECT 報文是不需要可變頭部的,也沒有 Payload,也就是說它們的報文大小僅僅消耗 2 個字節。
在 CONNECT 報文的可變長度頭部里,有個 Protocol Version 的字段。為了節省空間,只有一個字節。所以版本號不是按照字符串 "3.1.1" 存放的,而是使用數字 4 來表示 3.1.1 版本。
為適應設備不同的網絡環境,MQTT 設計了 3 個 QoS 等級,0, 1, 2:
QoS 0 是一種 "fire and forget" 的消息發送模式:Sender (可能是 Publisher 或者 Broker) 發送一條消息之后,就不再關心它有沒有發送到對方,也不設置任何重發機制。
QoS 1 包含了簡單的重發機制,Sender 發送消息之后等待接收者的 ACK,如果沒收到 ACK 則重新發送消息。這種模式能保證消息至少能到達一次,但無法保證消息重復。
QoS 2 設計了略微復雜的重發和重復消息發現機制,保證消息到達對方并且嚴格值到達一次。
MQTT 沒有假設設備或 Broker 使用了 TCP 的保活機制4,而是設計了協議層的保活機制:在 CONNECT 報文里可設置 Keepalive 字段,來設置保活心跳包 PINGREQ/PINGRESP 的發送時間間隔。當長時間無法收到設備的 PINGREQ 的時候,Broker 就會認為設備已經下線。
總的來說,Keepalive 有兩個作用:
對于那些想要在重新上線后,重新收到離線期間錯過的消息的設備,MQTT 設計了持久化連接:在 CONNECT 報文里可設置 CleanSession 字段為 False,則 Broker 會為終端存儲:
MQTT 設計了遺愿(Last Will) 消息,讓 Broker 在發現設備異常下線的情況下,幫助設備發布一條遺愿消息到指定的主題。
實際上在某些 MQTT 服務器的實現里 (比如 EMQ X),設備上線或下線的時候 Broker 會通過某些系統主題發布設備狀態更新,更符合實際應用場景。
到目前為止,比較流行的開源 MQTT 服務器有幾個:
從支持 MQTT 5.0、穩定性、擴展性、集群能力等方面考慮,EMQ X 的表現應該是最好的:
EMQ X MQTT 物聯網云服務 提供了一個在線的公共 MQTT 5.0 服務器,不需要任何安裝您就可以快速開始 MQTT 協議的學習、測試或原型制作。
該 MQTT 服務器的詳細接入信息請見 EMQ 官網頁面:免費的在線 MQTT 服務器。
EMQ 也提供了支持瀏覽器訪問的 MQTT 在線客戶端工具,該工具支持通過普通或者加密的 WebSocket 端口連接至 MQTT 服務器,同時也支持緩存連接方便下次訪問使用。
想一想都快兩年沒有編寫代碼,這公眾號也快有兩年沒更新了,人閑久了總想找些活干,為了保持代碼編寫技能于是把之前寫的MQTT協議擴展出一個網關服務,并實現對3.X和5.0協議版本的支持。作為一個服務網關在性能上也是有著一定的要求,其實現目標能支持數十萬的消息訂閱轉發。
簡介
項目是基于BeetleX通訊組件擴展的MQTT協議通訊服務,服務包括兩大模塊分別是:BeetleX.MQTT.Protocols和BeetleX.MQTT.Server。前者是對MQTT通訊協議的實現,分別實現了V3.X和V5.0兩個版本;而后者則是網關服務的實現并集成了基礎的管理界面,兩個項目都是都提供了完整的實現代碼并存放在Gitub上。
項目開源地址:github.com/beetlex-io/mqtt
運行項目
項目的運行非常簡單,只需要創建一個控制臺項目并在Main方法中添加以下代碼即可以運行
class Program
{
private static MQTTServer mServer;
static void Main(string[] args)
{
mServer = new MQTTServer(ProtocolType.V3);
mServer.RegisterComponent<BeetleX.MQTT.Server.Controller>();
mServer.MQTTListen(o =>
{
o.DefaultListen.Port = 8089;
//o.DefaultListen.SSL = true;
//o.DefaultListen.CertificateFile = "";
//o.DefaultListen.CertificatePassword = "";
})
.Setting(o =>
{
o.LogToConsole = true;
o.Port = 80;
o.LogLevel = EventArgs.LogType.Info;
})
.UseJWT()
.UseEFCore<Storages.MQTTDB>()
.UseElement(PageStyle.ElementDashboard)
.Initialize((http, vue, resoure) =>
{
resoure.AddAssemblies(typeof(BeetleX.MQTT.Server.MQTTUser).Assembly);
resoure.AddCss("website.css");
resoure.AddScript("echarts.js");
vue.Debug();
})
.Run();
}
}
以上代碼在80端口上打開WEB管理服務,在8089端口上打開MQTT服務;服務啟動后就可以通過瀏覽器進入到簡單的管理界面。在協議版本選擇上可以在MQTTServer創建時指定V3或V5(暫時不能同一端口服務同時支持V3和V5)。
首頁
用戶管理
設備管理
管理界面只推薦簡單的帳號管理和轉發統計,基礎框架已經搭建完成,可以根據實際需求進行擴展開發。
單獨使用協議分析器
如果使用其他網絡服務組件又不想自己編寫MQTT協議,那可以單獨使用BeetleX.MQTT.Protocols對網絡數據進行協議分析。組件是基于Stream數據流規范開發,只需要傳對一個標準的Stream數據流即可以完成MQTT協議讀取和寫入。
//v5
var mqttparse = new BeetleX.MQTT.Protocols.V5.MQTTParseV5();
mqttparse.Read(stream, );
mqttparse.Write(msg, stream, );
//v3.x
var mqttparse = new BeetleX.MQTT.MQTTParseV3();
mqttparse.Read(stream, );
mqttparse.Write(msg, stream, );
性能:由于MQTT是支持通配符訂閱的,高并發時大量消息在訂閱匹配上往往會比較損耗性能;組件在這方面做了特別的優化,對于通配符訂閱上也能非常輕松地應對每秒上10萬訂閱轉發(具體上限取決于硬件搭配)。
提醒:由于項目僅個人興趣編寫,并沒有在自有商業項目中使用,因此會存在一定的問題;如果碰到問題可以去Github對應的項目上提出相應的問題。
BeetleX
開源跨平臺通訊框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服務網關開源組件
http://beetlex-io.com
應用示例使用Coolpy7作為Mqtt服務器并啟用Websocket代理完美支持高并發大流量即時通過能力,本示以即時通信聊天為為例。還可以應用到其他軟件應用如:網頁客服系統、網站信息通知、網頁即時通信系統、網頁游戲等等
技術應用架構簡介
系統架構包括:
安裝并運行
運行Coolpy7核心服務
Coolpy7核心服務是一個最原始最單純功能完備的MQTT消息服務器端,包括功能有:QoS:0,QoS1,QoS2消息質量支持。Will消息支持等等。深入了解 https://mcxiaoke.gitbooks.io/mqtt-cn/content/
通過ssh進入服務器192.168.200.201,并確保你已經按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服務器操作系統的網絡優化配置。
Coolpy7核心服務運行后會自行構當前目錄下的data文件夾,此文件夾存放MQTT運行期所需求持久化的數據信息,使用的是開源項目 https://github.com/jacoblai/yiyidb,支持10億級秒op的高性能數據庫,數據庫內核使用的是Leveldb技術。
# 下載服務器端 git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7 # 解壓文件 unzip go_build_Coolpy7_go_linux.zip # 提權 chmod -R 777 go_build_Coolpy7_go_linux # 啟動Coolpy7 啟動參數 # l 當前服務Host地址 (默認為:1883即本地1883端口,此參數一般默認即可,無需配置) # a 連接接入調度器最大線程,此值可防止暴力連接攻擊,對已連接客戶端進行優先保護 (默認值128) ./go_build_Coolpy7_go_linux # 啟動成功后會打印如下信息,即說明服務端已正常啟動,host于1883端口,請確保相關防火墻配置可用 2018/10/29 12:59:55 Coolpy7 tcp is listening on [::]:1883
一般需為程序提權才可以運行Linux服務,指令:chmod -R 777 go_build_Coolpy7_go_linux
運行Coolpy7 WS代理服務
此功能即為Coolpy7核心服務提供WebSocket接入功能。通過ssh進入服務器192.168.200.203,并確保你已經按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服務器操作系統的網絡優化配置。
# 下載服務器端 git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7 # 解壓文件 unzip go_build_Coolpy7_ws_go_linux.zip # 提權 chmod -R 777 go_build_Coolpy7_ws_go_linux # 啟動Coolpy7 WS Poxy # r啟動參數 CP7核心服務器所在ip或域名 (例:core.coolpy.net:1883 or 192.168.200.201:1883) # l啟動參數 當前服務Host地址 (默認為:8083即本地8083端口,此參數一般默認即可,無需配置) ./go_build_Coolpy7_ws_go_linux # 啟動成功后會打印如下信息,即說明服務端已正常啟動,host于8083端口,請確保相關防火墻配置可用 2018/10/29 12:59:55 upstream 192.168.200.201:1883 ok 2018/10/29 12:59:55 Coolpy7 ws is listening on [::]:8083
運行Html5前端聊天室應用示例
以WebStorm為例
1.下載源代碼
填寫git地址下載源代碼
2.修改連接信息本示例以本機運行整套系統為例輸入127.0.0.1,端口號8083,假設把服務器端Coolpy7和Coolpy7-ws已經運行于阿里云之類的云服務器上改寫為服務器的公網IP地址和端口即可,如果已綁定域名可直接填寫域名如: test.coolpy.net
代碼位于chat.html第55行
3.修改完畢后選中工程中的index.html點擊
至此已完成運行部署。以下是測試運行演示
項目開源信息
服務器端開源地址: https://github.com/Coolpy7
聊天室前端開源地址:https://github.com/Coolpy7/Cp7Chat
*請認真填寫需求信息,我們會在24小時內與您取得聯系。