ros_sugar.core.component
#
Base Component
Module Contents#
Classes#
API#
- class ros_sugar.core.component.BaseComponent(component_name: str, inputs: Optional[Sequence[ros_sugar.io.topic.Topic]] = None, outputs: Optional[Sequence[ros_sugar.io.topic.Topic]] = None, config: Optional[ros_sugar.config.base_config.BaseComponentConfig] = None, config_file: Optional[str] = None, callback_group: Optional[rclpy.callback_groups.CallbackGroup] = None, enable_health_broadcast: bool = True, fallbacks: Optional[ros_sugar.core.fallbacks.ComponentFallbacks] = None, main_action_type: Optional[type] = None, main_srv_type: Optional[type] = None, **kwargs)#
Bases:
ros_sugar.core.node.BaseNode
,rclpy.lifecycle.Node
- rclpy_init_node(*args, **kwargs)#
To init the node with rclpy and activate default services
- attach_custom_callback(input_topic: ros_sugar.io.topic.Topic, callable: Callable) None #
Method to attach custom method to subscriber callbacks
- add_callback_postprocessor(input_topic: ros_sugar.io.topic.Topic, func: Callable) None #
Adds a callable as a post processor for topic callback.
- Parameters:
input_topic (Topic)
callable
- add_publisher_preprocessor(output_topic: ros_sugar.io.topic.Topic, func: Callable) None #
Adds a callable as a pre processor for topic publisher.
- Parameters:
output_topic (Topic)
callable
- create_all_subscribers()#
Creates all node subscribers from component inputs
- create_all_publishers()#
Creates all node publishers from component outputs
- create_all_timers()#
Creates all node timers
- create_all_action_servers()#
Action servers creation
- create_all_services()#
Services creation
- destroy_all_timers()#
Destroys all node timers
- destroy_all_subscribers()#
Destroys all node subscribers
- destroy_all_publishers()#
Destroys all node publishers
- destroy_all_services()#
Destroys all node services
- destroy_all_action_servers()#
Destroys all action servers
- got_all_inputs(inputs_to_check: Optional[List[str]] = None, inputs_to_exclude: Optional[List[str]] = None) bool #
Check if all input topics are being published
- Parameters:
inputs_to_check (list[str] | None, optional) – List of input keys to check, defaults to None
inputs_to_exclude (list[str] | None, optional) – List of input keys to exclude from check, defaults to None
- Returns:
If all inputs are published
- Return type:
bool
- get_missing_inputs() list[str] #
Get a list of input topic names not being published
- Returns:
List of unpublished topics
- Return type:
list[str]
- configure(config_file: str)#
Configure component from yaml file
- Parameters:
config_file (str) – Path to file
- property run_type: ros_sugar.config.base_config.ComponentRunType#
Component run type: Timed, ActionServer or Server
- Returns:
Timed, ActionServer or Server
- Return type:
str
- property fallback_rate: float#
Component fallback rate: Rate in which the component checks for fallbacks and executes a fallback actions if a failure is detected
- Returns:
Fallback rate (Hz)
- Return type:
float
- property loop_rate: float#
Component loop rate: Rate in which the component executes its main (_execution_step)
- Returns:
Loop rate (Hz)
- Return type:
float
- property events_actions: Dict[str, List[ros_sugar.core.action.Action]]#
Getter of component Events/Actions
- Returns:
Dictionary of monitored Events and associated Actions
- Return type:
Dict[str, List[Action]]
- abstract main_action_callback(goal_handle)#
Component main action server callback - used if component started with run_as_action_server=True
- Parameters:
goal_handle (action_type.Goal) – Action goal handle
- property main_action_name: Optional[str]#
Name of the main action server created by the component
- Returns:
ActionServer name
- Return type:
str
- property main_srv_name: Optional[str]#
Name of the main server created by the component
- Returns:
Server name
- Return type:
str
- abstract main_service_callback(request, response)#
Component main service callback - used if component started with run_as_server=True
- Parameters:
request (service_type.Request) – Service request
response (service_type.Response) – Service response
- classmethod get_change_parameters_msg_from_config(config: ros_sugar.config.base_config.BaseComponentConfig) automatika_ros_sugar.srv.ChangeParameters.Request #
Helper method to update config parameter value and return an error message if it is not updated
- Parameters:
config (ComponentConfig or child class) – description
- Returns:
Request message for change parameters services corresponding to the given config
- Return type:
ChangeParameters.Request
- is_topic_of_type(input, msg_type: type) bool #
Checks if a topic contains a msg of given type
- Parameters:
input (Topic) – Object to check
msg_type (type) – Topic message type to check
- Returns:
If input is a Topic with given message type
- Return type:
bool
- attach_callbacks()#
Run on-activate node. Override this method to attach methods to callbacks
- property available_actions: List[str]#
Getter of available component actions
- Returns:
Methods names
- Return type:
List[str]
- start() bool #
Start the component - trigger_activate
- Returns:
If the component is started
- Return type:
bool
- stop() bool #
Stop the component - trigger_deactivate
- Returns:
If the component is stopped
- Return type:
bool
- reconfigure(new_config: Any, keep_alive: bool = False) bool #
Reconfigure the component - cleanup->stop->trigger_configure->start
- Parameters:
new_config (Any) – New component config
keep_alive (bool, optional) – Reconfigure while the component is online, defaults to False
- Returns:
If the component is Reconfigured
- Return type:
bool
- restart(wait_time: Optional[float] = None) bool #
Restart the component - stop->start
- Returns:
If the component is Reconfigured
- Return type:
bool
- set_param(param_name: str, new_value: Any, keep_alive: bool = True) bool #
Change the value of one component parameter
- Parameters:
param_name (str) – description
new_value (Any) – description
keep_alive (bool, optional) – To keep the component running when updating value, defaults to True
- Raises:
Exception – Parameter could not be updated to given value
- Returns:
Parameter updated
- Return type:
bool
- set_params(params_names: List[str], new_values: List, keep_alive: bool = True) bool #
Change the value of multiple component parameters
- Parameters:
param_name (str) – description
new_value (Any) – description
keep_alive (bool, optional) – To keep the component running when updating value, defaults to True
- Raises:
Exception – Parameter could not be updated to given value
- Returns:
Parameter updated
- Return type:
bool
- property fallbacks: List[str]#
Gets all available component fallback methods
- Returns:
List of available fallback names
- Return type:
List[str]
- on_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None #
Set the fallback strategy (action) on any fail
- on_system_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None #
Set the fallback strategy (action) on system fail
- on_component_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None #
Set the fallback strategy (action) on component fail
- on_algorithm_fail(action: Union[List[ros_sugar.core.action.Action], ros_sugar.core.action.Action], max_retries: Optional[int] = None) None #
Set the fallback strategy (action) on algorithm fail
- broadcast_status() None #
Component fallback defined to only broadcast the current state so it is handled by an external manager. Used as the default fallback strategy for any system (external) failure
- property lifecycle_state: Optional[int]#
lifecycle state machine current state getter
- Returns:
description
- Return type:
int
- on_configure(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn #
Method on node state transition to Configured Declares node parameters and inits the base node (with initial flags and variables)
- Parameters:
state (lifecycle.State) – Current node state
- Returns:
Node state transition result
- Return type:
lifecycle.TransitionCallbackReturn
- on_activate(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn #
Method on node state transition to Active Starts node subscriptions, publications, services and clients
- Parameters:
state (lifecycle.State) – Current node state
- Returns:
Node state transition result
- Return type:
lifecycle.TransitionCallbackReturn
- on_deactivate(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn #
Method on node state transition to Deactivate
- Parameters:
state (lifecycle.State) – Current node state
- Returns:
Node state transition result
- Return type:
lifecycle.TransitionCallbackReturn
- on_shutdown(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn #
Method on node state transition to Finalized
- Parameters:
state (lifecycle.State) – Current node state
- Returns:
Node state transition result
- Return type:
lifecycle.TransitionCallbackReturn
- on_cleanup(state: rclpy.lifecycle.State) rclpy.lifecycle.TransitionCallbackReturn #
Method on node state transition to unConfigured Starts node subscriptions, publications, services and clients
- Parameters:
state (lifecycle.State) – Current node state
- Returns:
Node state transition result
- Return type:
lifecycle.TransitionCallbackReturn
- on_error(state: rclpy.lifecycle.LifecycleState) rclpy.lifecycle.TransitionCallbackReturn #
Handles a transition error
When a transition returns TransitionCallbackReturn.FAILURE or TransitionCallbackReturn.ERROR.
- custom_on_configure() None #
Method called on configure to overwrite with custom configuration
- custom_on_activate() None #
Method called on activation to overwrite with custom activation
- custom_on_deactivate() None #
Method called on deactivation to overwrite with custom deactivation
- custom_on_shutdown() None #
Method called on shutdown to overwrite with custom shutdown
- custom_on_error() None #
Method called on transition error to overwrite with custom transition error handling
- custom_on_cleanup() None #
Method called on cleanup to overwrite with custom cleanup
- add_execute_once(method: Callable)#
- add_execute_in_loop(method: Callable)#
- get_ros_time() builtin_interfaces.msg.Time #
- get_secs_time() float #
- property launch_cmd_args: List[str]#
- property config_json: Union[str, bytes]#
- activate()#
- deactivate()#
- setup_qos(qos_policy: ros_sugar.config.QoSConfig) rclpy.qos.QoSProfile #
- init_flags()#
- init_variables()#
- create_tf_listener(tf_config: ros_sugar.tf.TFListenerConfig) ros_sugar.tf.TFListener #
- create_client(*args, **kwargs) rclpy.client.Client #
- create_all_service_clients()#
- create_all_action_clients()#
- destroy_all_action_clients()#
- destroy_all_service_clients()#