Launcher#
Launcher is a class created to provide a more pythonic way to launch and configure ROS2 nodes.
Launcher starts a pre-configured component or a set of components as ROS2 nodes using multi-threaded or multi-process execution. Launcher spawns an internal Monitor node in a separate thread in both execution types.
Launcher can also manage a set of Events-Actions through its internal Monitor node (See Monitor class).
Available options:#
When initializing the Launcher, you can:
Provide a ROS2 namespace to all the components
Provide a YAML config file.
Enable/Disable events monitoring
You can add components to the launcher by using ‘add_pkg’ method to pass any number of components from the same ROS2 package created using ROS Sugar primitives. When adding components from a new package you can configure:
Enable/Disable multi-processing, if disabled the components are launched in threads
Select to activate one, many or all components on start (lifecycle nodes activation)
Set of Events/Actions related to the components
Launcher forwards all the provided Events to its internal Monitor, when the Monitor detects an Event trigger it emits an InternalEvent back to the Launcher. Execution of the Action is done directly by the Launcher or a request is forwarded to the Monitor depending on the selected run method (multi-processes or multi-threaded).
Note
While Launcher supports executing standard ROS2 launch actions. Launcher does not support standard ROS2 launch events for the current version.
Usage Example#
1from ros_sugar.core import BaseComponent
2from ros_sugar.actions import LogInfo
3from ros_sugar.events import OnLess
4from ros_sugar import Launcher
5
6# Create your components
7my_component = BaseComponent(component_name='test_component')
8
9
10# Create your events
11low_battery = OnLess(
12 "low_battery",
13 Topic(name="/battery_level", msg_type="Int"),
14 15,
15 ("data")
16)
17
18# Events/Actions
19my_events_actions: Dict[event.Event, Action] = {
20 low_battery: LogInfo(msg="Battery is Low!)
21}
22
23# We can add a config YAML file
24path_to_yaml = 'my_config.yaml'
25
26launcher = Launcher(
27 config_file=path_to_yaml,
28 activate_all_components_on_start=True,
29 multiprocessing=True,
30)
31
32# If any component fails -> restart it with unlimited retries
33launcher.on_component_fail(action_name="restart")
34
35# Bring up the system
36launcher.bringup(ros_log_level="info", introspect=False)