Rosject Wall Following Robot

Hi,
While working on the the wall following project, I tried to emulate the logic applied by @girishkumar.kannan and modified the code to only run the right side wall following. However, the robot runs into the wall while following the trajectory.
Could someone give any suggestions on what is wrong with it?

from nav_msgs.msg import Odometry
from sensor_msgs.msg import LaserScan
#standard imports
import math
#import tf_transformations

#define wall follower class as a subclass of node class
class WallFollower(Node):
    #class constructor
    def __init__(self):
        super().__init__("Wall_follower")
        self.get_logger().info("Initializing Wall Follower...")

        #declare and initialize cmd_vel publisher
        self.cmd_vel_pub = self.create_publisher(Twist,"/cmd_vel",10)
        self.get_logger().info("Initialized /cmd_vel Publisher")

        #declare and initialize callback group
        self.callback_group = ReentrantCallbackGroup()
        
        #declare and initialize scan subscriber
        self.scan_sub = self.create_subscription(LaserScan, "/scan", self.scan_callback, 
                                                 QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE), callback_group=self.callback_group)
        self.get_logger().info("Initialized /scan Subscriber")

        #declare and initialize odom subscriber
        self.odom_sub = self.create_subscription(Odometry, "/odom",self.odom_callback, 
                                                 QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE), callback_group=self.callback_group)
        
        self.get_logger().info("Initialized /odom Subscriber")

        #declare and initialize control timer callback
        self.control_timer = self.create_timer(timer_period_sec=0.500,
                                               callback=self.control_callback,
                                               callback_group=self.callback_group)
        
        self.get_logger().info("Initialized Control Timer")

        self.get_logger().info("Wall Follower Initialized")

        return None
    
    #class destructor
    def __del__(self):
        return None
    
    #define and initialize class variables
    #robot_radius = 0.10  # m
    side_threshold_min = 0.20   # m gap
    side_threshold_max = 0.30    # m gap
    front_threshold = 0.50        # m gap
    pi = 3.141592654
    pi_inv = 0.318309
    ignore_iterations = 5
    iterations_count = 0

    #process variables
    wall_found = False
    side_chosen = "right"

    lin_vel_zero = 0.000
    lin_vel_slow = 0.100
    lin_vel_fast = 0.250
    ang_vel_zero = 0.000
    ang_vel_slow = 0.050
    ang_vel_fast = 0.500
    ang_vel_mult = 1.250
    
    #velocity publisher variables
    twist_cmd = Twist()

    #scan subscriber variables
    scan_info_done = False
    scan_angle_min = 0.0
    scan_angle_max = 0.0
    scan_angle_inc = 0.0
    scan_range_min = 0.0
    scan_range_max = 0.0
    scan_right_range = 0.0
    scan_front_range = 0.0
    scan_left_range = 0.0
    scan_angle_range = 0
    scan_ranges_size = 0
    scan_right_index = 0
    scan_left_index = 0
    scan_sides_angle_window = 15 #degs
    scan_front_angle_window = 15 #degs
    scan_right_range_from_index = 0
    scan_right_range_to_index = 0
    scan_front_range_from_index = 0
    scan_front_range_to_index = 0
    scan_left_range_from_index = 0
    scan_left_range_to_index = 0

    #odom subscriber variables
    odom_info_done = False
    odom_initial_x = 0.0
    odom_initial_y = 0.0
    odom_initial_yaw = 0.0
    odom_curr_x = 0.0
    odom_curr_y = 0.0
    odom_curr_yaw = 0.0
    odom_prev_x = 0.0
    odom_prev_y = 0.0
    odom_distance = 0.0
    odom_ang_vel = 0.0
    angles = dict()

    #private class methods and callbacks
    def scan_callback(self, scan_msg):
        if (self.scan_info_done):
            #algo choics is "min" i.e "min_distance"
            #~~~~ Minium of Range Values Method ~~~#
            #initialize variables to hold minimum values
            scan_right_range_min = self.scan_range_max
            scan_front_range_min = self.scan_range_max
            #scan_left_range_min = self.scan_range_max

            #loop through the scan ranges and get the minimum value
            for index in range(0, self.scan_ranges_size):
                if (not math.isinf(scan_msg.ranges[index])):
                    if ((index >= self.scan_right_range_from_index) and (index <= self.scan_right_range_to_index)):
                        if (scan_right_range_min > scan_msg.ranges[index]):
                            scan_right_range_min = scan_msg.ranges[index]
                        else:
                            pass
                    if ((index >= self.scan_front_range_from_index) and (index <= self.scan_front_range_to_index)):
                        if (scan_front_range_min > scan_msg.ranges[index]):
                            scan_front_range_min = scan_msg.ranges[index]
                        else:
                            pass
                else: 
                    #discard the scan range with infinity as value
                    pass
                
            #set the range values to their minimum values
            if (self.scan_right_range > 0.0):
                self.scan_right_range = scan_right_range_min
            else: 
                self.scan_right_range = self.scan_range_min

            if (self.scan_front_range > 0.0):
                self.scan_front_range = scan_front_range_min
            else: 
                self.scan_front_range = self.scan_range_min

            #what the else loop above does is if the front and right
            # if the scan range encounters 0.0 or uninitialized values
            # this code guards against that.

        else:
            #do this step only once
            # get the min and max angles
            self.scan_angle_min = scan_msg.angle_min
            self.scan_angle_max = scan_msg.angle_max
            #get the min and max range values
            self.scan_range_min = scan_msg.range_min
            self.scan_range_max = scan_msg.range_max
            #get the size of ranges array
            self.scan_ranges_size = len(scan_msg.ranges)

            #get the total scan angle range
            self.scan_angle_range = int((abs(self.scan_angle_min) +
                                         abs(self.scan_angle_max)) * 
                                         (180.0/self.pi))
            #get the angle increments per scan ray
            self.scan_angle_inc = (self.scan_angle_range / self.scan_ranges_size)
            #Calculate the front, right  scan ray indexes
            self.scan_front_index = int(self.scan_ranges_size/2)
            self.scan_right_index = (self.scan_front_index - int(90.0/self.scan_angle_inc))


            #Calculate front scan range window
            self.scan_front_range_from_index = (self.scan_front_index - int(self.scan_front_angle_window/self.scan_angle_inc))
            self.scan_front_range_to_index = (self.scan_front_index + int(self.scan_front_angle_window/self.scan_angle_inc))
            #Calculate front scan range window
            if (self.scan_angle_range>180): 
                self.scan_right_range_from_index = (self.scan_right_index - int(self.scan_sides_angle_window/self.scan_angle_inc))
                self.scan_right_range_to_index = (self.scan_right_index + int(self.scan_sides_angle_window/self.scan_angle_inc))
            else:
                self.scan_right_range_from_index = self.scan_right_index
                self.scan_right_range_to_index = (self.scan_right_index + int(self.scan_sides_angle_window/self.scan_angle_inc))
            
            #now we can set the scan info flag to true
            self.scan_info_done = True

            #print scan detals
            self.get_logger().info("~~Start Scan Info~~")
            self.get_logger().info(f"scan angle min: {self.scan_angle_min:.2f}")
            self.get_logger().info(f"scan_angle_max: {self.scan_angle_max:.2f}")
            self.get_logger().info(f'scan_range_min: {self.scan_range_min:.2f}')
            self.get_logger().info(f"scan_range_max: {self.scan_range_max:.2f}")
            self.get_logger().info(f"scan_angle_range: {self.scan_angle_range:.2f}")
            self.get_logger().info(f'scan_ranges_size: {self.scan_ranges_size:.2f}')
            self.get_logger().info(f'scan_angle_inc: {self.scan_angle_inc:.2f}')
            self.get_logger().info(f'scan_right_index: {self.scan_right_index:.2f}')
            self.get_logger().info(f'scan_front_index: {self.scan_front_index:.2f}')
            
            #print frmom and to ranges now
            self.get_logger().info(f'scan right range index')
            self.get_logger().info(f'from: {self.scan_right_range_from_index:.2f} to: {self.scan_right_range_to_index:.2f}')

            self.get_logger().info(f'scan fron range index')
            self.get_logger().info(f'from: {self.scan_front_range_from_index:.2f} to {self.scan_front_range_to_index:.2f}')

            self.get_logger().info("~~~~End Scan Info~~~")
        
        return None
    
    def odom_callback(self, odom_msg):
        if (self.odom_info_done):
            #do this step continously
            #get current odometry values
            self.odom_curr_x = odom_msg.pose.pose.position.x
            self.odom_curr_y = odom_msg.pose.pose.position.y
            angles = self.euler_from_quaternion(odom_msg.pose.pose.orientation.x,
                                                odom_msg.pose.pose.orientation.y,
                                                odom_msg.pose.pose.orientation.z,
                                                odom_msg.pose.pose.orientation.w)
            self.odom_curr_yaw = angles["yaw_deg"]

            #Calculate distance based on current and previous odometry values
            self.odom_distance += self.calculate_distance(self.odom_prev_x,self.odom_prev_y,self.odom_curr_x,self.odom_curr_y)
            #set previous odometry values to current odometry values
            self.odom_prev_x = self.odom_curr_x
            self.odom_prev_y = self.odom_curr_y
            self.odom_prev_yaw = self.odom_curr_yaw

        else:
            #do this step only once
            #get the initial odometry values
            self.odom_initial_x = odom_msg.pose.pose.position.x
            self.odom_initial_y = odom_msg.pose.pose.position.y
            angles = self.euler_from_quaternion(odom_msg.pose.pose.orientation.x,
                                                odom_msg.pose.pose.orientation.y,
                                                odom_msg.pose.pose.orientation.z,
                                                odom_msg.pose.pose.orientation.w)
            self.odom_initial_yaw = angles["yaw_deg"]
            
            #set previous odometry values to initial odometry values
            self.odom_prev_x = self.odom_initial_x
            self.odom_prev_y = self.odom_initial_y
            self.odom_prev_yaw = self.odom_initial_yaw
            
            #set flag to true so this step will not be done again
            self.odom_info_done = True

            #print odom details
            self.get_logger().info("~~~~Start Odom Info ~~~~~")
            self.get_logger().info(f"odom_initial_x: {self.odom_initial_x:.2f}")
            self.get_logger().info(f"odom_initial_y: {self.odom_initial_y:.2f}")
            self.get_logger().info(f"odom_initial_yaw: {self.odom_initial_yaw:.2f}")
            self.get_logger().info("~~~~End Odom Info~~~")
        return None
    
    def control_callback(self):
        if (self.iterations_count >= self.ignore_iterations):
            #the above is something of a "warm-up" delay
            #“Only start making wall-following decisions after ignoring the first few callback cycles.”
            if (self.wall_found):
                #now the robot is either facing the wall after finding the wall
                # or running the wall follower process and facing the wall or 
                #obstacle
                if (self.scan_front_range < self.front_threshold):
                    #turn towards the side opposite to the wall while moving forward
                    self.twist_cmd.linear.x = self.lin_vel_slow
                    self.twist_cmd.angular.z = self.ang_vel_fast * self.ang_vel_mult
                else:
                    #otherwise keep going straight
                    #until either obstacle or wall is detected
                    self.twist_cmd.linear.x = self.lin_vel_fast
                    
                    #i am only considering wall on the right
                    if (self.scan_right_range < self.side_threshold_min):
                        #turn left to move away from the wall
                        self.twist_cmd.angular.z = self.ang_vel_slow
                    elif (self.scan_right_range > self.side_threshold_max):
                        #turn right to move a bit towards the wall
                        self.twist_cmd.angular.z = -self.ang_vel_slow
                    else:
                        #do not turn and keep going straight
                        self.twist_cmd.angular.z = self.ang_vel_zero
            else: 
                #find the wall closest to the robot
                #keep moving forward until the robot detects
                #an obstacle or wall in its front
                if (self.scan_front_range < self.front_threshold):
                    #immediately set the wall_found flag to be true
                    #and breakout of this subprocess
                    self.wall_found = True
                    self.get_logger().info("Wall Found!")
                    #stop the robot
                    self.twist_cmd.linear.x = self.lin_vel_zero
                    self.twist_cmd.angular.z = self.ang_vel_zero
                    self.get_logger().info("Robot Stopped!")
                else:
                    #otherwise keep going straight 
                    #until obstacle or wall is detected
                    self.twist_cmd.linear.x = self.lin_vel_slow
                    self.twist_cmd.angular.z = self.ang_vel_zero

        else:
            #just increment the iterations count
            #to get the delay out
            self.iterations_count += 1
            #keep the robot stopped
            self.twist_cmd.linear.x = self.lin_vel_zero
            self.twist_cmd.angular.z = self.ang_vel_zero
        
        #publish the twist command
        self.publish_twist_cmd()

        #print the current iteration information
        self.print_info()
        return None
    
    def publish_twist_cmd(self):
        # Clamp linear velocity
        if self.twist_cmd.linear.x > 0.150:
            self.twist_cmd.linear.x = 0.150
        elif self.twist_cmd.linear.x < 0.0:
            self.twist_cmd.linear.x = 0.0
        
        # Clamp angular velocity
        if self.twist_cmd.angular.z > 0.450:
            self.twist_cmd.angular.z = 0.450
        elif self.twist_cmd.angular.z < -0.450:
            self.twist_cmd.angular.z = -0.450
        
        # publish command
        self.cmd_vel_pub.publish(self.twist_cmd)
        return None
        

    def print_info(self):
        self.get_logger().info(f'Scan: F: {self.scan_front_range}, R: {self.scan_right_range}')
        self.get_logger().info(f'Odom: X: {self.odom_curr_x}, Y: {self.odom_curr_y}')
        self.get_logger().info(f'Odom: Yaw: {self.odom_curr_yaw}')
        self.get_logger().info(f'Odom: Dist: {self.odom_distance}')
        self.get_logger().info(f'Vel: Lin: {self.twist_cmd.linear.x}, Ang: {self.twist_cmd.angular.z}')
        self.get_logger().info('~~~~~~~~~~~')
        return None
    

    def calculate_distance(self, prev_x, prev_y, curr_x, curr_y):
        #function to calculate euclidean distance in 2D plane
        distance = ((((curr_x-prev_x)**2.0) + (curr_y-prev_y)**2.0)**0.50)
        #return the distance value
        return distance
    

    def euler_from_quaternion(self, quat_x, quat_y, quat_z, quat_w):
    # function to convert quaternions to euler angles

        # calculate roll
        sinr_cosp = 2 * (quat_w * quat_x + quat_y * quat_z)
        cosr_cosp = 1 - 2 * (quat_x * quat_x + quat_y * quat_y)
        roll_rad = math.atan2(sinr_cosp, cosr_cosp)
        roll_deg = (roll_rad * 180 * self.pi_inv)

        # calculate pitch
        sinp = 2 * (quat_w * quat_y - quat_z * quat_x)
        pitch_rad = math.asin(sinp)
        pitch_deg = (pitch_rad * 180 * self.pi_inv)

        # calculate yaw
        siny_cosp = 2 * (quat_w * quat_z + quat_x * quat_y)
        cosy_cosp = 1 - 2 * (quat_y * quat_y + quat_z * quat_z)
        yaw_rad = math.atan2(siny_cosp, cosy_cosp)
        yaw_deg = (yaw_rad * 180 * self.pi_inv)

        # store the angle values in a dict
        angles = dict()
        angles["roll_rad"] = roll_rad
        angles["roll_deg"] = roll_deg
        angles["pitch_rad"] = pitch_rad
        angles["pitch_deg"] = pitch_deg
        angles["yaw_rad"] = yaw_rad
        angles["yaw_deg"] = yaw_deg

        # return the angle values
        return angles
    

