agents.components.map_encoding#

Module Contents#

Classes#

MapEncoding

Map encoding component that encodes text information as a semantic map based on the robots localization. It takes in map layers, position topic, map occupancy grid topic, and a vector database client. Map layers can be arbitrary text based outputs from other components such as MLLMs or Vision.

API#

class agents.components.map_encoding.MapEncoding(*, layers: List[agents.ros.MapLayer], position: agents.ros.Topic, map_topic: agents.ros.Topic, config: agents.config.MapConfig, db_client: agents.clients.db_base.DBClient, trigger: Union[agents.ros.Topic, List[agents.ros.Topic], float] = 10.0, component_name: str, **kwargs)#

Bases: agents.components.component_base.Component

Map encoding component that encodes text information as a semantic map based on the robots localization. It takes in map layers, position topic, map occupancy grid topic, and a vector database client. Map layers can be arbitrary text based outputs from other components such as MLLMs or Vision.

Parameters:
  • layers (list[MapLayer]) – A list of map layer objects to be encoded.

  • position (Topic) – The topic for the current robot position.

  • map_topic (Topic) – The OccupancyGrid topic for storing and retrieving map data.

  • config (MapConfig) – The configuration for the map encoding component.

  • db_client (DBClient) – A database client to store and retrieve map data.

  • trigger (Union[Topic, list[Topic], float]) – An optional trigger value or topic that triggers the map encoding process.

  • component_name (str) – The name of the MapEncoding component. This should be a string.

  • kwargs – Additional keyword arguments.

Example usage:

position_topic = Topic(name="position", msg_type="Odometry")
map_topic = Topic(name="map", msg_type="OccupancyGrid")
config = MapConfig(map_name="map")
db_client = DBClient(db=ChromaDB("database_name"))
layers = [MapLayer(subscribes_to=text1, resolution_multiple=3),
          MapLayer(subscribes_to=detections1, temporal_change=True)]  # text1 and detections1 are String topics that are being published on by other components
map_encoding_component = MapEncoding(
    layers=layers,
    position=position_topic,
    map_topic=map_topic,
    config=config,
    db_client=db_client,
    component_name="semantic_map"
)
custom_on_configure()#

configure.

custom_on_deactivate()#

deactivate.

add_point(layer: agents.ros.MapLayer, point: Tuple[numpy.ndarray, str]) None#

Component action to add a user defined point to the map collection. This action can be executed on an event.

Parameters:
  • layer (MapLayer) – Layer to which the point should be added

  • point (tuple[np.ndarray, str]) – A tuple of position (numpy array) and text data (str)

Return type:

None

custom_on_activate()#

Custom configuration for creating triggers.

create_all_subscribers()#

Override to handle trigger topics and fixed inputs. Called by parent BaseComponent

activate_all_triggers() None#

Activates component triggers by attaching execution step to callbacks

destroy_all_subscribers() None#

Destroys all node subscribers

trigger(trigger: Union[agents.ros.Topic, List[agents.ros.Topic], float]) None#

Set component trigger

validate_topics(topics: Sequence[Union[agents.ros.Topic, agents.ros.FixedInput]], allowed_topic_types: Optional[Dict[str, List[Union[Type[agents.ros.SupportedType], List[Type[agents.ros.SupportedType]]]]]] = None, topics_direction: str = 'Topics')#

Verify component specific inputs or outputs using allowed topics if provided