1. 程式人生 > >回撥函式ros::spin()與ros::spinOnce()

回撥函式ros::spin()與ros::spinOnce()

ROS的主迴圈中需要不斷呼叫ros::spin()或ros::spinOnce(),兩者區別在於前者呼叫後不會再返回,而後者在呼叫後還可以繼續執行之後的程式。

在使用ros::spin()的情況下,一般來說在初始化時已經設定好所有訊息的回撥,並且不需要其他背景程式執行。這樣一來,每次訊息到達時會執行使用者的回撥函式進行操作,相當於程式是訊息事件驅動的;而在使用ros::spinOnce()的情況下,一般來說僅僅使用回撥不足以完成任務,還需要其他輔助程式的執行:比如定時任務、資料處理、使用者介面等。

關於訊息接收回調機制在ROS官網上略有說明 (callbacks and spinning)。總體來說其原理是這樣的:除了使用者的主程式以外,ROS的socket連線控制程序會在後臺接收訂閱的訊息,所有接收到的訊息並不是立即處理,而是等到spin()或者spinOnce()執行時才集中處理。所以為了保證訊息可以正常接收,需要尤其注意spinOnce()函式的使用 (對於spin()來說則不涉及太多的人為因素)。

I. 對於速度較快的訊息,需要注意合理控制訊息佇列及spinOnce()的時間。例如,如果訊息到達的頻率是100Hz,而spinOnce()的執行頻率是10Hz,那麼就要至少保證訊息佇列中預留的大小大於10。

II. 如果對於使用者自己的週期性任務,最好和spinOnce()並列呼叫。即使該任務是週期性的對於資料進行處理,例如對接收到的IMU資料進行Kalman濾波,也不建議直接放在回撥函式中:因為存在通訊接收的不確定性,不能保證該回調執行在時間上的穩定性。

 

// 示例程式碼
ros::Rate r(100);
 
while (ros::ok())
{
  libusb_handle_events_timeout(...); 
// Handle USB events ros::spinOnce(); // Handle ROS events r.sleep(); }

 

III. 最後說明一下將ROS整合到其他程式架構時的情況。有些圖形處理程式會將main()包裹起來,此時就需要找到一個合理的位置呼叫ros::spinOnce()。比如對於OpenGL來說,其中有一個方法就是採用設定定時器定時呼叫的方法:

 

// 示例程式碼
void timerCb(int value) {
  ros::spinOnce();
}
glutTimerFunc(
10, timerCb, 0); glutMainLoop(); // Never returns

 

訊息到來並不會立即執行訊息處理回撥函式,而是在呼叫ros::spin()之後,才進行訊息處理的輪轉,訊息回撥函式統一處理訂閱話題的訊息。

 

roscpp不會在你的應用中明確一個執行緒模型:也就是說即使roscpp會在幕後使用多執行緒管理網路連結,排程等,但它不會將自己的執行緒暴露在你的應用中。

 

roscpp允許你的回撥函式被任意多執行緒呼叫,如果你願意。

 

最後的結果可能是你的回撥函式將沒有機會被呼叫,最常用的方法是使用ros::spin()呼叫。

 

注意:回撥函式的排隊和輪轉,不會對內部的網路通訊造成影響,它們僅僅會影響到使用者的回撥函式何時發生。它們會影響到訂閱者佇列。因為處理你回撥函式的速度,你訊息到來的速度,將會決定以前的訊息會不會被丟棄。

 

1.單執行緒下的輪轉

 

最簡單的單執行緒spin的例子就是ros::spin()自己。

 

ros::init(argc, argv, "my_node"); //初始化節點
ros::NodeHandle nh;           //建立節點控制代碼
ros::Subscriber sub = nh.subscribe(...);  //建立訊息訂閱者
...
ros::spin();                 //呼叫spin(),統一處理訊息

 

在這裡,所有的使用者回撥函式將在spin()呼叫之後被呼叫.

ros::spin()不會返回,直到節點被關閉,或者呼叫ros::shutdown(),或者按下ctrl+C

另一個常用的模式是週期性地呼叫ros::spinOnce():

 

ros::Rate r(10); // 10 hz
while (should_continue)
{
  //... do some work, publish some messages, etc. ...
  ros::spinOnce();     //輪轉一次,返回
  r.sleep();        //休眠
}

 

ros::spinOnce()將會在被呼叫的那一時間點呼叫所有等待的回撥函式.

注意:ros::spin()和ros::spinOnce()函式對單執行緒應用很有意義,目前不會應用於多執行緒.

2.多執行緒輪轉

上面是單執行緒下的訊息回撥函式輪轉,那多執行緒下是什麼樣子?

roscpp庫提供了一些內嵌的支援來從多執行緒中呼叫回撥函式.

1) ros::MultiThreadedSpiner

