Skip to content

Controllers

Gamepad Controller

get_input_from_gamepad(queue)

Reads input from the gamepad and puts it in the queue.

Parameters:

Name Type Description Default
queue Queue

The queue to put the input events in.

required
Source code in controllers/gamepad.py
def get_input_from_gamepad(queue):
    """
    Reads input from the gamepad and puts it in the queue.

    Args:
        queue (Queue): The queue to put the input events in.
    """
    while True:
        # See if the gamepad is plugged in
        try:
            gamepad = inputs.devices.gamepads[0]
        except IndexError:
            logger.warning("There is no controller plugged in. Trying again later...")
            time.sleep(5)
            # continue
            # TODO: Hotplug gamepad
            return None

        # If it is plugged in, get events from it
        while True:
            for event in gamepad.read():
                queue.put(event)

start_processing_input(system_queue, demo_input_queue)

Listens to input on the device and puts it into the appropriate queue.

Parameters:

Name Type Description Default
system_queue Queue

The queue to put system input events in.

required
demo_input_queue Queue

The queue to put demo input events in.

required
Source code in controllers/gamepad.py
def start_processing_input(system_queue, demo_input_queue):
    """
    Listens to input on the device and puts it into the appropriate queue.

    Args:
        system_queue (Queue): The queue to put system input events in.
        demo_input_queue (Queue): The queue to put demo input events in.
    """
    queue = Queue()

    thread = threading.Thread(target=get_input_from_gamepad, args=(queue,), daemon=True)
    thread.start()

    def process():
        leftPressed = False
        rightPressed = False
        upPressed = False
        downPressed = False

        # TODO: Handle start and select in system queue?
        while True:
            while not queue.empty():
                try:
                    event = queue.get(block=False)
                except Empty:
                    break

                if event.code == "ABS_X":
                    if event.state == 0:
                        demo_input_queue.put("LEFT_P")
                        leftPressed = True
                    elif event.state == 127:
                        if leftPressed:
                            demo_input_queue.put("LEFT_R")
                            leftPressed = False
                        elif rightPressed:
                            demo_input_queue.put("RIGHT_R")
                            rightPressed = False
                    elif event.state == 255:
                        demo_input_queue.put("RIGHT_P")
                        rightPressed = True
                elif event.code == "ABS_Y":
                    if event.state == 0:
                        demo_input_queue.put("UP_P")
                        upPressed = True
                    elif event.state == 127:
                        if upPressed:
                            demo_input_queue.put("UP_R")
                            upPressed = False
                        elif downPressed:
                            demo_input_queue.put("DOWN_R")
                            downPressed = False
                    elif event.state == 255:
                        demo_input_queue.put("DOWN_P")
                        downPressed = True
                elif event.code == "BTN_BASE4":
                    if event.state:
                        demo_input_queue.put("START_P")
                    else:
                        demo_input_queue.put("START_R")
                elif event.code == "BTN_BASE3":
                    if event.state:
                        demo_input_queue.put("SEL_P")
                    else:
                        demo_input_queue.put("SEL_R")
                elif event.code == "BTN_THUMB":
                    if event.state:
                        demo_input_queue.put("PRI_P")
                    else:
                        demo_input_queue.put("PRI_R")
                elif event.code == "BTN_THUMB2":
                    if event.state:
                        demo_input_queue.put("SEC_P")
                    else:
                        demo_input_queue.put("SEC_R")
            yield

    return process()

Keyboard Controller

start_processing_input(system_queue, demo_input_queue)

Listens to input on the device and puts it into the appropriate queue.

Parameters:

Name Type Description Default
system_queue Queue

The queue to put system input events in.

required
demo_input_queue Queue

The queue to put demo input events in.

