⚙️ Using Events/Actions in your application#

Events/Actions are a powerful tool to make your robot application adaptive to the changing working conditions (both internal and external to the robot), or to extend the operational scope of your application by adapting it to new spaces/usecases.

In this tutorial we will extend the Turtlebot3 point navigation recipe by adding a few events/actions to the system.

First Event/Action Pair#

First we define two events, the first is associated with the Controller internal algorithm failing to calculate a new control command by accessing the value of the Health Status broadcasted by the Controller on its own status topic. The second event is triggered when an emergency stop is detected by the DriveManager:

from kompass import event

event_controller_fail = event.OnEqual(
    "controller_fail",
    Topic(name="controller_status", msg_type="ComponentStatus"),
    ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
    "status",
)

event_emergency_stop = event.OnEqual(
        "emergency_stop",
        Topic(name="emergency_stop", msg_type="Bool"),
        True,
        "data",
    )

We will link both events with an Action provided by the DriveManager called move_to_unblock. This is to address two a problem that can occur during navigation when the robot is too close to obstacles or an obstacle is already touching the chassis which will lead many control algorithms to fail. In such cases move_to_unblock will help the robot to move a little and escape the clutter or collision area.

from kompass.actions import Action

unblock_action = Action(method=driver.move_to_unblock)

We can also define an event for the Planner algorithm failing to produce a new map. We can associate it with the same action for the case where the reference map of the space used by the robot is not accurate. If the map is not accurate the robot can be at a starting point that is marked occupied on the map where it’s in fact free. In this case the the Planner will fail to solve the navigation problem as an invalid start state is passed to the algorithm. In this case executing the move_to_unblock action can help escape the invalid space.

event_planner_algo_fail = event.OnEqual(
    "planner_fail",
    Topic(name="planner_status", msg_type="ComponentStatus"),
    ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
    ("status"),
)

Tip

If you are using Kompass with the Costmap 2D node: You can associate another Action to the Planner algorithm level fail event which is a ROS2 service call to the node clear_costmap service. (Keep reading to see how similar actions are executed!)

Note

In the point navigation recipe we added a generic on_fail policy to restart the failed component. This policy will be applied whenever any type of failure is occurs, including the previously used algorithm failure. We can choose to keep this policy, meaning that while the driver is executing the move_to_unblock action, then the planner or controller will get restarted.

Tweak the system using events#

In our point navigation example we used the Planner with a Timed run type and used RVIZ clicked point as the goal point input. In this configuration, once a point is clicked on RVIZ (a message is published on the topic), the Planner will keep producing a new plan each loop step from the current position to the clicked point. To track the mission during execution and end the mission once the point is reached we can run the Planner as an ActionServer. In this case the Planner produces a new plan on an incoming action request and will no longer take goals directly from RVIZ topic.

We will use events here to run the Planner as an ActionServer and accept the goal directly from RVIZ to get the best of both worlds.

First, we define an event that is triggered on any clicked point on RVIZ:

from kompass import event
from kompass.ros import Topic

# On any clicked point
event_clicked_point = event.OnGreater(
    "rviz_goal",
    Topic(name="/clicked_point", msg_type="PointStamped"),
    0,
    ["header", "stamp", "sec"],
)

# Another way to define the same event
event_clicked_point_1 = event.OnAny(
    "rviz_goal_any",
    Topic(name="/clicked_point", msg_type="PointStamped"),
)

Then we will add an Action that sends a goal to the Planner’s ActionServer:

from kompass.actions import ComponentActions
from kompass_interfaces.action import PlanPath

# Define an Action to send a goal to the planner ActionServer
send_goal: Action = ComponentActions.send_action_goal(
    action_name="/planner/plan_path",
    action_type=PlanPath,
    action_request_msg=PlanPath.Goal(),
)

The only thing left is to parse the clicked point message published on the event topic into the send_goal action goal. For this we will write a small method that executes tha desired parsing and pass it to the action using the event_parser method available in the Action class:

from kompass_interfaces.msg import PathTrackingError
from geomerty_msgs.msg import PointStamped

