Using Events/Actions in your application#

Events/Actions are a powerful tool to make your robot application adaptive to the changing working conditions (both internal and external to the robot), or to extend the operational scope of your application by adapting it to new spaces/usecases.

In this tutorial we will extend the Turtlebot3 quick start recipe by adding a few events/actions to the system.

First Event/Action Pair#

First we define two events, the first is associated with the Controller internal algorithm failing to calculate a new control command by accessing the value of the Health Status broadcasted by the Controller on its own status topic. The second event is triggered when an emergency stop is detected by the DriveManager:

from kompass import event

event_controller_fail = event.OnEqual(
    "controller_fail",
    Topic(name="controller_status", msg_type="ComponentStatus"),
    ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
    "status",
)

event_emergency_stop = event.OnEqual(
        "emergency_stop",
        Topic(name="emergency_stop", msg_type="Bool"),
        True,
        "data",
    )

We will link both events with an Action provided by the DriveManager called move_to_unblock. This is to address two a problem that can occur during navigation when the robot is too close to obstacles or an obstacle is already touching the chassis which will lead many control algorithms to fail. In such cases move_to_unblock will help the robot to move a little and escape the clutter or collision area.

from kompass.actions import Action

unblock_action = Action(method=driver.move_to_unblock)

We can also define an event for the Planner algorithm failing to produce a new map. We can associate it with the same action for the case where the reference map of the space used by the robot is not accurate. If the map is not accurate the robot can be at a starting point that is marked occupied on the map where it’s in fact free. In this case the the Planner will fail to solve the navigation problem as an invalid start state is passed to the algorithm. In this case executing the move_to_unblock action can help escape the invalid space.

event_planner_algo_fail = event.OnEqual(
    "planner_fail",
    Topic(name="planner_status", msg_type="ComponentStatus"),
    ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
    ("status"),
)

Tip

If you are using Kompass with the Costmap 2D node: You can associate another Action to the Planner algorithm level fail event which is a ROS2 service call to the node clear_costmap service. (Keep reading to see how similar actions are executed!)

Note

In the quick start recipe we added a generic on_fail policy to restart the failed component. This policy will be applied whenever any type of failure is occurs, including the previously used algorithm failure. We can choose to keep this policy, meaning that while the driver is executing the move_to_unblock action, then the planner or controller will get restarted.

Tweak the system using events#

In our quick start example we used the Planner with a Timed run type and used RVIZ clicked point as the goal point input. In this configuration, once a point is clicked on RVIZ (a message is published on the topic), the Planner will keep producing a new plan each loop step from the current position to the clicked point. To track the mission during execution and end the mission once the point is reached we can run the Planner as an ActionServer. In this case the Planner produces a new plan on an incoming action request and will no longer take goals directly from RVIZ topic.

We will use events here to run the Planner as an ActionServer and accept the goal directly from RVIZ to get the best of both worlds.

First, we define an event that is triggered on any clicked point on RVIZ:

from kompass import event
from kompass.topic import Topic

# On any clicked point
event_clicked_point = event.OnGreater(
    "rviz_goal",
    Topic(name="/clicked_point", msg_type="PointStamped"),
    0,
    ["header", "stamp", "sec"],
)

Then we will add an Action that sends a goal to the Planner’s ActionServer:

from kompass.actions import ComponentActions
from kompass_interfaces.action import PlanPath

# Define an Action to send a goal to the planner ActionServer
send_goal: Action = ComponentActions.send_action_goal(
    action_name="/planner/plan_path",
    action_type=PlanPath,
    action_request_msg=PlanPath.Goal(),
)

The only thing left is to parse the clicked point message published on the event topic into the send_goal action goal. For this we will write a small method that executes tha desired parsing and pass it to the action using the event_parser method available in the Action class:

from kompass_interfaces.msg import PathTrackingError
from geomerty_msgs.msg import PointStamped

# Define a method to parse a message of type PointStamped to the planner PlanPath Goal
def goal_point_parser(*, msg: PointStamped, **_):
    action_request = PlanPath.Goal()
    goal = Pose()
    goal.position.x = msg.point.x
    goal.position.y = msg.point.y
    action_request.goal = goal
    end_tolerance = PathTrackingError()
    end_tolerance.orientation_error = 0.2
    end_tolerance.lateral_distance_error = 0.05
    action_request.end_tolerance = end_tolerance
    return action_request

# Adds the parser method as an Event parser of the send_goal action
send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")

Now we can write a dictionary to link all our events with their respective actions:

from kompass.actions import ComponentActions, LogInfo

# Define Events/Actions dictionary
events_actions = {
    event_clicked_point: [LogInfo(msg="Got a new goal point from RVIZ"), send_goal],
    event_emergency_stop: [
        ComponentActions.restart(component=planner),
        unblock_action,
    ],
    event_controller_fail: unblock_action,
}

Events/Actions to interact with external systems#

We can also use events/actions to communicate with systems or ROS2 nodes external to Kompass. Here for example, we will add another event for a system-level failure in the planner and an Action to send a service request to the Map Server node to load our reference map from the file:

import os
from kompass.actions import ComponentActions
from nav2_msgs.srv import LoadMap
from ament_index_python.packages import (
    get_package_share_directory,
)

package_dir = get_package_share_directory(package_name="kompass")

load_map_req = LoadMap()
load_map_req.map_url = os.path.join(
        package_dir, "maps", "turtlebot3_webots.yaml"
    )

action_load_map = ComponentActions.send_srv_request(
    srv_name="/map_server/load_map",
    srv_type=LoadMap,
    srv_request_msg=load_map_req,
)

event_planner_system_fail = event.OnEqual(
    "planner_system_fail",
    Topic(name="planner_status", msg_type="ComponentStatus"),
    ComponentStatus.STATUS_FAILURE_SYSTEM_LEVEL,
    "status",
)

