From 33051ec5512c3b7c2d738863cc81b97ef76ab922 Mon Sep 17 00:00:00 2001 From: Kavish Devar Date: Sat, 28 Sep 2024 15:25:09 +0530 Subject: [PATCH] organize and improve examples --- README.md | 48 ++++++------- aln/Notifications/Listener.py | 8 ++- aln/Notifications/__init__.py | 2 +- aln/__init__.py | 7 +- examples/daemon/ear-detection.py | 57 ++++++++-------- examples/daemon/read-data.py | 21 +++--- airpods_daemon.py => examples/daemon/start.py | 67 ++++++++++++------- main.py => examples/logger-and-anc.py | 2 +- standalone.py => examples/standalone.py | 0 9 files changed, 118 insertions(+), 94 deletions(-) rename airpods_daemon.py => examples/daemon/start.py (69%) rename main.py => examples/logger-and-anc.py (100%) rename standalone.py => examples/standalone.py (100%) diff --git a/README.md b/README.md index f948969..a9dd53f 100644 --- a/README.md +++ b/README.md @@ -28,29 +28,18 @@ cd aln Pair your AirPods with your machine before running this script! :warning: **Note:** DO NOT FORGET TO EDIT THE `AIRPODS_MAC` VARIABLE IN `main.py`/`standalone.py` WITH YOUR AIRPODS MAC ADDRESS! -## 4. Run! -You can either choose the more polished version of the script, which currently only supports -- fetching the battery percentage, -- and in-ear status (but not actually controlling the media with that information) -or the more experimental versions of the script (features listed in respective sections). - # Versions -## Polished version + +## Non-Daemon based +### This version is the most polished version of the script. It can do the following: +- fetch the battery percentage, +- fetch in-ear status (but not actually controlling the media with that information). +- control ANC modes ```bash -python3 main.py +python3 examples/logger-and-anc.py ``` -## Experimental versions - -### Standalone version (without module dependency) -- Controlling the media with the in-ear status -- Remove the device as an audio sink when the AirPods are not in your ears. -- Try to connect with the AirPods if media is playing and the AirPods are not connected. -```bash -python3 standalone.py -``` - -### Daemonizing +## As a daemon (using a UNIX socket) If you want to run a deamon for multiple programs to read/write airpods data, you can use the `airpods_daemon.py` script. - This creates a standard UNIX socket at `/tmp/airpods_daemon.sock` and listens for commands - and sends battery/in-ear info @@ -59,27 +48,38 @@ You can run it as follows: ```bash python3 airpods_daemon.py ``` -#### Scripts to interact with the daemon + +## Interacting with the daemon - Sending data to the daemon -You can send data to the daemon using the `example_daemon_send.py` script. Since it's a standard UNIX socket, you can send data to it using any programming language that supports UNIX sockets. +You can send data to the daemon using the `set-anc.py` script. Since it's a standard UNIX socket, you can send data to it using any programming language that supports UNIX sockets. This package includes a demo script that sends a command to turn off the ANC. You can run it as follows: ```bash -python3 example_daemon_send.py +python3 examples/daemon/set-anc.py ``` - Reading data from the daemon You can listen to the daemon's output by running the `example_daemon_read.py` script. This script listens to the UNIX socket and prints the data it receives. Currenty, it only prints the battery percentage and the in-ear status. ```bash -python3 example_daemon_read.py +python3 examples/daemon/example_daemon_read.py ``` - Controlling the media with the in-ear status (and get battery status) This script is basically the standalone script, but interacts with the UNIX socket created by the daemon instead. It can control the media with the in-ear status and remove the device as an audio sink when the AirPods are not in your ears. ```bash -python3 ear-detection.py +python3 examples/daemon/ear-detection.py +``` + +## Standalone version (without module dependency, mainly for testing, and reverse engineering purposes) +- Controlling the media with the in-ear status. +- Remove the device as an audio sink when the AirPods are not in your ears. +- Try to connect with the AirPods if media is playing and the AirPods are not connected. +- Control ANC modes. + +```bash +python3 examples/standalone.py ``` \ No newline at end of file diff --git a/aln/Notifications/Listener.py b/aln/Notifications/Listener.py index e7ecf5a..22ed6ea 100644 --- a/aln/Notifications/Listener.py +++ b/aln/Notifications/Listener.py @@ -10,6 +10,7 @@ class NotificationListener: BATTERY_UPDATED = 0x01 ANC_UPDATED = 0x02 EAR_DETECTION_UPDATED = 0x03 + UNKNOWN = 0x00 def __init__(self, socket: BluetoothSocket, callback: callable): self.socket = socket @@ -25,11 +26,14 @@ class NotificationListener: break if self.BatteryNotification.isBatteryData(data): self.BatteryNotification.setBattery(data) - self.callback(self.BATTERY_UPDATED) + self.callback(self.BATTERY_UPDATED, data) pass if self.EarDetectionNotification.isEarDetectionData(data): self.EarDetectionNotification.setEarDetection(data) - self.callback(self.EAR_DETECTION_UPDATED) + self.callback(self.EAR_DETECTION_UPDATED, data) + else: + self.callback(self.UNKNOWN, data) + pass pass pass diff --git a/aln/Notifications/__init__.py b/aln/Notifications/__init__.py index 9a4abe5..1d0780d 100644 --- a/aln/Notifications/__init__.py +++ b/aln/Notifications/__init__.py @@ -11,7 +11,7 @@ class Notifications: BATTERY_UPDATED = NotificationListener.BATTERY_UPDATED ANC_UPDATED = NotificationListener.ANC_UPDATED EAR_DETECTION_UPDATED = NotificationListener.EAR_DETECTION_UPDATED - + UNKNOWN = NotificationListener.UNKNOWN def __init__(self, socket: bluetooth.BluetoothSocket, callback: callable): self.socket = socket self.notificationListener = NotificationListener(self.socket, callback) diff --git a/aln/__init__.py b/aln/__init__.py index 883babc..eee78f9 100644 --- a/aln/__init__.py +++ b/aln/__init__.py @@ -35,7 +35,7 @@ class Connection: return False return True - def notification_callback(self, notification_type: int): + def notification_callback(self, notification_type: int, data: bytes): import logging if notification_type == Notifications.BATTERY_UPDATED: logging = logging.getLogger("Battery Status") @@ -46,6 +46,11 @@ class Connection: logging = logging.getLogger("In-Ear Status") logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}') pass + elif notification_type == Notifications.UNKNOWN: + logging = logging.getLogger("Unknown Notification") + hex_data = ' '.join(f'{byte:02x}' for byte in data) + logging.debug(f'{hex_data}') + pass pass def disconnect(self): diff --git a/examples/daemon/ear-detection.py b/examples/daemon/ear-detection.py index b0190cd..f11ea85 100644 --- a/examples/daemon/ear-detection.py +++ b/examples/daemon/ear-detection.py @@ -1,51 +1,56 @@ import socket -import pickle +import json import subprocess from aln.Notifications import Battery import threading import time import os +import logging + + +# Configure logging +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + +# Colorful logging +logging.addLevelName(logging.DEBUG, "\033[1;34m%s\033[1;0m" % logging.getLevelName(logging.DEBUG)) +logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO)) +logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING)) +logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR)) +logging.addLevelName(logging.CRITICAL, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL)) + SOCKET_PATH = "/tmp/airpods_daemon.sock" class MediaController: def __init__(self): - self.wasMusicPlaying = False self.earStatus = "Both out" - self.status = "Stopped" - self.stop_thread_event = threading.Event() self.wasMusicPlayingInSingle = False self.wasMusicPlayingInBoth = False self.firstEarOutTime = 0 + self.stop_thread_event = threading.Event() def playMusic(self): - print("Playing music") subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7")) def pauseMusic(self): - print("Pausing music") - subprocess.call(("playerctl", "pause", "--player", "spotify")) + subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7")) def isPlaying(self): - status = subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() - print(f"Music status: {status}") - return status == "Playing" + return subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() == "Playing" def handlePlayPause(self, data): primary_status = data[0] secondary_status = data[1] - print(f"Handle play/pause called with data: {data}, previousStatus: {self.status}, wasMusicPlaying: {self.wasMusicPlaying}") + logging.debug(f"Handle play/pause called with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}") def delayed_action(s): if not self.stop_thread_event.is_set(): - print("Delayed action") if self.wasMusicPlayingInSingle: self.playMusic() self.wasMusicPlayingInBoth = False elif self.wasMusicPlayingInBoth or s: self.wasMusicPlayingInBoth = True self.wasMusicPlayingInSingle = False - print(self.wasMusicPlayingInSingle, self.wasMusicPlayingInBoth) if primary_status and secondary_status: if self.earStatus != "Both out": @@ -54,7 +59,6 @@ class MediaController: os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B off") if self.earStatus == "Only one in": if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3: - print("Only one in called with both out") self.wasMusicPlayingInSingle = True self.wasMusicPlayingInBoth = True self.stop_thread_event.set() @@ -106,7 +110,7 @@ def read(): try: # Create a socket connection to the daemon client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - print("Connecting to daemon...") + logging.info("Connecting to daemon...") client_socket.connect(SOCKET_PATH) media_controller = MediaController() @@ -116,28 +120,21 @@ def read(): d = client_socket.recv(1024) if d: try: - data = pickle.loads(d) - if isinstance(data, str): - print(f"Received data: {data}") - elif isinstance(data, list) and all(isinstance(b, Battery.Battery) for b in data): - for b in data: - print(f"Received battery status: {b.get_component()} is {b.get_status()} at {b.get_level()}%") - elif isinstance(data, list) and len(data) == 2 and all(isinstance(i, int) for i in data): - print(f"Received ear detection status: Is in-ear? Primary: {data[0] == 0}, Secondary: {data[1] == 0}") - media_controller.handlePlayPause(data) - else: - print(f"Received unknown data: {data}") - except pickle.UnpicklingError as e: - print(f"Error deserializing data: {e}") + data: dict = json.loads(d.decode('utf-8')) + if data["type"] == "ear_detection": + logging.debug(f"Ear detection: {data['primary']} - {data['secondary']}") + media_controller.handlePlayPause([data['primary'], data['secondary']]) + except json.JSONDecodeError as e: + logging.error(f"Error deserializing data: {e}") else: break except Exception as e: - print(f"Error communicating with daemon: {e}") + logging.error(f"Error communicating with daemon: {e}") finally: if client_socket: client_socket.close() - print("Socket closed") + logging.warning("Socket closed") if __name__ == "__main__": read() \ No newline at end of file diff --git a/examples/daemon/read-data.py b/examples/daemon/read-data.py index d7bfe38..63519c2 100644 --- a/examples/daemon/read-data.py +++ b/examples/daemon/read-data.py @@ -1,5 +1,5 @@ import socket -import pickle +import json from aln.Notifications import Battery SOCKET_PATH = "/tmp/airpods_daemon.sock" @@ -18,18 +18,15 @@ def read(): d = client_socket.recv(1024) if d: try: - data = pickle.loads(d) - if isinstance(data, str): - print(f"Received data: {data}") - elif isinstance(data, list) and all(isinstance(b, Battery.Battery) for b in data): - for b in data: - print(f"Received battery status: {b.get_component()} is {b.get_status()} at {b.get_level()}%") - elif isinstance(data, list) and len(data) == 2 and all(isinstance(i, int) for i in data): - print(f"Received ear detection status: Is in-ear? Primary: {data[0] == 0}, Secondary: {data[1] == 0}") + data: dict = json.loads(d.decode('utf-8')) + if data["type"] == "battery": + for b in data.keys(): + print(f"Received battery status: {b} - {data[b]}") + elif data["type"] == "ear_detection": + print(f"Ear detection: {data['primary']} - {data['secondary']}") else: - print(f"Received unknown data: {data}") - all(isinstance(b, Battery.Battery) for b in data) - except pickle.UnpicklingError as e: + print(f"Received data: {data}") + except json.JSONDecodeError as e: print(f"Error deserializing data: {e}") else: break diff --git a/airpods_daemon.py b/examples/daemon/start.py similarity index 69% rename from airpods_daemon.py rename to examples/daemon/start.py index b186142..85cc7e0 100644 --- a/airpods_daemon.py +++ b/examples/daemon/start.py @@ -6,7 +6,8 @@ import logging from aln import Connection, enums from aln.Notifications import Notifications import os -import pickle +from aln.Notifications.Battery import Battery +import bluetooth connection = None @@ -22,31 +23,46 @@ running = True logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s %(levelname)s : %(message)s') # logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s : %(message)s') +from json import JSONEncoder + def handle_client(connection, client_socket): """Handle client requests by forwarding all received data to aln.Connection, send data back to the client.""" def send_status(): while running: try: - data = globals().get("battery") - if data: - if not client_socket or not isinstance(client_socket, socket.socket): - logging.error("Invalid client socket") - break - logging.info(f'Sending battery status: {data}') - client_socket.send(pickle.dumps(data)) - logging.info(f'Sent battery status: {data}') - globals()["battery"] = None - - data = globals().get("earDetection") - if data: - if not client_socket or not isinstance(client_socket, socket.socket): - logging.error("Invalid client socket") - break - logging.info(f'Sending ear detection status: {data}') - client_socket.send(pickle.dumps(data)) - logging.info(f'Sent ear detection status: {data}') - globals()["earDetection"] = None + for notif_key in list(globals().keys()): + if notif_key.startswith("notif_"): + data = globals().get(notif_key) + if data: + if notif_key == "notif_battery": + data: list[Battery] = data + batteryJSON = {"type": "battery"} + for i in data: + batteryJSON[i.get_component()] = { + "status": i.get_status(), + "level": i.get_level() + } + data: str = JSONEncoder().encode(batteryJSON) + elif notif_key == "notif_ear_detection": + data: list[int] = data + earDetectionJSON = { + "type": "ear_detection", + "primary": data[0], + "secondary": data[1] + } + data: str = JSONEncoder().encode(earDetectionJSON) + else: + logging.warning(f"Unhandled notification type: {notif_key}") + continue + + if not client_socket or not isinstance(client_socket, socket.socket): + logging.error("Invalid client socket") + break + logging.info(f'Sending {notif_key} status: {data}') + client_socket.sendall(data.encode('utf-8')) + logging.info(f'Sent {notif_key} status: {data}') + globals()[notif_key] = None except socket.error as e: logging.error(f"Socket error sending status: {e}") break @@ -133,13 +149,13 @@ def notification_handler(notification_type: int): if notification_type == Notifications.BATTERY_UPDATED: logger = logging.getLogger("Battery Status") battery = connection.notificationListener.BatteryNotification.getBattery() - globals()["battery"] = battery + globals()["notif_battery"] = battery for i in battery: logger.debug(f'{i.get_component()} - {i.get_status()}: {i.get_level()}') elif notification_type == Notifications.EAR_DETECTION_UPDATED: logger = logging.getLogger("In-Ear Status") earDetection = connection.notificationListener.EarDetectionNotification.getEarDetection() - globals()["earDetection"] = earDetection + globals()["notif_ear_detection"] = earDetection logger.debug(earDetection) def main(): @@ -149,7 +165,12 @@ def main(): globals()['connection'] = connection # Connect to the AirPods and send the handshake - connection.connect() + try: + connection.connect() + except bluetooth.btcommon.BluetoothError as e: + logging.error(f"Failed to connect to {AIRPODS_MAC}: {e}") + sys.exit(1) + connection.send(enums.HANDSHAKE) logging.info("Handshake sent") connection.initialize_notifications(notification_handler) diff --git a/main.py b/examples/logger-and-anc.py similarity index 100% rename from main.py rename to examples/logger-and-anc.py index e568752..26e524d 100644 --- a/main.py +++ b/examples/logger-and-anc.py @@ -6,8 +6,8 @@ import time import sys import shutil - AIRPODS_MAC = '28:2D:7F:C2:05:5B' + class CustomFormatter(logging.Formatter): def format(self, record): # Format the log message with spaces around colons without altering the original message diff --git a/standalone.py b/examples/standalone.py similarity index 100% rename from standalone.py rename to examples/standalone.py