defget_input_from_gamepad(queue):""" Reads input from the gamepad and puts it in the queue. Args: queue (Queue): The queue to put the input events in. """whileTrue:# See if the gamepad is plugged intry:gamepad=inputs.devices.gamepads[0]exceptIndexError:logger.warning("There is no controller plugged in. Trying again later...")time.sleep(5)# continue# TODO: Hotplug gamepadreturnNone# If it is plugged in, get events from itwhileTrue:foreventingamepad.read():queue.put(event)
defstart_processing_input(system_queue,demo_input_queue):""" Listens to input on the device and puts it into the appropriate queue. Args: system_queue (Queue): The queue to put system input events in. demo_input_queue (Queue): The queue to put demo input events in. """queue=Queue()thread=threading.Thread(target=get_input_from_gamepad,args=(queue,),daemon=True)thread.start()defprocess():leftPressed=FalserightPressed=FalseupPressed=FalsedownPressed=False# TODO: Handle start and select in system queue?whileTrue:whilenotqueue.empty():try:event=queue.get(block=False)exceptEmpty:breakifevent.code=="ABS_X":ifevent.state==0:demo_input_queue.put("LEFT_P")leftPressed=Trueelifevent.state==127:ifleftPressed:demo_input_queue.put("LEFT_R")leftPressed=FalseelifrightPressed:demo_input_queue.put("RIGHT_R")rightPressed=Falseelifevent.state==255:demo_input_queue.put("RIGHT_P")rightPressed=Trueelifevent.code=="ABS_Y":ifevent.state==0:demo_input_queue.put("UP_P")upPressed=Trueelifevent.state==127:ifupPressed:demo_input_queue.put("UP_R")upPressed=FalseelifdownPressed:demo_input_queue.put("DOWN_R")downPressed=Falseelifevent.state==255:demo_input_queue.put("DOWN_P")downPressed=Trueelifevent.code=="BTN_BASE4":ifevent.state:demo_input_queue.put("START_P")else:demo_input_queue.put("START_R")elifevent.code=="BTN_BASE3":ifevent.state:demo_input_queue.put("SEL_P")else:demo_input_queue.put("SEL_R")elifevent.code=="BTN_THUMB":ifevent.state:demo_input_queue.put("PRI_P")else:demo_input_queue.put("PRI_R")elifevent.code=="BTN_THUMB2":ifevent.state:demo_input_queue.put("SEC_P")else:demo_input_queue.put("SEC_R")yieldreturnprocess()
defstart_processing_input(system_queue,demo_input_queue):""" Listens to input on the device and puts it into the appropriate queue. Args: system_queue (Queue): The queue to put system input events in. demo_input_queue (Queue): The queue to put demo input events in. """whileTrue:events=pygame.event.get()foreventinevents:# Check for KEYDOWN event and pass into input queueifevent.type==KEYDOWN:# If the Esc key is pressed, then exit the main loopifevent.key==K_ESCAPE:system_queue.put("QUIT")elifevent.key==K_LEFT:demo_input_queue.put("LEFT_P")elifevent.key==K_UP:demo_input_queue.put("UP_P")elifevent.key==K_RIGHT:demo_input_queue.put("RIGHT_P")elifevent.key==K_DOWN:demo_input_queue.put("DOWN_P")elifevent.key==K_RETURN:demo_input_queue.put("START_P")elifevent.key==K_SPACE:demo_input_queue.put("PRI_P")elifevent.key==K_n:demo_input_queue.put("SEC_P")elifevent.key==K_RCTRL:demo_input_queue.put("SEL_P")# check for KEYUP event and pass into input queueelifevent.type==KEYUP:ifevent.key==K_LEFT:demo_input_queue.put("LEFT_R")elifevent.key==K_UP:demo_input_queue.put("UP_R")elifevent.key==K_RIGHT:demo_input_queue.put("RIGHT_R")elifevent.key==K_DOWN:demo_input_queue.put("DOWN_R")elifevent.key==K_RETURN:demo_input_queue.put("START_R")elifevent.key==K_SPACE:demo_input_queue.put("PRI_R")elifevent.key==K_n:demo_input_queue.put("SEC_R")elifevent.key==K_RCTRL:demo_input_queue.put("SEL_R")# Check for QUIT event.elifevent.type==QUIT:system_queue.put("QUIT")pygame_widgets.update(events)yield
defstart_processing_input(system_queue,demo_input_queue):""" Listens to input on the device and puts it into the appropriate queue. Args: system_queue (Queue): The queue to put system input events in. demo_input_queue (Queue): The queue to put demo input events in. """defon_message(client,userdata,message):logger.debug("on_message triggered")try:data=json.loads(message.payload)msg_type=data["type"]msg_input=data["input"]ifmsg_type=="system":logger.debug("Putting message into system queue: {}",msg_input)system_queue.put(msg_input)elifmsg_type=="demo":logger.debug("Putting message into input queue: {}",msg_input)demo_input_queue.put(msg_input)else:logger.error("Unknown MQTT message type: {}",msg_type)exceptjson.decoder.JSONDecodeError:logger.error("MQTT message was not json.")exceptKeyError:logger.error("MQTT message did not have correct keys.")defon_connect(client,userdata,flags,rc):logger.info("MQTT Client connected ({})",rc)# Subscribing in on_connect() means that if we lose the connection and# reconnect then subscriptions will be renewed.client.subscribe("byu_sss/input")defon_disconnect(client,userdata,rc):logger.info("MQTT Client disconnected ({})",rc)withopen("mqtt_config.yaml")asf:config=safe_load(f)ifnotconfig["host"]ornotconfig["port"]:raiseValueError("mqtt_config.yaml not set up")client=mqtt.Client()client.username_pw_set(config["username"],config["password"])ifconfig["tls"]:client.tls_set()client.on_message=on_messageclient.on_connect=on_connectclient.on_disconnect=on_disconnectdefprocess():logger.debug("In process func")whileTrue:try:logger.debug("Connecting to broker")client.connect(config["host"],config["port"])logger.debug("...done")whileTrue:client.loop(timeout=0.01)yieldexcept(ConnectionRefusedError,socket.gaierror,Exception)ase:logger.warning("Unable to connect to broker... trying again later.")for_inrange(100):yieldcontinuereturnprocess()
Last update:
August 28, 2023
Created:
July 13, 2022