1. 程式人生 > >MQTT再學習 -- MQTT 客戶端原始碼分析

MQTT再學習 -- MQTT 客戶端原始碼分析

MQTT 原始碼分析,搜尋了一下發現網路上講的很少,多是逍遙子的那幾篇。

參看:逍遙子_mosquitto原始碼分析系列

參看:MQTT libmosquitto原始碼分析

參看:Mosquitto學習筆記

一、目錄結構
首先我們還是來看一下 mosquitto-1.4.14 的原始碼目錄結構

 

 

我們主要關注 client、lib、src 這三個目錄。其中 src 和 lib 目錄下主要放置 mosquitto 的實現程式碼以及部分底層與網路相關的操作,client 目錄主要為兩個客戶端程式的實現原始碼。

我們主要就是來看看,這兩個客戶端的實現原始碼。

二、SUB 客戶端原始碼

 

首先我們先看 sub_client.c 
我們從 main 函式開始。

 

檢視結構體:
結構體 struct mosq_config 主要為 MQTT 的配置資訊
struct mosq_config {
char *id;
char *id_prefix;
int protocol_version;
int keepalive;
char *host;
int port;
int qos;
bool retain;
int pub_mode; /* pub */
char *file_input; /* pub */
char *message; /* pub */
long msglen; /* pub */
char *topic; /* pub */
char *bind_address;
#ifdef WITH_SRV
bool use_srv;
#endif
bool debug;
bool quiet;
unsigned int max_inflight;
char *username;
char *password;
char *will_topic;
char *will_payload;
long will_payloadlen;
int will_qos;
bool will_retain;
#ifdef WITH_TLS
char *cafile;
char *capath;
char *certfile;
char *keyfile;
char *ciphers;
bool insecure;
char *tls_version;
# ifdef WITH_TLS_PSK
char *psk;
char *psk_identity;
# endif
#endif
bool clean_session; /* sub */
char **topics; /* sub */
int topic_count; /* sub */
bool no_retain; /* sub */
char **filter_outs; /* sub */
int filter_out_count; /* sub */
bool verbose; /* sub */
bool eol; /* sub */
int msg_count; /* sub */
#ifdef WITH_SOCKS
char *socks5_host;
int socks5_port;
char *socks5_username;
char *socks5_password;
#endif
};

結構體 struct mosquito 主要用於儲存一個客戶端連線的所有資訊,例如使用者名稱、密碼、使用者ID、向該客戶端傳送的訊息等
struct mosquitto {
mosq_sock_t sock;
#ifndef WITH_BROKER
mosq_sock_t sockpairR, sockpairW;
#endif
#if defined(__GLIBC__) && defined(WITH_ADNS)
struct gaicb *adns; /* For getaddrinfo_a */
#endif
enum _mosquitto_protocol protocol;
char *address;
char *id;
char *username;
char *password;
uint16_t keepalive;
uint16_t last_mid;
enum mosquitto_client_state state;
time_t last_msg_in;
time_t next_msg_out;
time_t ping_t;
struct _mosquitto_packet in_packet;
struct _mosquitto_packet *current_out_packet;
struct _mosquitto_packet *out_packet;
struct mosquitto_message *will;
#ifdef WITH_TLS
SSL *ssl;
SSL_CTX *ssl_ctx;
char *tls_cafile;
char *tls_capath;
char *tls_certfile;
char *tls_keyfile;
int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);
char *tls_version;
char *tls_ciphers;
char *tls_psk;
char *tls_psk_identity;
int tls_cert_reqs;
bool tls_insecure;
#endif
bool want_write;
bool want_connect;
#if defined(WITH_THREADING) && !defined(WITH_BROKER)
pthread_mutex_t callback_mutex;
pthread_mutex_t log_callback_mutex;
pthread_mutex_t msgtime_mutex;
pthread_mutex_t out_packet_mutex;
pthread_mutex_t current_out_packet_mutex;
pthread_mutex_t state_mutex;
pthread_mutex_t in_message_mutex;
pthread_mutex_t out_message_mutex;
pthread_mutex_t mid_mutex;
pthread_t thread_id;
#endif
bool clean_session;
#ifdef WITH_BROKER
bool is_dropping;
bool is_bridge;
struct _mqtt3_bridge *bridge;
struct mosquitto_client_msg *msgs;
struct mosquitto_client_msg *last_msg;
int msg_count;
int msg_count12;
struct _mosquitto_acl_user *acl_list;
struct _mqtt3_listener *listener;
time_t disconnect_t;
struct _mosquitto_packet *out_packet_last;
struct _mosquitto_subhier **subs;
int sub_count;
int pollfd_index;
# ifdef WITH_WEBSOCKETS
# if defined(LWS_LIBRARY_VERSION_NUMBER)
struct lws *wsi;
# else
struct libwebsocket_context *ws_context;
struct libwebsocket *wsi;
# endif
# endif
bool ws_want_write;
#else
# ifdef WITH_SOCKS
char *socks5_host;
int socks5_port;
char *socks5_username;
char *socks5_password;
# endif
void *userdata;
bool in_callback;
unsigned int message_retry;
time_t last_retry_check;
struct mosquitto_message_all *in_messages;
struct mosquitto_message_all *in_messages_last;
struct mosquitto_message_all *out_messages;
struct mosquitto_message_all *out_messages_last;
void (*on_connect)(struct mosquitto *, void *userdata, int rc);
void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_publish)(struct mosquitto *, void *userdata, int mid);
void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
//void (*on_error)();
char *host;
int port;
int in_queue_len;
int out_queue_len;
char *bind_address;
unsigned int reconnect_delay;
unsigned int reconnect_delay_max;
bool reconnect_exponential_backoff;
char threaded;
struct _mosquitto_packet *out_packet_last;
int inflight_messages;
int max_inflight_messages;
# ifdef WITH_SRV
ares_channel achan;
# endif
#endif

