Hello,
I am currently doing the rosject of the ROS2 Python in 5 Days course. I am currently in section 2 of the rosject which is about creating a service server that make the robot parallel to the wall on its right before starting the wall following procedure. I created a service server node as shown below :
import rclpy
# import time
from rclpy.node import Node
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.qos import ReliabilityPolicy, QoSProfile
# from std_srvs.srv import Empty
from custom_interface.srv import FindWall
# from std_msgs.msg import Float32
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
class FindWallService(Node):
def __init__(self):
super().__init__('find_wall_service')
self.srv_cbg = MutuallyExclusiveCallbackGroup()
self.sub_cbg = MutuallyExclusiveCallbackGroup()
self.srv = self.create_service(
FindWall, 'find_wall', self.service_callback, callback_group=self.srv_cbg)
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
# The QoS Profile below is the most used to read LaserScan data and some sensor data
self.subscriber_ = self.create_subscription(
LaserScan,
'/scan',
self.read_measurement,
QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT),
callback_group=self.sub_cbg)
self.timer = 0.5
self.cmd = Twist()
self.curr_meas = []
self.closest_meas_ind = 0
self.closest_meas_val = 0.0
self.front_laser_meas = 0.0
# front laser acceptable range with ± 5 degree of error (the angle increment of the laser is equivalent to 0.5 deg)
self.acceptable_front_range = range(349, 369)
# right laser acceptable range with ± 5 degrees of error (the angle increment of the laser is equivalent to 0.5 deg)
self.acceptable_right_range = range(169, 189)
# subscriber callback function
def read_measurement(self, msg):
self.curr_meas = msg.ranges
self.front_laser_meas = msg.ranges[359]
def stop_robot(self):
self.get_logger().info('Stopping robot...')
self.cmd.linear.x = 0.0
self.cmd.angular.z = 0.0
self.publisher_.publish(self.cmd)
def rotate_left(self):
self.get_logger().info(
'closest wall is on the left, rotating counterclockwise...')
self.cmd.linear.x = 0.0 # making sure that the robot is not moving forward
self.cmd.angular.z = 0.4 # rotate counterclockwise to point to the closest wall
self.publisher_.publish(self.cmd)
def rotate_right(self):
self.get_logger().info(
'closest wall is on the right, rotating clockwise...')
self.cmd.linear.x = 0.0 # making sure that the robot is not moving forward
self.cmd.angular.z = -0.4 # rotate counterclockwise to point to the closest wall
self.publisher_.publish(self.cmd)
def move_straight(self):
self.get_logger().info('Moving straight...')
self.cmd.linear.x = 0.05
self.cmd.angular.z = 0.0
self.publisher_.publish(self.cmd)
def update_closest_measurement_val_and_ind(self):
# updating closest measurement value and index
self.closest_meas_ind = self.curr_meas.index(min(self.curr_meas))
self.closest_meas_val = self.curr_meas[self.closest_meas_ind]
def service_callback(self, request, response):
# Finding closest wall by using the closest laser measurement
self.update_closest_measurement_val_and_ind()
self.get_logger().info(
f'The closest measurement is: {self.closest_meas_val}m and the index is: {self.closest_meas_ind}')
# rotating the robot until the front laser ray is the smallest
# if the wall (closest index) is located on the right hand side, then turn right
if self.closest_meas_ind < 359:
self.rotate_right()
else: # the wall (closest index) is located on the left hand side, so turn left
self.rotate_left()
self.get_logger().info(
f'Value of closest index before while loop: {self.closest_meas_ind}')
while self.closest_meas_ind not in self.acceptable_front_range:
self.get_logger().info(
'Front measurement not in acceptable range')
# updating the closest measurement index and value
self.update_closest_measurement_val_and_ind()
self.get_logger().info(
f'Update complete - minimum ind ={self.closest_meas_ind}') # and closest_meas_val = {self.closest_meas_val}m')
# added publisher in while loop
# self.publisher_.publish(self.cmd)
self.get_logger().info('Robot currently facing closest wall - stopping robot...')
self.stop_robot()
# Moving the robot forward until the front laser ray has a distance smaller than 30 cm
self.move_straight()
while True:
self.get_logger().info(
f'Current front laser measurment: {self.front_laser_meas}m')
if self.front_laser_meas < 0.3:
self.get_logger().info('Distance to closest wall < 30cm')
self.stop_robot()
break
# Rotating robot until the closest wall is on the right
self.get_logger().info('Robot can now rotate to have the wall on the right')
self.rotate_left()
while True:
# updating the closest measurement index and value
self.update_closest_measurement_val_and_ind()
self.get_logger().info(
f'Closest measurement index = {self.closest_meas_ind}')
if self.closest_meas_ind in self.acceptable_right_range:
self.get_logger().info('The closest wall is currently on the right - Stopping robot...')
self.stop_robot()
break
# At this point, the robot is ready to start following the wall
response.wallfound = True
return response
def main(args=None): # check the main function of a service node
rclpy.init(args=args)
srv = FindWallService()
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(srv)
try:
executor.spin()
finally:
executor.shutdown()
srv.destroy_node()
rclpy.shutdown()
if __name__ == '__main':
main()
Moreover, I modified the wall_following
node that I previously created in section 1 so that it now sends a request to the service server node asynchronously. And, the wall following procedure will start as soon as the service server returns the response Boolean message set to true. The code of the wall_following
client node is shown below :
# import python ros client
import rclpy
# import the ROS2 python libraries
from rclpy.node import Node
# import Quality of Service library, to set the correct profile and reliability to read sensor data.
from rclpy.qos import ReliabilityPolicy, QoSProfile
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor
# import Twist module from geometry_msgs.msg
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from custom_interface.srv import FindWall
class Wall_Following(Node):
def __init__(self):
# naming the class constructor (node name)
super().__init__('wall_following')
# create service client for the /find_wall service server
self.client_ = self.create_client(FindWall, 'find_wall')
# check if the service client is running
while not self.client_.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service server not available, waiting again...')
# create Empty request
self.req = FindWall.Request()
# creating the publisher object
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
# creating the subscriber object
self.subscriber_ = self.create_subscription(
LaserScan,
'/scan',
self.read_measurement,
# is the most used to read LaserScan data and some sensor data.
QoSProfile(
depth=10, reliability=ReliabilityPolicy.BEST_EFFORT)
)
# define the timer period for 0.2 seconds
self.timer_period = 0.2
# declare variable to store all the laser messages
self.laser_msg = LaserScan()
# declare the variable to save the front laser scan reading
self.forward_laser = 0.0
# declare the variable to save the right laser scan reading
self.right_laser = 0.0
# declare the velocity twist message
self.cmd = Twist()
# create timer
self.timer = self.create_timer(self.timer_period, self.motion)
# declare Boolean to store the response of the service server
self.start_wall_following = False
def send_request(self):
self.future = self.client_.call_async(self.req)
def motion(self):
if self.start_wall_following is True:
# check if the robot is far from the front wall
if self.forward_laser > 0.5:
# if the robot is far from the front wall and far from the right wall, move closer to the right wall
if self.right_laser > 0.3:
self.turn_right()
# if the robot is far from the front wall and close to the right wall, move away from the right wall
elif self.right_laser < 0.2:
self.turn_left()
# if the robot is far from the front wall and is at a distance between 0.2 m and 0.3 m from the right wall, keep moving forward
elif self.right_laser > 0.2 and self.right_laser < 0.3:
self.move_straight()
# if the robot is close to a front wall, increase angular velocity while still going forward
else:
self.turn_hard_left()
self.get_logger().info(
f'Front laser measurement : {self.forward_laser} m')
self.get_logger().info(
f'Right laser measurement : {self.right_laser} m')
else:
self.get_logger().info('Robot not ready for wall following...')
def read_measurement(self, msg):
self.laser_msg = msg
self.forward_laser = self.laser_msg.ranges[359]
self.right_laser = self.laser_msg.ranges[179]
def turn_right(self):
self.cmd.linear.x = 0.1
self.cmd.angular.z = -0.2
self.publisher_.publish(self.cmd)
self.get_logger().info('Moving closer to right wall...')
def turn_left(self):
self.cmd.linear.x = 0.1
self.cmd.angular.z = 0.2
self.publisher_.publish(self.cmd)
self.get_logger().info('Moving away from right wall...')
def move_straight(self):
self.cmd.linear.x = 0.1
self.cmd.angular.z = 0.0
self.publisher_.publish(self.cmd)
self.get_logger().info('Moving straight...')
def turn_hard_left(self):
self.cmd.linear.x = 0.05
self.cmd.angular.z = 0.45
self.publisher_.publish(self.cmd)
self.get_logger().warn('Getting close to front wall, performing hard left turn...')
def main(args=None):
# initialize the ROS communication
rclpy.init(args=args)
# declare the node constructor which contains the service client
wall_following = Wall_Following()
# send the Empty request message to the /find_wall service server
wall_following.send_request()
while rclpy.ok():
rclpy.spin_once(wall_following)
if wall_following.future.done():
try:
# checks the future for a response from the Service
# while the system is running.
# If the Service has sent a response, the result will be written to a log message.
response = wall_following.future.result()
except Exception as e:
# Display the error message on the console
wall_following.get_logger().error('Service call failed %r' % (e,))
else:
wall_following.get_logger().info(
f'Service server response: {response.wallfound}')
wall_following.start_wall_following = response.wallfound
break
# pause the program execution, waits for a request to kill the node (ctrl+c)
rclpy.spin(wall_following)
# Explicity destroy the node
wall_following.destroy_node()
# shutdown the ROS communication
rclpy.shutdown()
if __name__ == '__main__':
main()
I ran the codes in the gazebo simulation and the robot is able to perform the required tasks, as can be seen in the video below:
However, when I test the code on the real robot, the robot does not move, as shown in the screenshots below:
I tried to do a
ros2 topic echo /cmd_vel
during the service request and I saw the following:So, I thought maybe there is a problem with the way I am publishing the velocity commands to the
/cmd_vel
topic in the service server node. I am publishing them only once at each step of the code. (This is why I did not catch the velocity twist message in time in the screenshot shown above and the ros2 topic echo /cmd_vel
doesn’t shown anything in the terminal). However, this was sufficient in the gazebo simulation. Should I be publishing velocity commands more often for the real robot ? (for example, using self.publisher_.publish(self.cmd)
inside the while loops of the service server, or will that saturate the cpu ?) I thought about creating a timer callback function but I thought that maybe by doing this, the laser scan measurements will overshoot and I will not be able to stop the robot at the correct times in order to do the required adjustments and have the robot parallel to the wall on the its right before starting the wall following procedure.
Thank you for your help