def main(args=None):

    #initialize ROS2 node
    rclpy.init(args=args)

    #create an instance of the wall follower class
    wall_follower = WallFollower()

    #create a multithread executor
    executor = MultiThreadedExecutor(num_threads=4)

    #add the wall follower node to the executor
    executor.add_node(wall_follower)

    try:
        #spin the executor to handle callbacks
        executor.spin()
    except:
        pass
    finally:
        # indicate wall follower node termination
        wall_follower.get_logger().info("Terminating Wall Follower...")
        #stop the robot
        wall_follower.twist_cmd.linear.x = wall_follower.lin_vel_zero
        wall_follower.twist_cmd.angular.z = wall_follower.ang_vel_zero
        #publish the twist command
        wall_follower.publish_twist_cmd()
        wall_follower.get_logger().info("Wall Follower Terminated !")

    #shutdown the executor when spin completes
    executor.shutdown()

    #destroy the wall follower node
    wall_follower.destroy_node()

    #shutdown ROS2 node when spin completes
    rclpy.shutdown()

    return None

if __name__ == "__main__":
    main()

Could you please provide more details so we can understand what you are working on?

  • Are you working on a Real Robot Project of The Construct, or are you working on a personal project?
  • Are you working on your local PC or on The Construct?
  • Are you working with a real robot or a simulation?
  • Which specific YouTube video are you referring to?
  • Is the video working with a simulation, and you are trying to apply it to a real robot, or vice versa?
  • Is the real robot or simulation the same as the one used in the video?

