Creating your ROS2 package using ROS Sugar#
Note
Before building your own package based on ROS Sugar, familiarize yourself with the basic design concepts
Tip
To see detailed examples on packages created using ROS Sugar, check out Kompass and ROS Agents
1- Start by creating a new ROS2 python package. (see instruction here)
ros2 pkg create --build-type ament_python --license Apache-2.0 my-awesome-pkg
2- Create your first functional unit (component) in a new file:
cd my-awesome-pkg\my_awesome_pkg
touch awesome_component.py
3- Setup your component configuration by extending BaseComponentConfig
based on attrs package:
from attrs import field, define
from ros_sugar.config import BaseComponentConfig, base_validators
@define(kw_only=True)
class AwesomeConfig(BaseComponentConfig):
"""
Component configuration parameters
"""
extra_float: float = field(
default=10.0, validator=base_validators.in_range(min_value=1e-9, max_value=1e9)
)
extra_flag: bool = field(default=True)
4- Initialize your component by inheriting from BaseComponent
class. Next, you can code the exact desired functionality in your component. (Refer to the BaseComponent API docs for more details on the available methods)
from ros_sugar.core import ComponentFallbacks, BaseComponent
from ros_sugar.io import Topic
class AwesomeComponent(BaseComponent):
def __init__(
self,
*,
component_name: str,
inputs: Optional[Sequence[Topic]] = None,
outputs: Optional[Sequence[Topic]] = None,
config_file: Optional[str] = None,
config: Optional[AwesomeConfig] = None,
**kwargs,
) -> None:
# Set default config if config is not provided
self.config: AwesomeConfig = config or AwesomeConfig()
super().__init__(
component_name=component_name,
inputs=inputs,
outputs=outputs,
config=self.config,
config_file=config_file,
**kwargs,
)
def _execution_step(self):
"""
Main execution step
"""
super()._execution_step()
# Add your main execution step here, to be executed at each loop step for timed components
5- Follow the previous method to create any number of functional units in your package.
6- To use your components with ROS Sugar Launcher in multi-threaded execution, jump to step 11. To Setup multi-process execution check step 7.
7- Next, to use your components with ROS Sugar Launcher in multi-process execution you need to create an entry point for the ROS2 package.
cd my-awesome-pkg\my_awesome_pkg
touch executable.py
8- Import your component and their configuration classes and get the executable_main
from ros_sugar
:
#!/usr/bin/env python3
from ros_sugar import executable_main
from my_awesome_pkg.awesome_component import AwesomeComponent, AwesomeConfig
# Import your other components/config here ...
# Create lists of available components/config classes
_components_list = [AwesomeComponent]
_configs_list = [AwesomeConfig]
# Create entry point main
def main(args=None):
executable_main(list_of_components=_components_list, list_of_configs=_configs_list)
9- Add your entry point to the ROS2 package setup.py:
from setuptools import find_packages, setup
package_name = "my_awesome_pkg"
console_scripts = [
"executable = my_awesome_pkg.executable:main",
]
setup(
name=package_name,
version="1",
packages=find_packages(),
install_requires=["setuptools"],
zip_safe=True,
maintainer="Cyberdyne Systems",
maintainer_email="contact@cyberdynesystems.com",
description="My awesome ROS2 sugar package",
entry_points={
"console_scripts": console_scripts,
},
)
10- Build your ROS2 package with colcon (instructions here)
11- Now you can use and launch your awesome new package with ROS sugar launcher using a simple python script:
1from my_awesome_pkg.awesome_component import AwesomeComponent, AwesomeConfig
2from ros_sugar.actions import LogInfo
3from ros_sugar.events import OnLess
4from ros_sugar import Launcher
5from ros_sugar.io import Topic
6
7# Define a set of topics
8map_topic = Topic(name="map", msg_type="OccupancyGrid")
9audio_topic = Topic(name="voice", msg_type="Audio")
10image_topic = Topic(name="camera/rgb", msg_type="Image")
11
12# Init your components
13my_component = AwesomeComponent(component_name='test_component', inputs=[map_topic, image_topic], outputs=[audio_topic])
14
15# Create your events
16low_battery = OnLess(
17 "low_battery",
18 Topic(name="/battery_level", msg_type="Int"),
19 15,
20 ("data")
21)
22
23# Events/Actions
24my_events_actions: Dict[event.Event, Action] = {
25 low_battery: LogInfo(msg="Battery is Low!)
26}
27
28# Create your launcher
29launcher = Launcher()
30
31# Add your package components
32launcher.add_pkg(
33 components=[my_component],
34 package_name='my_awesome_pkg',
35 executable_entry_point='executable',
36 events_actions=my_events_actions,
37 activate_all_components_on_start=True,
38 multiprocessing=True,
39)
40
41# If any component fails -> restart it with unlimited retries
42launcher.on_component_fail(action_name="restart")
43
44# Bring up the system
45launcher.bringup(ros_log_level="info", introspect=False)