ROS Client APIs advanced#

Introduction#

ROS clients APIs are used to interact with the ROS core system, such as topics, services, parameters, … Each programming language supported by ROS has its own API :

  • ROSCPP for C++

  • ROSPY for Python

  • ROSLUA for Lua

Warning

While all these APIs have a common goal, which is enabling a quick interface between ROS core and high level applications, the way to do it can vary a lot ! ROSCPP and ROSPY have a total different way to handle callbacks for example.

Do not assume that these APIs will behave the same way from one language to another…

This document focuses only on ROSPY and ROSCPP advanced implementations for the moment, since we did not get to play with other APIs. For basic operations, such as retrieving parameters from the launcher file, see ROSCPP tutorials.

Callback methods#

ROS works with callback methods to have interactions between the nodes.

Initial situation#

ROS topics are used for interactions between nodes. A ros::Subscriber or rospy.Subscriber instance will create the link to the master node and get messages published on the topic specified by the subscriber. The code is given here for both client APIs :

#include "ros/ros.h"
#include "std_msgs/String.h"

void callback(const std_msgs::String::ConstPtr& msg)
{
   ROS_INFO("Received message : [%s]", msg->data.c_str());
}

int main(int argc, char **argv)
{
   ros::init(argc, argv, "listener");

   // NodeHandle are the main communication points with ROS core system
   ros::NodeHandle n;

   // The subscriber will listen on the topic 'chatter', have a 5 big message queue, and use the 'callback' method to process incoming messages
   ros::Subscriber sub = n.subscribe("chatter", 5, callback);

   // loop to pump the callbacks
   ros::spin();
   return 0;
}

The same code but in python is :

#!/usr/bin/env python
import rospy
from std_msgs.msg import String

def callback(data):
   rospy.loginfo(rospy.get_caller_id() + "Message received :  %s", data.data)

if __name__ == '__main__':
   rospy.init_node('listener', anonymous=True)

   # The subscriber will listen on the topic 'chatter', use the 'callback' method to process incoming messages
   rospy.Subscriber("chatter", String, callback)

   # spin() simply keeps python from exiting until this node is stopped
   rospy.spin()

With such a simple configuration, both client APIs behave as expected from any callbacks system. However, there is already a major difference to consider in the way that callbacks are handled :

  • Callbacks in ROSCPP are executed from the main thread, which blocks the main execution until the callback is finished

  • Callbacks in ROSPY are executed in a new thread, which means that the main thread and the new thread will be concurrently executed on the same physical core.

While this does not produce any difference in the behaviour when the conditions are optimal (which means that callback execution time must be smaller than the period of the message publication), it become tricky when this execution time gets bigger, as we will see here below.

2nd situation : a longer callback execution time than the period of message publication#

In this situation, the time required to finish the execution of the callback method is about 150 milliseconds. Let us say that the publisher has a rate of 10 Herz, which leads to a publication each 100 milliseconds. Thus, the callback execution is too slow to keep up with the publication rate.

#include "ros/ros.h"
#include "std_msgs/String.h"

void callback(const std_msgs::String::ConstPtr& msg)
{
   ROS_INFO("Received message : [%s]", msg->data.c_str());

   // sleep for 150 ms
   ros::Duration(0.15).sleep();
}

int main(int argc, char **argv)
{
   ros::init(argc, argv, "listener");

   // NodeHandle are the main communication points with ROS core system
   ros::NodeHandle n;

   // The subscriber will listen on the topic 'chatter', have a 5 big message queue, and use the 'callback' method to process incoming messages
   ros::Subscriber sub = n.subscribe("chatter", 5, callback);

   // loop to pump the callbacks
   ros::spin();
   return 0;
}

The same code but in python is :

#!/usr/bin/env python
import rospy
from std_msgs.msg import String

def callback(data):
   rospy.loginfo(rospy.get_caller_id() + "Message received :  %s", data.data)

   # sleep for 150 ms
   rospy.sleep(0.150)

if __name__ == '__main__':
   rospy.init_node('listener', anonymous=True)

   # The subscriber will listen on the topic 'chatter', use the 'callback' method to process incoming messages
   rospy.Subscriber("chatter", String, callback)

   # spin() simply keeps python from exiting until this node is stopped
   rospy.spin()

The behaviour for the C++ node will be like this :

  1. start the node

  2. receive a callback

  3. stop the main loop execution

  4. start the execution of the callback

    1. while still being in the first callback, receive a 2nd callback

    2. put the 2nd callback in the CallbackQueue

    3. finish the 1st callback execution

  5. get back to the main loop

  6. fetch the 2nd callback from the CallbackQueue

  7. start the execution of the 2nd callback

    1. while still being in the 2nd callback, receive a 3rd callback

    2. put the 3rd callback in the CallbackQueue

    3. finish the 2nd callback execution

  8. get back to the main loop and almost instantly receive a 4th callback

  9. put the 4th callback in the CallbackQueue

  10. fetch the 3rd callback from the CallbackQueue

  11. start the execution of the 3rd callback

    1. while still being in the 3rd callback, receive a 5th callback

    2. put the 5th callback in the CallbackQueue

    3. finish the 3rd callback execution

