Hello Construct Team,
I hope everyone is doing well.
I’ve completed all three parts of the “ROS2 Basics in 5 Days (Python)” real robot project. However, I’ve encountered an issue in Part 3. We are instructed to create an action server that records odometry. Subsequently, we have to create a client to call this server from the wall_following.py script. My challenge is that I can’t ascertain if the server and client are functioning correctly since there aren’t any examples or videos that clarify this process.
I understand this might seem like a trivial question, but considering I’m only 14 years old, I hope you can offer some guidance.
this is my record_odom.py code :
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from rclpy.qos import ReliabilityPolicy, QoSProfile
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup
from geometry_msgs.msg import Point
from nav_msgs.msg import Odometry
from custom_interfaces.action import OdomRecord
import time
import numpy as np
class OdomRecordServer(Node):
def __init__(self):
super().__init__('record_odom_server')
self.group1 = ReentrantCallbackGroup()
# self.group2 = MutuallyExclusiveCallbackGroup()
self._action_server = ActionServer(self,
OdomRecord,
'record_odom',
self.action_callback,
callback_group=self.group1)
self.odom_subscriber_ = self.create_subscription(
Odometry,
'/odom',
self.odom_callback,
QoSProfile(depth=10, reliability=ReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE),
callback_group=self.group1)
self.current_point = Point()
self.start_point = Point()
self.start_point_identified = False
self.total_distance = 0.0
self.odom_record = []
self.lap_finished = False
self.check_finished_lap_flag = False
def action_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = OdomRecord.Feedback()
while not self.start_point_identified:
time.sleep(1)
self.odom_record.append(self.start_point)
N = 1
while not self.lap_finished:
time.sleep(1)
self.get_logger().info('Distance to start point: "%f"' % self.distance_to_start)
self.odom_record.append(self.current_point)
N += 1
delta_x = self.odom_record[N-1].x - self.odom_record[N-2].x
delta_y = self.odom_record[N-1].y - self.odom_record[N-2].y
dist_to_last_point = np.sqrt(np.power(delta_x, 2) + np.power(delta_y, 2))
self.get_logger().info('Distance to last point: "%f"' % dist_to_last_point)
self.total_distance += dist_to_last_point
self.get_logger().info('Current total distance feedback: "%f"' % self.total_distance)
feedback_msg.current_total = self.total_distance
goal_handle.publish_feedback(feedback_msg)
if N > 5:
self.check_finished_lap_flag = True
goal_handle.succeed()
result = OdomRecord.Result()
result.list_of_odoms = self.odom_record
self.get_logger().info('The Action Server has finished, it has recorded: "%s" points' % str(len(self.odom_record)))
return result
def odom_callback(self, msg):
x = msg.pose.pose.position.x
y = msg.pose.pose.position.y
if not self.start_point_identified:
self.start_point.x = x
self.start_point.y = y
self.get_logger().info('Start point: ("%f", "%f")' %(self.start_point.x, self.start_point.y))
self.start_point_identified = True
else:
self.current_point = Point()
self.current_point.x = x
self.current_point.y = y
self.check_finished_lap()
def check_finished_lap(self):
self.distance_to_start = np.sqrt(np.power(self.current_point.x - self.start_point.x, 2) + np.power(self.current_point.y - self.start_point.y, 2))
if self.check_finished_lap_flag and self.distance_to_start <= 0.1:
self.lap_finished = True
def main(args=None):
rclpy.init(args=args)
odom_recorder_node = OdomRecordServer()
# Create a MultiThreadedExecutor with 3 threads
executor = MultiThreadedExecutor(num_threads=3)
# Add the node to the executor
executor.add_node(odom_recorder_node)
try:
# Spin the executor
executor.spin()
finally:
# Shutdown the executor
executor.shutdown()
odom_recorder_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
and this is my wall_following.py :
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from custom_interfaces.srv import FindWall
from custom_interfaces.action import OdomRecord
from rclpy.action import ActionClient
class WallFollower(Node):
def __init__(self):
super().__init__('wall_follower')
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
self.subscription_ = self.create_subscription(
LaserScan,
'/scan',
self.laser_callback,
10
)
self.wall_distance_ = 0.3
self.front_wall_distance_ = 0.5
self.laser_forward = 0
self.wall_aligned = False
self.ranges = []
# Create the find_wall service client
self.find_wall_client = self.create_client(FindWall, 'find_wall')
self.request_sent = False
if not self.find_wall_client.wait_for_service(timeout_sec=1.0):
self.get_logger().error('Service not available')
else:
self.get_logger().info('Service is available')
# Create an action client for the '/record_odom' action server
self.odom_record_client = ActionClient(self, OdomRecord, '/record_odom')
# Flag to check if the odom recording has been started
self.odom_recording_started = False
self.timer = self.create_timer(0.1, self.timer_callback)
def timer_callback(self):
# If not aligned and not requested find_wall service,
# AND if odom recording hasn't started, start it.
if not self.wall_aligned and not self.odom_recording_started:
self.start_odom_recording()
# Existing find wall logic
if not self.wall_aligned and not self.request_sent:
request = FindWall.Request()
future = self.find_wall_client.call_async(request)
future.add_done_callback(self.find_wall_response_callback)
self.request_sent = True
if self.wall_aligned:
self.wall_following_behavior()
def start_odom_recording(self):
# Check if the action server is available
if not self.odom_record_client.wait_for_server(timeout_sec=1.0):
self.get_logger().error('Odom Record Action Server not available!')
return
# Send goal to start recording
goal_msg = OdomRecord.Goal()
self.get_logger().info('Sending goal to start odom recording...')
send_goal_future = self.odom_record_client.send_goal_async(goal_msg)
send_goal_future.add_done_callback(self.odom_recording_goal_sent_callback)
self.odom_recording_started = True
def odom_recording_goal_sent_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Odom Record goal rejected :(')
return
self.get_logger().info('Odom Record goal accepted :)')
get_result_future = goal_handle.get_result_async()
get_result_future.add_done_callback(self.odom_recording_result_callback)
def odom_recording_result_callback(self, future):
status = future.result().status
if status == GoalStatus.STATUS_SUCCEEDED:
self.get_logger().info('Odom recording completed successfully.')
else:
self.get_logger().info('Odom recording failed with status code: {0}'.format(status))
def find_wall_response_callback(self, future):
try:
response = future.result()
if response and response.wallfound:
self.get_logger().info('Wall found! Robot is aligned with the wall.')
self.wall_aligned = True
else:
self.get_logger().info('Wall not found yet.')
except Exception as e:
self.get_logger().error('Service call failed: %s' % str(e))
def wall_following_behavior(self):
ranges = self.ranges
if not ranges:
return
right_ray_index = int(len(ranges) * 3 / 4)
self.laser_forward = ranges[0]
if self.laser_forward < self.front_wall_distance_:
self.update_velocity(0.0, 0.35)
return
right_distance = ranges[right_ray_index]
if right_distance > self.wall_distance_ + 0.05:
self.update_velocity(0.15, -0.35)
elif right_distance < self.wall_distance_ - 0.05:
self.update_velocity(0.15, 0.35)
else:
self.update_velocity(0.15, 0.0)
def laser_callback(self, msg):
self.ranges = msg.ranges
def update_velocity(self, linear, angular):
msg = Twist()
msg.linear.x = linear
msg.angular.z = angular
self.publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
wall_follower = WallFollower()
executor = rclpy.executors.MultiThreadedExecutor(num_threads=3)
executor.add_node(wall_follower)
try:
executor.spin()
finally:
executor.shutdown()
wall_follower.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
I hope you can help me
Thank you.