Create a Go-to-X component using map data#
In the previous example we created a semantic map using the MapEncoding component. Intuitively one can imagine that using the map data would require some form of RAG. Let us suppose that we want to create a Go-to-X component, which, when given a command like βGo to the yellow doorβ, would retreive the coordinates of the yellow door from the map and publish them to a goal point topic of type PoseStamped to be handled by our robots navigation system. We will create our Go-to-X component using the LLM component provided by ROS Agents. We will start by initializing the component, and configuring it to use RAG.
Initialize the component#
from agents.components import LLM
from agents.models import Llama3_1
from agents.config import LLMConfig
from agents.clients.ollama import OllamaClient
from agents.ros import Topic
# Start a Llama3.1 based llm component using ollama client
llama = Llama3_1(name="llama")
llama_client = OllamaClient(llama)
# Define LLM input and output topics including goal_point topic of type PoseStamped
goto_in = Topic(name="goto_in", msg_type="String")
goal_point = Topic(name="goal_point", msg_type="PoseStamped")
In order to configure the component to use RAG, we will set the following options in its config.
config = LLMConfig(enable_rag=True,
collection_name="map",
distance_func="l2",
n_results=1,
add_metadata=True)
Note that the collection_name parameter is the same as the map name we set in the previous example. We have also set add_metadata parameter to true to make sure that our metadata is included in the RAG result, as the spatial coordinates we want to get are part of the metadata. Let us have a quick look at the metadata stored in the map by the MapEncoding component.
{
"coordinates": [1.1, 2.2, 0.0],
"layer_name": "Topic_Name", # same as topic name that the layer is subscribed to
"timestamp": 1234567,
"temporal_change": True
}
With this information, we will first initialize our component.
Caution
In the following code block we are using the same DB client that was setup in the previous example.
# initialize the component
goto = LLM(
inputs=[goto_in],
outputs=[goal_point],
model_client=llama_client,
db_client=chroma_client, # check the previous example where we setup this database client
trigger=goto_in,
config=config,
component_name='go_to_x'
)
Pre-process the model output before publishing#
Knowing that the output of retreival will be appended to the beggining of our query as context, we will setup a component level promot for our LLM.
# set a component prompt
goto.set_component_prompt(
template="""From the given metadata, extract coordinates and provide
the coordinates in the following json format:\n {"position": coordinates}"""
)
Note
One might notice that we have not used an input topic name in our prompt. This is because we only need the input topic to fetch data from the vector DB during the RAG step. The query to the LLM in this case would only be composed of data fetched from the DB and our prompt.
As the LLM output will contain text other than the json string that we have asked for, we need to add a pre-processing function to the output topic that extracts the required part of the text and returns the output in a format that can be published to a PoseStamped topic, i.e. a numpy array of floats.
from typing import Optional
import json
import numpy as np
# pre-process the output before publishing to a topic of msg_type PoseStamped
def llm_answer_to_goal_point(output: str) -> Optional[np.ndarray]:
# extract the json part of the output string (including brackets)
# one can use sophisticated regex parsing here but we'll keep it simple
json_string = output[output.find("{") : output.rfind("}") + 1]
# load the string as a json and extract position coordinates
# if there is an error, return None, i.e. no output would be published to goal_point
try:
json_dict = json.loads(json_string)
coordinates = np.fromstring(json_dict["position"], sep=',', dtype=np.float64)
print('Coordinates Extracted:', coordinates)
if coordinates.shape[0] < 2 or coordinates.shape[0] > 3:
return
elif coordinates.shape[0] == 2: # sometimes LLMs avoid adding the zeros of z-dimension
coordinates = np.append(coordinates, 0)
return coordinates
except Exception:
return
# add the pre-processing function to the goal_point output topic
goto.add_publisher_preprocessor(goal_point, llm_answer_to_goal_point)
Launching the Components#
And we will launch our Go-to-X component.
from agents.ros import Launcher
# Launch the component
launcher = Launcher()
launcher.add_pkg(
components=[goto]
)
launcher.bringup()
And that is all. Our Go-to-X component is ready. The complete code for this example is given below:
1from typing import Optional
2import json
3import numpy as np
4from agents.components import LLM
5from agents.models import Llama3_1
6from agents.vectordbs import ChromaDB
7from agents.config import LLMConfig
8from agents.clients.roboml import HTTPDBClient
9from agents.clients.ollama import OllamaClient
10from agents.ros import Launcher, Topic
11
12# Start a Llama3.1 based llm component using ollama client
13llama = Llama3_1(name="llama")
14llama_client = OllamaClient(llama)
15
16# Initialize a vector DB that will store our routes
17chroma = ChromaDB(name="MainDB")
18chroma_client = HTTPDBClient(db=chroma)
19
20# Define LLM input and output topics including goal_point topic of type PoseStamped
21goto_in = Topic(name="goto_in", msg_type="String")
22goal_point = Topic(name="goal_point", msg_type="PoseStamped")
23
24config = LLMConfig(enable_rag=True,
25 collection_name="map",
26 distance_func="l2",
27 n_results=1,
28 add_metadata=True)
29
30# initialize the component
31goto = LLM(
32 inputs=[goto_in],
33 outputs=[goal_point],
34 model_client=llama_client,
35 db_client=chroma_client, # check the previous example where we setup this database client
36 trigger=goto_in,
37 config=config,
38 component_name='go_to_x'
39)
40
41# set a component prompt
42goto.set_component_prompt(
43 template="""From the given metadata, extract coordinates and provide
44 the coordinates in the following json format:\n {"position": coordinates}"""
45)
46
47
48# pre-process the output before publishing to a topic of msg_type PoseStamped
49def llm_answer_to_goal_point(output: str) -> Optional[np.ndarray]:
50 # extract the json part of the output string (including brackets)
51 # one can use sophisticated regex parsing here but we'll keep it simple
52 json_string = output[output.find("{") : output.rfind("}") + 1]
53 # load the string as a json and extract position coordinates
54 # if there is an error, return None, i.e. no output would be published to goal_point
55 try:
56 json_dict = json.loads(json_string)
57 coordinates = np.fromstring(json_dict["position"], sep=',', dtype=np.float64)
58 print('Coordinates Extracted:', coordinates)
59 if coordinates.shape[0] < 2 or coordinates.shape[0] > 3:
60 return
61 elif coordinates.shape[0] == 2: # sometimes LLMs avoid adding the zeros of z-dimension
62 coordinates = np.append(coordinates, 0)
63 return coordinates
64 except Exception:
65 return
66
67
68# add the pre-processing function to the goal_point output topic
69goto.add_publisher_preprocessor(goal_point, llm_answer_to_goal_point)
70
71# Launch the component
72launcher = Launcher()
73launcher.add_pkg(
74 components=[goto]
75 )
76launcher.bringup()