All these questions will help us assist you better and/or point you in the right direction. That said, they’ll also help you to set the right expectations.

Working with robots requires a lot of work, as things hardly work out of the box. This is even more so if you are trying to move from simulation to the real robot.

If I were you, I would

  1. Ensure I can make the original code work as designed
  2. Understand every bit of it, without overlooking any little detail. The smallest details sometimes hold the key to the solution
  3. See what adaptations I need to make to account for changing from a simulation to a real robot, to a different simulation/robot
  4. Test the complete code again on my new simulation/robot (if applicable)
  5. Make small modifications at a time for the “right side only” wall following

Could you please work on the above and let us know?

I have tried changing the velocities, double-checked the indexing logic since I am only trying out simulation for now, I haven’t tried it with the real world robot.
I have also tried re-running the same code that was shown in the above referenced video on this robot without any modifications. But even that is not working as is for simulations.

Given that I’ve already tried the suggested steps, what would you recommend as the next course of action?
Would it be possible for you to take a closer look at the logic or point out what I might be missing?

Best,
Ninad Mehta

I just tried the project used in the video, and it worked out of the box for me. I outline below exactly what I did, following the instructions in the notebook.

  1. I clicked on the project link provided in the video description on YouTube and copied it into my account.
  2. I opened/ran the project.
  3. I ran the “recompile” command for the ros2_ws workspace.
  4. I tried to launch the simulation but ran into an error. It seemed that the simulation (in simulation_ws) did not come pre-compiled. So, I did the following to correct it.
