1. 程式人生 > >STM串列埠轉tcp實現Mqtt客戶端

STM串列埠轉tcp實現Mqtt客戶端

#include "emqtt.h" #include "usart4_wifi.h" #define MQTT_DUP_FLAG 1<<3 #define MQTT_QOS0_FLAG 0<<1 #define MQTT_QOS1_FLAG 1<<1 #define MQTT_QOS2_FLAG 2<<1 #define MQTT_RETAIN_FLAG 1 #define MQTT_CLEAN_SESSION 1<<1 #define MQTT_WILL_FLAG 1<<2
#define MQTT_WILL_RETAIN 1<<5 #define MQTT_USERNAME_FLAG 1<<7 #define MQTT_PASSWORD_FLAG 1<<6 #define TOPIC_MAX_LENGTH 64 #define MESSAGE_MAX_LENGTH 1024 //fixed_head×î´óΪ3 qossize×î´óΪ2 uint8_t mqtt_num_rem_len_bytes(const uint8_t* buf) { uint8_t num_bytes = 1
; if ((buf[1] & 0x80) == 0x80) { num_bytes++; if ((buf[2] & 0x80) == 0x80) { num_bytes ++; if ((buf[3] & 0x80) == 0x80) { num_bytes ++; } } } return
num_bytes; } uint16_t mqtt_parse_rem_len(const uint8_t* buf) { uint16_t multiplier = 1; uint16_t value = 0; uint8_t digit; buf++; do { digit = *buf; value += (digit & 127) * multiplier; multiplier *= 128; buf++; } while ((digit & 128) != 0); return value; } uint16_t mqtt_parse_msg_id(const uint8_t* buf) { uint8_t type = MQTTParseMessageType(buf); uint8_t qos = MQTTParseMessageQos(buf); uint16_t id = 0; uint8_t rlb; uint8_t offset; //printf("mqtt_parse_msg_id\n"); if(type >= MQTT_MSG_PUBLISH && type <= MQTT_MSG_UNSUBACK) { if(type == MQTT_MSG_PUBLISH) { if(qos != 0) { rlb = mqtt_num_rem_len_bytes(buf); offset = *(buf+1+rlb)<<8; // topic UTF MSB offset |= *(buf+1+rlb+1); // topic UTF LSB offset += (1+rlb+2); // fixed header + topic size id = *(buf+offset)<<8; // id MSB id |= *(buf+offset+1); // id LSB } } else { // fixed header length // 1 for "flags" byte + rlb for length bytes rlb = mqtt_num_rem_len_bytes(buf); id = *(buf+1+rlb)<<8; // id MSB id |= *(buf+1+rlb+1); // id LSB } } return id; } uint16_t mqtt_parse_pub_topic(const uint8_t* buf, uint8_t* topic) { const uint8_t* ptr; uint16_t topic_len = mqtt_parse_pub_topic_ptr(buf, &ptr); //printf("mqtt_parse_pub_topic\n"); if(topic_len != 0 && ptr != NULL) { memcpy(topic, ptr, topic_len); } return topic_len; } uint16_t mqtt_parse_pub_topic_ptr(const uint8_t* buf, const uint8_t **topic_ptr) { uint16_t len = 0; uint8_t rlb; //printf("mqtt_parse_pub_topic_ptr\n"); if(MQTTParseMessageType(buf) == MQTT_MSG_PUBLISH) { // fixed header length = 1 for "flags" byte + rlb for length bytes rlb = mqtt_num_rem_len_bytes(buf); len = *(buf+1+rlb)<<8; // MSB of topic UTF len |= *(buf+1+rlb+1); // LSB of topic UTF // start of topic = add 1 for "flags", rlb for remaining length, 2 for UTF *topic_ptr = (buf + (1+rlb+2)); } else { *topic_ptr = NULL; } return len; } uint16_t mqtt_parse_publish_msg(const uint8_t* buf, uint8_t* msg) { const uint8_t* ptr; //printf("mqtt_parse_publish_msg\n"); uint16_t msg_len = mqtt_parse_pub_msg_ptr(buf, &ptr); if(msg_len != 0 && ptr != NULL) { memcpy(msg, ptr, msg_len); } return msg_len; } uint16_t mqtt_parse_pub_msg_ptr(const uint8_t* buf, const uint8_t **msg_ptr) { uint16_t len = 0; uint8_t rlb; uint8_t offset; //printf("mqtt_parse_pub_msg_ptr\n"); if(MQTTParseMessageType(buf) == MQTT_MSG_PUBLISH) { // message starts at // fixed header length + Topic (UTF encoded) + msg id (if QoS>0) rlb = mqtt_num_rem_len_bytes(buf); offset = (*(buf+1+rlb))<<8; // topic UTF MSB offset |= *(buf+1+rlb+1); // topic UTF LSB offset += (1+rlb+2); // fixed header + topic size if(MQTTParseMessageQos(buf)) { offset += 2; // add two bytes of msg id } *msg_ptr = (buf + offset); // offset is now pointing to start of message // length of the message is remaining length - variable header // variable header is offset - fixed header // fixed header is 1 + rlb // so, lom = remlen - (offset - (1+rlb)) len = mqtt_parse_rem_len(buf) - (offset-(rlb+1)); } else { *msg_ptr = NULL; } return len; } void mqtt_init(mqtt_broker_handle_t* broker, const char* clientid) { // Connection options broker->alive = 300; // 300 seconds = 5 minutes broker->seq = 1; // Sequency for message indetifiers // Client options memset(broker->clientid, 0, sizeof(broker->clientid)); memset(broker->username, 0, sizeof(broker->username)); memset(broker->password, 0, sizeof(broker->password)); if(clientid) { strncpy(broker->clientid, clientid, sizeof(broker->clientid)); } else { strcpy(broker->clientid, "emqtt"); } // Will topic broker->clean_session = 1; } void mqtt_init_auth(mqtt_broker_handle_t* broker, const char* username, const char* password) { if(username && username[0] != '\0') strncpy(broker->username, username, sizeof(broker->username)-1); if(password && password[0] != '\0') strncpy(broker->password, password, sizeof(broker->password)-1); } void mqtt_set_alive(mqtt_broker_handle_t* broker, uint16_t alive) { broker->alive = alive; } int mqtt_connect(mqtt_broker_handle_t* broker) { uint8_t flags = 0x00; uint16_t clientidlen = strlen(broker->clientid); uint16_t usernamelen = strlen(broker->username); uint16_t passwordlen = strlen(broker->password); uint16_t payload_len = clientidlen + 2; // Variable header uint8_t var_header[] = { 0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70, // Protocol name: MQIsdp 0x03, // Protocol version 0x00, // Connect flags 0x00, 0x00 // Keep alive }; // Fixed header uint8_t fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length uint8_t remainLen; //¶àÒ»¸ö×Ö½Ú·ÀÖ¹³öÏÖ²»¹»µÄÇé¿ö uint8_t fixed_header[3]; uint16_t offset = 0; //¶à200¸ö×Ö½Ú·ÀÖ¹³öÏÖ²»¹»µÄÇé¿ö uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+200]; // Preparing the flags //²»ÉèÖÃÓû§ÃûºÍÃÜÂëÈ¥µôÁ½¸öif if(usernamelen) { payload_len += usernamelen + 2; flags |= MQTT_USERNAME_FLAG; } if(passwordlen) { payload_len += passwordlen + 2; flags |= MQTT_PASSWORD_FLAG; } //ÿ´Î¶¼³õʼ»¯ËùÒÔ´Ë´¦Ó¦¸ÃÊÇ1 if(broker->clean_session) { flags |= MQTT_CLEAN_SESSION; } //¸üжÔÓ¦µÄflags±ê־λ var_header[9] = flags; var_header[10] = broker->alive>>8; var_header[11] = broker->alive&0xFF; remainLen = sizeof(var_header)+payload_len; //¼Ù¶¨´Ë´¦²»»á³¬¹ý127¸ö×Ö·ûÔò±£ÁôfixedHeaderSize if (remainLen > 127) { fixedHeaderSize++; // add an additional byte for Remaining Length } // Message Type fixed_header[0] = MQTT_MSG_CONNECT; // Remaining Length if (remainLen <= 127) { fixed_header[1] = remainLen; } else { // first byte is remainder (mod) of 128, then set the MSB to indicate more bytes fixed_header[1] = remainLen % 128; fixed_header[1] = fixed_header[1] | 0x80; // second byte is number of 128s fixed_header[2] = remainLen / 128; } memset(packet, 0, sizeof(packet)); memcpy(packet, fixed_header, fixedHeaderSize); offset += fixedHeaderSize;//sizeof(fixed_header); memcpy(packet+offset, var_header, sizeof(var_header)); offset += sizeof(var_header); // Client ID - UTF encoded packet[offset++] = clientidlen>>8; packet[offset++] = clientidlen&0xFF; memcpy(packet+offset, broker->clientid, clientidlen); offset += clientidlen; if(usernamelen) { // Username - UTF encoded packet[offset++] = usernamelen>>8; packet[offset++] = usernamelen&0xFF; memcpy(packet+offset, broker->username, usernamelen); offset += usernamelen; } if(passwordlen) { // Password - UTF encoded packet[offset++] = passwordlen>>8; packet[offset++] = passwordlen&0xFF; memcpy(packet+offset, broker->password, passwordlen); offset += passwordlen; } //·¢ËÍÊý¾Ý°ü uart4SendChars(packet,offset); return 1; } int mqtt_ping() { uint8_t packet[] = { MQTT_MSG_PINGREQ, // Message Type, DUP flag, QoS level, Retain 0x00 // Remaining length }; //·¢ËÍpingÊý¾Ý°ü uart4SendChars(packet,sizeof(packet)); return 1; } int mqtt_publish_with_qos(mqtt_broker_handle_t* broker, const char* topic, const char* msg, uint8_t retain, uint8_t qos, uint16_t* message_id) { const uint16_t topiclen = strlen(topic); const uint16_t msglen = strlen(msg); uint8_t qos_flag = MQTT_QOS0_FLAG; uint8_t qos_size = 0; // No QoS included //½«Êý×é·ÖÅä³É×î´óÖµ topiclen×î´ó64 //msg ×î´ó1024 uint8_t var_header[TOPIC_MAX_LENGTH+2+2];//topiclen+2+qos_size uint8_t varHeaderSize; uint8_t fixedHeaderSize = 2; uint16_t remainLen; uint8_t fixed_header[3]; uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+MESSAGE_MAX_LENGTH]; if(qos == 1) { qos_size = 2; // 2 bytes for QoS qos_flag = MQTT_QOS1_FLAG; } else if(qos == 2) { qos_size = 2; // 2 bytes for QoS qos_flag = MQTT_QOS2_FLAG; } varHeaderSize=topiclen+2+qos_size; // Variable header // Topic size (2 bytes), utf-encoded topic memset(var_header, 0, sizeof(var_header)); var_header[0] = topiclen>>8; var_header[1] = topiclen&0xFF; memcpy(var_header+2, topic, topiclen); if(qos_size) { var_header[topiclen+2] = broker->seq>>8; var_header[topiclen+3] = broker->seq&0xFF; if(message_id) { // Returning message id *message_id = broker->seq; } broker->seq++; } // Fixed header // the remaining length is one byte for messages up to 127 bytes, then two bytes after that // actually, it can be up to 4 bytes but I'm making the assumption the embedded device will only // need up to two bytes of length (handles up to 16,383 (almost 16k) sized message) // Default size = one byte Message Type + one byte Remaining Length remainLen = varHeaderSize+msglen;//sizeof(var_header) if (remainLen > 127) { fixedHeaderSize++; // add an additional byte for Remaining Length } // Message Type, DUP flag, QoS level, Retain fixed_header[0] = MQTT_MSG_PUBLISH | qos_flag; if(retain) { fixed_header[0] |= MQTT_RETAIN_FLAG; } // Remaining Length if (remainLen <= 127) { fixed_header[1] = remainLen; } else { // first byte is remainder (mod) of 128, then set the MSB to indicate more bytes fixed_header[1] = remainLen % 128; fixed_header[1] = fixed_header[1] | 0x80; // second byte is number of 128s fixed_header[2] = remainLen / 128; } memset(packet, 0, sizeof(packet)); memcpy(packet, fixed_header, fixedHeaderSize); memcpy(packet+fixedHeaderSize, var_header, varHeaderSize); memcpy(packet+fixedHeaderSize+varHeaderSize, msg, msglen); //·¢ËÍÊý¾Ý°ü uart4SendChars(packet,fixedHeaderSize+varHeaderSize+msglen); return 1; } int mqtt_publish(mqtt_broker_handle_t* broker, const char* topic, const char* msg, uint8_t retain) { return mqtt_publish_with_qos(broker, topic, msg, retain, 0, NULL); } int mqtt_pubrel(mqtt_broker_handle_t* broker, uint16_t message_id) { uint8_t packet[] = { MQTT_MSG_PUBREL | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain 0x02, // Remaining length 0x00, 0x00 }; packet[2]=message_id>>8; packet[3]=message_id&0xFF; //·¢ËÍÊý¾Ý°ü uart4SendChars(packet,sizeof(packet)); return 1; } int mqtt_subscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id) { const uint16_t topiclen = strlen(topic); // Variable header uint8_t var_header[2]; // Message ID //topic ×î´ó64 Topic size (2 bytes), utf-encoded topic, QoS byte uint8_t utf_topic[TOPIC_MAX_LENGTH+3]; // Fixed header uint8_t fixed_header[] = { MQTT_MSG_SUBSCRIBE | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain 0x00 }; // uint8_t packet[sizeof(var_header)+sizeof(fixed_header)+sizeof(utf_topic)]; var_header[0] = broker->seq>>8; var_header[1] = broker->seq&0xFF; if(message_id) { // Returning message id *message_id = broker->seq; } broker->seq++; // utf topic // Topic size (2 bytes), utf-encoded topic, QoS byte memset(utf_topic, 0, sizeof(utf_topic)); utf_topic[0] = topiclen>>8; utf_topic[1] = topiclen&0xFF; memcpy(utf_topic+2, topic, topiclen); //sizeof(var_header)+sizeof(utf_topic) fixed_header[1]=sizeof(var_header)+topiclen+3; memset(packet, 0, sizeof(packet)); memcpy(packet, fixed_header, sizeof(fixed_header)); memcpy(packet+sizeof(fixed_header), var_header, sizeof(var_header)); memcpy(packet+sizeof(fixed_header)+sizeof(var_header), utf_topic, topiclen+3); //·¢ËÍÊý¾Ý°ü uart4SendChars(packet,sizeof(fixed_header)+sizeof(var_header)+topiclen+3); return 1; } int mqtt_unsubscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id) { const uint16_t topiclen = strlen(topic); // Variable header uint8_t var_header[2]; // Message ID uint8_t utf_topic[TOPIC_MAX_LENGTH+2]; // Topic size (2 bytes), utf-encoded topic // Fixed header uint8_t fixed_header[] = { MQTT_MSG_UNSUBSCRIBE | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain 0x00 }; uint8_t packet[sizeof(var_header)+sizeof(fixed_header)+sizeof(utf_topic)]; var_header[0] = broker->seq>>8; var_header[1] = broker->seq&0xFF; if(message_id) { // Returning message id *message_id = broker->seq; } broker->seq++; // utf topic memset(utf_topic, 0, sizeof(utf_topic)); utf_topic[0] = topiclen>>8; utf_topic[1] = topiclen&0xFF; memcpy(utf_topic+2, topic, topiclen); fixed_header[1]=sizeof(var_header)+topiclen+2; memset(packet, 0, sizeof(packet)); memcpy(packet, fixed_header, sizeof(fixed_header)); memcpy(packet+sizeof(fixed_header), var_header, sizeof(var_header)); memcpy(packet+sizeof(fixed_header)+sizeof(var_header), utf_topic, topiclen+2); //·¢ËÍÊý¾Ý°ü uart4SendChars(packet,sizeof(fixed_header)+sizeof(var_header)+topiclen+2); return 1; }