About ROS topics_quiz python

The rover is either not moving at all or moving very slowly to the pick up location. When I prompt it to go back to home location, it reaches within certain distance and keeps rotating around.
I feel this is because of calling obstacle avoidance in laser_callback simultaneously with the move_to_position command. How did you guys approach it?

def laserscan_callback(self,msg):

    #The below code is same as one in 
    #subscriber obstacle.py with IMP changes
    #the logic for detections remains the same
    #if returning to the origin, skip normal 
    #exploration behavior

    # if self.returning_to_origin:
    #     self.return_to_origin()
    #     return

    sectors = {
    "Right_Rear": (0,33),
    "Right": (34,66),
    "Front_Right":(67,100),
    "Front_Left":(101,133),
    "Left":(134,166),
    "Left_Rear":(167,199)
    }

    #Initialize the minumum distance
    for key in sectors.keys():
        min_distances = {key: float('inf')}
    
    #Find the minimum distance in each sector
    for sector, (start_idx, end_idx) in sectors.items():
        #Ensure the index range is within bounds and not empty
        if start_idx < len(msg.ranges) and end_idx < len(msg.ranges):
            sector_ranges = msg.ranges[start_idx:end_idx+1]
            if sector_ranges:
                min_distances[sector]=min(sector_ranges)

    #define obstacle_threshold
    obstacle_threshold = 0.1 
    detections = {}

     #Determine detected obstacles
    for sector, min_distance in min_distances.items():
        if min_distance < obstacle_threshold:
            detections[sector]=True
        else:
            detections[sector]=False

    #Determine suggested action based on detection
    action = Twist()
    
    if self.mission_active==True:
    #if obstacles are detected in both the front sectors, continue turning
        if detections["Front_Left"] or detections["Front_Right"]:
            if not self.turning:
                #Start turning if not already turning
                self.turning=True
                self.turn_direction=-0.5
            action.angular.z=self.turn_direction #Continue turning
            self.get_logger().info('Obstacle ahead. turning to clear path')


    else:
        self.turning=False #stop turning
        #Priority 2: Side detections
        if detections["Left"]:
            action.linear.x = 0.2 #Move forward
            action.angular.z=-0.3 #Turn slightly right
            self.get_logger().info('Obstacle on the left, turning slight right')
        elif detections["Right"]:
            action.linear.x = 0.2 
            action.angular.z=0.3 #Slight right
            self.get_logger().info('Obstacle on the right, turning slight left')
        elif detections["Right_Rear"]:
            action.linear.x=0.3
            self.get_logger().info('Obstacle on the right rear, moving forward')
        elif detections["Left_Rear"]:
            action.linear.x=0.3
            self.get_logger().info('Obstacle on left rear, moving forward')
        else:
            action.linear.x=0.5
            self.get_logger().info('No obstacle detected')

        #Publish the action command
        self.publisher_.publish(action)


def odom_callback(self,msg):
    #Extract the x,y coordinates from the odometry message
    self.current_position['x'] = msg.pose.pose.position.x
    self.current_position['y'] = msg.pose.pose.position.y

    #Calculate the yaw (orientation around the z-axis)
    orientation_q=msg.pose.pose.orientation
    orientation_list=[orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w]
    (roll,pitch,self.yaw) = tf_transformations.euler_from_quaternion(orientation_list)

    #Check if the goal has been reached
    if self.current_goal:
        distance_to_goal= math.sqrt(
        (self.current_position['x'] - self.current_goal['x'])**2 +
        (self.current_position['y'] - self.current_goal['y'])**2
        )

        if distance_to_goal < self.goal_tolerance:
            self.stop_rover()
            self.mission_active=False
            self.current_goal=None
            self.get_logger().info("Goal reached. Robot stopped")
            self.status_publisher_.publish(String(data='goal-reached'))



def is_at_position(self,target_position):
    """Check if the robot is within the tolerance of a tartget position """
    distance_to_target = math.sqrt(
    (self.current_position['x']-target_position['x'])**2 +
    (self.current_position['y']-target_position['y'])**2
    )

    return distance_to_target < self.goal_tolerance

def move_to_position(self,goal_position):
    action=Twist()

    #Calculate the desired angle the origin
    desired_yaw=math.atan2(goal_position['y']-self.current_position['y'],goal_position['x']-self.current_position['x'])
  # Calculate the difference between current yaw and desired yaw
    yaw_error = desired_yaw - self.yaw
    yaw_error=(yaw_error+math.pi)%(2*math.pi)-math.pi

    #if yaw error is significant, rotate towards origin

    if abs(yaw_error)>0.1:
        if yaw_error>0:
            action.angular.z=0.5
        else:
            action.angular.z=-0.5
        self.get_logger().info(f'Turning towards position. Yaw error: {yaw_error:.2f}')
    else:
        #already oriented towards the origin. so all good, move forward
        action.linear.x=0.5
        self.get_logger().info('Heading towards position')

    #Publish the action command
    self.publisher_.publish(action)

You are in the right track. I suggest that you divide a bit more the functionality into methods so that its cleaner and easier to debug. The main idea is that we have a subscriber that reacived the command as a string and updates an internal var . Here is ane example:

def mission_callback(self, msg):
        # Handle mission commands
        if msg.data == "Go-Home":
            self.get_logger().info("Received 'Go-Home' command.")
            if self.is_at_position(self.home_position):
                self.get_logger().info("Already at home. No need to move.")
                self.stop_robot()
                self.status_publisher_.publish(String(data='already-at-home'))
            else:
                self.get_logger().info("Moving to home position (0, 0).")
                self.current_goal = self.home_position
                self.mission_active = True

        elif msg.data == "Go-Pickup":
            self.get_logger().info("Received 'Go-Pickup' command.")
            if self.is_at_position(self.pickup_position):
                self.get_logger().info("Already at pickup. No need to move.")
                self.stop_robot()
                self.status_publisher_.publish(String(data='already-at-pickup'))
            else:
                self.get_logger().info("Moving to pickup position (5.0, -3.0).")
                self.current_goal = self.pickup_position
                self.mission_active = True

Then inside the laser callback we introduce that logic that will be executed all the time eahc time a laser reading is recieved. Here is the structure:

def laserscan_callback(self, msg):
        # Define sectors and minimum distances for obstacle detection
        sectors = {
            "Front_Left": (100, 140),
            "Front_Right": (20, 60),
            "Left": (141, 179),
            "Right": (0, 19)
        }

        min_distances = {key: float('inf') for key in sectors}
        for sector, (start_idx, end_idx) in sectors.items():
            if start_idx < len(msg.ranges) and end_idx < len(msg.ranges):
                sector_ranges = msg.ranges[start_idx:end_idx + 1]
                if sector_ranges:
                    min_distances[sector] = min(sector_ranges)

        obstacle_threshold = 0.8  # meters
        detections = {sector: min_distance < obstacle_threshold for sector, min_distance in min_distances.items()}

        # If a mission is active, navigate while avoiding obstacles
        if self.mission_active and self.current_goal:
            self.navigate_to_goal(detections)

. Try to improve apon the navigate_to_goal() method. I don’t want to give you the answer unless you get super stuck, trying it is what will help you learn.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.