ros_sugar.io.callbacks

Contents

ros_sugar.io.callbacks#

ROS Subscribers Callback Classes

Module Contents#

Classes#

GenericCallback

GenericCallback.

StdMsgCallback

std_msgs callback

ImageCallback

Image Callback class. Its get method saves an image as bytes

CompressedImageCallback

CompressedImage Callback class. Its get method saves an image as bytes

TextCallback

Text Callback class. Its get method returns the text

AudioCallback

Audio Callback class. Its get method returns the audio

MapMetaDataCallback

OccupancyGrid MetaData Callback class. Its get method returns dict of meta data from the occupancy grid topic

OdomCallback

Ros Odometry Callback Handler to get the robot state in 2D

PointCallback

Ros Pose Callback Handler to get the robot state in 2D

PointStampedCallback

Ros Pose Callback Handler to get the robot state in 2D

PoseCallback

Ros Pose Callback Handler to get the robot state in 2D

PoseStampedCallback

Ros PoseStamped Callback Handler to get the robot state in 2D

OccupancyGridCallback

API#

class ros_sugar.io.callbacks.GenericCallback(input_topic, node_name: Optional[str] = None)#

GenericCallback.

property frame_id: Optional[str]#

Getter of the message frame ID if available

Returns:

Header frame ID

Return type:

Optional[str]

set_node_name(node_name: str) None#

Set node name.

Parameters:

node_name (str)

Return type:

None

set_subscriber(subscriber: rclpy.subscription.Subscription) None#

set_subscriber.

Parameters:

subscriber

Return type:

None

on_callback_execute(callback: Callable) None#

Attach a method to be executed on topic callback

Parameters:

callback (Callable)

callback(msg) None#

Topic subscriber callback

Parameters:

msg (Any) – Received ros msg

add_post_processors(processors: List[Union[Callable, socket.socket]])#

Add a post processor for callback message

Parameters:

method (List[Union[Callable, socket]]) – Post processor methods or sockets

get_output(clear_last: bool = False, **kwargs) Any#

Post process outputs based on custom processors (if any) and return it

Parameters:
  • output

  • args

  • kwargs

property got_msg#

Property is true if an input is received on the topic

clear_last_msg()#

Clears the last received message on the topic

class ros_sugar.io.callbacks.StdMsgCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

std_msgs callback

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.ImageCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

Image Callback class. Its get method saves an image as bytes

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.CompressedImageCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.ImageCallback

CompressedImage Callback class. Its get method saves an image as bytes

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.TextCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

Text Callback class. Its get method returns the text

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.AudioCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

Audio Callback class. Its get method returns the audio

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.MapMetaDataCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

OccupancyGrid MetaData Callback class. Its get method returns dict of meta data from the occupancy grid topic

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.OdomCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

Ros Odometry Callback Handler to get the robot state in 2D

property transformation: Optional[tf2_ros.TransformStamped]#

Sets a new transformation value

Returns:

Detected transform from source to desired goal frame

Return type:

TransformStamped

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.PointCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

Ros Pose Callback Handler to get the robot state in 2D

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.PointStampedCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

Ros Pose Callback Handler to get the robot state in 2D

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.PoseCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.GenericCallback

Ros Pose Callback Handler to get the robot state in 2D

property transformation: Optional[tf2_ros.TransformStamped]#

Sets a new transformation value

Returns:

Detected transform from source to desired goal frame

Return type:

TransformStamped

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.PoseStampedCallback(input_topic, node_name: Optional[str] = None)#

Bases: ros_sugar.io.callbacks.PoseCallback

Ros PoseStamped Callback Handler to get the robot state in 2D

property transformation: Optional[tf2_ros.TransformStamped]#
property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#
class ros_sugar.io.callbacks.OccupancyGridCallback(input_topic, node_name: Optional[str] = None, to_numpy: Optional[bool] = True, twoD_to_threeD_conversion_height: Optional[float] = 0.01)#

Bases: ros_sugar.io.callbacks.GenericCallback

property frame_id: Optional[str]#
set_node_name(node_name: str) None#
set_subscriber(subscriber: rclpy.subscription.Subscription) None#
on_callback_execute(callback: Callable) None#
callback(msg) None#
add_post_processors(processors: List[Union[Callable, socket.socket]])#
get_output(clear_last: bool = False, **kwargs) Any#
property got_msg#
clear_last_msg()#