Tip

SYSTEM_LEVEL_FAILURE occurs in a component when an external error occurs such as an unavailable input.

Complete Recipe#

Et voila! We extended the robustness and adaptivity of our system by using events/actions and without modifying any of the used components and without introducing any new components.

Finally the full recipe with the added events/actions will look as follows:

turtlebot3 test#
  1import numpy as np
  2import os
  3from nav2_msgs.srv import LoadMap
  4
  5from kompass_core.models import (
  6    AngularCtrlLimits,
  7    LinearCtrlLimits,
  8    RobotGeometry,
  9    RobotType,
 10)
 11from kompass_core.control import LocalPlannersID
 12
 13from ros_sugar_interfaces.msg import ComponentStatus
 14from kompass_interfaces.action import PlanPath
 15from kompass_interfaces.msg import PathTrackingError
 16from geometry_msgs.msg import Pose, PointStamped
 17
 18from kompass import event
 19from kompass.actions import Action
 20
 21from kompass.components import (
 22    Controller,
 23    DriveManager,
 24    Planner,
 25    PlannerConfig,
 26    LocalMapper,
 27)
 28from kompass.actions import ComponentActions, LogInfo
 29from kompass.config import RobotConfig
 30from kompass.launcher import Launcher
 31from kompass.topic import Topic
 32
 33from ament_index_python.packages import get_package_share_directory
 34
 35
 36package_dir = get_package_share_directory(package_name="kompass")
 37config_file = os.path.join(package_dir, "params", "turtlebot3.yaml")
 38
 39# Setup your robot configuration
 40my_robot = RobotConfig(
 41    model_type=RobotType.DIFFERENTIAL_DRIVE,
 42    geometry_type=RobotGeometry.Type.CYLINDER,
 43    geometry_params=np.array([0.1, 0.3]),
 44    ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
 45    ctrl_omega_limits=AngularCtrlLimits(
 46        max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3)
 47)
 48
 49config = PlannerConfig(robot=my_robot, loop_rate=1.0)
 50planner = Planner(component_name="planner", config=config, config_file=config_file)
 51
 52controller = Controller(component_name="controller")
 53driver = DriveManager(component_name="drive_manager")
 54mapper = LocalMapper(component_name="mapper")
 55
 56# Configure Controller options
 57controller.algorithm = LocalPlannersID.STANLEY
 58controller.direct_sensor = False
 59
 60planner.run_type = "ActionServer"
 61
 62driver.on_fail(action=Action(driver.restart))
 63
 64# DEFINE EVENTS
 65event_emergency_stop = event.OnEqual(
 66    "emergency_stop",
 67    Topic(name="emergency_stop", msg_type="Bool"),
 68    True,
 69    "data",
 70)
 71event_controller_fail = event.OnEqual(
 72    "controller_fail",
 73    Topic(name="controller_status", msg_type="ComponentStatus"),
 74    ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
 75    "status",
 76)
 77
 78event_planner_system_fail = event.OnEqual(
 79    "planner_system_fail",
 80    Topic(name="planner_status", msg_type="ComponentStatus"),
 81    ComponentStatus.STATUS_FAILURE_SYSTEM_LEVEL,
 82    "status",
 83)
 84
 85# Unblock action
 86unblock_action = Action(method=driver.move_to_unblock)
 87
 88# On any clicked point
 89event_clicked_point = event.OnGreater(
 90    "rviz_goal",
 91    Topic(name="/clicked_point", msg_type="PointStamped"),
 92    0,
 93    ["header", "stamp", "sec"],
 94)
 95
 96# Define an Action to send a goal to the planner ActionServer
 97send_goal: Action = ComponentActions.send_action_goal(
 98    action_name="/planner/plan_path",
 99    action_type=PlanPath,
100    action_request_msg=PlanPath.Goal(),
101)
102
103# Define a method to parse a message of type PointStamped to the planner PlanPath Goal
104def goal_point_parser(*, msg: PointStamped, **_):
105    action_request = PlanPath.Goal()
106    goal = Pose()
107    goal.position.x = msg.point.x
108    goal.position.y = msg.point.y
109    action_request.goal = goal
110    end_tolerance = PathTrackingError()
111    end_tolerance.orientation_error = 0.2
112    end_tolerance.lateral_distance_error = 0.05
113    action_request.end_tolerance = end_tolerance
114    return action_request
115
116# Adds the parser method as an Event parser of the send_goal action
117send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
118
119# Load map action
120load_map_req = LoadMap()
121load_map_req.map_url = os.path.join(package_dir, "maps", "turtlebot3_webots.yaml")
122
123action_load_map = ComponentActions.send_srv_request(
124    srv_name="/map_server/load_map",
125    srv_type=LoadMap,
126    srv_request_msg=load_map_req)
127
128# Define Events/Actions dictionary
129events_actions = {
130    event_clicked_point: [LogInfo(msg="Got new goal point"), send_goal],
131    event_emergency_stop: [
132        ComponentActions.restart(component=planner),
133        unblock_action,
134    ],
135    event_controller_fail: unblock_action,
136    event_planner_system_fail: action_load_map
137}
138
139# Setup the launcher
140launcher = Launcher(config_file=config_file)
141
142# Add Kompass components
143launcher.kompass(
144    components=[planner, controller, mapper, driver],
145    events_actions=events_actions,
146    activate_all_components_on_start=True,
147    multi_processing=True)
148
149# Get odom from localizer filtered odom for all components
150odom_topic = Topic(name="/odometry/filtered", msg_type="Odometry")
151launcher.inputs(location=odom_topic)
152
153# Set the robot config for all components
154launcher.robot = my_robot
155
156launcher.bringup(introspect=True)