import sys import socket import struct import threading from queue import Queue import logging import signal # Configure logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QSlider, QCheckBox, QPushButton, QLineEdit, QFormLayout, QGridLayout from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject OPCODE_READ_REQUEST = 0x0A OPCODE_WRITE_REQUEST = 0x12 OPCODE_HANDLE_VALUE_NTF = 0x1B ATT_HANDLES = { 'TRANSPARENCY': 0x18, 'LOUD_SOUND_REDUCTION': 0x1B, 'HEARING_AID': 0x2A, } ATT_CCCD_HANDLES = { 'TRANSPARENCY': ATT_HANDLES['TRANSPARENCY'] + 1, 'LOUD_SOUND_REDUCTION': ATT_HANDLES['LOUD_SOUND_REDUCTION'] + 1, 'HEARING_AID': ATT_HANDLES['HEARING_AID'] + 1, } PSM_ATT = 31 class ATTManager: def __init__(self, mac_address): self.mac_address = mac_address self.sock = None self.responses = Queue() self.listeners = {} self.notification_thread = None self.running = False # Avoid logging full MAC address to prevent sensitive data exposure mac_tail = ':'.join(mac_address.split(':')[-2:]) if isinstance(mac_address, str) and ':' in mac_address else '[redacted]' logging.info(f"ATTManager initialized") def connect(self): logging.info("Attempting to connect to ATT socket") self.sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) self.sock.connect((self.mac_address, PSM_ATT)) self.sock.settimeout(0.1) self.running = True self.notification_thread = threading.Thread(target=self._listen_notifications) self.notification_thread.start() logging.info("Connected to ATT socket") def disconnect(self): logging.info("Disconnecting from ATT socket") self.running = False if self.sock: logging.info("Closing socket") self.sock.close() if self.notification_thread: logging.info("Stopping notification thread") self.notification_thread.join(timeout=1.0) logging.info("Disconnected from ATT socket") def register_listener(self, handle, listener): if handle not in self.listeners: self.listeners[handle] = [] self.listeners[handle].append(listener) logging.debug(f"Registered listener for handle {handle}") def unregister_listener(self, handle, listener): if handle in self.listeners: self.listeners[handle].remove(listener) logging.debug(f"Unregistered listener for handle {handle}") def enable_notifications(self, handle): self.write_cccd(handle, b'\x01\x00') logging.info(f"Enabled notifications for handle {handle.name}") def read(self, handle): handle_value = ATT_HANDLES[handle.name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_READ_REQUEST, lsb, msb]) logging.debug(f"Sending read request for handle {handle.name}: {pdu.hex()}") self._write_raw(pdu) response = self._read_response() logging.debug(f"Read response for handle {handle.name}: {response.hex()}") return response def write(self, handle, value): handle_value = ATT_HANDLES[handle.name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_WRITE_REQUEST, lsb, msb]) + value logging.debug(f"Sending write request for handle {handle.name}: {pdu.hex()}") self._write_raw(pdu) try: self._read_response() logging.debug(f"Write response received for handle {handle.name}") except: logging.warning(f"No write response received for handle {handle.name}") def write_cccd(self, handle, value): handle_value = ATT_CCCD_HANDLES[handle.name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_WRITE_REQUEST, lsb, msb]) + value logging.debug(f"Sending CCCD write request for handle {handle.name}: {pdu.hex()}") self._write_raw(pdu) try: self._read_response() logging.debug(f"CCCD write response received for handle {handle.name}") except: logging.warning(f"No CCCD write response received for handle {handle.name}") def _write_raw(self, pdu): self.sock.send(pdu) logging.debug(f"Sent PDU: {pdu.hex()}") def _read_pdu(self): try: data = self.sock.recv(512) logging.debug(f"Received PDU: {data.hex()}") return data except socket.timeout: return None except: raise def _read_response(self, timeout=2.0): try: response = self.responses.get(timeout=timeout)[1:] # Skip opcode logging.debug(f"Response received: {response.hex()}") return response except: logging.error("No response received within timeout") raise Exception("No response received") def _listen_notifications(self): logging.info("Starting notification listener thread") while self.running: try: pdu = self._read_pdu() except: break if pdu is None: continue if len(pdu) > 0 and pdu[0] == OPCODE_HANDLE_VALUE_NTF: logging.debug(f"Notification PDU received: {pdu.hex()}") handle = pdu[1] | (pdu[2] << 8) value = pdu[3:] logging.debug(f"Notification for handle {handle}: {value.hex()}") if handle in self.listeners: for listener in self.listeners[handle]: listener(value) else: self.responses.put(pdu) logging.info("Notification listener thread stopped, trying to reconnect") if self.running: try: self.connect() except Exception as e: logging.error(f"Reconnection failed: {e}") class HearingAidSettings: def __init__(self, left_eq, right_eq, left_amp, right_amp, left_tone, right_tone, left_conv, right_conv, left_anr, right_anr, net_amp, balance, own_voice): self.left_eq = left_eq self.right_eq = right_eq self.left_amplification = left_amp self.right_amplification = right_amp self.left_tone = left_tone self.right_tone = right_tone self.left_conversation_boost = left_conv self.right_conversation_boost = right_conv self.left_ambient_noise_reduction = left_anr self.right_ambient_noise_reduction = right_anr self.net_amplification = net_amp self.balance = balance self.own_voice_amplification = own_voice logging.debug(f"HearingAidSettings created: amp={net_amp}, balance={balance}, tone={left_tone}, anr={left_anr}, conv={left_conv}") def parse_hearing_aid_settings(data): logging.debug(f"Parsing hearing aid settings from data: {data.hex()}") if len(data) < 104: logging.warning("Data too short for parsing") return None buffer = data offset = 0 offset += 4 logging.info(f"Parsing hearing aid settings, starting read at offset 4, value: {buffer[offset]:02x}") left_eq = [] for i in range(8): val, = struct.unpack(' 0.5 offset += 4 left_anr, = struct.unpack(' 0.5 offset += 4 right_anr, = struct.unpack(' 0 else amp left_eq = [float(input_box.text() or 0) for input_box in self.left_eq_inputs] right_eq = [float(input_box.text() or 0) for input_box in self.right_eq_inputs] settings = HearingAidSettings( left_eq, right_eq, left_amp, right_amp, tone, tone, conv, conv, anr, anr, amp, balance, own_voice ) threading.Thread(target=send_hearing_aid_settings, args=(self.att_manager, settings)).start() def reset_settings(self): logging.debug("Resetting settings to defaults") self.amp_slider.setValue(0) self.balance_slider.setValue(0) self.tone_slider.setValue(0) self.anr_slider.setValue(50) self.conv_checkbox.setChecked(False) self.own_voice_slider.setValue(50) self.on_value_changed() def closeEvent(self, event): logging.info("Closing app") self.att_manager.disconnect() event.accept() if __name__ == "__main__": mac = None if len(sys.argv) != 2: logging.error("Usage: python hearing-aid-adjustments.py ") sys.exit(1) mac = sys.argv[1] mac_regex = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$' import re if not re.match(mac_regex, mac): logging.error("Invalid MAC address format") sys.exit(1) logging.info(f"Starting app") app = QApplication(sys.argv) def quit_app(signum, frame): app.quit() signal.signal(signal.SIGINT, quit_app) window = HearingAidApp(mac) window.show() sys.exit(app.exec_())