# Define a method to parse a message of type PointStamped to the planner PlanPath Goal
def goal_point_parser(*, msg: PointStamped, **_):
    action_request = PlanPath.Goal()
    goal = Pose()
    goal.position.x = msg.point.x
    goal.position.y = msg.point.y
    action_request.goal = goal
    end_tolerance = PathTrackingError()
    end_tolerance.orientation_error = 0.2
    end_tolerance.lateral_distance_error = 0.05
    action_request.end_tolerance = end_tolerance
    return action_request

# Adds the parser method as an Event parser of the send_goal action
send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")

Now we can write a dictionary to link all our events with their respective actions:

from kompass.actions import ComponentActions, LogInfo

# Define Events/Actions dictionary
events_actions = {
    event_clicked_point: [LogInfo(msg="Got a new goal point from RVIZ"), send_goal],
    event_emergency_stop: [
        ComponentActions.restart(component=planner),
        unblock_action,
    ],
    event_controller_fail: unblock_action,
}

Events/Actions to interact with external systems#

We can also use events/actions to communicate with systems or ROS2 nodes external to Kompass. Here for example, we will add another event for a system-level failure in the planner and an Action to send a service request to the Map Server node to load our reference map from the file:

import os
from kompass.actions import ComponentActions
from nav2_msgs.srv import LoadMap
from ament_index_python.packages import (
    get_package_share_directory,
)

package_dir = get_package_share_directory(package_name="kompass")

load_map_req = LoadMap()
load_map_req.map_url = os.path.join(
        package_dir, "maps", "turtlebot3_webots.yaml"
    )

action_load_map = ComponentActions.send_srv_request(
    srv_name="/map_server/load_map",
    srv_type=LoadMap,
    srv_request_msg=load_map_req,
)

event_planner_system_fail = event.OnEqual(
    "planner_system_fail",
    Topic(name="planner_status", msg_type="ComponentStatus"),
    ComponentStatus.STATUS_FAILURE_SYSTEM_LEVEL,
    "status",
)

Tip

SYSTEM_LEVEL_FAILURE occurs in a component when an external error occurs such as an unavailable input.

Complete Recipe#

Et voila! We extended the robustness and adaptivity of our system by using events/actions and without modifying any of the used components and without introducing any new components.

Finally the full recipe with the added events/actions will look as follows:

turtlebot3 test#
  1import numpy as np
  2import os
  3from nav2_msgs.srv import LoadMap
  4
  5from kompass_core.models import (
  6    AngularCtrlLimits,
  7    LinearCtrlLimits,
  8    RobotGeometry,
  9    RobotType,
 10)
 11from kompass_core.control import ControllersID
 12
 13from sugar.msg import ComponentStatus
 14from kompass_interfaces.action import PlanPath
 15from kompass_interfaces.msg import PathTrackingError
 16from geometry_msgs.msg import Pose, PointStamped
 17
 18from kompass import event
 19from kompass.actions import Action
 20
 21from kompass.components import (
 22    Controller,
 23    DriveManager,
 24    Planner,
 25    PlannerConfig,
 26    LocalMapper,
 27)
 28from kompass.actions import ComponentActions, LogInfo
 29from kompass.config import RobotConfig
 30from kompass.ros import Topic, Launcher
 31
 32from ament_index_python.packages import get_package_share_directory
 33
 34
 35package_dir = get_package_share_directory(package_name="kompass")
 36config_file = os.path.join(package_dir, "params", "turtlebot3.yaml")
 37
 38# Setup your robot configuration
 39my_robot = RobotConfig(
 40    model_type=RobotType.DIFFERENTIAL_DRIVE,
 41    geometry_type=RobotGeometry.Type.CYLINDER,
 42    geometry_params=np.array([0.1, 0.3]),
 43    ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
 44    ctrl_omega_limits=AngularCtrlLimits(
 45        max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3)
 46)
 47
 48config = PlannerConfig(robot=my_robot, loop_rate=1.0)
 49planner = Planner(component_name="planner", config=config, config_file=config_file)
 50
 51controller = Controller(component_name="controller")
 52mapper = LocalMapper(component_name="mapper")
 53
 54# Get the default DriveManager
 55driver = DriveManager(component_name="drive_manager")
 56
 57# Publish Twist or TwistStamped from the DriveManager based on the distribution
 58if "ROS_DISTRO" in os.environ and (
 59    os.environ["ROS_DISTRO"] in ["rolling", "jazzy", "kilted"]
 60):
 61    cmd_msg_type : str = "TwistStamped"
 62else:
 63    cmd_msg_type = "Twist"
 64
 65driver.outputs(robot_command=Topic(name="/cmd_vel", msg_type=cmd_msg_type))
 66
 67# Configure Controller options
 68controller.algorithm = ControllersID.STANLEY
 69controller.direct_sensor = False
 70
 71planner.run_type = "ActionServer"
 72
 73driver.on_fail(action=Action(driver.restart))
 74
 75# DEFINE EVENTS
 76event_emergency_stop = event.OnEqual(
 77    "emergency_stop",
 78    Topic(name="emergency_stop", msg_type="Bool"),
 79    True,
 80    "data",
 81)
 82event_controller_fail = event.OnEqual(
 83    "controller_fail",
 84    Topic(name="controller_status", msg_type="ComponentStatus"),
 85    ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
 86    "status",
 87)
 88
 89event_planner_system_fail = event.OnEqual(
 90    "planner_system_fail",
 91    Topic(name="planner_status", msg_type="ComponentStatus"),
 92    ComponentStatus.STATUS_FAILURE_SYSTEM_LEVEL,
 93    "status",
 94)
 95
 96# Unblock action
 97unblock_action = Action(method=driver.move_to_unblock)
 98
 99# On any clicked point
