Creating your ROS2 package using ROS Sugar

Creating your ROS2 package using ROS Sugar#

Note

Before building your own package based on ROS Sugar, familiarize yourself with the basic design concepts

Tip

To see detailed examples on packages created using ROS Sugar, check out Kompass and ROS Agents

1- Start by creating a new ROS2 python package. (see instruction here)

ros2 pkg create --build-type ament_python --license Apache-2.0 my-awesome-pkg

2- Create your first functional unit (component) in a new file:

cd my-awesome-pkg\my_awesome_pkg
touch awesome_component.py

3- Setup your component configuration by extending BaseComponentConfig based on attrs package:

from attrs import field, define
from ros_sugar.config import BaseComponentConfig, base_validators

@define(kw_only=True)
class AwesomeConfig(BaseComponentConfig):
    """
    Component configuration parameters
    """

    extra_float: float = field(
        default=10.0, validator=base_validators.in_range(min_value=1e-9, max_value=1e9)
    )

    extra_flag: bool = field(default=True)

4- Initialize your component by inheriting from BaseComponent class. Next, you can code the exact desired functionality in your component. (Refer to the BaseComponent API docs for more details on the available methods)

from ros_sugar.core import ComponentFallbacks, BaseComponent
from ros_sugar.io import Topic

class AwesomeComponent(BaseComponent):
    def __init__(
        self,
        *,
        component_name: str,
        inputs: Optional[Sequence[Topic]] = None,
        outputs: Optional[Sequence[Topic]] = None,
        config_file: Optional[str] = None,
        config: Optional[AwesomeConfig] = None,
        **kwargs,
    ) -> None:
        # Set default config if config is not provided
        self.config: AwesomeConfig = config or AwesomeConfig()

        super().__init__(
            component_name=component_name,
            inputs=inputs,
            outputs=outputs,
            config=self.config,
            config_file=config_file,
            **kwargs,
        )

    def _execution_step(self):
        """
        Main execution step
        """
        super()._execution_step()
        # Add your main execution step here, to be executed at each loop step for timed components

5- Follow the previous method to create any number of functional units in your package.

6- To use your components with ROS Sugar Launcher in multi-threaded execution, jump to step 11. To Setup multi-process execution check step 7.

7- Next, to use your components with ROS Sugar Launcher in multi-process execution you need to create an entry point for the ROS2 package.

cd my-awesome-pkg\my_awesome_pkg
touch executable.py

8- Import your component and their configuration classes and get the executable_main from ros_sugar:

#!/usr/bin/env python3
from ros_sugar import executable_main
from my_awesome_pkg.awesome_component import AwesomeComponent, AwesomeConfig
# Import your other components/config here ...

# Create lists of available components/config classes
_components_list = [AwesomeComponent]
_configs_list = [AwesomeConfig]

# Create entry point main
def main(args=None):
    executable_main(list_of_components=_components_list, list_of_configs=_configs_list)

9- Add your entry point to the ROS2 package setup.py:

from setuptools import find_packages, setup

package_name = "my_awesome_pkg"

console_scripts = [
    "executable = my_awesome_pkg.executable:main",
]

setup(
    name=package_name,
    version="1",
    packages=find_packages(),
    install_requires=["setuptools"],
    zip_safe=True,
    maintainer="Cyberdyne Systems",
    maintainer_email="contact@cyberdynesystems.com",
    description="My awesome ROS2 sugar package",
    entry_points={
        "console_scripts": console_scripts,
    },
)

10- Build your ROS2 package with colcon (instructions here)

11- Now you can use and launch your awesome new package with ROS sugar launcher using a simple python script:

Using ros_sugar Launcher with your awesome package#
 1from my_awesome_pkg.awesome_component import AwesomeComponent, AwesomeConfig
 2from ros_sugar.actions import LogInfo
 3from ros_sugar.events import OnLess
 4from ros_sugar import Launcher
 5from ros_sugar.io import Topic
 6
 7# Define a set of topics
 8map_topic = Topic(name="map", msg_type="OccupancyGrid")
 9audio_topic = Topic(name="voice", msg_type="Audio")
10image_topic = Topic(name="camera/rgb", msg_type="Image")
11
12# Init your components
13my_component = AwesomeComponent(component_name='test_component', inputs=[map_topic, image_topic], outputs=[audio_topic])
14
15# Create your events
16low_battery = OnLess(
17    "low_battery",
18    Topic(name="/battery_level", msg_type="Int"),
19    15,
20    ("data")
21)
22
23# Events/Actions
24my_events_actions: Dict[event.Event, Action] = {
25    low_battery: LogInfo(msg="Battery is Low!)
26}
27
28# Create your launcher
29launcher = Launcher()
30
31# Add your package components
32launcher.add_pkg(
33    components=[my_component],
34    package_name='my_awesome_pkg',
35    executable_entry_point='executable',
36    events_actions=my_events_actions,
37    activate_all_components_on_start=True,
38    multiprocessing=True,
39)
40
41# If any component fails -> restart it with unlimited retries
42launcher.on_component_fail(action_name="restart")
43
44# Bring up the system
45launcher.bringup(ros_log_level="info", introspect=False)