Create a spatio-temporal semantic map#
Autonomous Mobile Robots (AMRs) keep a representation of their environment in the form of occupancy maps. One can layer semantic information on top of these occupancy maps and with the use of MLLMs one can even add answers to arbitrary questions about the environment to this map. In EmbodiedAgents such maps can be created using vector databases which are specifically designed to store natural language data and retreive it based on natural language queries. Thus an embodied agent can keep a text based spatio-temporal memory, from which it can do retreival to answer questions or do spatial planning.
Here we will show an example of generating such a map using object detection information and questions answered by an MLLM. This map, of course can be made arbitrarily complex and robust by adding checks on the data being stored, however in our example we will keep things simple. Lets start by importing relevant components.
from agents.components import MapEncoding, Vision, MLLM
Next, we will use a vision component to provide us with object detections, as we did in the previous example.
Setting up a Vision Component#
from agents.ros import Topic
# Define the image input topic
image0 = Topic(name="image_raw", msg_type="Image")
# Create a detection topic
detections_topic = Topic(name="detections", msg_type="Detections")
Additionally the component requiers a model client with an object detection model. We will use the RESP client for RoboML and use the VisionModel a convenient model class made available in EmbodiedAgents, for initializing all vision models available in the opensource mmdetection library. We will specify the model we want to use by specifying the checkpoint attribute.
Note
Learn about setting up RoboML with vision here.
from agents.models import VisionModel
from agents.clients.roboml import RoboMLRESPClient
from agents.config import VisionConfig
# Add an object detection model
object_detection = VisionModel(name="object_detection",
checkpoint="dino-4scale_r50_8xb2-12e_coco")
roboml_detection = RoboMLRESPClient(object_detection)
# Initialize the Vision component
detection_config = VisionConfig(threshold=0.5)
vision = Vision(
inputs=[image0],
outputs=[detections_topic],
trigger=image0,
config=detection_config,
model_client=roboml_detection,
component_name="detection_component",
)
The vision component will provide us with semantic information to add to our map. However, object names are only the most basic semantic element of the scene. One can view such basic elements in aggregate to create more abstract semantic associations. This is where multimodal LLMs come in.
Setting up an MLLM Component#
With large scale multimodal LLMs we can ask higher level introspective questions about the sensor information the robot is receiving and record this information on our spatio-temporal map. As an example we will setup an MLLM component that periodically asks itself the same question, about the nature of the space the robot is present iin. In order to acheive this we will use two concepts. First is that of a FixedInput, a simulated Topic that has a fixed value whenever it is read by a listener. And the second is that of a timed component. In EmbodiedAgents, components can get triggered by either an input received on a Topic or automatically after a certain period of time. This latter trigger specifies a timed component. Lets see what all of this looks like in code.
from agents.clients import OllamaClient
from agents.models import OllamaModel
from agents.ros import FixedInput
# Define a model client (working with Ollama in this case)
llava = OllamaModel(name="llava", checkpoint="llava:latest")
llava_client = OllamaClient(llava)
# Define a fixed input for the component
introspection_query = FixedInput(
name="introspection_query", msg_type="String",
fixed="What kind of a room is this? Is it an office, a bedroom or a kitchen? Give a one word answer, out of the given choices")
# Define output of the component
introspection_answer = Topic(name="introspection_answer", msg_type="String")
# Start a timed (periodic) component using the mllm model defined earlier
# This component answers the same question after every 15 seconds
introspector = MLLM(
inputs=[introspection_query, image0], # we use the image0 topic defined earlier
outputs=[introspection_answer],
model_client=llava_client,
trigger=15.0, # we provide the time interval as a float value to the trigger parameter
component_name="introspector",
)
LLM/MLLM model outputs can be unpredictable. Before publishing the answer of our question to the output topic, we want to ensure that the model has indeed provided a one word answer, and this answer is one of the expected choices. EmbodiedAgents allows us to add arbitrary pre-processor functions to data that is going to be published (Conversely, we can also add post-processing functions to data that has been received in a listeners callback, but we will see that in another example). We will add a simple pre-processing function to our output topic as follows:
# Define an arbitrary function to validate the output of the introspective component
# before publication.
from typing import Optional
def introspection_validation(output: str) -> Optional[str]:
for option in ["office", "bedroom", "kitchen"]:
if option in output.lower():
return option
introspector.add_publisher_preprocessor(introspection_answer, introspection_validation)
This should ensure that our component only publishes the model output to this topic if the validation function returns an output. All that is left to do now is to setup our MapEncoding component.
Creating a Semantic Map as a Vector DB#
The final step is to store the output of our models in a spatio-temporal map. EmbodiedAgents provides a MapEncoding component that takes input data being published by other components and appropriately stores them in a vector DB. The input to a MapEncoding component is in the form of map layers. A MapLayer is a thin abstraction over Topic, with certain additional parameters. We will create our map layers as follows:
from agents.ros import MapLayer
# Object detection output from vision component
layer1 = MapLayer(subscribes_to=detections_topic, temporal_change=True)
# Introspection output from mllm component
layer2 = MapLayer(subscribes_to=introspection_answer, resolution_multiple=3)
temporal_change parameter specifies that for the same spatial position the output coming in from the component needs to be stored along with timestamps, as the output can change over time. By default this option is set to False. resolution_multiple specifies that we can coarse grain spatial coordinates by combining map grid cells.
Next we need to provide our component with localization information via an odometry topic and a map data topic (of type OccupancyGrid). The latter is necessary to know the actual resolution of the robots map.
# Initialize mandatory topics defining the robots localization in space
position = Topic(name="odom", msg_type="Odometry")
map_topic = Topic(name="map", msg_type="OccupancyGrid")
Caution
Be sure to replace the name paramter in topics with the actual topic names being published on your robot.
Finally we initialize the MapEncoding component by providing it a database client. For the database client we will use HTTP DB client from RoboML. Much like model clients, the database client is initialized with a vector DB specification. For our example we will use Chroma DB, an open source multimodal vector DB.
See also
Checkout Chroma DB here.
from agents.vectordbs import ChromaDB
from agents.clients import ChromaClient
from agents.config import MapConfig
# Initialize a vector DB that will store our semantic map
chroma = ChromaDB()
chroma_client = ChromaClient(db=chroma)
# Create the map component
map_conf = MapConfig(map_name="map") # We give our map a name
map = MapEncoding(
layers=[layer1, layer2],
position=position,
map_topic=map_topic,
config=map_conf,
db_client=chroma_client,
trigger=15.0, # map layer data is stored every 15 seconds
component_name="map_encoding",
)
Launching the Components#
And as always we will launch our components as we did in the previous examples.
from agents.ros import Launcher
# Launch the components
launcher = Launcher()
launcher.add_pkg(
components=[vision, introspector, map]
)
launcher.bringup()
And that is it. We have created our spatio-temporal semantic map using the outputs of two model components. The complete code for this example is below:
1from typing import Optional
2from agents.components import MapEncoding, Vision, MLLM
3from agents.models import VisionModel, OllamaModel
4from agents.clients import RoboMLRESPClient, ChromaClient, OllamaClient
5from agents.ros import Topic, MapLayer, Launcher, FixedInput
6from agents.vectordbs import ChromaDB
7from agents.config import MapConfig, VisionConfig
8
9# Define the image input topic
10image0 = Topic(name="image_raw", msg_type="Image")
11# Create a detection topic
12detections_topic = Topic(name="detections", msg_type="Detections")
13
14# Add an object detection model
15object_detection = VisionModel(
16 name="object_detection", checkpoint="dino-4scale_r50_8xb2-12e_coco"
17)
18roboml_detection = RoboMLRESPClient(object_detection)
19
20# Initialize the Vision component
21detection_config = VisionConfig(threshold=0.5)
22vision = Vision(
23 inputs=[image0],
24 outputs=[detections_topic],
25 trigger=image0,
26 config=detection_config,
27 model_client=roboml_detection,
28 component_name="detection_component",
29)
30
31
32# Define a model client (working with Ollama in this case)
33llava = OllamaModel(name="llava", checkpoint="llava:latest")
34llava_client = OllamaClient(llava)
35
36# Define a fixed input for the component
37introspection_query = FixedInput(
38 name="introspection_query",
39 msg_type="String",
40 fixed="What kind of a room is this? Is it an office, a bedroom or a kitchen? Give a one word answer, out of the given choices",
41)
42# Define output of the component
43introspection_answer = Topic(name="introspection_answer", msg_type="String")
44
45# Start a timed (periodic) component using the mllm model defined earlier
46# This component answers the same question after every 15 seconds
47introspector = MLLM(
48 inputs=[introspection_query, image0], # we use the image0 topic defined earlier
49 outputs=[introspection_answer],
50 model_client=llava_client,
51 trigger=15.0, # we provide the time interval as a float value to the trigger parameter
52 component_name="introspector",
53)
54
55
56# Define an arbitrary function to validate the output of the introspective component
57# before publication.
58def introspection_validation(output: str) -> Optional[str]:
59 for option in ["office", "bedroom", "kitchen"]:
60 if option in output.lower():
61 return option
62
63
64introspector.add_publisher_preprocessor(introspection_answer, introspection_validation)
65
66# Object detection output from vision component
67layer1 = MapLayer(subscribes_to=detections_topic, temporal_change=True)
68# Introspection output from mllm component
69layer2 = MapLayer(subscribes_to=introspection_answer, resolution_multiple=3)
70
71# Initialize mandatory topics defining the robots localization in space
72position = Topic(name="odom", msg_type="Odometry")
73map_topic = Topic(name="map", msg_type="OccupancyGrid")
74
75# Initialize a vector DB that will store our semantic map
76chroma = ChromaDB()
77chroma_client = ChromaClient(db=chroma)
78
79# Create the map component
80map_conf = MapConfig(map_name="map") # We give our map a name
81map = MapEncoding(
82 layers=[layer1, layer2],
83 position=position,
84 map_topic=map_topic,
85 config=map_conf,
86 db_client=chroma_client,
87 trigger=15.0,
88 component_name="map_encoding",
89)
90
91# Launch the components
92launcher = Launcher()
93launcher.add_pkg(
94 components=[vision, introspector, map]
95 )
96launcher.bringup()