#ifdef WITH_BROKER
UT_hash_handle hh_id;
UT_hash_handle hh_sock;
struct mosquitto *for_free_next;
#endif
};


client_config_load 客戶端配置負載
第二個引數,可選擇選擇是 PUB 還是 SUB

然後看到 init_config 函式

 


可以看到一些初始化配置
void init_config(struct mosq_config *cfg)
{
memset(cfg, 0, sizeof(*cfg));
cfg->port = 1883;
cfg->max_inflight = 20;
cfg->keepalive = 60;
cfg->clean_session = true;
cfg->eol = true;
cfg->protocol_version = MQTT_PROTOCOL_V31;
}

mosquitto_lib_init 初始化  (重點)

 

int mosquitto_lib_init(void)
{
#ifdef WIN32
srand(GetTickCount());
#else
struct timeval tv;

gettimeofday(&tv, NULL);
srand(tv.tv_sec*1000 + tv.tv_usec/1000);
#endif

_mosquitto_net_init();

return MOSQ_ERR_SUCCESS;
}
這裡有個時間戳函式 gettimeofday,參看:C語言再學習 -- 時間函式
所在檔案 mosquitto-1.4.14/lib/mosquitto.c 所以說需要連結動態庫 libmosquitto.so.1

 


client_id_generate 生成客戶端 ID 
其實就是我們講MQTT伺服器的時候,訂閱主題然後在伺服器上多出的那一行資訊。
裡面的 mosqsub|2431-ubuntu 就是客戶端 ID。這個函式就是幹這個。
1502159601: New client connected from 127.0.0.1 as mosqsub|2431-ubuntu (c1, k60)

mosquitto_new 新建一個 mosq。(重點)

 

看了一下這個函式裡面就是一些初始化的東西
然後可以看到它也是在 lib 目錄下定義的。所以說需要連結動態庫 libmosquitto.so.1。其他不用改。

 


client_ipts_set 各種設定。懶得看...

 


一些除錯資訊
以及訂閱回撥設定 mosquitto_subscribe_callback_set (重要)

連接回調設定,和資訊回撥設定(重點)

這兩個函式都是在lib目錄下定義的。裡面都是有互斥鎖的。

client_connect 客戶端連線

 

int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
{
char err[1024];
int rc;

#ifdef WITH_SRV
if(cfg->use_srv){
rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);
}else{
rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
}
#else
rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
#endif
if(rc>0){
if(!cfg->quiet){
if(rc == MOSQ_ERR_ERRNO){
#ifndef WIN32
strerror_r(errno, err, 1024);
#else
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);
#endif
fprintf(stderr, "Error: %s\n", err);
}else{
fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc));
}
}
mosquitto_lib_cleanup();
return rc;
}
return MOSQ_ERR_SUCCESS;
}
可以看到裡面又有幾個重要函式
mosquitto_connect_srv
mosquitto_connect_bind --> _mosquitto_connect_init

然後不斷迴環,裡面的引數自己看。

最後是 mosq 的銷燬和庫的關閉。(重點)

 


到此結束!!
三、PUB 客戶端原始碼
接下來來看 pub_client.c 有一些相同部分我就不再重複了。

client_config_load 客戶端配置負載
第二個引數,可選擇選擇是 PUB 還是 SUB

 


一些配置資訊的比較

 


mosquitto_lib_init 初始化  (重點)

 

 

client_id_generate 生成客戶端 ID 

mosquitto_new 新建一個 mosq。(重點)

 


除錯資訊這裡面就沒有了訂閱回撥設定 mosquitto_subscribe_callback_set

 


然後這裡看這裡,是有區別的。(重點)

 

connect、disconnect、publish.  這些回撥設定

 


client_ipts_set 各種設定。懶得看...

client_connect 客戶端連線

 


迴環開始、結束

 


最重要的來了,do while迴圈裡的釋出內容 (重點)

 

這裡的 qos 就是訊息釋出服務質量級別。

 


然後還有 retain 用於區分新老訂閱者
RETAIN標誌位只用於 PUBLISH 訊息,當伺服器收到某個主題的 PUBLISH 訊息時,如果RETAIN標誌位為1,則表示服務在將該訊息傳送給所有的已訂閱該主題的訂閱者後(傳送前伺服器將RETAIN標誌置為0),還需保持這條訊息,當有新增的訂閱者時,再將這條訊息發給新增的訂閱者;如果RETAIN標誌位為0,則不保持訊息,也不用發給新增的訂閱者。
目的:
1.將RETAIN標誌位置為1,可使新的訂閱者收到之前保持的或上一個確定有效的訊息。
2.區分新訂閱者(RETAIN標誌為1)和老訂閱者(RETAIN標誌為0)

原始碼中這兩個引數的設定都是 0

 


最後是 mosq 的銷燬和庫的關閉。(重點)

 


---------------------
作者:聚優致成
來源:CSDN
原文:https://blog.csdn.net/qq_29350001/article/details/77161537
版權宣告:本文為博主原創文章,轉載請附上博文連結!