organize and improve examples

This commit is contained in:
Kavish Devar
2024-09-28 15:25:09 +05:30
parent cd7a8e4b46
commit 33051ec551
9 changed files with 118 additions and 94 deletions

View File

@@ -28,29 +28,18 @@ cd aln
Pair your AirPods with your machine before running this script!
:warning: **Note:** DO NOT FORGET TO EDIT THE `AIRPODS_MAC` VARIABLE IN `main.py`/`standalone.py` WITH YOUR AIRPODS MAC ADDRESS!
## 4. Run!
You can either choose the more polished version of the script, which currently only supports
- fetching the battery percentage,
- and in-ear status (but not actually controlling the media with that information)
or the more experimental versions of the script (features listed in respective sections).
# Versions
## Polished version
## Non-Daemon based
### This version is the most polished version of the script. It can do the following:
- fetch the battery percentage,
- fetch in-ear status (but not actually controlling the media with that information).
- control ANC modes
```bash
python3 main.py
python3 examples/logger-and-anc.py
```
## Experimental versions
### Standalone version (without module dependency)
- Controlling the media with the in-ear status
- Remove the device as an audio sink when the AirPods are not in your ears.
- Try to connect with the AirPods if media is playing and the AirPods are not connected.
```bash
python3 standalone.py
```
### Daemonizing
## As a daemon (using a UNIX socket)
If you want to run a deamon for multiple programs to read/write airpods data, you can use the `airpods_daemon.py` script.
- This creates a standard UNIX socket at `/tmp/airpods_daemon.sock` and listens for commands
- and sends battery/in-ear info
@@ -59,27 +48,38 @@ You can run it as follows:
```bash
python3 airpods_daemon.py
```
#### Scripts to interact with the daemon
## Interacting with the daemon
- Sending data to the daemon
You can send data to the daemon using the `example_daemon_send.py` script. Since it's a standard UNIX socket, you can send data to it using any programming language that supports UNIX sockets.
You can send data to the daemon using the `set-anc.py` script. Since it's a standard UNIX socket, you can send data to it using any programming language that supports UNIX sockets.
This package includes a demo script that sends a command to turn off the ANC. You can run it as follows:
```bash
python3 example_daemon_send.py
python3 examples/daemon/set-anc.py
```
- Reading data from the daemon
You can listen to the daemon's output by running the `example_daemon_read.py` script. This script listens to the UNIX socket and prints the data it receives. Currenty, it only prints the battery percentage and the in-ear status.
```bash
python3 example_daemon_read.py
python3 examples/daemon/example_daemon_read.py
```
- Controlling the media with the in-ear status (and get battery status)
This script is basically the standalone script, but interacts with the UNIX socket created by the daemon instead. It can control the media with the in-ear status and remove the device as an audio sink when the AirPods are not in your ears.
```bash
python3 ear-detection.py
python3 examples/daemon/ear-detection.py
```
## Standalone version (without module dependency, mainly for testing, and reverse engineering purposes)
- Controlling the media with the in-ear status.
- Remove the device as an audio sink when the AirPods are not in your ears.
- Try to connect with the AirPods if media is playing and the AirPods are not connected.
- Control ANC modes.
```bash
python3 examples/standalone.py
```

View File

@@ -10,6 +10,7 @@ class NotificationListener:
BATTERY_UPDATED = 0x01
ANC_UPDATED = 0x02
EAR_DETECTION_UPDATED = 0x03
UNKNOWN = 0x00
def __init__(self, socket: BluetoothSocket, callback: callable):
self.socket = socket
@@ -25,11 +26,14 @@ class NotificationListener:
break
if self.BatteryNotification.isBatteryData(data):
self.BatteryNotification.setBattery(data)
self.callback(self.BATTERY_UPDATED)
self.callback(self.BATTERY_UPDATED, data)
pass
if self.EarDetectionNotification.isEarDetectionData(data):
self.EarDetectionNotification.setEarDetection(data)
self.callback(self.EAR_DETECTION_UPDATED)
self.callback(self.EAR_DETECTION_UPDATED, data)
else:
self.callback(self.UNKNOWN, data)
pass
pass
pass

View File

@@ -11,7 +11,7 @@ class Notifications:
BATTERY_UPDATED = NotificationListener.BATTERY_UPDATED
ANC_UPDATED = NotificationListener.ANC_UPDATED
EAR_DETECTION_UPDATED = NotificationListener.EAR_DETECTION_UPDATED
UNKNOWN = NotificationListener.UNKNOWN
def __init__(self, socket: bluetooth.BluetoothSocket, callback: callable):
self.socket = socket
self.notificationListener = NotificationListener(self.socket, callback)

View File

