#!/usr/bin/env python # Raspoid - version 0.1 ############################################ # This Raspoid is able to do the following: # - connect to a "master" raspberry # - blink a led when the connection is up # - recieve a start signal from the master # - start move forward, with a speed proportional to the distance # from the raspoid to the obstacle # - stop moving when touching the obstacle # ## # The Raspoid is composed of: (NXT) # - 2 motors (left: Port C; right: Port B) # - 1 touch sensor (Port 1) # - 1 ultrasound sensor (Port 2) # ## # Run the Raspoid with: # - On the Raspoid: # sudo python raspoid_v0-1.py # - On the master raspberry: # sudo python master_pilot_v0-1.py # ## # Original Authors: Gael Wittorski, Julien Louette # Initial Date: Oct 05, 2015 # import socket from threading import Thread from BrickPi import * #import BrickPi.py file to use BrickPi operations import RPi.GPIO as GPIO import signal import sys ## logs levels FORCED = 0 # Very few logs - critically needed INFO = 1 # Few logs - all relevant DEBUG = 2 # Many logs ## groups of logs GROUP_1 = 1 GPIO.setmode(GPIO.BOARD) GPIO.setup(12, GPIO.OUT) #GPIO 18 GPIO.setup(13, GPIO.OUT) #GPIO 27 GPIO.output(12, False) GPIO.output(13, False) last_received_message = None def end_program(signal, frame): GPIO.output(12, False) GPIO.output(13, False) GPIO.cleanup() sys.exit(0) signal.signal(signal.SIGINT, end_program) class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class CLI(Thread): def __init__(self, name = "cli"): Thread.__init__(self) self.name = name self.terminated = False def run(self): global last_received_message available_network_commands = ["run", "stop", "blink_led", "runf2", "runf1", "runb2", "runb1", "turnleft2"] available_cli_commands = ["sstop", "fstop"] GPIO.output(12, True) log("Thread started properly", INFO, self.name) while not self.terminated: new_command = raw_input("Enter a new command:\n") if new_command in available_network_commands: last_received_message = new_command elif new_command in available_cli_commands: if new_command == "sstop": for thread in threads: if thread.name != "cli": thread.stop() self.stop() elif new_command == "fstop": for thread in threads: if thread.name != "cli": thread._Thread__stop() GPIO.output(12, False) GPIO.output(13, False) self._Thread__stop() else: log("Unrecognized command", INFO, self.name) time.sleep(1.0) log("Thread ended properly", INFO, self.name) def stop(self): self.terminated = True class Network(Thread): def __init__(self, name = "network"): Thread.__init__(self) self.name = name self.terminated = False def run(self): global last_received_message log("Thread started properly", INFO, self.name) connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connection.bind(('', 12345)) connection.listen(5) # max number of connections connection_with_master, infos_connection_with_master = connection.accept() log("Connection accepted " + str(infos_connection_with_master), INFO, self.name) GPIO.output(13, True) while not self.terminated: new_packet = connection_with_master.recv(1024) last_received_message = new_packet.strip() log("New packet received: " + new_packet, INFO, self.name) connection_with_master.close() GPIO.output(13, False) log("Thread ended properly", INFO, self.name) def stop(self): self.terminated = True class Raspoid(Thread): def __init__(self, name = "Raspoid"): Thread.__init__(self) self.name = name self.terminated = False self.last_log = None self.last_group_log = None def run(self): global last_received_message log("Thread started properly", INFO, self.name) BrickPiSetup() # setup the serial port for communication # Set the sensor types + motors BrickPi.SensorType[PORT_1] = TYPE_SENSOR_TOUCH BrickPi.SensorType[PORT_2] = TYPE_SENSOR_ULTRASONIC_CONT BrickPi.MotorEnable[PORT_B] = 1 BrickPi.MotorEnable[PORT_C] = 1 BrickPiSetupSensors() #Send the properties of sensors to BrickPi. Set up the BrickPi. # if selected_log_level == INFO : we print only once the different logs # if selected_log_level == DEBUG : we print each log def log_just_once(arg, group = -1): if selected_log_level == INFO: if self.last_log != arg: if group != -1: if self.last_group_log != group: log(arg, INFO, self.name) self.last_group_log = group self.last_log = arg else: log(arg, INFO, self.name) self.last_group_log = None self.last_log = arg else: log(arg, DEBUG, self.name) def run_forward(power = 250, duration = .1): log_just_once("Running Forward") run(abs(power), duration) def run_backward(power = 250, duration = .1): log_just_once("Running Backward") run(-abs(power), duration) def run(power = 0, duration = .1): BrickPi.MotorSpeed[PORT_B] = power #Set the speed of MotorB (-255 to 255) BrickPi.MotorSpeed[PORT_C] = power #Set the speed of MotorC (-255 to 255) ot = time.time() while(time.time() - ot < duration): #running while loop for 0.1 second BrickPiUpdateValues() # Ask BrickPi to update values for sensors/motors time.sleep(.1) def turn_left(power = 0, duration = .1): BrickPi.MotorSpeed[PORT_B] = 150 BrickPi.MotorSpeed[PORT_C] = 50 ot = time.time() while(time.time() - ot < duration): #running while loop for 0.1 second BrickPiUpdateValues() # Ask BrickPi to update values for sensors/motors time.sleep(.1) def stop(): BrickPi.MotorSpeed[PORT_B] = 0 BrickPi.MotorSpeed[PORT_C] = 0 BrickPiUpdateValues() while not self.terminated: if last_received_message == "run": result = BrickPiUpdateValues() if not result: button_value = BrickPi.Sensor[PORT_1] if button_value == 0: power = 250 dist = BrickPi.Sensor[PORT_2] if dist <= 25: power = 10 * dist log("Power: " + str(power), DEBUG, self.name) run_forward(power) else: log_just_once("NXT Touch button pressed", GROUP_1) stop() log_just_once("Raspoid stopped", GROUP_1) else: time.sleep(.01) elif last_received_message == "runf2": run_forward(250, 2) last_received_message = None time.sleep(2) elif last_received_message == "runf1": run_forward(250, 1) last_received_message = None time.sleep(1) elif last_received_message == "runb2": run_backward(250, 2) last_received_message = None time.sleep(2) elif last_received_message == "runb1": run_backward(250, 1) last_received_message = None elif last_received_message == "turnleft2": turn_left(150, 1.5) last_received_message = None else: stop() log_just_once("Raspoid stopped") time.sleep(.01) log("Thread ended properly", INFO, self.name) def stop(self): self.terminated = True ## logs levels # 0. forced # 1. info (+forced) # 2. debug (+info +forced) selected_log_level = INFO def log(arg, log_level = INFO, thread_name = None): if log_level <= selected_log_level: if thread_name == None: print arg else: color = None if thread_name == "cli": color = bcolors.OKBLUE elif thread_name == "Raspoid": color = bcolors.OKGREEN elif thread_name == "network": color = bcolors.WARNING if color == None: print "(" + thread_name + ") " + arg else: print color + "(" + thread_name + ") " + arg + bcolors.ENDC threads = [Network(), Raspoid(), CLI()] for thread in threads: thread.start()