From 2aeb2b02a72f8405ddd0e53b4cc31cf12ed25095 Mon Sep 17 00:00:00 2001 From: Kavish Devar Date: Sun, 6 Oct 2024 21:21:03 +0530 Subject: [PATCH] Receive adaptive anc notifications --- .github/ISSUE_TEMPLATE/bug_report.md | 11 +--- .idea/.gitignore | 8 +++ .idea/AirPodsLikeNormal.iml | 12 ++++ .../inspectionProfiles/profiles_settings.xml | 6 ++ .idea/misc.xml | 7 +++ .idea/modules.xml | 8 +++ .idea/vcs.xml | 6 ++ aln/AirPods/Pro2.py | 8 +-- aln/Capabilites/__init__.py | 8 +-- aln/Notifications/ANC.py | 50 +++++++++++++++ aln/Notifications/Listener.py | 5 ++ aln/Notifications/__init__.py | 2 + aln/__init__.py | 8 ++- aln/enums.py | 19 +++--- examples/daemon/ear-detection.py | 12 ++-- examples/daemon/read-data.py | 4 +- examples/daemon/set-anc.py | 8 +-- examples/daemon/tray.py | 63 ++++++++++++++----- examples/logger-and-anc.py | 8 +-- examples/standalone.py | 2 +- start-daemon.py | 23 +++++-- 21 files changed, 215 insertions(+), 63 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/AirPodsLikeNormal.iml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 aln/Notifications/ANC.py diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index dd84ea7..cf15bd1 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -24,15 +24,8 @@ A clear and concise description of what you expected to happen. If applicable, add screenshots to help explain your problem. **Desktop (please complete the following information):** - - OS: [e.g. iOS] - - Browser [e.g. chrome, safari] - - Version [e.g. 22] - -**Smartphone (please complete the following information):** - - Device: [e.g. iPhone6] - - OS: [e.g. iOS8.1] - - Browser [e.g. stock browser, safari] - - Version [e.g. 22] + - Distro: [e.g. Ubuntu, KDE Neon, Arch] + - Version [e.g. 22.04] **Additional context** Add any other context about the problem here. diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/AirPodsLikeNormal.iml b/.idea/AirPodsLikeNormal.iml new file mode 100644 index 0000000..8b8c395 --- /dev/null +++ b/.idea/AirPodsLikeNormal.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..9de2865 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..c4f2625 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/aln/AirPods/Pro2.py b/aln/AirPods/Pro2.py index 223dea3..5d6e2db 100644 --- a/aln/AirPods/Pro2.py +++ b/aln/AirPods/Pro2.py @@ -4,10 +4,10 @@ class Pro2: self.name = 'AirPods Pro 2' self.capabilites = { Capabilites.NOISE_CANCELLATION: [ - Capabilites.NoiseCancellation.Off, - Capabilites.NoiseCancellation.On, - Capabilites.NoiseCancellation.Transparency, - Capabilites.NoiseCancellation.Adaptive, + Capabilites.NoiseCancellation.OFF, + Capabilites.NoiseCancellation.ON, + Capabilites.NoiseCancellation.TRANSPARENCY, + Capabilites.NoiseCancellation.ADAPTIVE, ], Capabilites.CONVERSATION_AWARENESS: True, Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY: True diff --git a/aln/Capabilites/__init__.py b/aln/Capabilites/__init__.py index 2452695..088ceb7 100644 --- a/aln/Capabilites/__init__.py +++ b/aln/Capabilites/__init__.py @@ -1,8 +1,8 @@ class NoiseCancellation: - Off = b"\x01" - On = b"\x02" - Transparency = b"\x03" - Adaptive = b"\x04" + OFF = b"\x01" + ON = b"\x02" + TRANSPARENCY = b"\x03" + ADAPTIVE = b"\x04" class ConversationAwareness: Off = b"\x02" diff --git a/aln/Notifications/ANC.py b/aln/Notifications/ANC.py new file mode 100644 index 0000000..cd130cb --- /dev/null +++ b/aln/Notifications/ANC.py @@ -0,0 +1,50 @@ +from ..enums import enums +from ..Capabilites import Capabilites +class ANCNotification: + NOTIFICATION_PREFIX = enums.NOISE_CANCELLATION_PREFIX + OFF = Capabilites.NoiseCancellation.OFF + ON = Capabilites.NoiseCancellation.ON + TRANSPARENCY = Capabilites.NoiseCancellation.TRANSPARENCY + ADAPTIVE = Capabilites.NoiseCancellation.ADAPTIVE + + def __init__(self): + pass + + def isANCData(self, data: bytes): + # 04 00 04 00 09 00 0D 01 00 00 00 + if len(data) != 11: + return False + + if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()): + return True + else: + return False + + def setANC(self, data: bytes): + self.status = data[7] + pass + + def getANC(self, returnString: bool = False, fromInt: int = None): + if fromInt is not None: + fromInt = bytes([fromInt]) + if fromInt == self.OFF: + return "Off" + elif fromInt == self.ON: + return "On" + elif fromInt == self.TRANSPARENCY: + return "Transparency" + elif fromInt == self.ADAPTIVE: + return "Adaptive" + pass + if returnString: + return self.status + else: + if self.status == self.OFF: + return "Off" + elif self.status == self.ON: + return "On" + elif self.status == self.TRANSPARENCY: + return "Transparency" + elif self.status == self.ADAPTIVE: + return "Adaptive" + pass \ No newline at end of file diff --git a/aln/Notifications/Listener.py b/aln/Notifications/Listener.py index 22ed6ea..113f0ac 100644 --- a/aln/Notifications/Listener.py +++ b/aln/Notifications/Listener.py @@ -2,6 +2,7 @@ from bluetooth import BluetoothSocket import threading from .Battery import BatteryNotification from .EarDetection import EarDetectionNotification +from .ANC import ANCNotification import logging logging = logging.getLogger(__name__) @@ -16,6 +17,7 @@ class NotificationListener: self.socket = socket self.BatteryNotification = BatteryNotification() self.EarDetectionNotification = EarDetectionNotification() + self.ANCNotification = ANCNotification() self.callback = callback pass @@ -31,6 +33,9 @@ class NotificationListener: if self.EarDetectionNotification.isEarDetectionData(data): self.EarDetectionNotification.setEarDetection(data) self.callback(self.EAR_DETECTION_UPDATED, data) + if self.ANCNotification.isANCData(data): + self.ANCNotification.setANC(data) + self.callback(self.ANC_UPDATED, data) else: self.callback(self.UNKNOWN, data) pass diff --git a/aln/Notifications/__init__.py b/aln/Notifications/__init__.py index 1d0780d..674454d 100644 --- a/aln/Notifications/__init__.py +++ b/aln/Notifications/__init__.py @@ -17,11 +17,13 @@ class Notifications: self.notificationListener = NotificationListener(self.socket, callback) self.BatteryNotification = self.notificationListener.BatteryNotification self.EarDetectionNotification = self.notificationListener.EarDetectionNotification + self.ANCNotification = self.notificationListener.ANCNotification pass def initialize(self): try: self.socket.send(enums.REQUEST_NOTIFICATIONS) + self.socket.send(enums.SET_SPECIFIC_FEATURES) self.notificationListener.start() except bluetooth.btcommon.BluetoothError as e: diff --git a/aln/__init__.py b/aln/__init__.py index eee78f9..7f83bb1 100644 --- a/aln/__init__.py +++ b/aln/__init__.py @@ -1,7 +1,4 @@ from .Notifications import Notifications -from .Capabilites import Capabilites -from .enums import enums - import bluetooth import logging @@ -25,6 +22,7 @@ class Connection: self.notifications = Notifications(self.socket, callback) self.notificationListener = self.notifications.notificationListener self.BatteryNotification = self.notifications.BatteryNotification + self.ANCNotification = self.notifications.ANCNotification self.notifications.initialize() def send(self, data: bytes): @@ -46,6 +44,10 @@ class Connection: logging = logging.getLogger("In-Ear Status") logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}') pass + elif notification_type == Notifications.ANC_UPDATED: + logging = logging.getLogger("ANC Status") + logging.debug(f'{self.notificationListener.ANCNotification.getANC()}') + pass elif notification_type == Notifications.UNKNOWN: logging = logging.getLogger("Unknown Notification") hex_data = ' '.join(f'{byte:02x}' for byte in data) diff --git a/aln/enums.py b/aln/enums.py index 311238c..20b0d79 100644 --- a/aln/enums.py +++ b/aln/enums.py @@ -6,14 +6,17 @@ class enums: CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY SEND_PREFIX = b'\x04\x00\x04\x00' SETTINGS = b"\x09\x00" - SETTINGS_SEND_SUFFIX = b'\x00\x00\x00' + SUFFIX = b'\x00\x00\x00' NOTIFICATION_FILTER = b'\x0f' + SPECIFIC_FEATURES = b'\x4d' + SET_SPECIFIC_FEATURES = SEND_PREFIX + SPECIFIC_FEATURES + b"\x00\xff\x00\x00\x00\x00\x00\x00\x00" REQUEST_NOTIFICATIONS = SEND_PREFIX + NOTIFICATION_FILTER + b"\x00\xff\xff\xff\xff" HANDSHAKE = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" - - SET_NOISE_CANCELLATION_OFF = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Off + SETTINGS_SEND_SUFFIX - SET_NOISE_CANCELLATION_ON = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.On + SETTINGS_SEND_SUFFIX - SET_NOISE_CANCELLATION_TRANSPARENCY = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Transparency + SETTINGS_SEND_SUFFIX - SET_NOISE_CANCELLATION_ADAPTIVE = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Adaptive + SETTINGS_SEND_SUFFIX - SET_CONVERSATION_AWARENESS_OFF = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.Off + SETTINGS_SEND_SUFFIX - SET_CONVERSATION_AWARENESS_ON = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.On + SETTINGS_SEND_SUFFIX \ No newline at end of file + NOISE_CANCELLATION_PREFIX = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + NOISE_CANCELLATION_OFF = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.OFF + SUFFIX + NOISE_CANCELLATION_ON = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ON + SUFFIX + NOISE_CANCELLATION_TRANSPARENCY = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.TRANSPARENCY + SUFFIX + NOISE_CANCELLATION_ADAPTIVE = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ADAPTIVE + SUFFIX + + SET_CONVERSATION_AWARENESS_OFF = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.Off + SUFFIX + SET_CONVERSATION_AWARENESS_ON = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.On + SUFFIX \ No newline at end of file diff --git a/examples/daemon/ear-detection.py b/examples/daemon/ear-detection.py index 422ed11..37fd5b0 100644 --- a/examples/daemon/ear-detection.py +++ b/examples/daemon/ear-detection.py @@ -7,8 +7,6 @@ import time import os import logging - - class CustomFormatter(logging.Formatter): # Define color codes for different log levels COLORS = { @@ -23,10 +21,10 @@ class CustomFormatter(logging.Formatter): # Apply color to the level name levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8) record.levelname = levelname - + # Format the message formatted_message = super().format(record) - + return formatted_message # Custom formatter with fixed width for level name @@ -49,14 +47,14 @@ class MediaController: def playMusic(self): logging.info("Playing music") - subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7")) + subprocess.call(("playerctl", "play")) def pauseMusic(self): logging.info("Pausing music") - subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7")) + subprocess.call(("playerctl", "--all-players", "pause")) def isPlaying(self): - return subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() == "Playing" + return "Playing" in subprocess.getoutput("playerctl --all-players status").strip() def handlePlayPause(self, data): primary_status = data[0] diff --git a/examples/daemon/read-data.py b/examples/daemon/read-data.py index 25760b0..c6b01f9 100644 --- a/examples/daemon/read-data.py +++ b/examples/daemon/read-data.py @@ -1,7 +1,7 @@ import socket import json import logging - +from aln.Notifications.ANC import ANCNotification SOCKET_PATH = "/tmp/airpods_daemon.sock" import logging @@ -56,6 +56,8 @@ def read(): logging.info(f"\033[1;33mReceived battery status: {b} - {battery_data['status']} - {battery_data['level']}\033[1;0m") elif data["type"] == "ear_detection": logging.info(f"\033[1;33mReceived ear detection status: {data['primary']} - {data['secondary']}\033[1;0m") + elif data["type"] == "anc": + logging.info(f"\033[1;33mReceived ANC status: {data['status']}\033[1;0m") elif data["type"] == "unknown": logging.info(f"Received data: {data['data']}") else: diff --git a/examples/daemon/set-anc.py b/examples/daemon/set-anc.py index ebfe556..d4ee085 100644 --- a/examples/daemon/set-anc.py +++ b/examples/daemon/set-anc.py @@ -62,12 +62,12 @@ if __name__ == "__main__": args = parse_arguments() if args.mode == "off" or args.mode == "1": - command = enums.SET_NOISE_CANCELLATION_OFF + command = enums.NOISE_CANCELLATION_OFF elif args.mode == "on" or args.mode == "2": - command = enums.SET_NOISE_CANCELLATION_ON + command = enums.NOISE_CANCELLATION_ON elif args.mode == "transparency" or args.mode == "3": - command = enums.SET_NOISE_CANCELLATION_TRANSPARENCY + command = enums.NOISE_CANCELLATION_TRANSPARENCY elif args.mode == "adaptive" or args.mode == "4": - command = enums.SET_NOISE_CANCELLATION_ADAPTIVE + command = enums.NOISE_CANCELLATION_ADAPTIVE send_command(command) diff --git a/examples/daemon/tray.py b/examples/daemon/tray.py index dc8e2eb..77bf2c5 100644 --- a/examples/daemon/tray.py +++ b/examples/daemon/tray.py @@ -11,6 +11,35 @@ import subprocess import time import os + +class CustomFormatter(logging.Formatter): + # Define color codes for different log levels + COLORS = { + logging.DEBUG: "\033[48;5;240;38;5;15m%s\033[1;0m", # Grey background, white bold text + logging.INFO: "\033[48;5;34;38;5;15m%s\033[1;0m", # Green background, white bold text + logging.WARNING: "\033[1;48;5;214;38;5;0m%s\033[1;0m", # Orange background, black bold text + logging.ERROR: "\033[1;48;5;202;38;5;15m%s\033[1;0m", # Orange-red background, white bold text + logging.CRITICAL: "\033[1;48;5;196;38;5;15m%s\033[1;0m", # Pure red background, white bold text + } + + def format(self, record): + # Apply color to the level name + levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8) + record.levelname = levelname + + # Format the message + formatted_message = super().format(record) + + return formatted_message + +# Custom formatter with fixed width for level name +formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s') + +logging.basicConfig(level=logging.DEBUG) + +# Set the custom formatter for the root logger +logging.getLogger().handlers[0].setFormatter(formatter) + SOCKET_PATH = "/tmp/airpods_daemon.sock" # Initialize battery_status at the module level @@ -33,14 +62,15 @@ class MediaController: def playMusic(self): logging.info("Playing music") - subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7")) + subprocess.call(("playerctl", "play")) def pauseMusic(self): logging.info("Pausing music") - subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7")) + subprocess.call(("playerctl", "--all-players", "pause")) def isPlaying(self): - return subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() == "Playing" + return "Playing" in subprocess.getoutput("playerctl --all-players status").strip() + def handlePlayPause(self, data): primary_status = data[0] @@ -121,7 +151,7 @@ class BatteryStatusUpdater(QObject): super().__init__() self.media_controller = MediaController() - def listen_for_battery_updates(self): + def listen_to_socket(self): global battery_status with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: client.connect(SOCKET_PATH) @@ -133,9 +163,11 @@ class BatteryStatusUpdater(QObject): if response["type"] == "battery": with battery_status_lock: battery_status = response + logging.debug(f"Received battery status: {response}") self.battery_status_updated.emit() elif response["type"] == "ear_detection": self.media_controller.handlePlayPause([response['primary'], response['secondary']]) + logging.debug(f"Received ear detection status: {response}") except json.JSONDecodeError as e: logging.warning(f"Error deserializing data: {e}") except KeyError as e: @@ -148,21 +180,24 @@ def get_battery_status(): left = battery_status["LEFT"] right = battery_status["RIGHT"] case = battery_status["CASE"] - return f"Left: {left['level']}% - {left['status'].title().replace('_', ' ')} | Right: {right['level']}% - {right['status'].title().replace('_', ' ')} | Case: {case['level']}% - {case['status'].title().replace('_', ' ')}" - + left_status = (left['status'] or 'Unknown').title().replace('_', ' ') + right_status = (right['status'] or 'Unknown').title().replace('_', ' ') + case_status = (case['status'] or 'Unknown').title().replace('_', ' ') + return f"Left: {left['level']}% - {left_status} | Right: {right['level']}% - {right_status} | Case: {case['level']}% - {case_status}" + from aln import enums def set_anc_mode(mode): with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: client.connect(SOCKET_PATH) - command = enums.SET_NOISE_CANCELLATION_OFF + command = enums.NOISE_CANCELLATION_OFF if mode == "on": - command = enums.SET_NOISE_CANCELLATION_ON + command = enums.NOISE_CANCELLATION_ON elif mode == "off": - command = enums.SET_NOISE_CANCELLATION_OFF + command = enums.NOISE_CANCELLATION_OFF elif mode == "transparency": - command = enums.SET_NOISE_CANCELLATION_TRANSPARENCY + command = enums.NOISE_CANCELLATION_TRANSPARENCY elif mode == "adaptive": - command = enums.SET_NOISE_CANCELLATION_ADAPTIVE + command = enums.NOISE_CANCELLATION_ADAPTIVE client.sendall(command) response = client.recv(1024) return json.loads(response.decode()) @@ -172,7 +207,7 @@ def control_anc(action): logging.info(f"ANC action: {action}, Response: {response}") def signal_handler(sig, frame): - print("Exiting...") + logging.info("Exiting...") QApplication.quit() sys.exit(0) @@ -222,8 +257,8 @@ battery_status_updater = BatteryStatusUpdater() battery_status_updater.battery_status_updated.connect(lambda: tray_icon.setToolTip(get_battery_status())) # Start the battery status listener thread -listener_thread = threading.Thread(target=battery_status_updater.listen_for_battery_updates, daemon=True) +listener_thread = threading.Thread(target=battery_status_updater.listen_to_socket, daemon=True) listener_thread.start() # Run the application -sys.exit(app.exec_()) \ No newline at end of file +sys.exit(app.exec_()) diff --git a/examples/logger-and-anc.py b/examples/logger-and-anc.py index 26e524d..28f4950 100644 --- a/examples/logger-and-anc.py +++ b/examples/logger-and-anc.py @@ -52,16 +52,16 @@ def input_thread(connection: Connection): while True: anc_mode = input() if anc_mode == '1': - connection.send(enums.SET_NOISE_CANCELLATION_OFF) + connection.send(enums.NOISE_CANCELLATION_OFF) logging.info('ANC Off') elif anc_mode == '2': - connection.send(enums.SET_NOISE_CANCELLATION_TRANSPARENCY) + connection.send(enums.NOISE_CANCELLATION_TRANSPARENCY) logging.info('Transparency On') elif anc_mode == '3': - connection.send(enums.SET_NOISE_CANCELLATION_ADAPTIVE) + connection.send(enums.NOISE_CANCELLATION_ADAPTIVE) logging.info('Adaptive Transparency On') elif anc_mode == '4': - connection.send(enums.SET_NOISE_CANCELLATION_ON) + connection.send(enums.NOISE_CANCELLATION_ON) logging.info('ANC On') else: logging.error('Invalid ANC Mode') diff --git a/examples/standalone.py b/examples/standalone.py index 499acf3..bbb8978 100644 --- a/examples/standalone.py +++ b/examples/standalone.py @@ -21,7 +21,7 @@ class initL2CAP(): subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7")) def getMusicStatus(self): - return subprocess.check_output(("playerctl", "status", "--ignore-player", "OnePlus_7")).decode("utf-8").strip() + return subprocess.getoutput("playerctl status --ignore-player OnePlus_7").strip() # Change to MAC address of your AirPods diff --git a/start-daemon.py b/start-daemon.py index 3d22016..c63cb4b 100644 --- a/start-daemon.py +++ b/start-daemon.py @@ -6,10 +6,10 @@ import sys import logging from aln import Connection, enums from aln.Notifications import Notifications -import os from aln.Notifications.Battery import Battery +import os import bluetooth - +from aln.enums import enums connection = None AIRPODS_MAC = '28:2D:7F:C2:05:5B' @@ -55,7 +55,9 @@ def handle_client(connection, client_socket): "level": i.get_level() } data: str = JSONEncoder().encode(batteryJSON) + elif notif_key == "notif_ear_detection": + # noinspection PyTypeChecker data: list[int] = data earDetectionJSON = { "type": "ear_detection", @@ -63,6 +65,13 @@ def handle_client(connection, client_socket): "secondary": data[1] } data: str = JSONEncoder().encode(earDetectionJSON) + elif notif_key == "notif_anc": + data: int = data + ancJSON = { + "type": "anc", + "status": data, + } + data: str = JSONEncoder().encode(ancJSON) elif notif_key == "notif_unknown": logging.debug(f"Unhandled notification type: {notif_key}") logging.debug(f"Data: {data}") @@ -136,7 +145,7 @@ def start_socket_server(connection): server_socket.close() logging.info("Socket server stopped") -def stop_daemon(signum, frame): +def stop_daemon(_, __): """Signal handler to stop the daemon.""" global running logging.info("Received termination signal. Stopping daemon...") @@ -169,6 +178,11 @@ def notification_handler(notification_type: int, data: bytes): earDetection = connection.notificationListener.EarDetectionNotification.getEarDetection() globals()["notif_ear_detection"] = earDetection logger.debug(earDetection) + elif notification_type == Notifications.ANC_UPDATED: + logger = logging.getLogger("ANC Status") + anc = connection.notificationListener.ANCNotification.status + globals()["notif_anc"] = anc + logger.debug(anc) elif notification_type == Notifications.UNKNOWN: logger = logging.getLogger("Unknown Notification") hex_data = ' '.join(f'{byte:02x}' for byte in data) @@ -190,6 +204,7 @@ def main(): connection.send(enums.HANDSHAKE) logging.info("Handshake sent") + connection.initialize_notifications(notification_handler) # Start the socket server to listen for client connections @@ -219,4 +234,4 @@ if __name__ == "__main__": os.dup2(logfile.fileno(), sys.stdout.fileno()) os.dup2(logfile.fileno(), sys.stderr.fileno()) - main() \ No newline at end of file + main()