This will continue until the CallbackQueue is full, and then old callbacks are lost.

For the Python implementation, it will be different :

  1. start the node

  2. receive a callback

  3. start a new thread for the execution of the 1st callback

  4. continue both main thread and callback thread execution in concurrency

  5. receive a new callback while the 1st callback is still being executed in its own thread

  6. start a new thread for the execution of the 2nd callback

  7. continue all 3 threads execution in concurrency

The callback will all be executed and none will be lost. But two callbacks will be executed at the same time. Which takes us to the next point…

3rd situation : concurrent access to resources#

While the implementation of ROSCPP prevents this situation for a C++ node, we have seen in the previous section that in ROSPY two callbacks might be executed in a concurrent way. If the callback uses or even worse modifies shared resources, such as a variable, then it is a critical section.

#!/usr/bin/env python
import rospy
from std_msgs.msg import String

class MsgHandler():

def __init__(self):
   self.last_msg = ''

def callback(self, data):
   rospy.loginfo("This message says :  %s", data.data)
   rospy.loginfo("Last message said :  %s", self.last_msg)

   # sleep for 150 ms
   rospy.sleep(0.150)

   # modify the last message
   self.last_msg = data.data

if __name__ == '__main__':

   MsgHandler msg_handler
   rospy.init_node('listener', anonymous=True)

   # The subscriber will listen on the topic 'chatter', use the 'callback' method from the class MsgHandler to process incoming messages
   rospy.Subscriber("chatter", String, msg_handler.callback)

   # spin() simply keeps python from exiting until this node is stopped
   rospy.spin()

Also, let us say that the publisher publishes messages as follow :

  • "Message n.0"

  • "Message n.1"

  • "Message n.2"

In the case of receiving the 4th callback for example, we would expect to have this output in the console :

This message says : Message n.3
Last message said : Message n.2

However, due to the fact that callbacks are executed in concurrency, and due to the fact that the execution time of the callback is longer than the period of publications, we get :

This message says : Message n.3
Last message said : Message n.1

Of course this is a dummy example that no one would ever write, but it happened that we had to process a big DataFrame in a callback, and the state of this DataFrame was not stable, because the first callback wasn’t finish to process it when the 2nd callback also started to process it. It lead to memory corruption.

How to prevent this to happens ?

  1. try to keep callbacks as short as possible (same as interruptions when programming a uC)

  2. if it is needed to access shared resources, use mutual exclusions tools, such as mutex (Lock() in python, see Thread and mutex in Python)

Another situation : two callbacks node implementation#

All the situation described here above only focused on the 1 node = 1 callback problematic. In complex systems, it is not rare to see nodes with multiple callback inputs. As always, when the callback execution times are short compared to their publication’s period, it works fine. When one of the two callback is more critical than another but more rarely published, we can be confronted to a situation where the CallbackQueue gets filled with low priority message while the high priority messages have to wait until all the low priority messages have been processed.

Two callbacks implementation

Source : https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f#

This problem only happens with ROSCPP due to its execution of the callbacks in the spinner thread. ROSPY is not concerned since each time a callback is received, a new thread is created and allocated to the execution of this callback.

To prevent this, it is possible to define multiple CallbackQueues, one per subscribed callback :

Two CallbackQueues implementation

Source : https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f#

The code for such a configuration is not that trivial :

void crane_position_cb(const std_msgs::String &position_msg)
{
   // ...
}

void cloud_cb(const sensor_msgs::PointCloud2ConstPtr &cloud_msg)
{
   // ...
}

int main(int argc, char **argv)
{
   ros::init(argc, argv, "packets_detector");

   // main NodeHandle, uses the default single-threaded spinner of ROS and default CallbackQueue
   ros::NodeHandle nh;
   ros::Subscriber subscriber_input_pc = nh.subscribe(topic="cloud_0", queue_size=5, callback=cloud_cb);

   // 2nd single-threaded spinner with a dedicated CallbackQueue
   ros::NodeHandle nh2;
   ros::CallbackQueue call_back_queue2;
   nh2.setCallbackQueue(&call_back_queue2);

   // subscribe must be called after the CallbackQueue was set up
   ros::Subscriber subscriber_crane_pos = nh2.subscribe(
      topic="crane_103_position", queue_size=10, callback=crane_position_cb
   );

   // 2nd single-threaded spinner start
   std::thread spinner2_thread(
      [&call_back_queue2]() {
         ros::SingleThreadedSpinner spinner2;
         spinner2.spin(&call_back_queue2);
      }
   );

   // main single-threaded spinner start
   ros::spin();

   // wait until 2nd Spinner thread is dead before leaving main()
   spinner2_thread.join();
}

For more information about this problematic, have a look at this very detailed article : ROS spinning, threading, queuing.

ROS