Actions#
Actions are methods or routines executed by a Component or by the System Monitor. They are the āOutputsā of the Event-Driven system.
Actions can be triggered in two ways:
Event-Driven: Executed when a specific Event is detected.
Fallback-Driven: Executed by a Component when an internal Health Status failure is detected.
The Action Class#
The Action class is a generic wrapper for any callable. It allows you to package a function and its arguments to be executed later.
class Action:
def __init__(self, method: Callable, args: tuple = (), kwargs: Optional[Dict] = None):
method: The function to be executed.
args: Tuple of positional arguments.
kwargs: Dictionary of keyword arguments.
Usage Example:#
from ros_sugar.core import BaseComponent
from ros_sugar.config import BaseComponentConfig
from ros_sugar.actions import Action
import logging
def function():
logging.info("I am executing an action!")
my_component = BaseComponent(node_name='test_component')
new_config = BaseComponentConfig(loop_rate=50.0)
action1 = Action(method=my_component.start)
action2 = Action(method=my_component.reconfigure, args=(new_config, True),)
action3 = Action(method=function)
Pre-defined Actions#
While you can wrap any function in an Action, Sugarcoat provides the ComponentActions module with a suite of pre-defined, thread-safe actions for managing components and system resources.
These actions are divided into Component-Level (affecting a specific componentās lifecycle or config) and System-Level (general ROS2 utilities).
Sugarcoat comes with a set of pre-defined component level actions and system level actions
Component-Level Actions#
These actions directly manipulate the state or configuration of a specific BaseComponent derived object.
Action Method |
Arguments |
Description |
|---|---|---|
|
|
Triggers the componentās Lifecycle transition to Active. |
|
|
Triggers the componentās Lifecycle transition to Inactive (stops execution loops). |
|
|
Stops the component, waits |
|
|
Reloads the component with a new configuration object or any configuration file path. |
|
|
Updates a single configuration parameter. |
|
|
Updates multiple configuration parameters simultaneously. |
System-Level Actions#
These actions interact with the broader ROS2 system and are executed by the central Monitor.
Action Method |
Arguments |
Description |
|---|---|---|
|
|
Logs a message to the ROS console. |
|
|
Publishes a message to a specific topic. Can be single-shot or periodic. |
|
|
Sends a request to a ROS 2 Service. |
|
|
Sends a goal to a ROS 2 Action Server. |
Tip
The pre-defined Actions are all keyword only
Usage Example:#
from ros_sugar.actions import ComponentActions
my_component = BaseComponent(node_name='test_component')
action1 = ComponentActions.start(component=my_component)
action2 = ComponentActions.log(msg="I am executing a cool action!")
Dynamic Arguments (Event Parsers)#
In standard usage, arguments are fixed at definition time. However, when paired with an Event, you often need to use data from the triggering message (e.g., āGo to this locationā where this is the location defined in the triggering event message).
See also
See more on how the event parser affects the event management here
You can use the .event_parser() method to map data from the event to the actionās arguments.
Example#
Letās see how this can work in a small example: We will take the example used in Kompass tutorial where a send_action_goal action is used to send a ROS2 ActionServer goal by parsing a value from a published topic.
First we define the action that sends the action server goal:
from ros_sugar.actions import ComponentActions
from kompass_interfaces.action import PlanPath
# Define an Action to send a goal to the planner ActionServer
send_goal: Action = ComponentActions.send_action_goal(
action_name="/planner/plan_path",
action_type=PlanPath,
action_request_msg=PlanPath.Goal(),
)
Then a parser is defined to parse a PointStamped message into the required ROS2 goal message:
from kompass_interfaces.msg import PathTrackingError
from geomerty_msgs.msg import PointStamped
# Define a method to parse a message of type PointStamped to the planner PlanPath Goal
def goal_point_parser(*, msg: PointStamped, **_):
action_request = PlanPath.Goal()
goal = Pose()
goal.position.x = msg.point.x
goal.position.y = msg.point.y
action_request.goal = goal
end_tolerance = PathTrackingError()
end_tolerance.orientation_error = 0.2
end_tolerance.lateral_distance_error = 0.05
action_request.end_tolerance = end_tolerance
return action_request
# Adds the parser method as an Event parser of the send_goal action
send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
As we see the defined goal_point_parser method takes the PointStamped message and turns it into a PlanPath goal request. Then at each event trigger the value of the action_request will be passed to the send_action_goal executable as the action_request_msg