cd ~/simulation_ws
rm -rf build/ install/ log/
colcon build
source install/setup.bash

PS: I did it twice until all packages were compiled. Please ignore the warning output about Rust message generation.

After this, the simulation worked.

  1. I ran the command to open Rviz
  2. I ran the command to launch (ros launch) the Python version of the program (which I suppose you are trying to modify)

Initially, the robot seemed too close to the wall for comfort, but it subsequently corrected itself. Please see the video attached below.

As I mentioned earlier, the starting point should be to get the robot working as demonstrated in the video. Then we can start modifying in small bits.

Clarifications

  1. The real robot is no longer the same as the one used in the video. We replaced it with a different version a few weeks ago.
  2. You mentioned that you are doing a Real Robot Project. This type of project is one that we ask learners to complete as part of a course. If you copied the project provided in the video, it would be classified as a personal project. If you are actually taking a course’s Real Robot Project and copying the code from this project into it, it might not work out of the box because the simulations may be different internally, even if they appear the same outwardly. In that case, please inspect the specific topics of the simulation you are using.
1 Like

Thank you for trying it out. I tried out the exact same steps listed in the response and the simulation doesn’t launch for me still.
I understand you replaced the robot with a different robot but I don’t understand how that would make the logic faulty since the project doesn’t require us to include robot’s properties.
Lastly, I have inspected the topics I am using. They are the correct ones (I echoed the topic and the robot does move when the file is executed on the terminal).

