[Knowledge sharing] Multi-robot navigation - helper code for easier setup

Hi,

I felt boring to change every config file twice and when I imagined, that I may have 100 of robots, I gave up editing the files manually.

So I wrote this launch file:

import os

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription, LaunchContext
from launch_ros.actions import Node
from nav2_common.launch import RewrittenYaml
import yaml
from pathlib import Path
from path_planner_server.helpers import write_config

def create_amcl_config(ns: str, path: str) -> str:
    #print('Load file:' + path)
    config = yaml.safe_load(Path(path).read_text())
    #print(yaml.dump(config))

    rp = config['amcl']['ros__parameters']
    rp['base_frame_id'] = ns + rp['base_frame_id']
    rp['odom_frame_id'] = ns + rp['odom_frame_id']
    rp['scan_topic'] = '/' + ns + rp['scan_topic']

    robot_num = int(ns.split('_')[1][:-1])
    rp['initial_pose']['y'] = robot_num * 0.5
    
    config[ns + 'amcl'] = config.pop('amcl')

    return write_config(config)

def generate_launch_description():
    shared_dir = get_package_share_directory('path_planner_server')
    config_dir = os.path.join(shared_dir, 'config') 

    map_file = os.path.join(config_dir, 'turtlebot_area_two_robots.yaml')

    nodes = []
    managed_node_names = []

    nodes.append(Node(
        package='nav2_map_server',
        executable='map_server',
        name='map_server',
        output='screen',
        parameters=[{'use_sim_time': True},
                    {'topic_name':"map"},
                    {'frame_id':"map"},
                    {'yaml_filename': map_file}]
    ))

    managed_node_names.append('map_server')

    for ns_str in ['tb3_0', 'tb3_1']:
        ns = ns_str + '/' if ns_str != '' else ''

        amcl_config_file_name = create_amcl_config(
            ns,
            os.path.join(config_dir, 'amcl_config.yaml')
        )

        amcl_node = Node(
            package='nav2_amcl',
            executable='amcl',
            name='amcl',
            namespace=ns_str,
            output='screen',
            parameters=[amcl_config_file_name]
        )

        nodes.append(amcl_node)
        managed_node_names.append(ns + 'amcl')

    lifecycle_mgr_node = Node(
            package='nav2_lifecycle_manager',
            executable='lifecycle_manager',
            name='lifecycle_manager_localization',
            output='screen',
            parameters=[{'use_sim_time': True},
                        {'autostart': True},
                        {'bond_timeout':0.0},
                        {'node_names': managed_node_names }
                        ]
        )

    nodes.append(lifecycle_mgr_node)

    return LaunchDescription(nodes)

Here is the helpers.py file, I had to put it into the path_planner_server/path_planner_server directory
to be able to use it:

from pathlib import Path
import tempfile
import yaml

def write_config(config: dict) -> str:
    print('Write config')
    fixed_config = yaml.dump(config).replace('//', '/')
    print(fixed_config)

    fp = tempfile.NamedTemporaryFile(delete=False)
    fp.write(fixed_config.encode())
    fp.close()

    return fp.name

The trick is that I am loading the config files with yaml.safe_load. It gives me a dictionary object.
I modify this dictionary and saving it to a temporary file. Then I give this temporary file to the nodes as a configuration file.

1 Like

I did the same for navigation:

import os

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription, LaunchContext
from launch_ros.actions import Node
from nav2_common.launch import RewrittenYaml
import yaml
from pathlib import Path
import tempfile
from path_planner_server.helpers import write_config

def create_controller_config(ns: str, path: str) -> str:
    config = yaml.safe_load(Path(path).read_text())

    config[ns + 'controller_server'] = config.pop('controller_server')

    rp = config['local_costmap']['local_costmap']['ros__parameters']
    rp['global_frame'] = ns + rp['global_frame']
    rp['robot_base_frame'] = ns + rp['robot_base_frame']
    rp['voxel_layer']['scan']['topic'] = '/' + ns + rp['voxel_layer']['scan']['topic']

    config[ns + 'local_costmap'] = config.pop('local_costmap')

    return write_config(config)

