Create a spatio-temporal semantic map#
Autonomous Mobile Robots (AMRs) keep a representation of their environment in the form of occupancy maps. One can layer semantic information on top of these occupancy maps and with the use of MLLMs one can even add answers to arbitrary questions about the environment to this map. In ROS Agents such maps can be created using vector databases which are specifically designed to store natural language data and retreive it based on natural language queries. Thus an embodied agent can keep a text based spatio-temporal memory, from which it can do retreival to answer questions or do spatial planning.
Here we will show an example of generating such a map using object detection information and questions answered by an MLLM. This map, of course can be made arbitrarily complex and robust by adding checks on the data being stored, however in our example we will keep things simple. Lets start by importing relevant components.
from agents.components import MapEncoding, Vision, MLLM
Next, we will use a vision component to provide us with object detections, as we did in the previous example.
Setting up a Vision Component#
from agents.ros import Topic
# Define the image input topic
image0 = Topic(name="image_raw", msg_type="Image")
# Create a detection topic
detections_topic = Topic(name="detections", msg_type="Detections")
Additionally the component requiers a model client with an object detection model. We will use the RESP client for RoboML and use the VisionModel a convenient model class made available in ROS Agents, for initializing all vision models available in the opensource mmdetection library. We will specify the model we want to use by specifying the checkpoint attribute.
Note
Learn about setting up RoboML with vision here.
from agents.models import VisionModel
from agents.clients.roboml import RESPModelClient
from agents.config import VisionConfig
# Add an object detection model
object_detection = VisionModel(name="object_detection",
checkpoint="dino-4scale_r50_8xb2-12e_coco")
roboml_detection = RESPModelClient(object_detection)
# Initialize the Vision component
detection_config = VisionConfig(threshold=0.5)
vision = Vision(
inputs=[image0],
outputs=[detections_topic],
trigger=image0,
config=detection_config,
model_client=roboml_detection,
component_name="detection_component",
)
The vision component will provide us with semantic information to add to our map. However, object names are only the most basic semantic element of the scene. One can view such basic elements in aggregate to create more abstract semantic associations. This is where multimodal LLMs come in.
Setting up an MLLM Component#
With large scale multimodal LLMs we can ask higher level introspective questions about the sensor information the robot is receiving and record this information on our spatio-temporal map. As an example we will setup an MLLM component that periodically asks itself the same question, about the nature of the space the robot is present iin. In order to acheive this we will use two concepts. First is that of a FixedInput, a simulated Topic that has a fixed value whenever it is read by a listener. And the second is that of a timed component. In ROS Agents, components can get triggered by either an input received on a Topic or automatically after a certain period of time. This latter trigger specifies a timed component. Lets see what all of this looks like in code.
from agents.clients.ollama import OllamaClient
from agents.models import Llava
from agents.ros import FixedInput
# Define a model client (working with Ollama in this case)
llava = Llava(name="llava")
llava_client = OllamaClient(llava)
# Define a fixed input for the component
introspection_query = FixedInput(
name="introspection_query", msg_type="String",
fixed="What kind of a room is this? Is it an office, a bedroom or a kitchen? Give a one word answer, out of the given choices")
# Define output of the component
introspection_answer = Topic(name="introspection_answer", msg_type="String")
# Start a timed (periodic) component using the mllm model defined earlier
# This component answers the same question after every 15 seconds
introspector = MLLM(
inputs=[introspection_query, image0], # we use the image0 topic defined earlier
outputs=[introspection_answer],
model_client=llava_client,
trigger=15.0, # we provide the time interval as a float value to the trigger parameter
component_name="introspector",
)
LLM/MLLM model outputs can be unpredictable. Before publishing the answer of our question to the output topic, we want to ensure that the model has indeed provided a one word answer, and this answer is one of the expected choices. ROS Agents allows us to add arbitrary pre-processor functions to data that is going to be published (Conversely, we can also add post-processing functions to data that has been received in a listeners callback, but we will see that in another example). We will add a simple pre-processing function to our output topic as follows:
# Define an arbitrary function to validate the output of the introspective component
# before publication.
from typing import Optional
def introspection_validation(output: str) -> Optional[str]:
for option in ["office", "bedroom", "kitchen"]:
if option in output.lower():
return option
introspector.add_publisher_preprocessor(introspection_answer, introspection_validation)
This should ensure that our component only publishes the model output to this topic if the validation function returns an output. All that is left to do now is to setup our MapEncoding component.
Creating a Semantic Map as a Vector DB#
The final step is to store the output of our models in a spatio-temporal map. ROS Agents provides a MapEncoding component that takes input data being published by other components and appropriately stores them in a vector DB. The input to a MapEncoding component is in the form of map layers. A MapLayer is a thin abstraction over Topic, with certain additional parameters. We will create our map layers as follows:
from agents.ros import MapLayer
# Object detection output from vision component
layer1 = MapLayer(subscribes_to=detections_topic, temporal_change=True)
# Introspection output from mllm component
layer2 = MapLayer(subscribes_to=introspection_answer, resolution_multiple=3)
temporal_change parameter specifies that for the same spatial position the output coming in from the component needs to be stored along with timestamps, as the output can change over time. By default this option is set to False. resolution_multiple specifies that we can coarse grain spatial coordinates by combining map grid cells.
Next we need to provide our component with localization information via an odometry topic and a map data topic (of type OccupancyGrid). The latter is necessary to know the actual resolution of the robots map.
# Initialize mandatory topics defining the robots localization in space
position = Topic(name="odom", msg_type="Odometry")
map_topic = Topic(name="map", msg_type="OccupancyGrid")
Caution
Be sure to replace the name paramter in topics with the actual topic names being published on your robot.
Finally we initialize the MapEncoding component by providing it a database client. For the database client we will use HTTP DB client from RoboML. Much like model clients, the database client is initialized with a vector DB specification. For our example we will use Chroma DB, an open source multimodal vector DB.
See also
Checkout Chroma DB here.
from agents.vectordbs import ChromaDB
from agents.clients.roboml import HTTPDBClient
from agents.config import MapConfig
# Initialize a vector DB that will store our semantic map
chroma = ChromaDB(name="MainDB")
chroma_client = HTTPDBClient(db=chroma)
# Create the map component
map_conf = MapConfig(map_name="map") # We give our map a name
map = MapEncoding(
layers=[layer1, layer2],
position=position,
map_topic=map_topic,
config=map_conf,
db_client=chroma_client,
trigger=15.0, # map layer data is stored every 15 seconds
)
Launching the Components#
And as always we will launch our components as we did in the previous examples.
from agents.ros import Launcher
# Launch the components
launcher = Launcher()
launcher.add_pkg(
components=[vision, introspector, map]
)
launcher.bringup()
And that is it. We have created our spatio-temporal semantic map using the outputs of two model components. The complete code for this example is below:
1from typing import Optional
2from agents.components import MapEncoding, Vision, MLLM
3from agents.models import VisionModel, Llava
4from agents.clients.roboml import RESPModelClient, HTTPDBClient
5from agents.clients.ollama import OllamaClient
6from agents.ros import Topic, MapLayer, Launcher, FixedInput
7from agents.vectordbs import ChromaDB
8from agents.config import MapConfig, VisionConfig
9
10# Define the image input topic
11image0 = Topic(name="image_raw", msg_type="Image")
12# Create a detection topic
13detections_topic = Topic(name="detections", msg_type="Detections")
14
15# Add an object detection model
16object_detection = VisionModel(name="object_detection",
17 checkpoint="dino-4scale_r50_8xb2-12e_coco")
18roboml_detection = RESPModelClient(object_detection)
19
20# Initialize the Vision component
21detection_config = VisionConfig(threshold=0.5)
22vision = Vision(
23 inputs=[image0],
24 outputs=[detections_topic],
25 trigger=image0,
26 config=detection_config,
27 model_client=roboml_detection,
28 component_name="detection_component",
29)
30
31
32# Define a model client (working with Ollama in this case)
33llava = Llava(name="llava")
34llava_client = OllamaClient(llava)
35
36
37# Define a fixed input for the component
38introspection_query = FixedInput(
39 name="introspection_query", msg_type="String",
40 fixed="What kind of a room is this? Is it an office, a bedroom or a kitchen? Give a one word answer, out of the given choices")
41# Define output of the component
42introspection_answer = Topic(name="introspection_answer", msg_type="String")
43
44# Start a timed (periodic) component using the mllm model defined earlier
45# This component answers the same question after every 15 seconds
46introspector = MLLM(
47 inputs=[introspection_query, image0], # we use the image0 topic defined earlier
48 outputs=[introspection_answer],
49 model_client=llava_client,
50 trigger=15.0, # we provide the time interval as a float value to the trigger parameter
51 component_name="introspector",
52)
53
54# Define an arbitrary function to validate the output of the introspective component
55# before publication.
56def introspection_validation(output: str) -> Optional[str]:
57 for option in ["office", "bedroom", "kitchen"]:
58 if option in output.lower():
59 return option
60
61
62introspector.add_publisher_preprocessor(introspection_answer, introspection_validation)
63
64# Object detection output from vision component
65layer1 = MapLayer(subscribes_to=detections_topic, temporal_change=True)
66# Introspection output from mllm component
67layer2 = MapLayer(subscribes_to=introspection_answer, resolution_multiple=3)
68
69# Initialize mandatory topics defining the robots localization in space
70position = Topic(name="odom", msg_type="Odometry")
71map_meta_data = Topic(name="map_meta_data", msg_type="MapMetaData")
72
73# Initialize a vector DB that will store our semantic map
74chroma = ChromaDB(name="MainDB")
75chroma_client = HTTPDBClient(db=chroma)
76
77# Create the map component
78map_conf = MapConfig(map_name="map") # We give our map a name
79map = MapEncoding(
80 layers=[layer1, layer2],
81 position=position,
82 map_meta_data=map_meta_data,
83 config=map_conf,
84 db_client=chroma_client,
85 trigger=15.0,
86)
87
88# Launch the components
89launcher = Launcher()
90launcher.add_pkg(
91 components=[vision, introspector, map]
92 )
93launcher.bringup()