它是一個阻塞型輪轉器,類似於ros::spin().

可以使用它的構造器來設定執行緒的個數,如果不設定或設成0,它將為每個cpu核心使用一個執行緒。

 

ros::MultiThreadedSpinner spinner(4); // Use 4 threads
spinner.spin(); // spin() will not return until the node has been shutdown

2)ros::AsyncSpinner

API : http://docs.ros.org/api/roscpp/html/classros_1_1AsyncSpinner.html

更實用的多執行緒輪轉是非同步輪轉器(AsyncSpiner),相對於阻塞的spin()呼叫,它有自己的start()和stop()呼叫

並且在銷燬後將自動停止。

對上述MultiThreadedSpiner等效的AsyncSpiner使用如下:

ros::AsyncSpinner spinner(4); // Use 4 threads
spinner.start();
ros::waitForShutdown();

 

3.CallbackQueue::callAvailable() and callOne()

CallbackQueue API 回撥函式佇列類: 

http://docs.ros.org/api/roscpp/html/classros_1_1CallbackQueue.html

可以建立一個回撥函式佇列類:

 

#include 
...
ros::CallbackQueue my_queue;

回撥函式佇列類有兩種觸發其內部回撥函式的方法:callAvailable()方法和callOne()方法.

前者將獲取當前可以符合條件的回撥函式,並且全部觸發它們;後者將簡單地觸發佇列中最早的那個回撥函式.

這兩個方法都接受一個可選的timeout超時時間,它們將在此時間之內等待一個回撥函式變得符合條件。

如果這個值是0,那麼,如果佇列中沒有回撥函式,該方法立即返回.

4.高階主題:使用不同的回撥函式佇列

預設的是所有的訊息回撥函式都會被壓入全域性訊息回撥佇列.

roscpp允許使用自定義的訊息回撥函式佇列並分別服務。

這可以以兩種粒度實現:

1)每個subsceribe(),advertise(),advertiseService(),等

這部分可以使用高階版的方法呼叫原型,使用一個選項結構體指標引數.

2)每個節點控制代碼

這是常見的方法,使用節點控制代碼的setCallbackQueue()方法:

 

ros::NodeHandle nh;
nh.setCallbackQueue(&my_callback_queue);

這使所有的訊息訂閱者,服務,定時器等的回撥函式都進入my_callback_queue,而非roscpp的預設佇列.

這意味著,ros::spin()和ros::spinOnce()將不會觸發這些回撥函式。

使用者自己必須額外呼叫這些回撥函式,可以使用的是回撥函式佇列類物件的callAvailable()方法和callOne()方法

應用:

將不同的回撥函式分別壓進不同的回撥函式佇列有下面幾個優勢:

1)長時服務:對一個服務的回撥函式安排一個單獨的佇列,然後單獨地使用一個執行緒來呼叫它,可以保證不會阻塞其它回撥函式

2)計算消耗回撥函式:與長時服務相似,為一個費計算時間的回撥函式安排一個單獨的回撥佇列處理,能夠減輕應用的負擔.