def create_planner_config(ns: str, path: str) -> str:
    config = yaml.safe_load(Path(path).read_text())

    config[ns + 'planner_server'] = config.pop('planner_server')

    rp = config['global_costmap']['global_costmap']['ros__parameters']
    rp['robot_base_frame'] = ns + rp['robot_base_frame']
    rp['obstacle_layer']['scan']['topic'] = '/' + ns + rp['obstacle_layer']['scan']['topic']

    config[ns + 'global_costmap'] = config.pop('global_costmap')

    return write_config(config)

def create_bt_nav_config(ns: str, path: str, behavior_xml_path: str) -> str:
    config = yaml.safe_load(Path(path).read_text())

    rp = config['bt_navigator']['ros__parameters']
    rp['odom_topic'] = ns + rp['odom_topic']
    rp['robot_base_frame'] = ns + rp['robot_base_frame']
    rp['default_nav_to_pose_bt_xml'] = behavior_xml_path 

    config[ns + 'bt_navigator'] = config.pop('bt_navigator')

    return write_config(config)

def create_recovery_config(ns: str, path: str) -> str:
    config = yaml.safe_load(Path(path).read_text())

    rp = config['behavior_server']['ros__parameters']
    rp['global_frame'] = ns + rp['global_frame']
    rp['robot_base_frame'] = ns + rp['robot_base_frame']

    config[ns + 'behavior_server'] = config.pop('behavior_server')

    return write_config(config)

def generate_launch_description():
    shared_dir = get_package_share_directory('path_planner_server')
    config_dir = os.path.join(shared_dir, 'config') 

    nodes = []
    managed_node_names = []

    for ns_str in ['tb3_0', 'tb3_1']:
        ns = ns_str + '/' if ns_str != '' else ''

        controller_config_file_name = create_controller_config(
            ns,
            os.path.join(config_dir, 'controller.yaml')
        )
        controller_node = Node(
            package='nav2_controller',
            executable='controller_server',
            name='controller_server',
            namespace=ns_str,
            # prefix=['xterm -e gdb -ex run --args'],
            output='screen',
            parameters=[controller_config_file_name]
        )

        nodes.append(controller_node)
        managed_node_names.append(ns + 'controller_server')

        planner_config_file_name = create_planner_config(
            ns,
            os.path.join(config_dir, 'planner_server.yaml')
        )
        planner_node = Node(
                package='nav2_planner',
                executable='planner_server',
                name='planner_server',
                namespace=ns_str,
                output='screen',
                parameters=[planner_config_file_name]
        )

        nodes.append(planner_node)
        managed_node_names.append(ns + 'planner_server')

        recovery_config_file_name = create_recovery_config(
            ns,
            os.path.join(config_dir, 'recovery.yaml')
        )
        recovery_node = Node(
            package='nav2_behaviors',
            executable='behavior_server',
            name='recoveries_server',
            namespace=ns_str,
            output='screen',
            parameters=[recovery_config_file_name],
        )
        nodes.append(recovery_node)
        managed_node_names.append(ns + 'recoveries_server')

        bt_nav_config_file_name = create_bt_nav_config(
            ns,
            os.path.join(config_dir, 'bt_navigator.yaml'),
            os.path.join(config_dir, 'behavior.xml'))

        bt_nav_node = Node(
                package='nav2_bt_navigator',
                executable='bt_navigator',
                name='bt_navigator',
                namespace=ns_str,
                output='screen',
                parameters=[bt_nav_config_file_name]
        )

        nodes.append(bt_nav_node)
        managed_node_names.append(ns + 'bt_navigator')

    lifecycle_mgr_node = Node(
            package='nav2_lifecycle_manager',
            executable='lifecycle_manager',
            name='lifecycle_manager_pathplanner',
            output='screen',
            parameters=[{'use_sim_time': True},
                        {'autostart': True},
                        {'bond_timeout':0.0},
                        {'node_names': managed_node_names }
                        ]
        )

    nodes.append(lifecycle_mgr_node)

    return LaunchDescription(nodes)

Probably the settings in ros__parameters can be overridden with node parameters, but unfortunately, not all the configs are ros_parameters.

I have read in the nav2 bringup documentation, that it can handle multi robots automatically with one command. We only have to add the robots initial positions.
But now I can not find this document on the internet.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.