100event_clicked_point = event.OnGreater(
101    "rviz_goal",
102    Topic(name="/clicked_point", msg_type="PointStamped"),
103    0,
104    ["header", "stamp", "sec"],
105)
106
107# Define an Action to send a goal to the planner ActionServer
108send_goal: Action = ComponentActions.send_action_goal(
109    action_name="/planner/plan_path",
110    action_type=PlanPath,
111    action_request_msg=PlanPath.Goal(),
112)
113
114# Define a method to parse a message of type PointStamped to the planner PlanPath Goal
115def goal_point_parser(*, msg: PointStamped, **_):
116    action_request = PlanPath.Goal()
117    goal = Pose()
118    goal.position.x = msg.point.x
119    goal.position.y = msg.point.y
120    action_request.goal = goal
121    end_tolerance = PathTrackingError()
122    end_tolerance.orientation_error = 0.2
123    end_tolerance.lateral_distance_error = 0.05
124    action_request.end_tolerance = end_tolerance
125    return action_request
126
127# Adds the parser method as an Event parser of the send_goal action
128send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
129
130# Load map action
131load_map_req = LoadMap()
132load_map_req.map_url = os.path.join(package_dir, "maps", "turtlebot3_webots.yaml")
133
134action_load_map = ComponentActions.send_srv_request(
135    srv_name="/map_server/load_map",
136    srv_type=LoadMap,
137    srv_request_msg=load_map_req)
138
139# Define Events/Actions dictionary
140events_actions = {
141    event_clicked_point: [LogInfo(msg="Got new goal point"), send_goal],
142    event_emergency_stop: [
143        ComponentActions.restart(component=planner),
144        unblock_action,
145    ],
146    event_controller_fail: unblock_action,
147    event_planner_system_fail: action_load_map
148}
149
150# Setup the launcher
151launcher = Launcher(config_file=config_file)
152
153# Add Kompass components
154launcher.kompass(
155    components=[planner, controller, mapper, driver],
156    events_actions=events_actions,
157    activate_all_components_on_start=True,
158    multi_processing=True)
159
160# Get odom from localizer filtered odom for all components
161odom_topic = Topic(name="/odometry/filtered", msg_type="Odometry")
162launcher.inputs(location=odom_topic)
163
164# Set the robot config for all components
165launcher.robot = my_robot
166
167launcher.bringup()