''' ir_sensor_subscriber.py Tufts Create®3 Educational Robot Example by Kate Wujciak This file shows how to subscribe to a topic in ROS2 using the Create®3. It subscribes to the IR sensor and displays the relevant information in your terminal. ''' import sys import rclpy import time ''' Statements that import messages, actions, interfaces, and variable types. ''' from rclpy.node import Node from rclpy.action import ActionClient from rclpy.qos import qos_profile_sensor_data from irobot_create_msgs.msg import IrIntensityVector from geometry_msgs.msg import Twist from irobot_create_msgs.msg import AudioNoteVector, AudioNote from rclpy.time import Duration from irobot_create_msgs.action import RotateAngle ''' camera imports ''' from keras.models import load_model # TensorFlow is required for Keras to work import cv2 # Install opencv-python import numpy as np from picamera2 import Picamera2 import cv2 as cv from libcamera import controls import time TF = True class RotateActionCLient(Node): def __init__(self): super().__init__('Rotate_Angle') #print('hello') self._action_client = ActionClient(self, RotateAngle, 'rotate_angle') def send_goal(self, angle): goal_msg = RotateAngle.Goal() goal_msg.angle = angle goal_msg.max_rotation_speed = .5 self._action_client.wait_for_server() self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback) self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('Goal rejected :(') return self.get_logger().info('Goal accepted :)') self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_results_callback) def get_results_callback(self, future): results = future.result().result self.get_logger().info(f'Rotation Completed with result: {results}') def feedback_callback(self, feedback_msg): # Store the feedback message as a variable feedback = feedback_msg.feedback self.get_logger().info(f'Received Feedback: {feedback}') class IRSubscriber(Node): ''' The IRSubscriber class is created which is a subclass of Node. The Node is subscribing to the /ir_intensity topic. ''' def __init__(self): ''' The following line calls the Node class' constructor and declares a node name, which is 'IR_subscriber' in this case. ''' super().__init__('IR_subscriber') ''' This line indicates that the node is subscribing to the IrIntensityVector type over the '/ir_intensity' topic. ''' print('Creating subscription to to the IrIntensityVector type over the /ir_intensity topic') self.subscription = self.create_subscription( IrIntensityVector, '/ir_intensity', self.listener_callback, qos_profile_sensor_data) self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10) def listener_callback(self, msg:IrIntensityVector): ''' The subscriber's callback listens and as soon as it receives the message, this function runs. This callback function is basically printing what it hears. It runs the data it receives in your terminal (msg). ''' #print('Now listening to IR sensor readings it hears...') self.printIR(msg) def printIR(self, msg): global previous_error, sum_error, TF ''' This function is used in the above function. Its purpose is to determine which parts of the info are worth showing. :type msg: IrIntensity :rtype: None The msg is returned from our topic '/ir_intensity.' To get components of a message, use the '.' dot operator. ''' print('Printing IR sensor readings:', end="") #print('hi') MSG = Twist() self.val = msg.readings[3].value val = msg.readings[3].value time.sleep(0.05) print(val) ''' error = target - val sum_error += error diff_error = error - previous_error speed = Kp * error + Kd * diff_error + Ki * sum_error previous_error = error speed = max(min(speed, 1.0), -1.0) if val < 70 and val > 20: print('Time to take a photo') MSG.linear.x = float(0) MSG.angular.z = 0.0 else: print('Gotta Correct') MSG.linear.x = float(speed) MSG.angular.z = 0.0 self.publisher_.publish(MSG) print("Published message: ", MSG.linear.x, MSG.angular.z) #for reading in msg.readings: # val = reading.value # print("IR Sensor:" + str(val)) #time.sleep(1) #MSG.linear.x = 0.0 #MSG.angular.z = 0.0 #self.publisher_.publish(MSG) if MSG.linear.x == 0.0 and MSG.angular.z == 0: TF = False''' class AudioPublisher(Node): def __init__(self): super().__init__('audio_publisher') self.publisher_ = self.create_publisher(AudioNoteVector, '/cmd_audio', 10) self.timer = self.create_timer(1.0, self.publish_audio_notes) # Adjust timer as needed def publish_audio_notes(self): audio_msg = AudioNoteVector() audio_msg.append = False # Ensure it does not append to previous notes # Define musical notes (frequencies in Hz, durations in seconds) notes_dict = { 'D': 587, 'A': 440, 'G': 784, 'F#': 740, 'E': 660, 'C#': 554, 'A2': 880, 'B': 988 #G, D, F#, E, B ,'D2': 294, 'G2': 392, 'F#2': 370, 'E': 330, 'E': 330, 'B2': 494 } def create_note(note, duration): return AudioNote(frequency=notes_dict[note], max_runtime=Duration(seconds=duration).to_msg()) short_pause = AudioNote(frequency=0, max_runtime=Duration(seconds=0.1875).to_msg()) # A short silence long_pause = AudioNote(frequency=0, max_runtime=Duration(seconds=0.3).to_msg()) # A longer silence # Pokémon Center Theme Notes with adjusted durations notes = [ # create_note('D', 0.4), create_note('A', 0.4), create_note('D', 0.4), create_note('A2', 0.4), short_pause, # create_note('G', 0.4), create_note('F#', 0.4), create_note('E', 0.4), create_note('C#', 0.4), long_pause, # create_note('C#', 0.4), create_note('A', 0.4), create_note('C#', 0.4), create_note('F#', 0.4), short_pause, # create_note('E', 0.4), create_note('C#', 0.4), create_note('D', 0.4), create_note('F#', 0.4), long_pause, # create_note('D', 0.4), create_note('A', 0.4), create_note('D', 0.4), create_note('A2', 0.4), short_pause, # create_note('G', 0.4), create_note('F#', 0.4), create_note('E', 0.4), create_note('C#', 0.4), long_pause, # create_note('C#', 0.4), create_note('A', 0.4), create_note('C#', 0.4), create_note('F#', 0.4), short_pause, # create_note('E', 0.4), create_note('C#', 0.4), create_note('D', 0.4), long_pause, create_note('D', 0.1875), create_note('C#', 0.1875), create_note('B2', 0.1875), short_pause, create_note('C#', 0.1875), create_note('B2', 0.1875), create_note('A', 0.1875), short_pause, create_note('B2', 0.1875), create_note('A', 0.1875), create_note('G2', 0.1875), create_note('F#2', 0.1875), create_note('A', 0.1875), short_pause, create_note('A', 0.1875), short_pause, create_note('D', 0.1875), create_note('C#', 0.1875), create_note('B2', 0.1875), short_pause, create_note('C#', 0.1875), create_note('B2', 0.1875), create_note('A', 0.1875), short_pause, create_note('B2', 0.1875), create_note('A', 0.1875), create_note('G2', 0.1875), create_note('F#2', 0.1875), create_note('D', 0.4), long_pause, create_note('D', 0.1875), create_note('C#', 0.1875), create_note('B2', 0.1875), short_pause, create_note('C#', 0.1875), create_note('B2', 0.1875), create_note('A', 0.1875), short_pause, create_note('B2', 0.1875), create_note('A', 0.1875), create_note('G2', 0.1875), create_note('F#2', 0.1875), create_note('A', 0.1875), short_pause, create_note('A', 0.1875), short_pause, create_note('D', 0.1875), create_note('C#', 0.1875), create_note('B2', 0.1875), short_pause, create_note('C#', 0.1875), create_note('B2', 0.1875), create_note('A', 0.1875), short_pause, create_note('B2', 0.1875), create_note('A', 0.1875), create_note('G2', 0.1875), create_note('F#2', 0.1875), create_note('D', 0.4), long_pause, #create_note('F#', 0.5), create_note('A2', 0.5), create_note('G', 0.3), create_note('A2', 0.3), #create_note('G', 0.3), create_note('F#', 0.3), create_note('E', 0.4), long_pause, #create_note('C#', 0.5), create_note('E', 0.5), create_note('F#', 0.3), create_note('G', 0.3), #create_note('F#', 0.3), create_note('E', 0.3), create_note('D', 0.4), long_pause, #create_note('F#', 0.5), create_note('A2', 0.5), create_note('G', 0.3), create_note('A2', 0.3), #create_note('G', 0.3), create_note('A2', 0.3), create_note('B', 0.4), long_pause, #create_note('A2', 0.4), create_note('G', 0.3), create_note('F#', 0.3), create_note('G', 0.5), #create_note('F#', 0.3), create_note('G', 0.3), create_note('F#', 0.3), create_note('E', 0.3), create_note('D', 0.5),long_pause, ] audio_msg.notes = notes self.get_logger().info('Publishing Pokémon Center Theme notes') self.publisher_.publish(audio_msg) def identify_pokemon(camera, model, class_names): image = camera.capture_array("main") image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) # Resize the raw image into (224-height,224-width) pixels image = cv2.resize(image, (224, 224), interpolation=cv2.INTER_AREA) # Show the image in a window #cv2.imshow("Webcam Image", image) # Make the image a numpy array and reshape it to the models input shape. image = np.asarray(image, dtype=np.float32).reshape(1, 224, 224, 3) # Normalize the image array image = (image / 127.5) - 1 # Predicts the model prediction = model.predict(image) index = np.argmax(prediction) class_name = class_names[index] confidence_score = prediction[0][index] # Print prediction and confidence score #print("Class:", class_name[2:], end="") #print("Confidence Score:", str(np.round(confidence_score * 100))[:-2], "%") return class_name[2:(len(class_name)-1)], str(np.round(confidence_score * 100))[:-2] def main(args=None): ########################################CAMERA SETUP##################################### print("starting camera") # Disable scientific notation for clarity np.set_printoptions(suppress=True) print("loading model") # Load the model model = load_model("keras_model.h5", compile=False) print(" done loading model") # Load the labels class_names = open("labels.txt", "r").readlines() camera = Picamera2() camera.set_controls({"AfMode": controls.AfModeEnum.Continuous}) camera.start() time.sleep(1) pokemon, confidence = identify_pokemon(camera, model, class_names) print("camera started") ########################################CAMERA SETUP##################################### target = 30 Kp = .001 Kd = .2 Ki = .005 previous_error = 0 sum_error = 0 pokemon_dict = { "Pikachu": "left", "Charmander": "right", "Squirtle": "left", "Snorlax": "left", "Bulbasaur": "right", "Nidoran": "yay", "Pokeball": "right", "Floor": "straight" } ''' This line initializes the rclpy library. ''' rclpy.init(args=args) ''' This creates the node. ''' IR_subscriber = IRSubscriber() action_client = RotateActionCLient() audio_publisher = AudioPublisher() ''' The node is then "spun" so its callbacks are called. ''' print('Callbacks are called.') try: done = False while not done: rclpy.spin_once(IR_subscriber) val = IR_subscriber.val MSG = Twist() # error = target - val # sum_error += error # diff_error = error - previous_error # speed = Kp * error + Kd * diff_error + Ki * sum_error # previous_error = error # speed = max(min(speed, 0.5), -0.5) if val > 8: print('Time to take a photo') MSG.linear.x = float(0) MSG.angular.z = 0.0 #print("turning now") time.sleep(1) #action_client.send_goal(-1.57) #spins the robot 90 degrees #rclpy.spin_once(action_client) #time.sleep(2) pokemon, confidence = identify_pokemon(camera, model, class_names) print(pokemon, confidence) if float(confidence) > 90: print(str(confidence) + "% confident", pokemon) if pokemon_dict[pokemon] == "left": print("turning left now") action_client.send_goal(1.57) #spins the robot 90 degrees rclpy.spin_once(action_client) time.sleep(2) elif pokemon_dict[pokemon] == "right": print("turning right now") action_client.send_goal(-1.57) #spins the robot 90 degrees rclpy.spin_once(action_client) time.sleep(2) elif pokemon_dict[pokemon] == "yay": print("done!") rclpy.spin_once(audio_publisher) action_client.send_goal(6.3) #spins the robot 90 degrees rclpy.spin_once(action_client) done = True else: print("thats the floor") else: #print('Gotta Correct') MSG.linear.x = float(.1) MSG.angular.z = 0.0 IR_subscriber.publisher_.publish(MSG) #print("Published message: ", MSG.linear.x, MSG.angular.z) time.sleep(0.001) #action_client.send_goal(-1.57) #spins the robot 90 degrees #rclpy.spin_once(action_client) except KeyboardInterrupt: print('\nCaught keyboard interrupt') finally: '''Destroying the node acts as a "reset" so we don't run into any problems when we try and run again''' print("Done") IR_subscriber.destroy_node() audio_publisher.destroy_node() action_client.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()