一、 概述
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱模式的"輕量級"通訊協(xié)議,它和 Modbus TCP 一樣都是基于 TCP/IP 之上的應(yīng)用層協(xié)議。
1.1 MQTT 具備以下幾個(gè)特點(diǎn)
簡單容易實(shí)現(xiàn)、支持設(shè)置消息服務(wù)質(zhì)量等級(QoS)、輕量且省帶寬。
1.2 為什么是 MQTT
物聯(lián)網(wǎng)設(shè)備通信有一些問題需要針對性的解決:網(wǎng)絡(luò)環(huán)境復(fù)雜而不可靠、硬件內(nèi)存和閃存容量小、處理器能力有限。MQTT 協(xié)議就是為了解決遇到的這些問題而創(chuàng)建的。
1.3 MQTT 與 HTTP 協(xié)議對比
MQTT 最小報(bào)文僅為 2 個(gè)字節(jié),比 HTTP 占用更少網(wǎng)絡(luò)開銷。
http需要指定請求頭、請求方式、請求協(xié)議版本等比MQTT請求更多的內(nèi)容,也代表著它要比MQTT的網(wǎng)絡(luò)開銷要更多。
MQTT 基于發(fā)布訂閱模式,HTTP 基于請求響應(yīng)。
MQTT 可實(shí)時(shí)推送消息,HTTP 需要通過輪詢獲取數(shù)據(jù)。
MQTT 是有狀態(tài)的,HTTP 是無狀態(tài)的。
MQTT客戶端啟動之后與Broker進(jìn)行連接之后后續(xù)請求不用再帶自身認(rèn)證信息,而對于http來說客戶端需要每次請求都帶著自身認(rèn)證信息表明自身身份。
1.4 MQTT 與消息隊(duì)列對比
MQTT 和消息隊(duì)列的很多行為和特性非常接近, 比如都采用發(fā)布/訂閱模式,但是他們面向的場景卻 有著顯著的不同。消息隊(duì)列主要用于服務(wù)端應(yīng)用之間的消息存儲與轉(zhuǎn)發(fā),這類場景往往數(shù)據(jù)量大但客戶端 數(shù)量少。MQTT 是一種消息傳輸協(xié)議,主要用于物聯(lián)網(wǎng)設(shè)備之間的消息傳遞,這類場景的特點(diǎn)是海量的設(shè)備接入、管理與消息傳輸。
1.5 MQTT消息服務(wù)質(zhì)量等級
MQTT協(xié)議可以對消息設(shè)置三種不同的服務(wù)質(zhì)量等級,分別是:
QoS=0:最多發(fā)送一次,訂閱方不會發(fā)送接收到消息的響應(yīng),發(fā)布方也不會重新發(fā)送消息,消息可能會丟失。
QoS=1:最少發(fā)送一次,訂閱方接收到消息之后發(fā)送響應(yīng)給發(fā)布方,如果發(fā)布方?jīng)]收到響應(yīng)會重新發(fā)送消息,直到接收到響應(yīng)為止,消息可能會重復(fù)。
QoS=2:保證只有一次,訂閱方不會重復(fù)接收到消息,也不會丟失消息。發(fā)布方和訂閱方之間需要進(jìn)行兩次請求響應(yīng)確認(rèn)消息傳輸,保證相同消息只接收一次,QoS為2時(shí)消耗網(wǎng)絡(luò)資源也最多。
1.6 MQTT 主題通配符
MQTT 通配符包含單層通配符 `+` 和多層通配符`#`,主要用于客戶端一次訂閱多個(gè)主題。在發(fā)布的時(shí)候不能用通配符下發(fā)到多個(gè)主題中。
單層通配符適用于單個(gè)主題層級匹配的通配符,單層通配符必須占據(jù)整個(gè)層級才能生效。
多層通配符適用于主題匹配任意層級的通配符,多層通配符表示它的父級和任意數(shù)量的子層級,多層通配符必須單獨(dú)占據(jù)一個(gè)層級才能生效。
二、 創(chuàng)建 MQTT 鏈接
下面我們通過 Java 代碼創(chuàng)建一個(gè) MQTT 鏈接,模擬設(shè)備發(fā)送數(shù)據(jù),通過代碼模擬訂閱者接收數(shù)據(jù)。
2.1添加依賴
1.<dependency>
2. <groupId>org.eclipse.paho</groupId>
3. <artifactId>org.eclipse.paho.client.MQTTv3</artifactId>
4. <version>1.2.5</version>
5.</dependency>
2.2編寫發(fā)布者代碼
1./**
2. *
3. * @param topic
4. * @param content
5. * @param qos 有3個(gè)值分別為:
6. * 0:消息最多傳遞一次,如果客戶端沒有接收到消息,那么消息會丟失
7. * 1:消息傳遞至少一次,發(fā)布者發(fā)送消息會等待訂閱者的ACK,如果在規(guī)定時(shí)間內(nèi)沒有收到ACK則會重新發(fā)送,可能會出現(xiàn)消息重復(fù)
8. * 2:消息僅且只發(fā)送一次,保證消息到達(dá)對方并且只發(fā)送一次
9. */
10.public void sendMessage(MQTTClient client,String topic,String content,int qos){
11. MQTTMessage message = new MQTTMessage(content.getBytes());
12. message.setQos(qos);
13. try {
14. client.publish(topic,message);
15. } catch (MQTTException e) {
16. //todo publish fail
17. }
18.}
19./**
20. * 連接MQTT客戶端
21. * @return
22. */
23.public MQTTClient connectMQTT() {
24. MQTTClient client = null;
25. try {
26. //broker
27. String broker = "tcp://localhost:1883";
28. //用戶名
29. String username = "admin";
30. //密碼
31. String password = "123456";
32. //clientId
33. String clientId = System.currentTimeMillis() + "";
34. //創(chuàng)建MQTT客戶端(指定broker、客戶端id、消息持久策略)
35. client = new MQTTClient(broker, clientId, new MemoryPersistence());
36. //創(chuàng)建連接參數(shù)配置
37. MQTTConnectOptions options = new MQTTConnectOptions();
38. options.setUserName(username);
39. options.setPassword(password.toCharArray());
40. //是否清除會話
41. options.setCleanSession(true);
42. //心跳間隔
43. options.setKeepAliveInterval(20);
44. //連接超時(shí)時(shí)間
45. options.setConnectionTimeout(20);
46. //是否自動重連
47. options.setAutomaticReconnect(true);
48. //連接
49. client.connect(options);
50.
51. }catch (Exception e){
52. //todo connect fail
53. }
54. return client;
55.}
56.
57./**
58. * 關(guān)閉MQTT客戶端
59. * @param client
60. */
61.public void closeClient(MQTTClient client){
62. try {
63. //關(guān)閉連接
64. client.disconnect();
65. //關(guān)閉客戶端
66. client.close();
67. } catch (MQTTException e) {
68. //todo close fail
69. }
70.}
現(xiàn)在我們來模擬給訂閱者發(fā)送一條消息,我們通過 MQTTX 客戶端來模擬測試:
1.Demo demo = new Demo();
2.MQTTClient client = demo.connectMQTT();
3.String content = "{\n" +
4. " \"deviceNo\": \"x5a2aw\",\n" +
5. " \"val\": 19\n" +
6. "}";
7.//執(zhí)行主題
8.String topic = "deviceUp";
9.int qos = 1;
10.if (null != client){
11. demo.sendMessage(client,topic,content,qos);
12.}
13.demo.closeClient(client);
2.3編寫訂閱者代碼
1.String broker = "tcp://localhost:1883";
2.String topic = "/deviceUp";
3.String username = "admin";
4.String password = "123456";
5.String clientId = System.currentTimeMillis() + "";
6.int qos = 1;
7.
8.try {
9. MQTTClient client = new MQTTClient(broker, clientId, new MemoryPersistence());
10. // 連接參數(shù)
11. MQTTConnectOptions options = new MQTTConnectOptions();
12. options.setUserName(username);
13. options.setPassword(password.toCharArray());
14. options.setConnectionTimeout(60);
15. options.setKeepAliveInterval(60);
16. // 設(shè)置回調(diào)
17. client.setCallback(new MQTTCallback() {
18. public void connectionLost(Throwable cause) {
19. }
20. public void messageArrived(String topic, MQTTMessage message) {
21. System.out.println("topic為: " + topic);
22. System.out.println("topic為: " + message.getQos());
23. System.out.println("消息內(nèi)容為: " + new String(message.getPayload()));
24. }
25. public void deliveryComplete(IMQTTDeliveryToken token) {
26. }
27. });
28. client.connect(options);
29. client.subscribe(topic, qos);
30.} catch (Exception e) {
31. //todo subscribe failed
32.}
啟動訂閱者之后再啟動發(fā)布者代碼,與此同時(shí)也可以啟動自己本地的MQTTX接收消息。
訂閱者收到消息為:
MQTTX收到消息為:
三、 總結(jié)
以上就是對MQTT的簡單介紹和通過Java代碼創(chuàng)建一個(gè) MQTT 鏈接,模擬設(shè)備發(fā)送數(shù)據(jù),通過代碼模擬訂閱者接收數(shù)據(jù),希望可以對你有所幫助。更多關(guān)于MQTT的細(xì)節(jié)可以留言獲取或者通過閱讀MQTT官方文檔進(jìn)一步學(xué)習(xí)。