With respect to the video project, could you tell me why the environment could not be working for me? specially since I ran the exact same commands as you did.

Thank you.

Best,
Ninad Mehta

My bad (partially though :slight_smile:). I omitted one of the steps.
3. I ran the recompile command for ros2_ws. I have now included that in the steps.

The screenshot you shared indicated that the problem was in the ros2_ws workspace, so that was my clue.

I mentioned this just in case you are planning to apply this to the real robot.

Great. Let’s get the original code working first, and cross this bridge later.

Thank you, @bayodesegun. The unmodified simulation code is working for me in the original environment.
I have moved to working on the “Real Robot Project”. That is still causing a few issues with maneuvering. I improved the scan_callback function, so my robot wasn’t getting the right scan call_back values before (maybe the difference in laser scan). Now it is. However, I am still trying to figure out why it won’t turn right around the corners.

Except for the laser scan getting the right values, the rest of the code is still the same.

#! /usr/bin/python3

# imports
# rclpy imports
import rclpy                                              # rclpy
from rclpy.node import Node                               # base node
from rclpy.qos import QoSProfile                          # qos profile
from rclpy.qos import (HistoryPolicy, ReliabilityPolicy,  # qos policies
                      DurabilityPolicy, LivelinessPolicy) # qos policies
from rclpy.executors import MultiThreadedExecutor         # multithreaded executor
from rclpy.callback_groups import ReentrantCallbackGroup  # reentrant callback group
# ros2 interfaces
from geometry_msgs.msg import Twist   # twist message
from nav_msgs.msg import Odometry     # odometry message
from sensor_msgs.msg import LaserScan # laser scan message
# standard imports
import math

# common global variables
# side bias options for wall follow behavior: "none" or "left" or "right"
# this is the side on which the robot will have the wall
# left - sets the robot to follow the wall on its left side
# right - sets the robot to follow the wall on its right side
# none - sets the robot to automatically choose a side
side_choice = "left"
# algorithm choice options for wall follow behavior: "min" or "avg"
# this is the algorithm to decide closeness to the wall
# min - uses minimum scan ranges to detect the wall on its side
# avg - uses average scan ranges to detect the wall on its side
algo_choice = "min"

