#!/usr/bin/env python3 # Needs https://github.com/google/bumble on Windows # See https://github.com/google/bumble/blob/main/docs/mkdocs/src/platforms/windows.md for usage. # You need to associate WinUSB with your Bluetooth interface. Once done, you can roll back to the original driver from Device Manager. import asyncio import argparse import logging import signal import struct import sys import threading import platform from queue import Queue from typing import Any, Callable, Dict, List, Optional from PyQt5.QtGui import QDoubleValidator from colorama import Fore, Style, init as colorama_init colorama_init(autoreset=True) from PyQt5.QtWidgets import ( QApplication, QWidget, QVBoxLayout, QLabel, QSlider, QCheckBox, QPushButton, QLineEdit, QGridLayout, QComboBox ) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject handler = logging.StreamHandler() class ColorFormatter(logging.Formatter): COLORS = { logging.DEBUG: Fore.BLUE, logging.INFO: Fore.GREEN, logging.WARNING: Fore.YELLOW, logging.ERROR: Fore.RED, logging.CRITICAL: Fore.MAGENTA, } def format(self, record): color = self.COLORS.get(record.levelno, "") prefix = f"{color}[{record.levelname}:{record.name}]{Style.RESET_ALL}" return f"{prefix} {record.getMessage()}" handler.setFormatter(ColorFormatter()) logging.basicConfig(level=logging.INFO, handlers=[handler]) logger = logging.getLogger("hearing-aid") OPCODE_READ_REQUEST: int = 0x0A OPCODE_READ_RESPONSE: int = 0x0B OPCODE_WRITE_REQUEST: int = 0x12 OPCODE_WRITE_RESPONSE: int = 0x13 OPCODE_HANDLE_VALUE_NTF: int = 0x1B ATT_HANDLES: Dict[str, int] = { 'LOUD_SOUND_REDUCTION': 0x1B, 'HEARING_AID': 0x2A, } ATT_CCCD_HANDLES: Dict[str, int] = { 'LOUD_SOUND_REDUCTION': ATT_HANDLES['LOUD_SOUND_REDUCTION'] + 1, 'HEARING_AID': ATT_HANDLES['HEARING_AID'] + 1, } AACP_HEADER = bytes([0x04, 0x00, 0x04, 0x00]) AACP_HANDSHAKE = bytes([0x00, 0x00, 0x04, 0x00, 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) class AACPOpcodes: SET_FEATURE_FLAGS = 0x4D REQUEST_NOTIFICATIONS = 0x0F CONTROL_COMMAND = 0x09 class ControlCommandId: HEARING_AID = 0x2C HPS_GAIN_SWIPE = 0x2F HEARING_ASSIST_CONFIG = 0x33 LISTENING_MODE = 0x0D OWNS_CONNECTION = 0x06 class BluezChannel: def __init__(self, socket, loop): self.socket = socket self.loop = loop self.sink = None self._running = True self._thread = threading.Thread(target=self._read_loop, daemon=True) self._thread.start() def send_pdu(self, pdu): try: logger.debug(f"Sending PDU: {pdu.hex() if pdu else 'None'}") self.socket.send(pdu) except OSError as e: logger.error(f"Socket send error: {e}") def _read_loop(self): while self._running: try: data = self.socket.recv(2048) logger.debug(f"Received PDU: {data.hex() if data else 'None'}") if not data: break if self.sink: self.loop.call_soon_threadsafe(self.sink, data) except OSError: break def stop(self): self._running = False try: self.socket.close() except: pass def _make_reader(ch): recv_q: asyncio.Queue = asyncio.Queue() def _sink(sdu): try: recv_q.put_nowait(sdu) except Exception: logger.debug("Dropping SDU in sink fallback") try: ch.sink = _sink except Exception: logger.debug("Failed to set channel.sink fallback") async def _reader_from_sink(): item = await recv_q.get() return item return _reader_from_sink class HearingAidSettings: def __init__(self, left_eq: List[float], right_eq: List[float], left_amp: float, right_amp: float, left_tone: float, right_tone: float, left_conv: bool, right_conv: bool, left_anr: float, right_anr: float, net_amp: float, balance: float, own_voice: float) -> None: self.left_eq = left_eq self.right_eq = right_eq self.left_amplification = left_amp self.right_amplification = right_amp self.left_tone = left_tone self.right_tone = right_tone self.left_conversation_boost = left_conv self.right_conversation_boost = right_conv self.left_ambient_noise_reduction = left_anr self.right_ambient_noise_reduction = right_anr self.net_amplification = net_amp self.balance = balance self.own_voice_amplification = own_voice def parse_hearing_aid_settings(data: bytes) -> Optional[HearingAidSettings]: if len(data) < 104: logger.warning("Data too short for parsing") return None buffer = data offset = 4 left_eq = [] for _ in range(8): val, = struct.unpack(' 0.5 offset += 4 left_anr, = struct.unpack(' 0.5 offset += 4 right_anr, = struct.unpack(' None: if handle not in self.listeners: self.listeners[handle] = [] self.listeners[handle].append(listener) async def enable_notifications(self, handle_name: str) -> None: await self.write_cccd(handle_name, b'\x01\x00') logger.info(f"Enabled notifications for handle {handle_name}") async def read(self, handle_name: str) -> bytes: handle_value = ATT_HANDLES[handle_name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_READ_REQUEST, lsb, msb]) self.channel.send_pdu(pdu) response = await self._read_response() return response async def write(self, handle_name: str, value: bytes) -> None: handle_value = ATT_HANDLES[handle_name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_WRITE_REQUEST, lsb, msb]) + value self.channel.send_pdu(pdu) try: await self._read_response(timeout=2.0) except Exception: logger.warning(f"No write response received for handle {handle_name}") async def write_cccd(self, handle_name: str, value: bytes) -> None: handle_value = ATT_CCCD_HANDLES[handle_name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_WRITE_REQUEST, lsb, msb]) + value self.channel.send_pdu(pdu) try: await self._read_response(timeout=2.0) except Exception: logger.warning(f"No CCCD write response received for handle {handle_name}") async def _read_response(self, timeout: float = 2.0) -> bytes: try: response = await asyncio.wait_for( asyncio.get_event_loop().run_in_executor( None, lambda: self.responses.get(timeout=timeout) ), timeout=timeout + 0.5 ) return response[1:] # Skip opcode except Exception: raise Exception("No response received") from None async def listen_notifications(self) -> None: logger.info("ATT notification listener started") while self.running: try: pdu = await self._recv_q.get() if not isinstance(pdu, (bytes, bytearray)): pdu = bytes(pdu) if len(pdu) > 0 and pdu[0] == OPCODE_HANDLE_VALUE_NTF: handle = pdu[1] | (pdu[2] << 8) value = pdu[3:] if handle in self.listeners: for listener in self.listeners[handle]: listener(value) else: self.responses.put(pdu) except asyncio.CancelledError: break except Exception as e: logger.debug(f"ATT listen error: {e}") break logger.info("ATT notification listener stopped") def stop(self): self.running = False class SignalEmitter(QObject): update_ui = pyqtSignal(HearingAidSettings) update_hearing_aid_toggle = pyqtSignal(bool) update_swipe_toggle = pyqtSignal(bool) update_loud_sound_reduction_toggle = pyqtSignal(bool) update_listening_mode = pyqtSignal(int) connected = pyqtSignal() class HearingAidApp(QWidget): def __init__(self, att_manager: ATTManager, aacp_manager: AACPManager, loop: asyncio.AbstractEventLoop) -> None: super().__init__() self.att_manager = att_manager self.aacp_manager = aacp_manager self.loop = loop self.emitter = SignalEmitter() self.emitter.update_ui.connect(self.on_update_ui) self.emitter.update_hearing_aid_toggle.connect(self._set_hearing_aid_toggle) self.emitter.update_swipe_toggle.connect(self._set_swipe_toggle) self.emitter.update_loud_sound_reduction_toggle.connect(self._set_loud_sound_reduction_toggle) self.emitter.update_listening_mode.connect(self._set_listening_mode) self.emitter.connected.connect(self.on_connected) self.debounce_timer = QTimer() self.debounce_timer.setSingleShot(True) self.debounce_timer.timeout.connect(self.send_settings) self.init_ui() def init_ui(self) -> None: self.setWindowTitle("LibrePods - Hearing Aid") layout = QVBoxLayout() # Status label self.status_label = QLabel("Connecting...") layout.addWidget(self.status_label, alignment=Qt.AlignCenter) # Listening Mode combo box self.listening_mode_combo = QComboBox() self.listening_mode_combo.addItems(["Off", "Noise Cancellation", "Transparency", "Adaptive"]) self.listening_mode_combo.currentIndexChanged.connect(self.on_listening_mode_changed) layout.addWidget(QLabel("Listening Mode")) layout.addWidget(self.listening_mode_combo) layout.addWidget(QLabel("Switch to Transparency mode for Hearing Aid to work.")) layout.addSpacing(20) self.loud_sound_reduction_checkbox = QCheckBox("Loud Sound Reduction") self.loud_sound_reduction_checkbox.stateChanged.connect(self.on_loud_sound_reduction_toggle) layout.addWidget(self.loud_sound_reduction_checkbox) self.hearing_aid_checkbox = QCheckBox("Hearing Aid") self.hearing_aid_checkbox.stateChanged.connect(self.on_hearing_aid_toggle) layout.addWidget(self.hearing_aid_checkbox) layout.addSpacing(20) # EQ Inputs eq_layout = QGridLayout() self.left_eq_inputs: List[QLineEdit] = [] self.right_eq_inputs: List[QLineEdit] = [] eq_labels = ["250Hz", "500Hz", "1kHz", "2kHz", "3kHz", "4kHz", "6kHz", "8kHz"] eq_layout.addWidget(QLabel("Frequency"), 0, 0) eq_layout.addWidget(QLabel("Left"), 0, 1, alignment=Qt.AlignCenter) eq_layout.addWidget(QLabel("Right"), 0, 2, alignment=Qt.AlignCenter) validator = QDoubleValidator(0.0, 100.0, 6) for i, label in enumerate(eq_labels): eq_layout.addWidget(QLabel(label), i + 1, 0) left_input = QLineEdit() right_input = QLineEdit() left_input.setValidator(validator) right_input.setValidator(validator) left_input.setPlaceholderText("Left") right_input.setPlaceholderText("Right") self.left_eq_inputs.append(left_input) self.right_eq_inputs.append(right_input) eq_layout.addWidget(left_input, i + 1, 1) eq_layout.addWidget(right_input, i + 1, 2) eq_group = QWidget() eq_group.setLayout(eq_layout) layout.addWidget(QLabel("Loss, in dBHL")) layout.addWidget(eq_group) # Amplification self.amp_slider = QSlider(Qt.Horizontal) self.amp_slider.setRange(-100, 100) self.amp_slider.setValue(0) layout.addWidget(QLabel("Amplification")) layout.addWidget(self.amp_slider) self.swipe_checkbox = QCheckBox("Swipe to control amplification") self.swipe_checkbox.stateChanged.connect(self.on_swipe_toggle) layout.addWidget(self.swipe_checkbox) # Balance self.balance_slider = QSlider(Qt.Horizontal) self.balance_slider.setRange(-100, 100) self.balance_slider.setValue(0) layout.addWidget(QLabel("Balance")) layout.addWidget(self.balance_slider) # Tone self.tone_slider = QSlider(Qt.Horizontal) self.tone_slider.setRange(-100, 100) self.tone_slider.setValue(0) layout.addWidget(QLabel("Tone")) layout.addWidget(self.tone_slider) # Ambient Noise Reduction self.anr_slider = QSlider(Qt.Horizontal) self.anr_slider.setRange(0, 100) self.anr_slider.setValue(0) layout.addWidget(QLabel("Ambient Noise Reduction")) layout.addWidget(self.anr_slider) # Conversation Boost self.conv_checkbox = QCheckBox("Conversation Boost") layout.addWidget(self.conv_checkbox) # Own Voice Amplification self.own_voice_slider = QSlider(Qt.Horizontal) self.own_voice_slider.setRange(0, 100) self.own_voice_slider.setValue(50) layout.addWidget(QLabel("Own Voice Amplification")) layout.addWidget(self.own_voice_slider) # Reset button self.reset_button = QPushButton("Reset adjustments") layout.addWidget(self.reset_button) # Connect signals for ATT settings for input_box in self.left_eq_inputs + self.right_eq_inputs: input_box.textChanged.connect(self.on_value_changed) self.amp_slider.valueChanged.connect(self.on_value_changed) self.balance_slider.valueChanged.connect(self.on_value_changed) self.tone_slider.valueChanged.connect(self.on_value_changed) self.anr_slider.valueChanged.connect(self.on_value_changed) self.conv_checkbox.stateChanged.connect(self.on_value_changed) self.own_voice_slider.valueChanged.connect(self.on_value_changed) self.reset_button.clicked.connect(self.reset_settings) self.setLayout(layout) def on_connected(self) -> None: self.status_label.setText("Connected") self.att_manager.register_listener(ATT_HANDLES['HEARING_AID'], self.on_att_notification) self.att_manager.register_listener(ATT_HANDLES['LOUD_SOUND_REDUCTION'], self.on_loud_sound_reduction_notification) self.aacp_manager.register_control_cmd_listener(ControlCommandId.HEARING_AID, self._on_hearing_aid_cmd) self.aacp_manager.register_control_cmd_listener(ControlCommandId.HPS_GAIN_SWIPE, self._on_swipe_cmd) self.aacp_manager.register_control_cmd_listener(ControlCommandId.LISTENING_MODE, self._on_listening_mode_cmd) asyncio.run_coroutine_threadsafe(self._initial_setup(), self.loop) def on_loud_sound_reduction_notification(self, value: bytes) -> None: enabled = value[0] == 0x01 if value else False self.emitter.update_loud_sound_reduction_toggle.emit(enabled) def _set_loud_sound_reduction_toggle(self, enabled: bool): self.loud_sound_reduction_checkbox.blockSignals(True) self.loud_sound_reduction_checkbox.setChecked(enabled) self.loud_sound_reduction_checkbox.blockSignals(False) def on_loud_sound_reduction_toggle(self, state: int): enabled = state == Qt.Checked asyncio.run_coroutine_threadsafe(self._send_loud_sound_reduction_toggle(enabled), self.loop) async def _send_loud_sound_reduction_toggle(self, enabled: bool): value = bytes([0x01]) if enabled else bytes([0x00]) await self.att_manager.write('LOUD_SOUND_REDUCTION', value) def _on_hearing_aid_cmd(self, value: bytes): enabled = value[0] == 0x01 if value else False self.emitter.update_hearing_aid_toggle.emit(enabled) def _on_swipe_cmd(self, value: bytes): enabled = value[0] == 0x01 if value else False self.emitter.update_swipe_toggle.emit(enabled) def _on_listening_mode_cmd(self, value: bytes): mode_value = value[0] if value else 0x01 index = mode_value - 1 if 1 <= mode_value <= 4 else 0 self.emitter.update_listening_mode.emit(index) def _set_hearing_aid_toggle(self, enabled: bool): self.hearing_aid_checkbox.blockSignals(True) self.hearing_aid_checkbox.setChecked(enabled) self.hearing_aid_checkbox.blockSignals(False) def _set_swipe_toggle(self, enabled: bool): self.swipe_checkbox.blockSignals(True) self.swipe_checkbox.setChecked(enabled) self.swipe_checkbox.blockSignals(False) def _set_listening_mode(self, index: int): self.listening_mode_combo.blockSignals(True) self.listening_mode_combo.setCurrentIndex(index) self.listening_mode_combo.blockSignals(False) def on_hearing_aid_toggle(self, state: int): enabled = state == Qt.Checked asyncio.run_coroutine_threadsafe(self._send_hearing_aid_toggle(enabled), self.loop) def on_swipe_toggle(self, state: int): enabled = state == Qt.Checked asyncio.run_coroutine_threadsafe(self._send_swipe_toggle(enabled), self.loop) def on_listening_mode_changed(self, index: int): value = index + 1 asyncio.run_coroutine_threadsafe(self._send_listening_mode(value), self.loop) async def _send_hearing_aid_toggle(self, enabled: bool): if enabled: await self.aacp_manager.send_control_command(ControlCommandId.HEARING_AID, bytes([0x01, 0x01])) await self.aacp_manager.send_control_command(ControlCommandId.HEARING_ASSIST_CONFIG, bytes([0x01])) else: await self.aacp_manager.send_control_command(ControlCommandId.HEARING_AID, bytes([0x02, 0x02])) await self.aacp_manager.send_control_command(ControlCommandId.HEARING_ASSIST_CONFIG, bytes([0x02])) async def _send_swipe_toggle(self, enabled: bool): value = bytes([0x01]) if enabled else bytes([0x02]) await self.aacp_manager.send_control_command(ControlCommandId.HPS_GAIN_SWIPE, value) async def _send_listening_mode(self, value: int): await self.aacp_manager.send_control_command(ControlCommandId.LISTENING_MODE, bytes([value])) async def _initial_setup(self): try: await self.att_manager.enable_notifications('HEARING_AID') await self.att_manager.enable_notifications('LOUD_SOUND_REDUCTION') data = await self.att_manager.read('HEARING_AID') settings = parse_hearing_aid_settings(data) if settings: self.emitter.update_ui.emit(settings) logger.info("Initial ATT settings loaded") loud_sound_data = await self.att_manager.read('LOUD_SOUND_REDUCTION') loud_sound_enabled = loud_sound_data[0] == 0x01 if loud_sound_data else False self.emitter.update_loud_sound_reduction_toggle.emit(loud_sound_enabled) logger.info("Initial loud sound reduction setting loaded") except Exception as e: logger.error(f"Initial ATT setup failed: {e}") def on_att_notification(self, value: bytes) -> None: settings = parse_hearing_aid_settings(value) if settings: self.emitter.update_ui.emit(settings) def on_update_ui(self, settings: HearingAidSettings) -> None: self.amp_slider.blockSignals(True) self.balance_slider.blockSignals(True) self.tone_slider.blockSignals(True) self.anr_slider.blockSignals(True) self.conv_checkbox.blockSignals(True) self.own_voice_slider.blockSignals(True) self.amp_slider.setValue(int(settings.net_amplification * 100)) self.balance_slider.setValue(int(settings.balance * 100)) self.tone_slider.setValue(int(settings.left_tone * 100)) self.anr_slider.setValue(int(settings.left_ambient_noise_reduction * 100)) self.conv_checkbox.setChecked(settings.left_conversation_boost) self.own_voice_slider.setValue(int(settings.own_voice_amplification * 100)) for i, value in enumerate(settings.left_eq): self.left_eq_inputs[i].blockSignals(True) self.left_eq_inputs[i].setText(f"{value:.2f}") self.left_eq_inputs[i].blockSignals(False) for i, value in enumerate(settings.right_eq): self.right_eq_inputs[i].blockSignals(True) self.right_eq_inputs[i].setText(f"{value:.2f}") self.right_eq_inputs[i].blockSignals(False) self.amp_slider.blockSignals(False) self.balance_slider.blockSignals(False) self.tone_slider.blockSignals(False) self.anr_slider.blockSignals(False) self.conv_checkbox.blockSignals(False) self.own_voice_slider.blockSignals(False) def on_value_changed(self) -> None: self.debounce_timer.start(100) def send_settings(self) -> None: asyncio.run_coroutine_threadsafe(self._send_settings_async(), self.loop) async def _send_settings_async(self) -> None: try: amp = self.amp_slider.value() / 100.0 balance = self.balance_slider.value() / 100.0 tone = self.tone_slider.value() / 100.0 anr = self.anr_slider.value() / 100.0 conv = self.conv_checkbox.isChecked() own_voice = self.own_voice_slider.value() / 100.0 left_amp = amp + (0.5 - balance) * amp * 2 if balance < 0 else amp right_amp = amp + (balance - 0.5) * amp * 2 if balance > 0 else amp left_eq = [float(input_box.text() or 0) for input_box in self.left_eq_inputs] right_eq = [float(input_box.text() or 0) for input_box in self.right_eq_inputs] settings = HearingAidSettings( left_eq, right_eq, left_amp, right_amp, tone, tone, conv, conv, anr, anr, amp, balance, own_voice ) await self._send_hearing_aid_settings(settings) except Exception as e: logger.error(f"Failed to send settings: {e}") async def _send_hearing_aid_settings(self, settings: HearingAidSettings) -> None: data = await self.att_manager.read('HEARING_AID') if len(data) < 104: logger.error("Read data too short for sending settings") return buffer = bytearray(data) # buffer[0] = 0x02 # buffer[1] = 0x02 buffer[2] = 0x64 for i in range(8): struct.pack_into(' None: self.att_manager.stop() self.aacp_manager.stop() event.accept() # Make sure shutdown event is created earlier SHUTDOWN_EVENT = threading.Event() # Add a global shutdown event to signal the async loop to exit cleanly async def run_bluez(bdaddr: str, att_manager: ATTManager, aacp_manager: AACPManager, app_window: HearingAidApp, shutdown_event: threading.Event = SHUTDOWN_EVENT): try: import bluetooth except ImportError: logger.error("PyBluez (bluetooth) not installed. Install it or use --bumble.") return 1 logger.info(f"Connecting to {bdaddr} using bluez sockets...") att_channel = None aacp_channel = None att_listen_task = None aacp_listen_task = None try: # ATT att_sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) att_sock.connect((bdaddr, 31)) logger.info("Connected to ATT (PSM 31)") # AACP aacp_sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) aacp_sock.connect((bdaddr, 4097)) logger.info("Connected to AACP (PSM 4097)") loop = asyncio.get_running_loop() att_channel = BluezChannel(att_sock, loop) att_manager.set_channel(att_channel) aacp_channel = BluezChannel(aacp_sock, loop) aacp_manager.set_channel(aacp_channel) # AACP Setup await aacp_manager.send_handshake() await asyncio.sleep(0.1) await aacp_manager.send_notification_request() await asyncio.sleep(0.1) await aacp_manager.send_set_feature_flags() await asyncio.sleep(0.1) await aacp_manager.send_control_command(ControlCommandId.OWNS_CONNECTION, bytes([0x01])) app_window.emitter.connected.emit() att_listen_task = asyncio.create_task(att_manager.listen_notifications()) aacp_listen_task = asyncio.create_task(aacp_manager.listen()) logger.info("BlueZ connection established. UI is now active.") try: await loop.run_in_executor(None, shutdown_event.wait) except asyncio.CancelledError: pass except Exception as e: logger.error(f"BlueZ connection failed: {e}") return 1 finally: # Ensure we attempt a clean shutdown if att_listen_task: att_listen_task.cancel() try: await att_listen_task except asyncio.CancelledError: pass except Exception: pass if aacp_listen_task: aacp_listen_task.cancel() try: await aacp_listen_task except asyncio.CancelledError: pass except Exception: pass if att_channel: try: att_channel.stop() except Exception: pass if aacp_channel: try: aacp_channel.stop() except Exception: pass return 0 async def run_bumble(bdaddr: str, att_manager: ATTManager, aacp_manager: AACPManager, app_window: HearingAidApp, shutdown_event: threading.Event = SHUTDOWN_EVENT): try: from bumble.l2cap import ClassicChannelSpec, ClassicChannel from bumble.transport import open_transport from bumble.device import Device, Connection from bumble.host import Host from bumble.core import PhysicalTransport, UUID from bumble.pairing import PairingConfig, PairingDelegate from bumble.hci import HCI_Error from bumble.keys import JsonKeyStore from bumble.sdp import ServiceAttribute, DataElement except ImportError: logger.error("Bumble not installed") return 1 transport = None device = None connection = None att_listen_task = None aacp_listen_task = None try: async def get_device(): logger.info("Opening transport...") transport = await open_transport("usb:0") device = Device(host=Host(controller_source=transport.source, controller_sink=transport.sink)) device.classic_enabled = True device.le_enabled = False device.keystore = JsonKeyStore.from_device(device, "./keys.json") device.pairing_config_factory = lambda conn: PairingConfig( sc=True, mitm=False, bonding=True, delegate=PairingDelegate(io_capability=PairingDelegate.NO_OUTPUT_NO_INPUT) ) await device.power_on() logger.info("Device powered on") def on_l2cap_connection(channel: ClassicChannel): logger.info("Incoming L2CAP connection on PSM %d", channel.psm) async def handle_data(): try: reader = _make_reader(channel) while True: data = await reader() print(f"Received PDU on PSM {channel.psm}: {data.hex() if data else 'None'}") except Exception as e: logger.info("L2CAP channel on PSM %d closed: %s", channel.psm, e) asyncio.create_task(handle_data()) att_server_spec = ClassicChannelSpec(psm=31, mtu=512) device.create_l2cap_server(att_server_spec, handler=on_l2cap_connection) logger.info("L2CAP server registered on PSM 0x%04X", att_server_spec.psm) device.sdp_service_records = { 0x4f491200: [ ServiceAttribute(0x0000, DataElement.unsigned_integer_32(0x4f491200)), ServiceAttribute(0x0001, DataElement.sequence([DataElement.uuid(UUID.from_16_bits(0x1200))])), ServiceAttribute(0x0002, DataElement.unsigned_integer_32(0x00000000)), ServiceAttribute(0x0005, DataElement.sequence([DataElement.uuid(UUID.from_16_bits(0x1002))])), ServiceAttribute(0x0006, DataElement.sequence([ DataElement.unsigned_integer_16(0x656e), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0100), DataElement.unsigned_integer_16(0x6672), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0110), DataElement.unsigned_integer_16(0x6465), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0120), DataElement.unsigned_integer_16(0x6a61), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0130) ])), ServiceAttribute(0x0008, DataElement.unsigned_integer_8(0xff)), ServiceAttribute(0x0101, DataElement.text_string('PnP Information')), ServiceAttribute(0x0200, DataElement.unsigned_integer_16(0x0102)), ServiceAttribute(0x0201, DataElement.unsigned_integer_16(0x004c)), ServiceAttribute(0x0202, DataElement.unsigned_integer_16(0x0000)), ServiceAttribute(0x0203, DataElement.unsigned_integer_16(0x0f60)), ServiceAttribute(0x0204, DataElement.boolean(True)), ServiceAttribute(0x0205, DataElement.unsigned_integer_16(0x0001)), ServiceAttribute(0xa000, DataElement.unsigned_integer_32(0x00a026c4)), ServiceAttribute(0xafff, DataElement.unsigned_integer_16(0x0001)) ] } logger.info("SDP service records set up") return transport, device async def setup_aacp(conn: Connection): spec = ClassicChannelSpec(psm=4097, mtu=2048) logger.info("Requesting AACP channel on PSM = 0x%04X", spec.psm) if not conn.is_encrypted: await conn.encrypt() await asyncio.sleep(0.05) channel: ClassicChannel = await conn.create_l2cap_channel(spec=spec) aacp_manager.set_channel(channel) logger.info("AACP channel established") await aacp_manager.send_handshake() await asyncio.sleep(0.1) await aacp_manager.send_notification_request() await asyncio.sleep(0.1) await aacp_manager.send_set_feature_flags() return channel async def setup_att(conn: Connection): spec = ClassicChannelSpec(psm=31, mtu=512) logger.info("Requesting ATT channel on PSM = 0x%04X", spec.psm) if not conn.is_encrypted: await conn.encrypt() await asyncio.sleep(0.05) channel: ClassicChannel = await conn.create_l2cap_channel(spec=spec) att_manager.set_channel(channel) logger.info("ATT channel established") return channel transport, device = await get_device() logger.info("Connecting to %s (BR/EDR)...", bdaddr) connection = await device.connect(bdaddr, PhysicalTransport.BR_EDR) logger.info("Connected to %s (handle %s)", connection.peer_address, connection.handle) logger.info("Authenticating...") await connection.authenticate() if not connection.is_encrypted: logger.info("Encrypting link...") await connection.encrypt() await setup_aacp(connection) await setup_att(connection) app_window.emitter.connected.emit() att_listen_task = asyncio.create_task(att_manager.listen_notifications()) aacp_listen_task = asyncio.create_task(aacp_manager.listen()) logger.info("Connection established. UI is now active.") try: loop = asyncio.get_running_loop() await loop.run_in_executor(None, shutdown_event.wait) except asyncio.CancelledError: pass except HCI_Error as e: if "PAIRING_NOT_ALLOWED_ERROR" in str(e): logger.error("Put your device into pairing mode and run the script again") else: logger.error("HCI error: %s", e) except Exception as e: logger.error("Unexpected error: %s", e) finally: logger.info("Shutting down bumble connection...") # Cancel and await listening tasks if att_listen_task: att_listen_task.cancel() try: await att_listen_task except asyncio.CancelledError: pass except Exception: pass if aacp_listen_task: aacp_listen_task.cancel() try: await aacp_listen_task except asyncio.CancelledError: pass except Exception: pass # Attempt to cleanly disconnect the remote device if connection: try: await connection.disconnect() except Exception: pass if transport: logger.info("Closing transport...") try: await transport.close() except Exception: pass logger.info("Transport closed") return 0 def main(): parser = argparse.ArgumentParser() parser.add_argument("bdaddr", help="Bluetooth address of the hearing aid device") parser.add_argument("--debug", action="store_true", help="Enable debug logging") parser.add_argument("--bumble", action="store_true", help="Force use of Bumble stack (default on Windows)") args = parser.parse_args() logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO) qt_app = QApplication(sys.argv) loop = asyncio.new_event_loop() att_manager = ATTManager() aacp_manager = AACPManager() window = HearingAidApp(att_manager, aacp_manager, loop) window.show() def quit_app(signum, frame): # Signal the application shutdown to both main thread and the async thread SHUTDOWN_EVENT.set() try: att_manager.stop() aacp_manager.stop() except Exception: pass qt_app.quit() signal.signal(signal.SIGINT, quit_app) signal.signal(signal.SIGTERM, quit_app) def run_async(): asyncio.set_event_loop(loop) use_bumble = args.bumble or platform.system() == "Windows" try: if use_bumble: loop.run_until_complete(run_bumble(args.bdaddr, att_manager, aacp_manager, window, SHUTDOWN_EVENT)) else: import subprocess ps_output = subprocess.run(["ps", "-A"], capture_output=True, text=True).stdout if "librepods" in ps_output: logger.error("LibrePods is running. Please close it before using this script.") loop.call_soon_threadsafe(loop.stop) quit_app(None, None) return loop.run_until_complete( run_bluez(args.bdaddr, att_manager, aacp_manager, window, SHUTDOWN_EVENT) ) except Exception as e: logger.error("Async thread error: %s", e) finally: loop.call_soon_threadsafe(loop.stop) # Keep the async thread non-daemon so cleanup can run async_thread = threading.Thread(target=run_async, daemon=False) async_thread.start() timer = QTimer() timer.timeout.connect(lambda: None) timer.start(100) # Run GUI and wait for async thread cleanup exit_code = qt_app.exec_() # Ensure shutdown is signaled (in case user closed the window) SHUTDOWN_EVENT.set() # Wait for async thread to finish gracefully for a short timeout async_thread.join(timeout=10) sys.exit(exit_code) if __name__ == "__main__": main()