Create a Go-to-X component using map data#

In the previous example we created a semantic map using the MapEncoding component. Intuitively one can imagine that using the map data would require some form of RAG. Let us suppose that we want to create a Go-to-X component, which, when given a command like β€˜Go to the yellow door’, would retreive the coordinates of the yellow door from the map and publish them to a goal point topic of type PoseStamped to be handled by our robots navigation system. We will create our Go-to-X component using the LLM component provided by ROS Agents. We will start by initializing the component, and configuring it to use RAG.

Initialize the component#

from agents.components import LLM
from agents.models import Llama3_1
from agents.config import LLMConfig
from agents.clients.ollama import OllamaClient
from agents.ros import Topic

# Start a Llama3.1 based llm component using ollama client
llama = Llama3_1(name="llama")
llama_client = OllamaClient(llama)

# Define LLM input and output topics including goal_point topic of type PoseStamped
goto_in = Topic(name="goto_in", msg_type="String")
goal_point = Topic(name="goal_point", msg_type="PoseStamped")

In order to configure the component to use RAG, we will set the following options in its config.

config = LLMConfig(enable_rag=True,
                   collection_name="map",
                   distance_func="l2",
                   n_results=1,
                   add_metadata=True)

Note that the collection_name parameter is the same as the map name we set in the previous example. We have also set add_metadata parameter to true to make sure that our metadata is included in the RAG result, as the spatial coordinates we want to get are part of the metadata. Let us have a quick look at the metadata stored in the map by the MapEncoding component.

{
    "coordinates": [1.1, 2.2, 0.0],
    "layer_name": "Topic_Name",  # same as topic name that the layer is subscribed to
    "timestamp": 1234567,
    "temporal_change": True
}

With this information, we will first initialize our component.

Caution

In the following code block we are using the same DB client that was setup in the previous example.

# initialize the component
goto = LLM(
    inputs=[goto_in],
    outputs=[goal_point],
    model_client=llama_client,
    db_client=chroma_client,  # check the previous example where we setup this database client
    trigger=goto_in,
    component_name='go_to_x'
)

Pre-process the model output before publishing#

Knowing that the output of retreival will be appended to the beggining of our query as context, we will setup a component level promot for our LLM.

# set a component prompt
goto.set_component_prompt(
    template="""From the given metadata, extract coordinates and provide
    the coordinates in the following json format:\n {"position": coordinates}"""
)

Note

One might notice that we have not used an input topic name in our prompt. This is because we only need the input topic to fetch data from the vector DB during the RAG step. The query to the LLM in this case would only be composed of data fetched from the DB and our prompt.

As the LLM output will contain text other than the json string that we have asked for, we need to add a pre-processing function to the output topic that extracts the required part of the text and returns the output in a format that can be published to a PoseStamped topic, i.e. a numpy array of floats.

from typing import Optional
import json
import numpy as np

# pre-process the output before publishing to a topic of msg_type PoseStamped
def llm_answer_to_goal_point(output: str) -> Optional[np.ndarray]:
    # extract the json part of the output string (including brackets)
    # one can use sophisticated regex parsing here but we'll keep it simple
    json_string = output[output.find("{") : output.rfind("}") + 1]
    # load the string as a json and extract position coordinates
    # if there is an error, return None, i.e. no output would be published to goal_point
    try:
        json_dict = json.loads(json_string)
        coordinates = np.fromstring(json_dict["position"], sep=',', dtype=np.float64)
        print('Coordinates Extracted:', coordinates)
        if coordinates.shape[0] < 2 or coordinates.shape[0] > 3:
            return
        elif coordinates.shape[0] == 2:  # sometimes LLMs avoid adding the zeros of z-dimension
            coordinates = np.append(coordinates, 0)
        return coordinates
    except Exception:
        return

# add the pre-processing function to the goal_point output topic
goto.add_publisher_preprocessor(goal_point, llm_answer_to_goal_point)

Launching the Components#

And we will launch our Go-to-X component.

from agents.ros import Launcher

# Launch the component
launcher = Launcher()
launcher.add_pkg(
    components=[goto],
    activate_all_components_on_start=True)
launcher.bringup()

And that is all. Our Go-to-X component is ready. The complete code for this example is given below:

Go-to-X Component#
 1from typing import Optional
 2import json
 3import numpy as np
 4from agents.components import LLM
 5from agents.models import Llama3_1
 6from agents.vectordbs import ChromaDB
 7from agents.config import LLMConfig
 8from agents.clients.roboml import HTTPDBClient
 9from agents.clients.ollama import OllamaClient
10from agents.ros import Launcher, Topic
11
12# Start a Llama3.1 based llm component using ollama client
13llama = Llama3_1(name="llama")
14llama_client = OllamaClient(llama)
15
16# Initialize a vector DB that will store our routes
17chroma = ChromaDB(name="MainDB")
18chroma_client = HTTPDBClient(db=chroma)
19
20# Define LLM input and output topics including goal_point topic of type PoseStamped
21goto_in = Topic(name="goto_in", msg_type="String")
22goal_point = Topic(name="goal_point", msg_type="PoseStamped")
23
24config = LLMConfig(enable_rag=True,
25                   collection_name="map",
26                   distance_func="l2",
27                   n_results=1,
28                   add_metadata=True)
29
30# initialize the component
31goto = LLM(
32    inputs=[goto_in],
33    outputs=[goal_point],
34    model_client=llama_client,
35    db_client=chroma_client,  # check the previous example where we setup this database client
36    trigger=goto_in,
37    component_name='go_to_x'
38)
39
40# set a component prompt
41goto.set_component_prompt(
42    template="""From the given metadata, extract coordinates and provide
43    the coordinates in the following json format:\n {"position": coordinates}"""
44)
45
46
47# pre-process the output before publishing to a topic of msg_type PoseStamped
48def llm_answer_to_goal_point(output: str) -> Optional[np.ndarray]:
49    # extract the json part of the output string (including brackets)
50    # one can use sophisticated regex parsing here but we'll keep it simple
51    json_string = output[output.find("{") : output.rfind("}") + 1]
52    # load the string as a json and extract position coordinates
53    # if there is an error, return None, i.e. no output would be published to goal_point
54    try:
55        json_dict = json.loads(json_string)
56        coordinates = np.fromstring(json_dict["position"], sep=',', dtype=np.float64)
57        print('Coordinates Extracted:', coordinates)
58        if coordinates.shape[0] < 2 or coordinates.shape[0] > 3:
59            return
60        elif coordinates.shape[0] == 2:  # sometimes LLMs avoid adding the zeros of z-dimension
61            coordinates = np.append(coordinates, 0)
62        return coordinates
63    except Exception:
64        return
65
66
67# add the pre-processing function to the goal_point output topic
68goto.add_publisher_preprocessor(goal_point, llm_answer_to_goal_point)
69
70# Launch the component
71launcher = Launcher()
72launcher.add_pkg(
73    components=[goto],
74    activate_all_components_on_start=True)
75launcher.bringup()