# define wall follower class as a subclass of node class
class WallFollower(Node):
    # class constructor
    def __init__(self):
        super().__init__("wall_follower")
        self.get_logger().info("Initializing Wall Follower ...")

        # wall detection algorithm choice
        if (algo_choice == "avg"):
            self.ang_vel_mult = 3.000
        else:
            self.ang_vel_mult = 1.250

        # declare and initialize cmd_vel publisher
        self.cmd_vel_pub = self.create_publisher(msg_type=Twist,
                                                 topic="/cmd_vel",
                                                 qos_profile=10)
        self.get_logger().info("Initialized /cmd_vel Publisher")

        # declare and initialize callback group
        self.callback_group = ReentrantCallbackGroup()

        # declare and initialize scan subscriber
        self.scan_sub_qos = QoSProfile(depth=10,
                                       history=HistoryPolicy.KEEP_LAST,
                                       reliability=ReliabilityPolicy.BEST_EFFORT,
                                       durability=DurabilityPolicy.VOLATILE,
                                       liveliness=LivelinessPolicy.AUTOMATIC)
        self.scan_sub = self.create_subscription(msg_type=LaserScan,
                                                 topic="/scan",
                                                 callback=self.scan_callback,
                                                 qos_profile=self.scan_sub_qos,
                                                 callback_group=self.callback_group)
        self.get_logger().info("Initialized /scan Subscriber")

        # declare and initialize odom subscriber
        self.odom_sub_qos = QoSProfile(depth=10,
                                       history=HistoryPolicy.KEEP_LAST,
                                       reliability=ReliabilityPolicy.BEST_EFFORT,
                                       durability=DurabilityPolicy.VOLATILE,
                                       liveliness=LivelinessPolicy.AUTOMATIC)
        self.odom_sub = self.create_subscription(msg_type=Odometry,
                                                 topic="/odom",
                                                 callback=self.odom_callback,
                                                 qos_profile=self.odom_sub_qos,
                                                 callback_group=self.callback_group)
        self.get_logger().info("Initialized /odom Subscriber")

        # declare and initialize control timer callback
        self.control_timer = self.create_timer(timer_period_sec=0.500,
                                               callback=self.control_callback,
                                               callback_group=self.callback_group)
        self.get_logger().info("Initialized Control Timer")

        self.get_logger().info("Wall Follower Initialized !")

        return None

    # class destructor
    def __del__(self):
        return None

    # define and initialize class variables
    robot_radius = 0.10                      # 10 cm
    side_threshold_min = robot_radius + 0.05 #  5 cm gap
    side_threshold_max = robot_radius + 0.10 # 10 cm gap
    front_threshold = robot_radius + 0.40    # 40 cm gap
    pi = 3.141592654
    pi_inv = 0.318309886
    ignore_iterations = 5
    iterations_count = 0
    # process variables
    wall_found = False
    side_chosen = "none"
    lin_vel_zero = 0.000
    lin_vel_slow = 0.100
    lin_vel_fast = 0.250
    ang_vel_zero = 0.000
    ang_vel_slow = 0.050
    ang_vel_fast = 0.500
    ang_vel_mult = 0.0
    # velocity publisher variables
    twist_cmd = Twist()
    # scan subscriber variables
    scan_info_done = False
    scan_angle_min = 0.0
    scan_angle_max = 0.0
    scan_angle_inc = 0.0
    scan_range_min = 0.0
    scan_range_max = 0.0
    scan_right_range = 0.0
    scan_front_range = 0.0
    scan_left_range = 0.0
    scan_angle_range = 0
    scan_ranges_size = 0
    scan_right_index = 0
    scan_front_index = 0
    scan_left_index = 0
    scan_sides_angle_range = 15 # degs
    scan_front_angle_range = 15 # degs
    scan_right_range_from_index = 0
    scan_right_range_to_index = 0
    scan_front_range_from_index = 0
    scan_front_range_to_index = 0
    scan_left_range_from_index = 0
    scan_left_range_to_index = 0
    # odom subscriber variables
    odom_info_done = False
    odom_initial_x = 0.0
    odom_initial_y = 0.0
    odom_initial_yaw = 0.0
    odom_curr_x = 0.0
    odom_curr_y = 0.0
    odom_curr_yaw = 0.0
    odom_prev_x = 0.0
    odom_prev_y = 0.0
    odom_prev_yaw = 0.0
    odom_distance = 0.0
    odom_lin_vel = 0.0
    odom_ang_vel = 0.0
    angles = dict()

    # private class methods and callbacks
 
    def scan_callback(self, scan_msg):

        # One-time setup
        if not self.scan_info_done:
            self.scan_angle_min = scan_msg.angle_min
            self.scan_angle_inc = scan_msg.angle_increment
            self.scan_ranges_size = len(scan_msg.ranges)
            self.scan_range_min = scan_msg.range_min
            self.scan_range_max = scan_msg.range_max
            self.scan_info_done = True

            self.get_logger().info("Scan initialized")
            return

        # Helper: get min range around a target angle
        def get_min_range(target_angle_rad, window_deg=15):
            window_rad = math.radians(window_deg)
            min_dist = self.scan_range_max

            for i, r in enumerate(scan_msg.ranges):
                if math.isinf(r) or math.isnan(r):
                    continue

                angle = self.scan_angle_min + i * self.scan_angle_inc

                # normalize angle to [-pi, pi]
                angle = (angle + math.pi) % (2 * math.pi) - math.pi

                if abs(angle - target_angle_rad) <= window_rad:
                    if r < min_dist:
                        min_dist = r

            return min_dist

        # FRONT = 0 rad
        self.scan_front_range = get_min_range(0.0)

        # RIGHT = -90 deg = -pi/2
        self.scan_right_range = get_min_range(-math.pi / 2)

        # Debug print (keep this!)
        self.get_logger().info(
            f"Scan: Front={self.scan_front_range:.2f}, Right={self.scan_right_range:.2f}"
        )

    def odom_callback(self, odom_msg):
        if (self.odom_info_done):
            # do this step continuously
            # get current odometry values
            self.odom_curr_x = odom_msg.pose.pose.position.x
            self.odom_curr_y = odom_msg.pose.pose.position.y
            angles = self.euler_from_quaternion(odom_msg.pose.pose.orientation.x,
                                                odom_msg.pose.pose.orientation.y,
                                                odom_msg.pose.pose.orientation.z,
                                                odom_msg.pose.pose.orientation.w)
            self.odom_curr_yaw = angles["yaw_deg"]
            # calculate distance based on current and previous odometry values
            self.odom_distance += self.calculate_distance(self.odom_prev_x,
                                                          self.odom_prev_y,
                                                          self.odom_curr_x,
                                                          self.odom_curr_y)
            # set previous odometry values to current odometry values
            self.odom_prev_x = self.odom_curr_x
            self.odom_prev_y = self.odom_curr_y
            self.odom_prev_yaw = self.odom_curr_yaw
        else:
            # do this step only once
            # get initial odometry values
            self.odom_initial_x = odom_msg.pose.pose.position.x
            self.odom_initial_y = odom_msg.pose.pose.position.y
            angles = self.euler_from_quaternion(odom_msg.pose.pose.orientation.x,
                                                odom_msg.pose.pose.orientation.y,
                                                odom_msg.pose.pose.orientation.z,
                                                odom_msg.pose.pose.orientation.w)
            self.odom_initial_yaw = angles["yaw_deg"]
            # set previous odometry values to initial odometry values
            self.odom_prev_x = self.odom_initial_x
            self.odom_prev_y = self.odom_initial_y
            self.odom_prev_yaw = self.odom_initial_yaw
            # set flag to true so this step will not be done again
            self.odom_info_done = True
            # print odom details
            self.get_logger().info("~~~~~ Start Odom Info ~~~~")
            self.get_logger().info("odom_initial_x: %+0.3f" % (self.odom_initial_x))
            self.get_logger().info("odom_initial_y: %+0.3f" % (self.odom_initial_y))
            self.get_logger().info("odom_initial_yaw: %+0.3f" % (self.odom_initial_yaw))
            self.get_logger().info("~~~~~ End Odom Info ~~~~")
        return None

    def control_callback(self):
        if (self.iterations_count >= self.ignore_iterations):
            # fix for delayed laser scanner startup
            if (self.wall_found):
                # now the robot is either facing the wall after finding the wall
                # or running the wall follower process and facing a wall or obstacle
                if (self.scan_front_range < self.front_threshold):
                    # turn towards the side opposite to the wall while moving forward
                    self.twist_cmd.linear.x = self.lin_vel_slow
                    if (self.side_chosen == "right"):
                        # turn the robot to the left
                        self.twist_cmd.angular.z = (self.ang_vel_fast * self.ang_vel_mult)
                    elif (self.side_chosen == "left"):
                        # turn the robot to the right
                        self.twist_cmd.angular.z = (-self.ang_vel_fast * self.ang_vel_mult)
                    else:
                        # otherwise do nothing
                        # this choice will never happen
                        pass
                else:
                    # otherwise keep going straight
                    # until either onstacle or wall is detected
                    self.twist_cmd.linear.x = self.lin_vel_fast
                    # check the closeness to the wall
                    if (self.side_chosen == "right"):
                        # wall is on the right
                        if (self.scan_right_range < self.side_threshold_min):
                            # turn left to move away from the wall
                            self.twist_cmd.angular.z = self.ang_vel_slow
                        elif (self.scan_right_range > self.side_threshold_max):
                            # turn right to move close to the wall
                            self.twist_cmd.angular.z = -self.ang_vel_slow
                        else:
                            # do not turn and keep going straight
                            self.twist_cmd.angular.z = self.ang_vel_zero
                    elif (self.side_chosen == "left"):
                        # wall is on the left
                        if (self.scan_left_range < self.side_threshold_min):
                            # turn right to move away from the wall
                            self.twist_cmd.angular.z = -self.ang_vel_slow
                        elif (self.scan_left_range > self.side_threshold_max):
                            # turn left to move close to the wall
                            self.twist_cmd.angular.z = self.ang_vel_slow
                        else:
                            # do not turn and keep going straight
                            self.twist_cmd.angular.z = self.ang_vel_zero
                    else:
                        # otherwise do nothing
                        # this choice will never happen
                        pass
            else:
                # find the wall closest to the robot
                # keep moving forward until the robot detects
                # an obstacle or wall in its front
                if (self.scan_front_range < self.front_threshold):
                    # immediately set the wall_found flag to true
                    # to break out of this subprocess
                    self.wall_found = True
                    self.get_logger().info("Wall Found!")
                    # stop the robot
                    self.twist_cmd.linear.x = self.lin_vel_zero
                    self.twist_cmd.angular.z = self.ang_vel_zero
                    self.get_logger().info("Robot Stopped!")
                    # choose a side to turn if side_choice is set to none
                    if ((side_choice != "right") and
                        (side_choice != "left")):
                        # choose the side that has closer range value
                        # closer range value indicates that the wall is on that side
                        if (self.scan_right_range < self.scan_left_range):
                            # wall is on the right
                            self.side_chosen = "right"
                        elif (self.scan_right_range > self.scan_left_range):
                            # wall is on the left
                            self.side_chosen = "left"
                        else:
                            # otherwise do nothing
                            # this choice will never happen
                            pass
                        self.get_logger().info("Side Chosen: %s" % (self.side_chosen))
                    else:
                        # otherwise do nothing
                        # side is already set by the user
                        pass
                else:
                    # otherwise keep going straight slowly
                    # until either onstacle or wall is detected
                    self.twist_cmd.linear.x = self.lin_vel_slow
                    self.twist_cmd.angular.z = self.ang_vel_zero
        else:
            # just increment the iterations count
            self.iterations_count += 1
            # keep the robot stopped
            self.twist_cmd.linear.x = self.lin_vel_zero
            self.twist_cmd.angular.z = self.ang_vel_zero
        # publish the twist command
        self.publish_twist_cmd()
        # print the current iteration information
        self.print_info()
        return None

    def publish_twist_cmd(self):
        # Clamp linear velocity
        if self.twist_cmd.linear.x > 0.150:
            self.twist_cmd.linear.x = 0.150
        elif self.twist_cmd.linear.x < 0.0:
            self.twist_cmd.linear.x = 0.0
        
        # Clamp angular velocity
        if self.twist_cmd.angular.z > 0.450:
            self.twist_cmd.angular.z = 0.450
        elif self.twist_cmd.angular.z < -0.450:
            self.twist_cmd.angular.z = -0.450
        
        # publish command
        self.cmd_vel_pub.publish(self.twist_cmd)
        return None

    def print_info(self):
        self.get_logger().info("Scan: L: %0.3f F: %0.3f R: %0.3f" %
                               (self.scan_left_range, self.scan_front_range,
                                self.scan_right_range))
        self.get_logger().info("Odom: X: %+0.3f Y: %+0.3f" %
                               (self.odom_curr_x, self.odom_curr_y))
        self.get_logger().info("Odom: Yaw: %+0.3f Dist: %0.3f" %
                               (self.odom_curr_yaw, self.odom_distance))
        self.get_logger().info("Vel: Lin: %+0.3f Ang: %+0.3f" %
                               (self.twist_cmd.linear.x, self.twist_cmd.angular.z))
        self.get_logger().info("~~~~~~~~~~")
        return None

    def calculate_distance(self, prev_x, prev_y, curr_x, curr_y):
        # function to calculate euclidean distance in 2d plane

        #calculate distance
        distance = ((((curr_x - prev_x) ** 2.0) +
                     ((curr_y - prev_y) ** 2.0)) ** 0.50)

        #return the distance value
        return distance

    def euler_from_quaternion(self, quat_x, quat_y, quat_z, quat_w):
        # function to convert quaternions to euler angles

        # calculate roll
        sinr_cosp = 2 * (quat_w * quat_x + quat_y * quat_z)
        cosr_cosp = 1 - 2 * (quat_x * quat_x + quat_y * quat_y)
        roll_rad = math.atan2(sinr_cosp, cosr_cosp)
        roll_deg = (roll_rad * 180 * self.pi_inv)

        # calculate pitch
        sinp = 2 * (quat_w * quat_y - quat_z * quat_x)
        pitch_rad = math.asin(sinp)
        pitch_deg = (pitch_rad * 180 * self.pi_inv)

        # calculate yaw
        siny_cosp = 2 * (quat_w * quat_z + quat_x * quat_y)
        cosy_cosp = 1 - 2 * (quat_y * quat_y + quat_z * quat_z)
        yaw_rad = math.atan2(siny_cosp, cosy_cosp)
        yaw_deg = (yaw_rad * 180 * self.pi_inv)

        # store the angle values in a dict
        angles = dict()
        angles["roll_rad"] = roll_rad
        angles["roll_deg"] = roll_deg
        angles["pitch_rad"] = pitch_rad
        angles["pitch_deg"] = pitch_deg
        angles["yaw_rad"] = yaw_rad
        angles["yaw_deg"] = yaw_deg

        # return the angle values
        return angles


