Using Events/Actions in your application#
Events/Actions are a powerful tool to make your robot application adaptive to the changing working conditions (both internal and external to the robot), or to extend the operational scope of your application by adapting it to new spaces/usecases.
In this tutorial we will extend the Turtlebot3 quick start recipe by adding a few events/actions to the system.
First Event/Action Pair#
First we define two events, the first is associated with the Controller internal algorithm failing to calculate a new control command by accessing the value of the Health Status broadcasted by the Controller on its own status topic. The second event is triggered when an emergency stop is detected by the DriveManager:
from kompass import event
event_controller_fail = event.OnEqual(
"controller_fail",
Topic(name="controller_status", msg_type="ComponentStatus"),
ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
"status",
)
event_emergency_stop = event.OnEqual(
"emergency_stop",
Topic(name="emergency_stop", msg_type="Bool"),
True,
"data",
)
We will link both events with an Action provided by the DriveManager
called move_to_unblock
. This is to address two a problem that can occur during navigation when the robot is too close to obstacles or an obstacle is already touching the chassis which will lead many control algorithms to fail. In such cases move_to_unblock
will help the robot to move a little and escape the clutter or collision area.
from kompass.actions import Action
unblock_action = Action(method=driver.move_to_unblock)
We can also define an event for the Planner algorithm failing to produce a new map. We can associate it with the same action for the case where the reference map of the space used by the robot is not accurate. If the map is not accurate the robot can be at a starting point that is marked occupied on the map where it’s in fact free. In this case the the Planner will fail to solve the navigation problem as an invalid start state is passed to the algorithm. In this case executing the move_to_unblock
action can help escape the invalid space.
event_planner_algo_fail = event.OnEqual(
"planner_fail",
Topic(name="planner_status", msg_type="ComponentStatus"),
ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
("status"),
)
Tip
If you are using Kompass with the Costmap 2D node: You can associate another Action to the Planner algorithm level fail event which is a ROS2 service call to the node clear_costmap
service. (Keep reading to see how similar actions are executed!)
Note
In the quick start recipe we added a generic on_fail policy to restart the failed component. This policy will be applied whenever any type of failure is occurs, including the previously used algorithm failure. We can choose to keep this policy, meaning that while the driver is executing the move_to_unblock
action, then the planner or controller will get restarted.
Tweak the system using events#
In our quick start example we used the Planner with a Timed
run type and used RVIZ clicked point as the goal point input. In this configuration, once a point is clicked on RVIZ (a message is published on the topic), the Planner will keep producing a new plan each loop step from the current position to the clicked point. To track the mission during execution and end the mission once the point is reached we can run the Planner as an ActionServer
. In this case the Planner produces a new plan on an incoming action request and will no longer take goals directly from RVIZ topic.
We will use events here to run the Planner as an ActionServer
and accept the goal directly from RVIZ to get the best of both worlds.
First, we define an event that is triggered on any clicked point on RVIZ:
from kompass import event
from kompass.ros import Topic
# On any clicked point
event_clicked_point = event.OnGreater(
"rviz_goal",
Topic(name="/clicked_point", msg_type="PointStamped"),
0,
["header", "stamp", "sec"],
)
Then we will add an Action that sends a goal to the Planner’s ActionServer:
from kompass.actions import ComponentActions
from kompass_interfaces.action import PlanPath
# Define an Action to send a goal to the planner ActionServer
send_goal: Action = ComponentActions.send_action_goal(
action_name="/planner/plan_path",
action_type=PlanPath,
action_request_msg=PlanPath.Goal(),
)
The only thing left is to parse the clicked point message published on the event topic into the send_goal
action goal. For this we will write a small method that executes tha desired parsing and pass it to the action using the event_parser
method available in the Action class:
from kompass_interfaces.msg import PathTrackingError
from geomerty_msgs.msg import PointStamped
# Define a method to parse a message of type PointStamped to the planner PlanPath Goal
def goal_point_parser(*, msg: PointStamped, **_):
action_request = PlanPath.Goal()
goal = Pose()
goal.position.x = msg.point.x
goal.position.y = msg.point.y
action_request.goal = goal
end_tolerance = PathTrackingError()
end_tolerance.orientation_error = 0.2
end_tolerance.lateral_distance_error = 0.05
action_request.end_tolerance = end_tolerance
return action_request
# Adds the parser method as an Event parser of the send_goal action
send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
Now we can write a dictionary to link all our events with their respective actions:
from kompass.actions import ComponentActions, LogInfo
# Define Events/Actions dictionary
events_actions = {
event_clicked_point: [LogInfo(msg="Got a new goal point from RVIZ"), send_goal],
event_emergency_stop: [
ComponentActions.restart(component=planner),
unblock_action,
],
event_controller_fail: unblock_action,
}
Events/Actions to interact with external systems#
We can also use events/actions to communicate with systems or ROS2 nodes external to Kompass. Here for example, we will add another event for a system-level failure in the planner and an Action to send a service request to the Map Server node to load our reference map from the file:
import os
from kompass.actions import ComponentActions
from nav2_msgs.srv import LoadMap
from ament_index_python.packages import (
get_package_share_directory,
)
package_dir = get_package_share_directory(package_name="kompass")
load_map_req = LoadMap()
load_map_req.map_url = os.path.join(
package_dir, "maps", "turtlebot3_webots.yaml"
)
action_load_map = ComponentActions.send_srv_request(
srv_name="/map_server/load_map",
srv_type=LoadMap,
srv_request_msg=load_map_req,
)
event_planner_system_fail = event.OnEqual(
"planner_system_fail",
Topic(name="planner_status", msg_type="ComponentStatus"),
ComponentStatus.STATUS_FAILURE_SYSTEM_LEVEL,
"status",
)
Tip
SYSTEM_LEVEL_FAILURE
occurs in a component when an external error occurs such as an unavailable input.
Complete Recipe#
Et voila! We extended the robustness and adaptivity of our system by using events/actions and without modifying any of the used components and without introducing any new components.
Finally the full recipe with the added events/actions will look as follows:
1import numpy as np
2import os
3from nav2_msgs.srv import LoadMap
4
5from kompass_core.models import (
6 AngularCtrlLimits,
7 LinearCtrlLimits,
8 RobotGeometry,
9 RobotType,
10)
11from kompass_core.control import ControllersID
12
13from sugar.msg import ComponentStatus
14from kompass_interfaces.action import PlanPath
15from kompass_interfaces.msg import PathTrackingError
16from geometry_msgs.msg import Pose, PointStamped
17
18from kompass import event
19from kompass.actions import Action
20
21from kompass.components import (
22 Controller,
23 DriveManager,
24 Planner,
25 PlannerConfig,
26 LocalMapper,
27)
28from kompass.actions import ComponentActions, LogInfo
29from kompass.config import RobotConfig
30from kompass.ros import Topic, Launcher
31
32from ament_index_python.packages import get_package_share_directory
33
34
35package_dir = get_package_share_directory(package_name="kompass")
36config_file = os.path.join(package_dir, "params", "turtlebot3.yaml")
37
38# Setup your robot configuration
39my_robot = RobotConfig(
40 model_type=RobotType.DIFFERENTIAL_DRIVE,
41 geometry_type=RobotGeometry.Type.CYLINDER,
42 geometry_params=np.array([0.1, 0.3]),
43 ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
44 ctrl_omega_limits=AngularCtrlLimits(
45 max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3)
46)
47
48config = PlannerConfig(robot=my_robot, loop_rate=1.0)
49planner = Planner(component_name="planner", config=config, config_file=config_file)
50
51controller = Controller(component_name="controller")
52driver = DriveManager(component_name="drive_manager")
53mapper = LocalMapper(component_name="mapper")
54
55# Configure Controller options
56controller.algorithm = ControllersID.STANLEY
57controller.direct_sensor = False
58
59planner.run_type = "ActionServer"
60
61driver.on_fail(action=Action(driver.restart))
62
63# DEFINE EVENTS
64event_emergency_stop = event.OnEqual(
65 "emergency_stop",
66 Topic(name="emergency_stop", msg_type="Bool"),
67 True,
68 "data",
69)
70event_controller_fail = event.OnEqual(
71 "controller_fail",
72 Topic(name="controller_status", msg_type="ComponentStatus"),
73 ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
74 "status",
75)
76
77event_planner_system_fail = event.OnEqual(
78 "planner_system_fail",
79 Topic(name="planner_status", msg_type="ComponentStatus"),
80 ComponentStatus.STATUS_FAILURE_SYSTEM_LEVEL,
81 "status",
82)
83
84# Unblock action
85unblock_action = Action(method=driver.move_to_unblock)
86
87# On any clicked point
88event_clicked_point = event.OnGreater(
89 "rviz_goal",
90 Topic(name="/clicked_point", msg_type="PointStamped"),
91 0,
92 ["header", "stamp", "sec"],
93)
94
95# Define an Action to send a goal to the planner ActionServer
96send_goal: Action = ComponentActions.send_action_goal(
97 action_name="/planner/plan_path",
98 action_type=PlanPath,
99 action_request_msg=PlanPath.Goal(),
100)
101
102# Define a method to parse a message of type PointStamped to the planner PlanPath Goal
103def goal_point_parser(*, msg: PointStamped, **_):
104 action_request = PlanPath.Goal()
105 goal = Pose()
106 goal.position.x = msg.point.x
107 goal.position.y = msg.point.y
108 action_request.goal = goal
109 end_tolerance = PathTrackingError()
110 end_tolerance.orientation_error = 0.2
111 end_tolerance.lateral_distance_error = 0.05
112 action_request.end_tolerance = end_tolerance
113 return action_request
114
115# Adds the parser method as an Event parser of the send_goal action
116send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
117
118# Load map action
119load_map_req = LoadMap()
120load_map_req.map_url = os.path.join(package_dir, "maps", "turtlebot3_webots.yaml")
121
122action_load_map = ComponentActions.send_srv_request(
123 srv_name="/map_server/load_map",
124 srv_type=LoadMap,
125 srv_request_msg=load_map_req)
126
127# Define Events/Actions dictionary
128events_actions = {
129 event_clicked_point: [LogInfo(msg="Got new goal point"), send_goal],
130 event_emergency_stop: [
131 ComponentActions.restart(component=planner),
132 unblock_action,
133 ],
134 event_controller_fail: unblock_action,
135 event_planner_system_fail: action_load_map
136}
137
138# Setup the launcher
139launcher = Launcher(config_file=config_file)
140
141# Add Kompass components
142launcher.kompass(
143 components=[planner, controller, mapper, driver],
144 events_actions=events_actions,
145 activate_all_components_on_start=True,
146 multi_processing=True)
147
148# Get odom from localizer filtered odom for all components
149odom_topic = Topic(name="/odometry/filtered", msg_type="Odometry")
150launcher.inputs(location=odom_topic)
151
152# Set the robot config for all components
153launcher.robot = my_robot
154
155launcher.bringup(introspect=True)