Launcher#
Launcher is a class created to provide a more pythonic way to launch and configure ROS nodes.
Launcher starts a pre-configured component or a set of components as ROS2 nodes using multi-threaded or multi-process execution. Launcher spawns an internal Monitor node in a separate thread in both execution types.
Launcher can also manage a set of Events-Actions through its internal Monitor node (See Monitor class).
Available options:#
Enable/Disable events monitoring
Enable/Disable multi-processing, if disabled the components are launched in threads
Select to activate one, many or all components on start (lifecycle nodes activation)
Launcher forwards all the provided Events to its internal Monitor, when the Monitor detects an Event trigger it emits an InternalEvent back to the Launcher. Execution of the Action is done directly by the Launcher or a request is forwarded to the Monitor depending on the selected run method (multi-processes or multi-threaded).
Note
While Launcher supports executing standard ROS2 launch actions. Launcher does not support standard ROS2 launch events for the current version.
Usage Example#
1from kompass.components import Planner, Controller
2from kompass.actions import Actions
3from kompass.event import OnLess
4from kompass_interfaces.action import PlanPath
5from geometry_msgs.msg import Pose
6from kompass_interfaces.msg import PathTrackingError
7
8# Create your components
9planner = Planner(component_name='test_planner')
10controller = Controller(component_name='test_controller')
11
12
13# Configure your components here
14# ....
15planner.run_type = "ActionServer"
16
17# Create your events
18low_battery = OnLess(
19 "low_battery",
20 Topic(name="/battery_level", msg_type="Int"),
21 15,
22 ("data")
23)
24
25# Charging station location
26charging_location = Pose()
27charging_location.position.x = 10.0
28
29# Tolerance for reaching charging station
30end_tolerance = PathTrackingError()
31end_tolerance.orientation_error = 0.1
32end_tolernace.lateral_distance_error = 0.05
33
34action_goal = PlanPath.Goal()
35action_goal.end_tolerance = end_tolerance
36action_goal.goal = charging_location
37
38# Define action to send a goal to the planner to start a mission to the charging station
39return_to_charging_station = Actions.send_action_goal(
40 action_name="test_planner/plan_path",
41 action_type=PlanPath,
42 action_request_msg=action_goal
43 )
44
45# Events/Actions
46my_events_actions: Dict[event.Event, Action] = {
47 low_battery: return_to_charging_station
48}
49
50# We can add a config YAML file
51path_to_yaml = 'my_config.yaml'
52
53launcher = Launcher(
54 components=[planner, controller],
55 events_actions=my_events_actions,
56 config_file=path_to_yaml,
57 activate_all_components_on_start=True,
58 multi_processing=True,
59)
60
61# Set a topic for all components
62odom_topic = Topic(name="robot_odom", msg_type="Odometry")
63launcher.inputs(location=odom_topic)
64
65# If any component fails -> restart it with unlimited retries
66launcher.on_component_fail(action_name="restart")
67
68# Bring up the system
69launcher.bringup(ros_log_level="info", introspect=False)