1. 程式人生 > >【原創】xenomai核心解析--實時IPC概述

【原創】xenomai核心解析--實時IPC概述

版權宣告:本文為本文為博主原創文章,轉載請註明出處。如有問題,歡迎指正。部落格地址:https://www.cnblogs.com/wsg1100/ [TOC] ## 1.概述 Linux系統中常見的程序間通訊方式有管道、FIFO、共享記憶體、訊號、套接字等方式。但在xenomai核心加入後,一個實時任務與非實時(普通Linux任務,如人機互動應用)之間該如何通訊? 雖然xenomai任務本身也是一個linux任務,能夠無障礙地使用linux提供的程序間通訊方式,但是當實時任務呼叫這些服務介面的時候會觸發任務遷移,遷移到linux核,由linux接管排程並提供服務,Linux核心本身就只是軟實時核心,這樣必然會嚴重影響了xenomai實時任務實時性。 實時任務除了可以使用Linux的程序間通訊外(當然不建議使用),xenomai也提供了針對實時任務的程序間通訊方式(**Real-time IPC**),其中包含一種跨域通訊方式---XDDP(cross-domain datagram protocol跨域資料報協議)。 ## 2.Real-time IPC `RTIPC`以RTDM(實時裝置驅動模型)下的**Protocol Devices**來實現,根據程序間通訊情況不同,rtipc提供三種程序間通訊: - **XDDP**,跨域資料報協議,實時與普通Linux任務之間的通訊**(RT<->non-RT)**,實時Xenomai執行緒和常規Linux執行緒通訊時使用,實時任務端不會離開head域,這樣就不會影響到實時任務的實時性。 - **IDDP**,實時域內資料報協議,實時任務之間的通訊**(RT<->RT)**,IDDP協議使實時執行緒可以通過套接字端點在Xenomai域內交換資料報。 - **BUFP**,緩衝區協議,實時任務間批量資料通訊**(RT<->RT)**,所有寫入的訊息均按照嚴格的FIFO順序緩衝到單個儲存區中,直到被使用者讀取為止。 ![rtipc-arch](https://wsg-blogs-pic.oss-cn-beijing.aliyuncs.com/xenomai/rtipc-arch.png) 當然,並不是說有了RTIPC,xenomai核心就沒有其它通訊方式了,其實大部分posix標準通訊方式xenoma核心均有實現,僅用於實時任務間,如:訊號量(sem)、訊息佇列(mq)、xddp/bufp/iddp、事件(event)、條件變數(cond)....,至於它們的核心實現,與RTIPC不同,可以關注本部落格後續文章。 ## 2.核心配置 由於RTIPC以實時核心驅動模組的形式來實現,所以要使用RTIPC,就得在核心構建編譯的時候配置,如下: ``` Xenomai/cobalt ---> Drivers ---> Real-time IPC drivers ---> <*> RTIPC protocol family [*] XDDP cross-domain datagram protocol [*] IDDP intra-domain datagram protocol (32) Number of IDDP communication ports [*] Buffer protocol (32) Number of BUFP communication ports ``` ## 3.應用程式設計介面 實時應用通過套接字來使用RTIPC,雖然介面與普通套接字介面一樣,但是引數需要根據xenomai提供的引數來使用,下面為[官方文件](https://xenomai.org/documentation/xenomai-3/html/xeno3prm/group__rtdm__ipc.html)簡單直譯。 #### socket() ---- ​ 建立套接字。 ```C #include
int socket(int domain, int type, int protocol); ``` 引數: **domain**:**`AF_RTIPC`**地址族; **type**:套接字型別,**`SOCK_DGRAM`**(其餘無效) **protocol**: - **`IPCPROTO_XDDP`、`IPCPROTO_IDDP `、`IPCPROTO_BUFP` 、`IPCPROTO_IPC `預設協議(IPCPROTO_IDDP)**。 返回值: ​ 返回一個套接字,出錯:除了用於socket(2)的標準錯誤程式碼外,還可能返回以下特定錯誤程式碼: - ENOPROTOOPT(協議是已知的,但未在RTIPC驅動程式中進行編譯)。 #### close() ---- ​ 關閉一個套接字。 ```C int close (int sockfd) ``` 當套接字關閉並返回錯誤時,將解除阻塞在sendmsg或recvmsg的阻塞。 #### setsockopt() --- 設定套接字選項。 ```C #include
int setsockopt(int sockfd, int level, int optname, const void * optval, socklen_t optlen ) ``` --- 針對**XDDP**套接字選項說明及引數配置如下: - **XDDP_LABEL**:設定XDDP埠標籤。設定XDDP埠的ASCII字串名稱,設定後在非實時端,可通過裝置名稱(`/proc/xenomai/registry/rtipc/xddp/%s`)來開啟通訊端點,而不是用裝置路徑名(`/dev/rtpN`) - `level ` :` SOL_XDDP` - `optname`:`XDDP_LABEL` - `optval`:`rtipc_port_label`指標 - `optlen`:`sizeof(struct rtipc_port_label)` ```C struct rtipc_port_label { /** 埠標籤字串,以null結尾。 */ char label[XNOBJECT_NAME_LEN]; }; ``` - **XDDP_POLLSZ**:XDDP本地記憶體池大小配置。預設情況下,傳輸資料所需的記憶體是從xenomai的系統記憶體池中提取的,設定本地池大小會覆蓋預設大小。如果配置了非零大小,則在bind時才進行分配實際記憶體。 該池將為未決資料提供儲存。==繫結套接字後,不允許配置本地池大小。 但是,繫結之前允許進行多個配置呼叫。 將使用最後設定的值。== - - `level ` :` SOL_XDDP` - `optname`:`XDDP_POLLSZ` - `optval`:指向型別為size_t的變數的指標,該變量表示繫結時保留的本地池大小,單位:位元組。 - `optlen`:`sizeof(size_tl)` - **XDDP_BUFSZ** :XDDP流緩衝區大小配置。除了傳送資料報外,實時執行緒還可以通過埠以面向位元組的模式傳輸資料。為套接字設定非零緩衝區大小時,啟用此功能。這樣,當任何傳送函式使用MSG_MORE標誌時,實時資料會累積到流緩衝區中,發生以下情況時緩衝區資料會被髮送出去: - **Linux域中接收器被喚醒接收資料,** - **不同的源埠嘗試將資料傳送到相同的目標埠,** - **傳送標誌中沒有MSG_MORE,** - **緩衝區已滿。(以先到者為準)。** 將`* optval`設定為0將禁用流緩衝區,在這種情況下,所有傳送都將在單獨的資料報中傳輸,而與`MSG_MORE`無關。 注意:每個套接字只有一個流緩衝區。當該緩衝區滿時,實時資料將停止積累,並且僅在資料報模式恢復傳送操作。從Linux域端點消耗了流緩衝區中的部分或全部資料之後,可能會再次發生累積。==在套接字生存期中,可以多次調整流緩衝區的大小;在重新整理前一個緩衝區後恢復累積時,最新的配置更改將生效。== - - `level ` :` SOL_XDDP` - `optname`:`XDDP_BUFSZ` - `optval`:指向型別為size_t的變數的指標,該變量表示繫結時保留的本地池大小,單位:位元組。 - `optlen`:`sizeof(size_t)` - **XDDP_MONITOR:**XDDP監視回撥。對套接字安插使用者定義的回撥函式,以便收集通道上發生的特定事件。此機制對於在執行其他任務時非同步監視通道特別有用。**僅適用於核心空間任務**。 - `level ` :` SOL_XDDP` - `optname`:`XDDP_MONITOR` - `optval`:指向型別為int (*)(int fd, int event, long arg)的函式的指標,其中包含使用者定義的回撥函式的地址。在optval中傳遞NULL回撥指標將禁用該功能。 - `optlen`:`sizeof(size_t)` ---- 針對**IDDP**套接字選項說明及引數配置如下: - **IDDP_LABEL**:設定IDDP埠標籤。設定IDDP埠的ASCII字串名稱,以便使用比數字埠更具描述性的方式來與套接字連線。設定label後,標籤將在bind()時註冊,在bind()前可多次設定,bind()前的最後一次設定生效。 - `level ` :` SOL_IDDP` - `optname`:`IDDP_LABEL` - `optval`:`rtipc_port_label`指標 - `optlen`:`sizeof(struct rtipc_port_label)` ```C struct rtipc_port_label { /** 埠標籤字串,以null結尾。 */ char label[XNOBJECT_NAME_LEN]; }; ``` - **IDDP_POOLSZ **:配置IDDP本地記憶體池大小。預設情況下,傳輸資料所需的記憶體是從xenomai的系統記憶體池中提取的,設定本地池大小會覆蓋預設大小。如果配置了非零大小,則在bind時才進行分配實際記憶體。傳輸資料佔用的記憶體將從該池內分配。==繫結套接字後,不允許配置本地池大小。 但是,繫結之前允許進行多個配置呼叫。 將使用最後設定的值==。 - `level ` :` SOL_IDDP` - `optname`:`IDDP_POLLSZ` - `optval`:指向型別為size_t的變數的指標,該變量表示繫結時保留的本地池大小,單位:位元組。` - `optlen`:`sizeof(size_tl)` --- 針對**BUFP**套接字選項說明及引數配置如下: - **BUFP_BUFSZ**:配置BUFP緩衝區大小,寫入BUFP的資料都被緩衝在每個套接字的儲存區域中,必須配置該大小。==繫結套接字後,不允許配置本地池大小。 但是,繫結之前允許進行多個配置呼叫。 將使用最後設定的值==。 - `level ` :` SOL_BUFP` - `optname`:`BUFP_BUFSZ` - `optval`:指向型別為size_t的變數的指標,該變量表示繫結時保留的本地池大小,單位:位元組。` - `optlen`:`sizeof(size_tl)` - **BUFP_LABEL**:設定BUFP埠標籤。以便以比使用普通數字埠值更具描述性的方式來連線套接字。 繫結套接字後,不允許分配標籤。 但是,在繫結之前允許多次分配呼叫。 最後一個標籤集將被使用。 #### bind() ---- 繫結一個RTIPC socket到一個埠。 ```C int bind(int sockfd, const struct sockaddr_ipc *addr, socklen_t addrlen) ``` 將套接字繫結到目標埠。 - `sockfd`:套接字檔案描述符。 - `addr`:繫結套接字的地址(請參見struct sockaddr_ipc)。 該地址的含義取決於套接字所使用的RTIPC協議: - **IPCPROTO_XDDP** `sipc_family`:必須是AF_RTIPC,`sipc_port`為-1或者0到CONFIG_XENO_OPT_PIPE_NRDEV-1之間的有效空閒埠號。如果sipc_port為-1,bind將自動為其分配一個空閒埠。 成功後,將為該通訊通道保留偽裝置`/dev /rtpN`,其中N是分配的埠號。 非實時端應開啟此裝置以通過繫結的套接字交換資料。 如果使用了label,非實時通過偽裝置`/proc/xenomai/registry/rtipc/xddp/label`來與實時通訊。 - **IPCPROTO_IDDP** `sipc_family`:必須是AF_RTIPC,`sipc_port`為-1或者0到CONFIG_XENO_OPT_IDDP_NRPORT-1之間的有效空閒埠號。如果sipc_port為-1,bind將自動為其分配一個空閒埠。 - **IPCPROTO_BUFP** `sipc_family`:必須是AF_RTIPC,`sipc_port`為-1或者0到CONFIG_XENO_OPT_BUFP_NRPORT-1之間的有效空閒埠號。如果sipc_port為-1,bind將自動為其分配一個空閒埠。 - `addrlen`:addr指向的結構體大小。 #### sendto()與recvfrom() ---- 資料傳送與接收。 ```C ssize_t send(int sockfd, const void *buf, size_t len, int flags); ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen); ssize_t recv(int sockfd, void *buf, size_t len, int flags); ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen); ``` 引數: `sockfd`:socket()建立的套接字. `buf`:傳送/接收的資料; `len`:傳送/接收的資料長度; `flags`:**MSG_MORE**傳送標誌位,將帶有該標誌的資料包累積到緩衝區,而不是立即發出資料報,僅用於XDDP協議。 #### recvmsg()與sendmsg() --- 資料傳送與接收。recvmsg()能做所有read()、sendto()能做到的事,同樣sendmsg()能做所有read()、sendto()能做到的事,具體使用方法查閱Linux相關資料。 recvmsg()從RTIPC套接字接收訊息。 ```C #include
struct msghdr { void *msg_name; /* optional address */ socklen_t msg_namelen; /* size of address */ struct iovec *msg_iov; /* scatter/gather array */ size_t msg_iovlen; /* # elements in msg_iov */ void *msg_control; /* ancillary data, see below */ size_t msg_controllen; /* ancillary data buffer len */ int msg_flags; /* flags (unused) */ }; ssize_t recvmsg (int sockfd, struct msghdr *msg, int flags) ``` 引數: `sockfd`: socket()建立的套接字。 `msg`:訊息頭將被複制到該地址,具體查閱資料。 `flasgs`:MSG_DONTWAIT 非阻塞操作,如果沒有訊息可接收時,不會阻塞,立即返回EWOULDBLOCK,只有**實時應用**能使用該標誌。 sendmsg()在RTIPC套接字上傳送訊息 ```C #include ssize_t sendmsg (int sockfd, const struct msghdr *msg, int flags) ``` 引數: `sockfd`: socket()建立的套接字。 `msg`:傳達資料報的訊息頭的地址,,具體查閱資料。 `flasgs`:**MSG_OOB**給傳送帶外訊息;(帶外資料:允許傳送端將傳送的資料標記為高優先順序)。 **MSG_DONTWAIT** 非阻塞操作,當無法立即傳送訊息時(如記憶體不足),不會阻塞,而是立即返回EWOULDBLOCK。 **MSG_MORE**傳送前先累積資料到緩衝區,而不是立即發出資料報,僅用於IPCPROTO_XDDP協議。只有**實時應用**能使用該標誌。 ## 4.實時與非實時間通訊XDDP示例 **`IPCPROTO_XDDP`**:跨域資料報協議(RT<->NRT),實時Xenomai執行緒和常規Linux執行緒通訊時使用,linux端通過`read()、write()`讀寫`/dev/rtp `來通訊,Xenomai端通過套接字`recvfrom()或read()`來接收資料,`sendto()或write()`來發送資料。 ![xddp-ipc](https://wsg-blogs-pic.oss-cn-beijing.aliyuncs.com/xenomai/xddp-ipc.png) ### XDDP應用示例: 一個LLinux任務與一個實時任務使用XDDP進行通訊,實時任務向Linux任務傳送訊息,Linux任務收到後原樣傳送出去,實時任務將收到的訊息顯示出來(xenomai示例:`xenomai3.0.8\demo\posix\cobalt\xddp-echo.c`)。 對於linux可通過開啟固定rtipc埠的裝置節點來與實時任務固定埠通訊,這個埠是全域性的,被使用了另一個實時任務就無法再使用。另一種方式是設定XDDP埠標籤。實時程式設定XDDP埠的ASCII字串名稱,設定後在非實時端,可通過裝置名稱(/proc/xenomai/registry/rtipc/xddp/%s)來開啟通訊端點,而不是用裝置路徑名(/dev/rtpN),其中的埠xenomai會自動分配。(xenomai示例:`xenomai3.0.8\demo\posix\cobalt\xddp-label.c`) 同一系統的兩種方式儘量不要混合使用,不然會發生如下情況,程式1使用XDDP埠標籤配置了XDDP socket,此bind時系統為該socket分配的是埠1,接著另一個程式2開始建立另一個XDDP socket,由於指定了用端0來通訊,但該埠已經被程式1佔用,就會繫結埠失敗,導致程式無法正常執行。下面例子使用固定埠通訊: 使用帶緩衝區方式與非實時應用通訊,使用埠0,實時端: ```C #define XDDP_PORT 0 /*通訊埠0*/ ..... /*1.建立一個XDDP(rt<->nrt)通訊socket,AF_RTIPC、SOCK_DGRAM為固定引數*/ s = socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_XDDP); if (s < 0) { perror("socket"); exit(EXIT_FAILURE); } /*2.配置socket s為流緩衝通訊,緩衝區大小為1KB,設定為零將禁用流緩衝,每次資料傳送將單獨傳輸*/ streamsz = 1024; /* bytes */ ret = setsockopt(s, SOL_XDDP, XDDP_BUFSZ, &streamsz, sizeof(streamsz)); if (ret) fail("setsockopt"); /*3.將套接字s繫結到埠0*/ memset(&saddr, 0, sizeof(saddr)); saddr.sipc_family = AF_RTIPC; //固定引數 saddr.sipc_port = XDDP_PORT; //埠0 對應非實時讀寫的裝置節點/dev/rtp0 ret = bind(s, (struct sockaddr *)&saddr, sizeof(saddr)); for (;;) { /*4.傳送*/ for (b = 0; b < len; b++) { /*MSG_MORE表示:一位元組一位元組的將資料存到緩衝區*/ ret = sendto(s, msg[n] + b, 1, MSG_MORE, NULL, 0); if (ret != 1) fail("sendto"); /*如果不使用MSG_MORE,每個字母將作為一個數據包。Linux端段每次讀取只能讀取到一個字母,且符合FIFO*/ ret = sendto(s, msg[n] + b, 1, 0, NULL, 0); if (ret != 1) fail("sendto"); } /*4.接收資料*/ ret = recvfrom(s, buf, sizeof(buf), 0, NULL, 0); if (ret <= 0) fail("recvfrom"); } /* 5.關閉套接字*/ close(s); ``` 非實時端: ```C #define _GNU_SOURCE /*使用asprintf()函式需要該巨集*/ #include #include #define XDDP_PORT 0 /*通訊埠0*/ char buf[128],*devname; if (asprintf(&devname, "/dev/rtp%d", XDDP_PORT) < 0)/* /dev/rtp0 */ fail("asprintf"); /*1.開啟裝置 /dev/rtp0*/ fd = open(devname, O_RDWR); free(devname); for (;;) { /*2.讀/dev/rtp0*/ ret = read(fd, buf, sizeof(buf)); if (ret <= 0) fail("read"); /*3.寫/dev/rtp0來發送資料*/ ret = write(fd, buf, ret); if (ret <= 0) fail("write"); } close(f