整合營銷服務商

          電腦端+手機端+微信端=數據同步管理

          免費咨詢熱線:

          MQTT 協議是什么

          著 5G 時代的來臨,萬物互聯的偉大構想正在成為現實。聯網的 物聯網設備 在 2018 年已經達到了 70 億1,在未來兩年,僅智能水電氣表就將超過10億2。

          海量的設備接入和設備管理對網絡帶寬、通信協議以及平臺服務架構都帶來了很大挑戰。對于 物聯網協議 來說,必須針對性地解決物聯網設備通信的幾個關鍵問題:其網絡環境復雜而不可靠、其內存和閃存容量小、其處理器能力有限。

          MQTT 協議 是基于發布/訂閱模式的物聯網通信協議,憑借簡單易實現、支持 QoS、報文小等特點,占據了物聯網協議的半壁江山:

          MQTT 協議的誕生

          MQTT was created by Andy Stanford-Clark of IBM, and Arlen Nipper (then of Arcom Systems, later CTO of Eurotech).3

          據 Arlen Nipper 在一 IBM Podcast 上的自述,MQTT 原名是 MQ TT, 注意 MQ 與 TT之間的空格,其全稱為: MQ Telemetry Transport,是九十年代早期,他在參與 Conoco Phillips 公司的一個原油管道數據采集監控系統(pipeline SCADA system)時,開發的一個實時數據傳輸協議。它的目的在于讓傳感器通過帶寬有限的 VSAT ,與 IBM 的 MQ Integrator 通信。由于 Nipper 是遙感和數據采集監控專業出身,所以按業內慣例給了個 MQ TT 的名字。

          MQTT 協議設計原則

          按照 Nipper 的介紹,MQTT 必須簡單容易實現,必須支持 QoS(設備網絡環境復雜),必須輕量且省帶寬(因為那時候帶寬很貴),必須數據無關(不關心 Payload 數據格式),必須有持續地會話感知能力(時刻知道設備是否在線)。下面將介紹 MQTT (3.1.1 版本) 的幾個核心特色,分別對應了這幾個設計原則的實現。

          靈活的發布訂閱和主題設計

          發布訂閱模式是傳統 Client/Server 模式的一種解耦方案。發布者通過 Broker 與消費者之間通信,Broker 的作用是將收到的消息通過某種過濾規則,正確地發送給消費者。發布/訂閱模式 相對于 客戶端/服務器模式 的好處在于:

          • 發布者和消費者之間不必預先知道對方的存在,比如不需要預先溝通對方的 IP Address 和 Port
          • 發布者和消費者之間不必同時運行。因為 Broker 是一直運行的。

          在 MQTT 協議里,上面提到的 過濾規則 是 Topic。比如:所有發布到 news 這個 Topic 的消息,都會被 Broker 轉發給已經訂閱了 news 的訂閱者:

          上圖中訂閱者預先訂閱了 news,然后發布者向 Broker 發布了一條消息 "some msg" 并指定發布到 news 主題,Broker 通過 Topic 匹配,決定將這條消息轉發給訂閱者。

          MQTT 的 Topic 有層級結構,并且支持通配符 + 和 #:

          • + 是匹配單層的通配符。比如 news/+ 可以匹配 news/sports,news/+/basketball 可匹配到 news/sports/basketball。
          • # 是一到多層的通配符。比如 news/# 可以匹配 news、 news/sports、news/sports/basketball 以及 news/sports/basketball/x 等等。

          MQTT 的主題是不要預先創建的,發布者發送消息到某個主題、或者訂閱者訂閱某個主題的時候,Broker 就會自動創建這個主題。

          帶寬消耗最小化

          MQTT 協議將協議本身占用的額外消耗最小化,消息頭部最小只需要占用 2 個字節。

          MQTT 的消息格式分三部分:

          固定長度頭部,2 個字節,所有消息類型里都有可變長度頭部,只有某些消息類型里有Payload,只有某些消息類型里有

          MQTT 的主要消息類型有:

          • CONNECT / CONNACK
          • PUBLISH / PUBACK
          • SUBSCRIBE / SUBACK
          • UNSUBSCRIBE / UNSUBACK
          • PINGREQ / PINGRESP
          • DISCONNECT

          其中 PINGREQ / PINGRESP 和 DISCONNECT 報文是不需要可變頭部的,也沒有 Payload,也就是說它們的報文大小僅僅消耗 2 個字節。

          在 CONNECT 報文的可變長度頭部里,有個 Protocol Version 的字段。為了節省空間,只有一個字節。所以版本號不是按照字符串 "3.1.1" 存放的,而是使用數字 4 來表示 3.1.1 版本。

          三個可選的 QoS 等級

          為適應設備不同的網絡環境,MQTT 設計了 3 個 QoS 等級,0, 1, 2:

          • At most once (0)
          • At least once (1)
          • Exactly once (2)

          QoS 0 是一種 "fire and forget" 的消息發送模式:Sender (可能是 Publisher 或者 Broker) 發送一條消息之后,就不再關心它有沒有發送到對方,也不設置任何重發機制。

          QoS 1 包含了簡單的重發機制,Sender 發送消息之后等待接收者的 ACK,如果沒收到 ACK 則重新發送消息。這種模式能保證消息至少能到達一次,但無法保證消息重復。

          QoS 2 設計了略微復雜的重發和重復消息發現機制,保證消息到達對方并且嚴格值到達一次。

          會話保持

          MQTT 沒有假設設備或 Broker 使用了 TCP 的保活機制4,而是設計了協議層的保活機制:在 CONNECT 報文里可設置 Keepalive 字段,來設置保活心跳包 PINGREQ/PINGRESP 的發送時間間隔。當長時間無法收到設備的 PINGREQ 的時候,Broker 就會認為設備已經下線。

          總的來說,Keepalive 有兩個作用:

          • 發現對端死亡或者網絡中斷
          • 在長時間無消息交互的情況下,保持連接不被網絡設備斷開

          對于那些想要在重新上線后,重新收到離線期間錯過的消息的設備,MQTT 設計了持久化連接:在 CONNECT 報文里可設置 CleanSession 字段為 False,則 Broker 會為終端存儲:

          • 設備所有的訂閱
          • 還未被設備確認的 QoS1 和 QoS 消息
          • 設備離線時錯過的消息

          在線狀態感知

          MQTT 設計了遺愿(Last Will) 消息,讓 Broker 在發現設備異常下線的情況下,幫助設備發布一條遺愿消息到指定的主題。

          實際上在某些 MQTT 服務器的實現里 (比如 EMQ X),設備上線或下線的時候 Broker 會通過某些系統主題發布設備狀態更新,更符合實際應用場景。

          開源 MQTT 服務器如何選擇

          到目前為止,比較流行的開源 MQTT 服務器有幾個:

          1. Eclipse Mosquitto使用 C 語言實現的 MQTT 服務器。Eclipse 組織還還包含了大量的 MQTT 客戶端項目:https://www.eclipse.org/paho/#
          2. EMQ X使用 Erlang 語言開發的 MQTT 服務器,內置強大的規則引擎,支持許多其他 IoT 協議比如 MQTT-SN、 CoAP、LwM2M 等。
          3. Mosca使用 Node.JS 開發的 MQTT 服務器,簡單易用。
          4. VerneMQ同樣使用 Erlang 開發的 MQTT 服務器.

          從支持 MQTT 5.0、穩定性、擴展性、集群能力等方面考慮,EMQ X 的表現應該是最好的:

          • 使用 Erlang OTP 開發,容錯能力好 (電信領域久經考驗的語言,曾經做出過 99.9999999% 可用性的交換機設備5)
          • 官方有大量的擴展插件可供擴展。有很多認證插件,數據存儲(backend)插件可供選擇。可支持各種關系型數據庫,NoSQL 數據庫,以及常見消息隊列如 Kafka,RabbitMQ,Pulsar 等
          • 支持集群,支持節點水平擴展
          • 單節點支持 2000K 并發連接
          • 支持規則引擎和編解碼

          MQTT 協議快速體驗

          MQTT 在線服務器

          EMQ X MQTT 物聯網云服務 提供了一個在線的公共 MQTT 5.0 服務器,不需要任何安裝您就可以快速開始 MQTT 協議的學習、測試或原型制作。

          該 MQTT 服務器的詳細接入信息請見 EMQ 官網頁面:免費的在線 MQTT 服務器。

          MQTT 在線客戶端

          EMQ 也提供了支持瀏覽器訪問的 MQTT 在線客戶端工具,該工具支持通過普通或者加密的 WebSocket 端口連接至 MQTT 服務器,同時也支持緩存連接方便下次訪問使用。


          1. The number of connected devices that are in use worldwide now exceeds 17 billion, with the number of IoT devices at 7 billion... https://iot-analytics.com/state-of-the-iot-update-q1-q2-2018-number-of-iot-devices-now-7b/ ?
          2. The estimated installed base of smart meters (electricity, gas and water) is expected to surpass the 1 billion mark within the next 2 years. https://iot-analytics.com/smart-meter-market-2019-global-penetration-reached-14-percent/ ?
          3. https://github.com/mqtt/mqtt.github.io/wiki/history ?
          4. https://www.cnblogs.com/softidea/p/5764051.html ?
          5. https://pragprog.com/articles/erlang ?

          想一想都快兩年沒有編寫代碼,這公眾號也快有兩年沒更新了,人閑久了總想找些活干,為了保持代碼編寫技能于是把之前寫的MQTT協議擴展出一個網關服務,并實現對3.X和5.0協議版本的支持。作為一個服務網關在性能上也是有著一定的要求,其實現目標能支持數十萬的消息訂閱轉發。

          簡介
          項目是基于BeetleX通訊組件擴展的MQTT協議通訊服務,服務包括兩大模塊分別是:BeetleX.MQTT.Protocols和BeetleX.MQTT.Server。前者是對MQTT通訊協議的實現,分別實現了V3.X和V5.0兩個版本;而后者則是網關服務的實現并集成了基礎的管理界面,兩個項目都是都提供了完整的實現代碼并存放在Gitub上。

          項目開源地址:github.com/beetlex-io/mqtt

          運行項目
          項目的運行非常簡單,只需要創建一個控制臺項目并在Main方法中添加以下代碼即可以運行

          • class Program{ private static MQTTServer mServer;  static void Main(string[] args) { mServer = new MQTTServer(ProtocolType.V3); mServer.RegisterComponent<BeetleX.MQTT.Server.Controller>(); mServer.MQTTListen(o => { o.DefaultListen.Port = 8089; //o.DefaultListen.SSL = true; //o.DefaultListen.CertificateFile = ""; //o.DefaultListen.CertificatePassword = ""; }) .Setting(o => { o.LogToConsole = true; o.Port = 80; o.LogLevel = EventArgs.LogType.Info; }) .UseJWT() .UseEFCore<Storages.MQTTDB>() .UseElement(PageStyle.ElementDashboard) .Initialize((http, vue, resoure) => { resoure.AddAssemblies(typeof(BeetleX.MQTT.Server.MQTTUser).Assembly); resoure.AddCss("website.css"); resoure.AddScript("echarts.js"); vue.Debug(); }) .Run();
            }}

            以上代碼在80端口上打開WEB管理服務,在8089端口上打開MQTT服務;服務啟動后就可以通過瀏覽器進入到簡單的管理界面。在協議版本選擇上可以在MQTTServer創建時指定V3或V5(暫時不能同一端口服務同時支持V3和V5)。

            • 首頁

            • 用戶管理

            • 設備管理


            管理界面只推薦簡單的帳號管理和轉發統計,基礎框架已經搭建完成,可以根據實際需求進行擴展開發。

            單獨使用協議分析器
            如果使用其他網絡服務組件又不想自己編寫MQTT協議,那可以單獨使用BeetleX.MQTT.Protocols對網絡數據進行協議分析。組件是基于Stream數據流規范開發,只需要傳對一個標準的Stream數據流即可以完成MQTT協議讀取和寫入。

            • //v5var mqttparse = new BeetleX.MQTT.Protocols.V5.MQTTParseV5();mqttparse.Read(stream, );mqttparse.Write(msg, stream, );//v3.xvar mqttparse = new BeetleX.MQTT.MQTTParseV3();mqttparse.Read(stream, );mqttparse.Write(msg, stream, );


              性能:由于MQTT是支持通配符訂閱的,高并發時大量消息在訂閱匹配上往往會比較損耗性能;組件在這方面做了特別的優化,對于通配符訂閱上也能非常輕松地應對每秒上10萬訂閱轉發(具體上限取決于硬件搭配)。

              提醒:由于項目僅個人興趣編寫,并沒有在自有商業項目中使用,因此會存在一定的問題;如果碰到問題可以去Github對應的項目上提出相應的問題。


              BeetleX

              開源跨平臺通訊框架(支持TLS)

              提供HTTP,Websocket,MQTT,Redis,RPC和服務網關開源組件

              http://beetlex-io.com

          應用示例使用Coolpy7作為Mqtt服務器并啟用Websocket代理完美支持高并發大流量即時通過能力,本示以即時通信聊天為為例。還可以應用到其他軟件應用如:網頁客服系統、網站信息通知、網頁即時通信系統、網頁游戲等等

          技術應用架構簡介

          系統架構包括:

          1. MQTT服務端程序(Coolpy7)
          2. WebSocket代理服務端 (Coolpy7_ws)
          3. Html5聊天室前端

          安裝并運行

          運行Coolpy7核心服務

          Coolpy7核心服務是一個最原始最單純功能完備的MQTT消息服務器端,包括功能有:QoS:0,QoS1,QoS2消息質量支持。Will消息支持等等。深入了解 https://mcxiaoke.gitbooks.io/mqtt-cn/content/

          1. 防止暴力連接攻擊,對已連接客戶端進行優先保護
          2. 防止空連接攻擊,當用戶連接建立后兩秒鐘內沒有進行身份驗證即主動關閉客戶端連接

          通過ssh進入服務器192.168.200.201,并確保你已經按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服務器操作系統的網絡優化配置。

          Coolpy7核心服務運行后會自行構當前目錄下的data文件夾,此文件夾存放MQTT運行期所需求持久化的數據信息,使用的是開源項目 https://github.com/jacoblai/yiyidb,支持10億級秒op的高性能數據庫,數據庫內核使用的是Leveldb技術。

          # 下載服務器端
          git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7
          # 解壓文件
          unzip go_build_Coolpy7_go_linux.zip
          # 提權
          chmod -R 777 go_build_Coolpy7_go_linux
          # 啟動Coolpy7 啟動參數
          # l 當前服務Host地址 (默認為:1883即本地1883端口,此參數一般默認即可,無需配置)
          # a 連接接入調度器最大線程,此值可防止暴力連接攻擊,對已連接客戶端進行優先保護 (默認值128)
          ./go_build_Coolpy7_go_linux
          # 啟動成功后會打印如下信息,即說明服務端已正常啟動,host于1883端口,請確保相關防火墻配置可用
          2018/10/29 12:59:55 Coolpy7 tcp is listening on [::]:1883
          

          一般需為程序提權才可以運行Linux服務,指令:chmod -R 777 go_build_Coolpy7_go_linux

          運行Coolpy7 WS代理服務

          此功能即為Coolpy7核心服務提供WebSocket接入功能。通過ssh進入服務器192.168.200.203,并確保你已經按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服務器操作系統的網絡優化配置。

          1. 千萬級WebSocket代理服務器
          2. 支持防爆力攻擊
          # 下載服務器端
          git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7
          # 解壓文件
          unzip go_build_Coolpy7_ws_go_linux.zip
          # 提權
          chmod -R 777 go_build_Coolpy7_ws_go_linux
          # 啟動Coolpy7 WS Poxy
          # r啟動參數 CP7核心服務器所在ip或域名 (例:core.coolpy.net:1883 or 192.168.200.201:1883)
          # l啟動參數 當前服務Host地址 (默認為:8083即本地8083端口,此參數一般默認即可,無需配置)
          ./go_build_Coolpy7_ws_go_linux
          # 啟動成功后會打印如下信息,即說明服務端已正常啟動,host于8083端口,請確保相關防火墻配置可用
          2018/10/29 12:59:55 upstream 192.168.200.201:1883 ok
          2018/10/29 12:59:55 Coolpy7 ws is listening on [::]:8083
          

          運行Html5前端聊天室應用示例

          1. 下載開源項目:https://github.com/Coolpy7/Cp7Chat
          2. 修改連接地址為上一步服務器端ip和端口(具體ip和端口按閣下真實環境,全套程序運行于本機可統一使用127.0.0.1為連接地址)
          3. 通過Webstorm等web調試工具運行代碼

          以WebStorm為例

          1.下載源代碼

          填寫git地址下載源代碼

          2.修改連接信息本示例以本機運行整套系統為例輸入127.0.0.1,端口號8083,假設把服務器端Coolpy7和Coolpy7-ws已經運行于阿里云之類的云服務器上改寫為服務器的公網IP地址和端口即可,如果已綁定域名可直接填寫域名如: test.coolpy.net

          代碼位于chat.html第55行

          3.修改完畢后選中工程中的index.html點擊

          至此已完成運行部署。以下是測試運行演示

          項目開源信息

          服務器端開源地址: https://github.com/Coolpy7

          聊天室前端開源地址:https://github.com/Coolpy7/Cp7Chat


          主站蜘蛛池模板: 国模精品一区二区三区| 久久久久人妻精品一区| 无码人妻精品一区二| 色窝窝免费一区二区三区| 色妞色视频一区二区三区四区| 精品性影院一区二区三区内射 | 日韩中文字幕精品免费一区| 在线播放国产一区二区三区 | 久久精品成人一区二区三区| 精品国产一区二区三区久久久狼| 国产免费一区二区三区在线观看| 亚洲国产系列一区二区三区| 国产精品福利一区二区久久| 精品一区二区三区在线观看l | 国产精品无码亚洲一区二区三区 | 中文字幕在线观看一区| 久久影院亚洲一区| 消息称老熟妇乱视频一区二区| 无码一区二区三区| 精品少妇人妻AV一区二区三区| 国产精品区一区二区三| 日本一区二区三区免费高清在线 | 国产在线精品一区二区不卡麻豆| 97精品国产福利一区二区三区| 久久久91精品国产一区二区三区| 一区二区三区四区免费视频| 亚洲一区无码中文字幕乱码| 国产小仙女视频一区二区三区| 精品国产一区二区三区在线观看| 中文字幕精品一区二区精品| 日本一区二区在线播放| 亚洲国模精品一区| 亚洲一区二区三区电影| 亚洲一区二区三区四区视频 | 久久精品一区二区国产| 国产在线精品一区在线观看| 国产精品丝袜一区二区三区| 波多野结衣一区在线| 国产成人久久精品区一区二区| 国产成人一区二区三区在线| 国产一区二区免费在线|