required
Source code in controllers/keyboard.py
def start_processing_input(system_queue, demo_input_queue):
    """
    Listens to input on the device and puts it into the appropriate queue.

    Args:
        system_queue (Queue): The queue to put system input events in.
        demo_input_queue (Queue): The queue to put demo input events in.
    """
    while True:
        events = pygame.event.get()

        for event in events:
            # Check for KEYDOWN event and pass into input queue
            if event.type == KEYDOWN:
                # If the Esc key is pressed, then exit the main loop
                if event.key == K_ESCAPE:
                    system_queue.put("QUIT")
                elif event.key == K_LEFT:
                    demo_input_queue.put("LEFT_P")
                elif event.key == K_UP:
                    demo_input_queue.put("UP_P")
                elif event.key == K_RIGHT:
                    demo_input_queue.put("RIGHT_P")
                elif event.key == K_DOWN:
                    demo_input_queue.put("DOWN_P")
                elif event.key == K_RETURN:
                    demo_input_queue.put("START_P")
                elif event.key == K_SPACE:
                    demo_input_queue.put("PRI_P")
                elif event.key == K_n:
                    demo_input_queue.put("SEC_P")
                elif event.key == K_RCTRL:
                    demo_input_queue.put("SEL_P")

            # check for KEYUP event and pass into input queue
            elif event.type == KEYUP:
                if event.key == K_LEFT:
                    demo_input_queue.put("LEFT_R")
                elif event.key == K_UP:
                    demo_input_queue.put("UP_R")
                elif event.key == K_RIGHT:
                    demo_input_queue.put("RIGHT_R")
                elif event.key == K_DOWN:
                    demo_input_queue.put("DOWN_R")
                elif event.key == K_RETURN:
                    demo_input_queue.put("START_R")
                elif event.key == K_SPACE:
                    demo_input_queue.put("PRI_R")
                elif event.key == K_n:
                    demo_input_queue.put("SEC_R")
                elif event.key == K_RCTRL:
                    demo_input_queue.put("SEL_R")

            # Check for QUIT event.
            elif event.type == QUIT:
                system_queue.put("QUIT")

        pygame_widgets.update(events)
        yield

MQTT Controller

start_processing_input(system_queue, demo_input_queue)

Listens to input on the device and puts it into the appropriate queue.

Parameters:

Name Type Description Default
system_queue Queue

The queue to put system input events in.

required
demo_input_queue Queue

The queue to put demo input events in.

required
Source code in controllers/mqtt.py
def start_processing_input(system_queue, demo_input_queue):
    """
    Listens to input on the device and puts it into the appropriate queue.

    Args:
        system_queue (Queue): The queue to put system input events in.
        demo_input_queue (Queue): The queue to put demo input events in.
    """

    def on_message(client, userdata, message):
        logger.debug("on_message triggered")
        try:
            data = json.loads(message.payload)
            msg_type = data["type"]
            msg_input = data["input"]

            if msg_type == "system":
                logger.debug("Putting message into system queue: {}", msg_input)
                system_queue.put(msg_input)
            elif msg_type == "demo":
                logger.debug("Putting message into input queue: {}", msg_input)
                demo_input_queue.put(msg_input)
            else:
                logger.error("Unknown MQTT message type: {}", msg_type)

        except json.decoder.JSONDecodeError:
            logger.error("MQTT message was not json.")
        except KeyError:
            logger.error("MQTT message did not have correct keys.")

    def on_connect(client, userdata, flags, rc):
        logger.info("MQTT Client connected ({})", rc)

        # Subscribing in on_connect() means that if we lose the connection and
        # reconnect then subscriptions will be renewed.
        client.subscribe("byu_sss/input")

    def on_disconnect(client, userdata, rc):
        logger.info("MQTT Client disconnected ({})", rc)

    with open("mqtt_config.yaml") as f:
        config = safe_load(f)

    if not config["host"] or not config["port"]:
        raise ValueError("mqtt_config.yaml not set up")

    client = mqtt.Client()
    client.username_pw_set(config["username"], config["password"])

    if config["tls"]:
        client.tls_set()

    client.on_message = on_message
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect

    def process():
        logger.debug("In process func")
        while True:
            try:
                logger.debug("Connecting to broker")
                client.connect(config["host"], config["port"])
                logger.debug("...done")

                while True:
                    client.loop(timeout=0.01)
                    yield

            except (ConnectionRefusedError, socket.gaierror, Exception) as e:
                logger.warning("Unable to connect to broker... trying again later.")

                for _ in range(100):
                    yield
                continue

    return process()

Last update: August 28, 2023
Created: July 13, 2022