ros_sugar.core.component

Contents

ros_sugar.core.component#

Base Component

Module Contents#

Classes#

API#

class ros_sugar.core.component.BaseComponent(component_name: str, inputs: Optional[Sequence[ros_sugar.io.topic.Topic]] = None, outputs: Optional[Sequence[ros_sugar.io.topic.Topic]] = None, config: Optional[ros_sugar.config.base_config.BaseComponentConfig] = None, config_file: Optional[str] = None, callback_group: Optional[rclpy.callback_groups.CallbackGroup] = None, enable_health_broadcast: bool = True, fallbacks: Optional[ros_sugar.core.fallbacks.ComponentFallbacks] = None, main_action_type: Optional[type] = None, main_srv_type: Optional[type] = None, **kwargs)#

Bases: ros_sugar.core.node.BaseNode, rclpy.lifecycle.Node

rclpy_init_node(*args, **kwargs)#

To init the node with rclpy and activate default services

attach_custom_callback(input_topic: ros_sugar.io.topic.Topic, callable: Callable) None#

Method to attach custom method to subscriber callbacks

add_callback_postprocessor(input_topic: ros_sugar.io.topic.Topic, func: Callable) None#

Adds a callable as a post processor for topic callback.

Parameters:
  • input_topic (Topic)

  • callable

add_publisher_preprocessor(output_topic: ros_sugar.io.topic.Topic, func: Callable) None#

Adds a callable as a pre processor for topic publisher.

Parameters:
  • output_topic (Topic)

  • callable

create_all_subscribers()#

Creates all node subscribers from component inputs

create_all_publishers()#

Creates all node publishers from component outputs

create_all_timers()#

Creates all node timers

create_all_action_servers()#

Action servers creation

create_all_services()#

Services creation

destroy_all_timers()#

Destroys all node timers

destroy_all_subscribers()#

Destroys all node subscribers

destroy_all_publishers()#

Destroys all node publishers

destroy_all_services()#

Destroys all node services

destroy_all_action_servers()#

Destroys all action servers

got_all_inputs(inputs_to_check: Optional[List[str]] = None, inputs_to_exclude: Optional[List[str]] = None) bool#

Check if all input topics are being published

Parameters:
  • inputs_to_check (list[str] | None, optional) – List of input keys to check, defaults to None

  • inputs_to_exclude (list[str] | None, optional) – List of input keys to exclude from check, defaults to None

Returns:

If all inputs are published

Return type:

bool

get_missing_inputs() list[str]#

Get a list of input topic names not being published

Returns:

List of unpublished topics

Return type:

list[str]

configure(config_file: str)#

Configure component from yaml file

Parameters:

config_file (str) – Path to file

property run_type: ros_sugar.config.base_config.ComponentRunType#

Component run type: Timed, ActionServer or Server

Returns:

Timed, ActionServer or Server

Return type:

str

property fallback_rate: float#

Component fallback rate: Rate in which the component checks for fallbacks and executes a fallback actions if a failure is detected

Returns:

Fallback rate (Hz)

Return type:

float

property loop_rate: float#

Component loop rate: Rate in which the component executes its main (_execution_step)

Returns:

Loop rate (Hz)

Return type:

float

property events_actions: Dict[str, List[ros_sugar.core.action.Action]]#

Getter of component Events/Actions

Returns:

Dictionary of monitored Events and associated Actions

Return type:

Dict[str, List[Action]]

abstract main_action_callback(goal_handle)#

Component main action server callback - used if component started with run_as_action_server=True

Parameters:

goal_handle (action_type.Goal) – Action goal handle

property main_action_name: Optional[str]#

Name of the main action server created by the component

Returns:

ActionServer name

Return type:

str

property main_srv_name: Optional[str]#

Name of the main server created by the component

Returns:

Server name

Return type:

str

abstract main_service_callback(request, response)#

Component main service callback - used if component started with run_as_server=True

Parameters:
  • request (service_type.Request) – Service request

  • response (service_type.Response) – Service response

classmethod get_change_parameters_msg_from_config(config: ros_sugar.config.base_config.BaseComponentConfig) automatika_ros_sugar.srv.ChangeParameters.Request#

Helper method to update config parameter value and return an error message if it is not updated

Parameters:

config (ComponentConfig or child class) – description

Returns:

Request message for change parameters services corresponding to the given config

Return type:

ChangeParameters.Request

is_topic_of_type(input, msg_type: type) bool#

Checks if a topic contains a msg of given type

Parameters:
  • input (Topic) – Object to check

  • msg_type (type) – Topic message type to check

Returns:

If input is a Topic with given message type

Return type:

bool

attach_callbacks()#

Run on-activate node. Override this method to attach methods to callbacks

property available_actions: List[str]#

Getter of available component actions

Returns:

Methods names

Return type:

List[str]

start() bool#

Start the component - trigger_activate

Returns:

If the component is started

Return type:

bool

stop() bool#

Stop the component - trigger_deactivate

Returns:

If the component is stopped

Return type:

bool

reconfigure(new_config: Any, keep_alive: bool = False) bool#

Reconfigure the component - cleanup->stop->trigger_configure->start

