iOS MQTT 簡單使用流程
EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基於 Erlang/OTP 平臺開發的開源物聯網 MQTT 訊息伺服器。Erlang/OTP 是出色的軟實時(Soft-Realtime)、低延時(Low-Latency)、分散式(Distributed) 的語言平臺。MQTT 是輕量的(Lightweight)、釋出訂閱模式(PubSub) 的物聯網訊息協議。
EMQ 專案設計目標是承載移動終端或物聯網終端海量 MQTT 連線,並實現在海量物聯網裝置間快速低延時訊息路由:
- 穩定承載大規模的 MQTT 客戶端連線,單伺服器節點支援50萬到100萬連線。
- 分散式節點叢集,快速低延時的訊息路由,單叢集支援1000萬規模的路由。
- 訊息伺服器內擴充套件,支援定製多種認證方式、高效儲存訊息到後端資料庫。
- 完整物聯網協議支援,MQTT、MQTT-SN、CoAP、WebSocket 或私有協議支援。
選擇 MQTTSDK 分為多種
以下介紹其中的兩種ofollow,noindex">MQTTKit 和MQTT-Client-Framework 這兩種都是OC 使用
Swift 版本可參考CocoaMQTT
1、 ** MQTTKit **
已經不更新 但是基本使用沒問題
pod 'MQTTKit'
標頭檔案
#import <MQTTKit.h> #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTClient *client;
初始化 連結
WEAKSELF NSString *clientID = @"測試client - 必須是全域性唯一的id "; MQTTClient *client = [[MQTTClient alloc] initWithClientId:StrFormat(@"%@", clientID)]; client.username = @"username"; client.password = @"password"; client.cleanSession = false; client.keepAlive = 20; client.port = 11883;// 埠號 根據服務端 選擇 self.client = client; // 連結MQTT [client connectToHost:@"連結的MQTT的URL" completionHandler:^(MQTTConnectionReturnCode code) { if (code == ConnectionAccepted) { NSLog(@"連結MQTT 成功 :grin::grin::grin:"); // 連結成功訂閱相對應的主題 [weakSelf.client subscribe:@"你需要訂閱的主題" withQos:AtLeastOnce completionHandler:^(NSArray *grantedQos) { DLog(@"訂閱 返回 %@",grantedQos); }]; }else if (code == ConnectionRefusedBadUserNameOrPassword){ NSLog(@"MQTT 賬號或驗證碼錯誤"); } else if (code == ConnectionRefusedUnacceptableProtocolVersion){ NSLog(@"MQTT 不可接受的協議"); }else if (code == ConnectionRefusedIdentiferRejected){ NSLog(@"MQTT不認可"); }else if (code == ConnectionRefusedServerUnavailable){ NSLog(@"MQTT拒絕連結"); }else { NSLog(@"MQTT 未授權"); } }]; // 接收訊息體 client.messageHandler = ^(MQTTMessage *message) { NSString *jsonStr = [[NSString alloc] initWithData:message.payload encoding:NSUTF8StringEncoding]; NSLog(@"EasyMqttService mqtt connect success%@",jsonStr); }; 複製程式碼
訂閱主題
// 方法 封裝 可外部呼叫 -(void)subscribeType:(NSString *)example{ // 訂閱主題 [self.client subscribe:@"你需要訂閱的主題" withQos:AtMostOnce completionHandler:^(NSArray *grantedQos) { NSLog(@"訂閱 返回 %@",grantedQos); }]; } 複製程式碼
關閉MQTTKit
-(void)closeMQTTClient{ WEAKSELF [self.client disconnectWithCompletionHandler:^(NSUInteger code) { // The client is disconnected when this completion handler is called NSLog(@"MQTT client is disconnected"); [weakSelf.client unsubscribe:@"已經訂閱的主題" withCompletionHandler:^{ NSLog(@"取消訂閱"); }]; }]; } 複製程式碼
傳送訊息
[self.client publishString:postMsg toTopic:@"傳送訊息的主題 根據服務端定"withQos:AtLeastOnce retain:NO completionHandler:^(int mid) { if (cmd != METHOD_SOCKET_CHAT_TO) { NSLog(@"傳送訊息 返回 %d",mid); } }]; 複製程式碼
2、 ** MQTTClient **MQTTClient 配置更多 是可持續更新,可配置 SSL
基本使用
pod 'MQTTClient'websocket
方式連線
pod 'MQTTClient/MinL'
pod 'MQTTClient/ManagerL'
pod 'MQTTClient/WebsocketL'
標頭檔案
基本使用
#import <MQTTClient.h>websocket
需要新增的標頭檔案
#import <MQTTWebsocketTransport.h>
#define WEAKSELF__typeof(&*self) __weak weakSelf = self;
@property (nonatomic, strong) MQTTSession *mySession;需要新增協議頭
<MQTTSessionDelegate,MQTTSessionManagerDelegate>
初始化 連結
基本使用
#import "MQTTClient.h" \@interface MyDelegate : ... <MQTTSessionDelegate> ... MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init]; transport.host = @"localhost"; transport.port = 1883; MQTTSession *session = [[MQTTSession alloc] init]; session.transport = transport; session.delegate = self; [session connectAndWaitTimeout:30];//this is part of the synchronous API 複製程式碼
websocket 連線
WEAKSELF NSString *clientID = @"測試client - 必須是全域性唯一的id "; MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init]; transport.host = @"連線MQTT 地址"; transport.port =8083;// 埠號 transport.tls = YES; //根據需要配置YES 開起 SSL 驗證 此處為單向驗證 雙向驗證 根據SDK 提供方法直接新增 MQTTSession *session = [[MQTTSession alloc] init]; NSString *linkUserName = @"username"; NSString *linkPassWord = @"password"; [session setUserName:linkUserName]; [session setClientId:clientID]; [session setPassword:linkPassWord]; [session setKeepAliveInterval:5]; session.transport = transport; session.delegate = self; self.mySession = session; [self reconnect]; [self.mySession addObserver:self forKeyPath:@"state" options:NSKeyValueObservingOptionNew|NSKeyValueObservingOptionOld context:nil]; //新增事件監聽 複製程式碼
websocket
監聽 響應事件
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context { switch (self.mySession.status) { case MQTTSessionManagerStateClosed: NSLog(@"連線已經關閉"); break; case MQTTSessionManagerStateClosing: NSLog(@"連線正在關閉"); break; case MQTTSessionManagerStateConnected: NSLog(@"已經連線"); break; case MQTTSessionManagerStateConnecting: NSLog(@"正在連線中"); break; case MQTTSessionManagerStateError: { //NSString *errorCode = self.mySession.lastErrorCode.localizedDescription; NSString *errorCode = self.mySession.description; NSLog(@"連線異常 ----- %@",errorCode); } break; case MQTTSessionManagerStateStarting: NSLog(@"開始連線"); break; default: break; } } 複製程式碼
session Delegate 協議
連線 返回狀態
-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{ if (eventCode == MQTTSessionEventConnected) { NSLog(@"2222222 連結MQTT 成功"); }else if (eventCode == MQTTSessionEventConnectionRefused) { NSLog(@"MQTT拒絕連結"); }else if (eventCode == MQTTSessionEventConnectionClosed){ NSLog(@"MQTT連結關閉"); }else if (eventCode == MQTTSessionEventConnectionError){ NSLog(@"MQTT 連結錯誤"); }else if (eventCode == MQTTSessionEventProtocolError){ NSLog(@"MQTT 不可接受的協議"); }else{//MQTTSessionEventConnectionClosedByBroker NSLog(@"MQTT連結 其他錯誤"); } if (error) { NSLog(@"連結報錯-- %@",error); } } 複製程式碼
收到傳送的訊息
-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid { NSDictionary *dic = [NSJSONSerialization JSONObjectWithData:data options:NSJSONReadingMutableContainers error:nil]; NSLog(@"EasyMqttService mqtt connect success%@",dic); // 做相對應的操作 } 複製程式碼
訂閱主題
基本使用
// 方法 封裝 可外部呼叫 [session subscribeToTopic:@"example/#" atLevel:2 subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss){ if (error) { NSLog(@"Subscription failed %@", error.localizedDescription); } else { NSLog(@"Subscription sucessfull! Granted Qos: %@", gQoss); } }]; // this is part of the block API 複製程式碼
websocket
- (void)subscibeToTopicAction { dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ dispatch_async(dispatch_get_main_queue(), ^{ [self subscibeToTopic:@"你要訂閱的主題"]; }); }); } -(void)subscibeToTopic:(NSString *)topicUrl { //self.manager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:MQTTQosLevelAtMostOnce] forKey:topicUrl]; [self.mySession subscribeToTopic:topicUrl atLevel:MQTTQosLevelAtMostOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) { if (error) { NSLog(@"訂閱 %@ 失敗 原因 %@",topicUrl,error); }else { NSLog(@"訂閱 %@ 成功 g1oss %@",topicUrl,gQoss); dispatch_async(dispatch_get_main_queue(), ^{ // 操作 }); }; }]; } 複製程式碼
關閉MQTT-Client
-(void)closeMQTTClient{ [self.mySession disconnect]; [self.mySession unsubscribeTopics:@[@"已經訂閱的主題"] unsubscribeHandler:^(NSError *error) { if (error) { DLog(@"取消訂閱失敗"); }else{ DLog(@"取消訂閱成功"); } }]; } 複製程式碼
傳送訊息
[self.mySession publishData:jsonData onTopic:@"傳送訊息的主題 服務端定義" retain:NO qos:MQTTQosLevelAtMostOnce publishHandler:^(NSError *error) { if (error) { NSLog(@"傳送失敗 - %@",error); }else{ NSLog(@"傳送成功"); } }]; 複製程式碼