ros_sugar.io.callbacks
#
ROS Subscribers Callback Classes
Module Contents#
Classes#
GenericCallback. |
|
std_msgs callback |
|
Image Callback class. Its get method saves an image as bytes |
|
CompressedImage Callback class. Its get method saves an image as bytes |
|
Text Callback class. Its get method returns the text |
|
Audio Callback class. Its get method returns the audio |
|
OccupancyGrid MetaData Callback class. Its get method returns dict of meta data from the occupancy grid topic |
|
Ros Odometry Callback Handler to get the robot state in 2D |
|
Ros Pose Callback Handler to get the robot state in 2D |
|
Ros Pose Callback Handler to get the robot state in 2D |
|
Ros Pose Callback Handler to get the robot state in 2D |
|
Ros PoseStamped Callback Handler to get the robot state in 2D |
|
API#
- class ros_sugar.io.callbacks.GenericCallback(input_topic, node_name: Optional[str] = None)#
GenericCallback.
- property frame_id: Optional[str]#
Getter of the message frame ID if available
- Returns:
Header frame ID
- Return type:
Optional[str]
- set_node_name(node_name: str) None #
Set node name.
- Parameters:
node_name (str)
- Return type:
None
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
set_subscriber.
- Parameters:
subscriber
- Return type:
None
- on_callback_execute(callback: Callable) None #
Attach a method to be executed on topic callback
- Parameters:
callback (Callable)
- callback(msg) None #
Topic subscriber callback
- Parameters:
msg (Any) – Received ros msg
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
Add a post processor for callback message
- Parameters:
method (List[Union[Callable, socket]]) – Post processor methods or sockets
- get_output(clear_last: bool = False, **kwargs) Any #
Post process outputs based on custom processors (if any) and return it
- Parameters:
output
args
kwargs
- property got_msg#
Property is true if an input is received on the topic
- clear_last_msg()#
Clears the last received message on the topic
- class ros_sugar.io.callbacks.StdMsgCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
std_msgs callback
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.ImageCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
Image Callback class. Its get method saves an image as bytes
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.CompressedImageCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.ImageCallback
CompressedImage Callback class. Its get method saves an image as bytes
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.TextCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
Text Callback class. Its get method returns the text
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.AudioCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
Audio Callback class. Its get method returns the audio
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.MapMetaDataCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
OccupancyGrid MetaData Callback class. Its get method returns dict of meta data from the occupancy grid topic
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.OdomCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
Ros Odometry Callback Handler to get the robot state in 2D
- property transformation: Optional[tf2_ros.TransformStamped]#
Sets a new transformation value
- Returns:
Detected transform from source to desired goal frame
- Return type:
TransformStamped
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.PointCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
Ros Pose Callback Handler to get the robot state in 2D
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.PointStampedCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
Ros Pose Callback Handler to get the robot state in 2D
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.PoseCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.GenericCallback
Ros Pose Callback Handler to get the robot state in 2D
- property transformation: Optional[tf2_ros.TransformStamped]#
Sets a new transformation value
- Returns:
Detected transform from source to desired goal frame
- Return type:
TransformStamped
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.PoseStampedCallback(input_topic, node_name: Optional[str] = None)#
Bases:
ros_sugar.io.callbacks.PoseCallback
Ros PoseStamped Callback Handler to get the robot state in 2D
- property transformation: Optional[tf2_ros.TransformStamped]#
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#
- class ros_sugar.io.callbacks.OccupancyGridCallback(input_topic, node_name: Optional[str] = None, to_numpy: Optional[bool] = True, twoD_to_threeD_conversion_height: Optional[float] = 0.01)#
Bases:
ros_sugar.io.callbacks.GenericCallback
- property frame_id: Optional[str]#
- set_node_name(node_name: str) None #
- set_subscriber(subscriber: rclpy.subscription.Subscription) None #
- on_callback_execute(callback: Callable) None #
- callback(msg) None #
- add_post_processors(processors: List[Union[Callable, socket.socket]])#
- get_output(clear_last: bool = False, **kwargs) Any #
- property got_msg#
- clear_last_msg()#