Parameters:
  • new_config (Any) – New component config

  • keep_alive (bool, optional) – Reconfigure while the component is online, defaults to False

Returns:

If the component is Reconfigured

Return type:

bool

restart(wait_time: Optional[float] = None) bool#

Restart the component - stop->start

Returns:

If the component is Reconfigured

Return type:

bool

set_param(param_name: str, new_value: Any, keep_alive: bool = True) bool#

Change the value of one component parameter

Parameters:
  • param_name (str) – description

  • new_value (Any) – description

  • keep_alive (bool, optional) – To keep the component running when updating value, defaults to True

Raises:

Exception – Parameter could not be updated to given value

Returns:

Parameter updated

Return type:

bool

set_params(params_names: List[str], new_values: List, keep_alive: bool = True) bool#

Change the value of multiple component parameters

Parameters:
  • param_name (str) – description

  • new_value (Any) – description

  • keep_alive (bool, optional) – To keep the component running when updating value, defaults to True

Raises:

Exception – Parameter could not be updated to given value

Returns:

Parameter updated

Return type:

bool

property fallbacks: List[str]#

Gets all available component fallback methods

Returns:

List of available fallback names

Return type:

List[str]

on_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None#

Set the fallback strategy (action) on any fail

Parameters:
  • action (Union[List[Action], Action]) – Action to be executed on failure

  • max_retries (Optional[int], optional) – Maximum number of action execution retries. None is equivalent to unlimited retries, defaults to None

on_system_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None#

Set the fallback strategy (action) on system fail

Parameters:
  • action (Union[List[Action], Action]) – Action to be executed on failure

  • max_retries (Optional[int], optional) – Maximum number of action execution retries. None is equivalent to unlimited retries, defaults to None

on_component_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None#

Set the fallback strategy (action) on component fail

Parameters:
  • action (Union[List[Action], Action]) – Action to be executed on failure

  • max_retries (Optional[int], optional) – Maximum number of action execution retries. None is equivalent to unlimited retries, defaults to None

on_algorithm_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None#

Set the fallback strategy (action) on algorithm fail

Parameters:
  • action (Union[List[Action], Action]) – Action to be executed on failure

  • max_retries (Optional[int], optional) – Maximum number of action execution retries. None is equivalent to unlimited retries, defaults to None

broadcast_status() None#

Component fallback defined to only broadcast the current state so it is handled by an external manager. Used as the default fallback strategy for any system (external) failure

property lifecycle_state: Optional[int]#

lifecycle state machine current state getter

Returns:

description

Return type:

int

on_configure(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn#

Method on node state transition to Configured Declares node parameters and inits the base node (with initial flags and variables)

Parameters:

state (lifecycle.State) – Current node state

Returns:

Node state transition result

Return type:

lifecycle.TransitionCallbackReturn

on_activate(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn#

Method on node state transition to Active Starts node subscriptions, publications, services and clients

Parameters:

state (lifecycle.State) – Current node state

Returns:

Node state transition result

Return type:

lifecycle.TransitionCallbackReturn

on_deactivate(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn#

Method on node state transition to Deactivate

Parameters:

state (lifecycle.State) – Current node state

Returns:

Node state transition result

Return type:

lifecycle.TransitionCallbackReturn

on_shutdown(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn#

Method on node state transition to Finalized

Parameters:

state (lifecycle.State) – Current node state

Returns:

Node state transition result

Return type:

lifecycle.TransitionCallbackReturn

on_cleanup(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn#

Method on node state transition to unConfigured Starts node subscriptions, publications, services and clients

Parameters:

state (lifecycle.State) – Current node state

Returns:

Node state transition result

Return type:

lifecycle.TransitionCallbackReturn

on_error(state: rclpy.lifecycle.LifecycleState) rclpy.lifecycle.TransitionCallbackReturn#

Handles a transition error

When a transition returns TransitionCallbackReturn.FAILURE or TransitionCallbackReturn.ERROR.

custom_on_configure() None#

Method called on configure to overwrite with custom configuration

custom_on_activate() None#

Method called on activation to overwrite with custom activation

custom_on_deactivate() None#

Method called on deactivation to overwrite with custom deactivation

custom_on_shutdown() None#

Method called on shutdown to overwrite with custom shutdown

custom_on_error() None#

Method called on transition error to overwrite with custom transition error handling

custom_on_cleanup() None#

Method called on cleanup to overwrite with custom cleanup

add_execute_once(method: Callable)#
add_execute_in_loop(method: Callable)#
get_ros_time() builtin_interfaces.msg.Time#
get_secs_time() float#
property launch_cmd_args: List[str]#
property config_json: Union[str, bytes]#
activate()#
deactivate()#
setup_qos(qos_policy: ros_sugar.config.QoSConfig) rclpy.qos.QoSProfile#
init_flags()#
init_variables()#
create_tf_listener(tf_config: ros_sugar.tf.TFListenerConfig) ros_sugar.tf.TFListener#
create_client(*args, **kwargs) rclpy.client.Client#
create_all_service_clients()#
create_all_action_clients()#
destroy_all_action_clients()#
destroy_all_service_clients()#