ros_sugar.core.monitor#

Monitor

Module Contents#

Classes#

Monitor

Monitor is a ROS2 Node (not Lifecycle) responsible of monitoring the status of the stack (rest of the running nodes) and managing requests/responses from the Orchestrator.

API#

class ros_sugar.core.monitor.Monitor(components_names: List[str], enable_health_status_monitoring: bool = True, events_actions: Optional[Dict[ros_sugar.core.event.Event, List[ros_sugar.core.action.Action]]] = None, events_to_emit: Optional[List[ros_sugar.core.event.Event]] = None, config: Optional[ros_sugar.config.BaseConfig] = None, services_components: Optional[List[ros_sugar.core.component.BaseComponent]] = None, action_servers_components: Optional[List[ros_sugar.core.component.BaseComponent]] = None, activate_on_start: Optional[List[ros_sugar.core.component.BaseComponent]] = None, activation_timeout: Optional[float] = None, activation_attempt_time: float = 1.0, start_on_init: bool = False, component_name: str = 'monitor', callback_group: Optional[Union[rclpy.callback_groups.MutuallyExclusiveCallbackGroup, rclpy.callback_groups.ReentrantCallbackGroup]] = None, *args, **kwargs)#

Bases: ros_sugar.core.node.BaseNode

Monitor is a ROS2 Node (not Lifecycle) responsible of monitoring the status of the stack (rest of the running nodes) and managing requests/responses from the Orchestrator.

Note

When launching the stack using the Launcher, the user is not required to configure the Monitor. The Launcher will configure and launch its own Monitor internally.

Main Functionalities:

  • Creates Subscribers to registered Events. The Monitor is configured to declare an InternalEvent back to the Launcher so the corresponding Action can be executed (see source implementation in launch_actions.py)

  • Creates Subscribers to all registered Components health status topics

  • Creates clients for all components main services and main action servers

  • Creates service clients to components reconfiguration services to handle actions sent from the Launcher

add_components_activation_event(method) None#

Adds a method to be executed when components are activated

Parameters:

method (Callable) – Method to be executed on components activation

create_all_timers() None#

Create all timers

property events#

Monitored events getter

Returns:

Events list

Return type:

List[Event]

configure_component(component: ros_sugar.core.component.BaseComponent, new_config: Union[object, str], keep_alive: bool) None#

Configure a given component from config instance or config file Creates and send the request to the component service

Parameters:
  • component (BaseComponent) – Component to configure

  • config (object | str) – Config instance or path to config file

  • keep_alive (bool) – To keep the component running while configuring

  • executor (ROS Executor, optional) – Used to spin the monitor node until the service response is received, defaults to None

update_parameter(component: ros_sugar.core.component.BaseComponent, param_name: str, new_value: Any, keep_alive: bool = True) None#

Sends a ChangeParameter service request to given component

Parameters:
  • component (BaseComponent) – description

  • param_name (str) – description

  • new_value (Any) – description

  • keep_alive (bool, optional) – description, defaults to True

update_parameters(component: ros_sugar.core.component.BaseComponent, params_names: List[str], new_values: List, keep_alive: bool = True, **_) None#

Sends a ChangeParameters service request to given component

Parameters:
  • component (BaseComponent) – description

  • params_names (List[str]) – description

  • new_values (List) – description

  • keep_alive (bool, optional) – description, defaults to True

send_srv_request(srv_name: str, srv_type: type, srv_request_msg: Any, **_) None#

Action to send a ROS2 service request during runtime

Parameters:
  • srv_name (str) – Service name

  • srv_type (type) – Service type (ROS2 service)

  • srv_request_msg (Any) – Service request message

send_action_goal(action_name: str, action_type: type, action_request_msg: Any, **_) None#

Action to send a ROS2 action goal during runtime

Parameters:
  • action_name (str) – ROS2 action name

  • action_type (type) – ROS2 action type

  • action_request_msg (Any) – ROS2 action goal message

publish_message(topic: ros_sugar.io.topic.Topic, msg: Any, publish_rate: Optional[float] = None, publish_period: Optional[float] = None, **_) None#

Action to publish a message to a given topic

Parameters:
  • topic (Topic) – Published topic

  • msg (Any) – Published message

  • publish_rate (Optional[float], optional) – Publishing rate, if None the message is published once, defaults to None

  • publish_period (Optional[float], optional) – Publishing period, if none and rate is given the message is published forever, defaults to None

create_all_subscribers() None#

Create health status subscribers and events subscribers

create_all_service_clients() None#

Create service clients for all components running as servers

create_all_action_clients() None#

Create action clients for all components running as action servers

rclpy_init_node(*args, **kwargs)#
configure(config_file: str)#
add_execute_once(method: Callable)#
add_execute_in_loop(method: Callable)#
get_ros_time() builtin_interfaces.msg.Time#
get_secs_time() float#
property launch_cmd_args: List[str]#
property config_json: Union[str, bytes]#
activate()#
deactivate()#
setup_qos(qos_policy: ros_sugar.config.QoSConfig) rclpy.qos.QoSProfile#
init_flags()#
init_variables()#
create_tf_listener(tf_config: ros_sugar.tf.TFListenerConfig) ros_sugar.tf.TFListener#
create_client(*args, **kwargs) rclpy.client.Client#
create_all_publishers()#
create_all_services()#
create_all_action_servers()#
destroy_all_subscribers()#
destroy_all_publishers()#
destroy_all_services()#
destroy_all_action_servers()#
destroy_all_action_clients()#
destroy_all_service_clients()#
destroy_all_timers()#