@@ -35,7 +35,7 @@ class Connection:
return False
return True
def notification_callback(self, notification_type: int):
def notification_callback(self, notification_type: int, data: bytes):
import logging
if notification_type == Notifications.BATTERY_UPDATED:
logging = logging.getLogger("Battery Status")
@@ -46,6 +46,11 @@ class Connection:
logging = logging.getLogger("In-Ear Status")
logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}')
pass
elif notification_type == Notifications.UNKNOWN:
logging = logging.getLogger("Unknown Notification")
hex_data = ' '.join(f'{byte:02x}' for byte in data)
logging.debug(f'{hex_data}')
pass
pass
def disconnect(self):

View File

@@ -1,51 +1,56 @@
import socket
import pickle
import json
import subprocess
from aln.Notifications import Battery
import threading
import time
import os
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Colorful logging
logging.addLevelName(logging.DEBUG, "\033[1;34m%s\033[1;0m" % logging.getLevelName(logging.DEBUG))
logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
logging.addLevelName(logging.CRITICAL, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL))
SOCKET_PATH = "/tmp/airpods_daemon.sock"
class MediaController:
def __init__(self):
self.wasMusicPlaying = False
self.earStatus = "Both out"
self.status = "Stopped"
self.stop_thread_event = threading.Event()
self.wasMusicPlayingInSingle = False
self.wasMusicPlayingInBoth = False
self.firstEarOutTime = 0
self.stop_thread_event = threading.Event()
def playMusic(self):
print("Playing music")
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
def pauseMusic(self):
print("Pausing music")
subprocess.call(("playerctl", "pause", "--player", "spotify"))
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
def isPlaying(self):
status = subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip()
print(f"Music status: {status}")
return status == "Playing"
return subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() == "Playing"
def handlePlayPause(self, data):
primary_status = data[0]
secondary_status = data[1]
print(f"Handle play/pause called with data: {data}, previousStatus: {self.status}, wasMusicPlaying: {self.wasMusicPlaying}")
logging.debug(f"Handle play/pause called with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
def delayed_action(s):
if not self.stop_thread_event.is_set():
print("Delayed action")
if self.wasMusicPlayingInSingle:
self.playMusic()
self.wasMusicPlayingInBoth = False
elif self.wasMusicPlayingInBoth or s:
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
print(self.wasMusicPlayingInSingle, self.wasMusicPlayingInBoth)
if primary_status and secondary_status:
if self.earStatus != "Both out":
@@ -54,7 +59,6 @@ class MediaController:
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
if self.earStatus == "Only one in":
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
print("Only one in called with both out")
self.wasMusicPlayingInSingle = True
self.wasMusicPlayingInBoth = True
self.stop_thread_event.set()
@@ -106,7 +110,7 @@ def read():
try:
# Create a socket connection to the daemon
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
print("Connecting to daemon...")
logging.info("Connecting to daemon...")
client_socket.connect(SOCKET_PATH)
media_controller = MediaController()
@@ -116,28 +120,21 @@ def read():
d = client_socket.recv(1024)
if d:
try:
data = pickle.loads(d)
if isinstance(data, str):
print(f"Received data: {data}")
elif isinstance(data, list) and all(isinstance(b, Battery.Battery) for b in data):
for b in data:
print(f"Received battery status: {b.get_component()} is {b.get_status()} at {b.get_level()}%")
elif isinstance(data, list) and len(data) == 2 and all(isinstance(i, int) for i in data):
print(f"Received ear detection status: Is in-ear? Primary: {data[0] == 0}, Secondary: {data[1] == 0}")
media_controller.handlePlayPause(data)
else:
print(f"Received unknown data: {data}")
except pickle.UnpicklingError as e:
print(f"Error deserializing data: {e}")
data: dict = json.loads(d.decode('utf-8'))
if data["type"] == "ear_detection":
logging.debug(f"Ear detection: {data['primary']} - {data['secondary']}")
media_controller.handlePlayPause([data['primary'], data['secondary']])
except json.JSONDecodeError as e:
logging.error(f"Error deserializing data: {e}")
else:
break
except Exception as e:
print(f"Error communicating with daemon: {e}")
logging.error(f"Error communicating with daemon: {e}")
finally:
if client_socket:
client_socket.close()
print("Socket closed")
logging.warning("Socket closed")
if __name__ == "__main__":
read()

View File

@@ -1,5 +1,5 @@
import socket
import pickle
import json
from aln.Notifications import Battery
SOCKET_PATH = "/tmp/airpods_daemon.sock"
@@ -18,18 +18,15 @@ def read():
d = client_socket.recv(1024)
if d:
try:
data = pickle.loads(d)
if isinstance(data, str):
print(f"Received data: {data}")
elif isinstance(data, list) and all(isinstance(b, Battery.Battery) for b in data):
for b in data:
print(f"Received battery status: {b.get_component()} is {b.get_status()} at {b.get_level()}%")
elif isinstance(data, list) and len(data) == 2 and all(isinstance(i, int) for i in data):
print(f"Received ear detection status: Is in-ear? Primary: {data[0] == 0}, Secondary: {data[1] == 0}")
data: dict = json.loads(d.decode('utf-8'))
if data["type"] == "battery":
for b in data.keys():
print(f"Received battery status: {b} - {data[b]}")
elif data["type"] == "ear_detection":
print(f"Ear detection: {data['primary']} - {data['secondary']}")
else:
print(f"Received unknown data: {data}")
all(isinstance(b, Battery.Battery) for b in data)
except pickle.UnpicklingError as e:
print(f"Received data: {data}")
except json.JSONDecodeError as e:
print(f"Error deserializing data: {e}")
else:
break

View File

@@ -6,7 +6,8 @@ import logging
from aln import Connection, enums
from aln.Notifications import Notifications
import os
import pickle
from aln.Notifications.Battery import Battery
import bluetooth
connection = None
@@ -22,31 +23,46 @@ running = True
logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s %(levelname)s : %(message)s')
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s : %(message)s')
from json import JSONEncoder
def handle_client(connection, client_socket):
"""Handle client requests by forwarding all received data to aln.Connection, send data back to the client."""
def send_status():
while running:
try:
data = globals().get("battery")
if data:
if not client_socket or not isinstance(client_socket, socket.socket):
logging.error("Invalid client socket")
break
logging.info(f'Sending battery status: {data}')
client_socket.send(pickle.dumps(data))
logging.info(f'Sent battery status: {data}')
globals()["battery"] = None
data = globals().get("earDetection")
if data:
if not client_socket or not isinstance(client_socket, socket.socket):
logging.error("Invalid client socket")
break
logging.info(f'Sending ear detection status: {data}')
client_socket.send(pickle.dumps(data))
logging.info(f'Sent ear detection status: {data}')
globals()["earDetection"] = None
for notif_key in list(globals().keys()):
if notif_key.startswith("notif_"):
data = globals().get(notif_key)
if data:
if notif_key == "notif_battery":
data: list[Battery] = data
batteryJSON = {"type": "battery"}
for i in data:
batteryJSON[i.get_component()] = {
"status": i.get_status(),
"level": i.get_level()
}
data: str = JSONEncoder().encode(batteryJSON)
elif notif_key == "notif_ear_detection":
data: list[int] = data
earDetectionJSON = {
"type": "ear_detection",
"primary": data[0],
"secondary": data[1]
}
data: str = JSONEncoder().encode(earDetectionJSON)
else:
logging.warning(f"Unhandled notification type: {notif_key}")
continue
if not client_socket or not isinstance(client_socket, socket.socket):
logging.error("Invalid client socket")
break
logging.info(f'Sending {notif_key} status: {data}')
client_socket.sendall(data.encode('utf-8'))
logging.info(f'Sent {notif_key} status: {data}')
globals()[notif_key] = None
except socket.error as e:
logging.error(f"Socket error sending status: {e}")
break
@@ -133,13 +149,13 @@ def notification_handler(notification_type: int):
if notification_type == Notifications.BATTERY_UPDATED:
logger = logging.getLogger("Battery Status")
battery = connection.notificationListener.BatteryNotification.getBattery()
globals()["battery"] = battery
globals()["notif_battery"] = battery
for i in battery:
logger.debug(f'{i.get_component()} - {i.get_status()}: {i.get_level()}')
elif notification_type == Notifications.EAR_DETECTION_UPDATED:
logger = logging.getLogger("In-Ear Status")
earDetection = connection.notificationListener.EarDetectionNotification.getEarDetection()
globals()["earDetection"] = earDetection
globals()["notif_ear_detection"] = earDetection
logger.debug(earDetection)
def main():
@@ -149,7 +165,12 @@ def main():
globals()['connection'] = connection
# Connect to the AirPods and send the handshake
connection.connect()
try:
connection.connect()
except bluetooth.btcommon.BluetoothError as e:
logging.error(f"Failed to connect to {AIRPODS_MAC}: {e}")
sys.exit(1)
connection.send(enums.HANDSHAKE)
logging.info("Handshake sent")
connection.initialize_notifications(notification_handler)

View File

@@ -6,8 +6,8 @@ import time
import sys
import shutil
AIRPODS_MAC = '28:2D:7F:C2:05:5B'
class CustomFormatter(logging.Formatter):
def format(self, record):
# Format the log message with spaces around colons without altering the original message