def main(args=None):

    # initialize ROS2 node
    rclpy.init(args=args)

    # create an instance of the wall follower class
    wall_follower = WallFollower()
    
    # create a multithreaded executor
    executor = MultiThreadedExecutor(num_threads=4)
    
    # add the wall follower node to the executor
    executor.add_node(wall_follower)

    try:
        # spin the executor to handle callbacks
        executor.spin()
    except:
        pass
    finally:
        # indicate wall follower node termination
        wall_follower.get_logger().info("Terminating Wall Follower ...")
        # stop the robot
        wall_follower.twist_cmd.linear.x = wall_follower.lin_vel_zero
        wall_follower.twist_cmd.angular.z = wall_follower.ang_vel_zero
        # publish the twist command
        wall_follower.publish_twist_cmd()
        wall_follower.get_logger().info("Wall Follower Terminated !")
    
    # shutdown the executor when spin completes
    executor.shutdown()
    
    # destroy the wall follower node
    wall_follower.destroy_node()

    # shutdown ROS2 node when spin completes
    rclpy.shutdown()

    return None


if __name__ == "__main__":
    main()

I think it’s safe to say that the problem is coming from the environment. The code was adapted to the original environment where it was developed. Now you need to adapt it to the Real Robot Project environment.

You need some low-level debugging. For instance, log the laser scan readings and see the values printed at the corners. Obviously, the robot does not think it needs to turn right because the scan values at that point aren’t quite right.

Thank you, @bayodesegun. I recompiled the simulation and made changes to the control callback to follow only the right side or only the left side, and the simulation is now working correctly.
I also tried the same logic with the real robot today and it was working okay (although a bit erratic/oscillatory).

One final question: For certification and moving further in the course, am I supposed to submit the project files somewhere? or should I share/submit recorded video of the running simulation and real robot?
Please let me know about it.
Thank you.

Best,
Ninad Mehta

Glad you got it working! That’s the spirit :+1:

You need to send us an email when your project is ready and you are ready to present it. It’s a live presentation. Further details are available within the course notebooks.

1 Like

Understood. Thank you!

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.