Launcher

Launcher#

Launcher is a class created to provide a more pythonic way to launch and configure ROS nodes.

Launcher starts a pre-configured component or a set of components as ROS2 nodes using multi-threaded or multi-process execution. Launcher spawns an internal Monitor node in a separate thread in both execution types.

Multi-threaded execution

Multi-threaded execution#

Multi-process execution

Multi-process execution#

Launcher can also manage a set of Events-Actions through its internal Monitor node (See Monitor class).

Available options:#

  • Enable/Disable events monitoring

  • Enable/Disable multi-processing, if disabled the components are launched in threads

  • Select to activate one, many or all components on start (lifecycle nodes activation)

Launcher forwards all the provided Events to its internal Monitor, when the Monitor detects an Event trigger it emits an InternalEvent back to the Launcher. Execution of the Action is done directly by the Launcher or a request is forwarded to the Monitor depending on the selected run method (multi-processes or multi-threaded).

Note

While Launcher supports executing standard ROS2 launch actions. Launcher does not support standard ROS2 launch events for the current version.

Usage Example#

launcher test#
 1from kompass.components import Planner, Controller
 2from kompass.actions import Actions
 3from kompass.event import OnLess
 4from kompass_interfaces.action import PlanPath
 5from geometry_msgs.msg import Pose
 6from kompass_interfaces.msg import PathTrackingError
 7
 8# Create your components
 9planner = Planner(component_name='test_planner')
10controller = Controller(component_name='test_controller')
11
12
13# Configure your components here
14# ....
15planner.run_type = "ActionServer"
16
17# Create your events
18low_battery = OnLess(
19    "low_battery",
20    Topic(name="/battery_level", msg_type="Int"),
21    15,
22    ("data")
23)
24
25# Charging station location
26charging_location = Pose()
27charging_location.position.x = 10.0
28
29# Tolerance for reaching charging station
30end_tolerance = PathTrackingError()
31end_tolerance.orientation_error = 0.1
32end_tolernace.lateral_distance_error = 0.05
33
34action_goal = PlanPath.Goal()
35action_goal.end_tolerance = end_tolerance
36action_goal.goal = charging_location
37
38# Define action to send a goal to the planner to start a mission to the charging station
39return_to_charging_station = Actions.send_action_goal(
40        action_name="test_planner/plan_path",
41        action_type=PlanPath,
42        action_request_msg=action_goal
43    )
44
45# Events/Actions
46my_events_actions: Dict[event.Event, Action] = {
47    low_battery: return_to_charging_station
48}
49
50# We can add a config YAML file
51path_to_yaml = 'my_config.yaml'
52
53launcher = Launcher(
54    components=[planner, controller],
55    events_actions=my_events_actions,
56    config_file=path_to_yaml,
57    activate_all_components_on_start=True,
58    multi_processing=True,
59)
60
61# Set a topic for all components
62odom_topic = Topic(name="robot_odom", msg_type="Odometry")
63launcher.inputs(location=odom_topic)
64
65# If any component fails -> restart it with unlimited retries
66launcher.on_component_fail(action_name="restart")
67
68# Bring up the system
69launcher.bringup(ros_log_level="info", introspect=False)