#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from std_msgs.msg import String
import math
import tf_transformations
class TopicsQuizNode(Node):
def init(self):
super().init(‘topics_quiz_node’)
# Subscribers
self.create_subscription(LaserScan, '/laser_scan', self.laserscan_callback, QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))
self.create_subscription(Odometry, '/odom', self.odom_callback, QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))
self.create_subscription(String, '/nasa_mission', self.mission_callback, 10)
# Publisher
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
# Internal state
self.turning = False
self.turn_direction = -0.5
self.distance_from_origin = 0.0
self.current_position = {'x': 0.0, 'y': 0.0}
self.yaw = 0.0
# Mission control
self.mission_active = False
self.mission_type = None # 'go_home' or 'go_pickup'
self.pickup_point = {'x': -2.342, 'y': -2.432}
self.home_point = {'x': 0, 'y': 0}
self.get_logger().info("Waiting for mission command on /nasa_mission...")
def mission_callback(self, msg: String):
if msg.data == 'Go-Home':
self.mission_active = True
self.mission_type = 'go_home'
self.get_logger().info('Received Go-Home command.')
elif msg.data == 'Go-Pickup':
self.mission_active = True
self.mission_type = 'go_pickup'
self.get_logger().info('Received Go-Pickup command.')
def laserscan_callback(self, msg: LaserScan):
if not self.mission_active:
return # Ignore laser data unless a mission is active
if self.mission_type not in ['go_home', 'go_pickup']:
return
if self.mission_type == 'go_home' and self.distance_from_origin > 5.0:
self.navigate_to_target({'x': 0.0, 'y': 0.0})
return
if self.mission_type == 'go_pickup' and self.distance_to_point(self.pickup_point) > 0.5:
self.navigate_to_target(self.pickup_point)
return
# If at target, stop
if self.at_target():
self.get_logger().info("Target reached. Stopping.")
self.stop_motion()
self.mission_active = False
self.mission_type = None
return
# If navigating, avoid obstacles
self.avoid_obstacles(msg)
def odom_callback(self, msg: Odometry):
self.current_position['x'] = msg.pose.pose.position.x
self.current_position['y'] = msg.pose.pose.position.y
self.distance_from_origin = self.distance_to_point({'x': 0.0, 'y': 0.0})
orientation_q = msg.pose.pose.orientation
orientation_list = [orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w]
(roll, pitch, self.yaw) = tf_transformations.euler_from_quaternion(orientation_list)
def distance_to_point(self, point):
dx = point['x'] - self.current_position['x']
dy = point['y'] - self.current_position['y']
return math.sqrt(dx**2 + dy**2)
def at_target(self):
if self.mission_type == 'go_home':
return self.distance_from_origin <= 0.5
elif self.mission_type == 'go_pickup':
return self.distance_to_point(self.pickup_point) <= 0.5
return False
def is_at_position(self, target_position):
"""Check if the robot is within the tolerance of a target position."""
distance_to_target = math.sqrt(
(self.current_position['x'] - target_position['x'])**2 +
(self.current_position['y'] - target_position['y'])**2
)
return distance_to_target < 0.5
def navigate_to_target(self, target):
action = Twist()
dx = target['x'] - self.current_position['x']
dy = target['y'] - self.current_position['y']
target_yaw = math.atan2(dy, dx)
yaw_error = target_yaw - self.yaw
yaw_error = (yaw_error + math.pi) % (2 * math.pi) - math.pi # normalize
if abs(yaw_error) > 0.1:
action.angular.z = 0.5 if yaw_error > 0 else -0.5
self.get_logger().info(f"Rotating to target: yaw error {yaw_error:.2f}")
else:
action.linear.x = 0.5
self.get_logger().info("Moving toward target...")
self.publisher_.publish(action)
def avoid_obstacles(self, msg: LaserScan):
# Sector scan
sectors = {
"Right": msg.ranges[0:60],
"Front": msg.ranges[70:110],
"Left": msg.ranges[120:180]
}
front_min = min(sectors["Front"]) if sectors["Front"] else float('inf')
left_min = min(sectors["Left"]) if sectors["Left"] else float('inf')
right_min = min(sectors["Right"]) if sectors["Right"] else float('inf')
action = Twist()
if front_min < 0.8:
action.angular.z = 0.5
self.get_logger().info("Obstacle ahead, turning.")
elif left_min < 0.5:
action.angular.z = -0.3
elif right_min < 0.5:
action.angular.z = 0.3
else:
action.linear.x = 0.3
self.publisher_.publish(action)
def stop_motion(self):
stop_msg = Twist()
self.publisher_.publish(stop_msg)
def main(args=None):
rclpy.init(args=args)
node = TopicsQuizNode()
rclpy.spin_once(node)
if not node.is_at_position(node.home_point) and not node.is_at_position(node.pickup_point):
node.get_logger().info("Initial position not at home or pickup. Moving to home position.")
node.mission_type = 'go_home'
node.mission_active = True
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if name == ‘main’:
main()