add tray app and update README

This commit is contained in:
Kavish Devar
2024-09-29 02:48:48 +05:30
parent 8115328ac5
commit 801d300be1
13 changed files with 202 additions and 27 deletions

View File

@@ -1,7 +1,4 @@
# ALN - AirPods like Normal (Linux Only)
![Main Demo (Screenshot 2024-09-27 at 3 06 56AM)](https://github.com/user-attachments/assets/352275c8-e143-42c3-a06a-fc3ac0c937b9)
# Get Started!
## 1. Install the required packages
@@ -40,6 +37,7 @@ python3 examples/logger-and-anc.py
```
## As a daemon (using a UNIX socket)
![Daemon Log Screenshot](imgs/daemon-log.png | width=300)
If you want to run a deamon for multiple programs to read/write airpods data, you can use the `airpods_daemon.py` script.
- This creates a standard UNIX socket at `/tmp/airpods_daemon.sock` and listens for commands
- and sends battery/in-ear info
@@ -50,7 +48,7 @@ python3 airpods_daemon.py
```
## Interacting with the daemon
![Set ANC Screenshot](imgs/set-anc.png | width=300)
- Sending data to the daemon
You can send data to the daemon using the `set-anc.py` script. Since it's a standard UNIX socket, you can send data to it using any programming language that supports UNIX sockets.
@@ -61,19 +59,31 @@ python3 examples/daemon/set-anc.py
```
- Reading data from the daemon
You can listen to the daemon's output by running the `example_daemon_read.py` script. This script listens to the UNIX socket and prints the data it receives. Currenty, it only prints the battery percentage and the in-ear status.
![Read Data Screenshot](imgs/read-data.png | width=300)
You can listen to the daemon's output by running the `read-data.py` script. This script listens to the UNIX socket and prints the data it receives. Currenty, it recognizes the battery percentage and the in-ear status and dumps the rest of the data to the terminal.
```bash
python3 examples/daemon/example_daemon_read.py
python3 examples/daemon/read-data.py
```
- Controlling the media with the in-ear status (and get battery status)
![Ear Detection Screenshot](imgs/ear-detection.png | width=300)
This script is basically the standalone script, but interacts with the UNIX socket created by the daemon instead. It can control the media with the in-ear status and remove the device as an audio sink when the AirPods are not in your ears.
```bash
python3 examples/daemon/ear-detection.py
```
- App Indicator/Tray Icon
![Tray Icon Hover Screenshot](imgs/tray-icon-hover.png)
![Tray Icon Menu Screenshot](imgs/tray-icon-menu.png)
This script is a simple tray icon that shows the battery percentage and set ANC modes.
> Note: This script uses QT.
```bash
python3 examples/daemon/tray.py
```
## Standalone version (without module dependency, mainly for testing, and reverse engineering purposes)
- Controlling the media with the in-ear status.
- Remove the device as an audio sink when the AirPods are not in your ears.

View File

@@ -8,15 +8,34 @@ import os
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Colorful logging
logging.addLevelName(logging.DEBUG, "\033[1;34m%s\033[1;0m" % logging.getLevelName(logging.DEBUG))
logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
logging.addLevelName(logging.CRITICAL, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL))
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
logging.DEBUG: "\033[48;5;240;38;5;15m%s\033[1;0m", # Grey background, white bold text
logging.INFO: "\033[48;5;34;38;5;15m%s\033[1;0m", # Green background, white bold text
logging.WARNING: "\033[1;48;5;214;38;5;0m%s\033[1;0m", # Orange background, black bold text
logging.ERROR: "\033[1;48;5;202;38;5;15m%s\033[1;0m", # Orange-red background, white bold text
logging.CRITICAL: "\033[1;48;5;196;38;5;15m%s\033[1;0m", # Pure red background, white bold text
}
def format(self, record):
# Apply color to the level name
levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8)
record.levelname = levelname
# Format the message
formatted_message = super().format(record)
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
SOCKET_PATH = "/tmp/airpods_daemon.sock"
@@ -29,9 +48,11 @@ class MediaController:
self.stop_thread_event = threading.Event()
def playMusic(self):
logging.info("Playing music")
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
def pauseMusic(self):
logging.info("Pausing music")
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
def isPlaying(self):
@@ -41,7 +62,7 @@ class MediaController:
primary_status = data[0]
secondary_status = data[1]
logging.debug(f"Handle play/pause called with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
logging.debug(f"Handling play/pause with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
def delayed_action(s):
if not self.stop_thread_event.is_set():
@@ -55,8 +76,10 @@ class MediaController:
if primary_status and secondary_status:
if self.earStatus != "Both out":
s = self.isPlaying()
self.pauseMusic()
if s:
self.pauseMusic()
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
logging.info("Setting profile to off")
if self.earStatus == "Only one in":
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
self.wasMusicPlayingInSingle = True
@@ -80,6 +103,7 @@ class MediaController:
if self.earStatus != "Both in":
if self.earStatus == "Both out":
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
logging.info("Setting profile to a2dp-sink")
elif self.earStatus == "Only one in":
self.stop_thread_event.set()
s = self.isPlaying()
@@ -95,12 +119,14 @@ class MediaController:
if self.earStatus != "Only one in":
self.stop_thread_event.clear()
s = self.isPlaying()
self.pauseMusic()
if s:
self.pauseMusic()
delayed_thread = threading.Timer(0.3, delayed_action, args=[s])
delayed_thread.start()
self.firstEarOutTime = time.time()
if self.earStatus == "Both out":
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
logging.info("Setting profile to a2dp-sink")
self.earStatus = "Only one in"
return "Only one in"
@@ -124,7 +150,8 @@ def read():
if data["type"] == "ear_detection":
media_controller.handlePlayPause([data['primary'], data['secondary']])
except json.JSONDecodeError as e:
logging.error(f"Error deserializing data: {e}")
# logging.error(f"Error deserializing data: {e}")
pass
else:
break

View File

@@ -5,7 +5,7 @@ import logging
SOCKET_PATH = "/tmp/airpods_daemon.sock"
import logging
import textwrap
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
@@ -27,16 +27,13 @@ class CustomFormatter(logging.Formatter):
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;90m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
# Example usage
logging.info("This is an info message. This is a continuation of the info message. This is a continuation of the info message. This is a continuation of the info message.")
def read():
"""Send a command to the daemon via UNIX domain socket."""
client_socket = None
@@ -66,8 +63,9 @@ def read():
else:
logging.error("Received data is not a dictionary")
except json.JSONDecodeError as e:
logging.warning(f"Error deserializing data: {e}")
logging.warning(f"raw data: {d}")
# logging.warning(f"Error deserializing data: {e}")
# logging.warning(f"raw data: {d}")
pass
except KeyError as e:
logging.error(f"KeyError: {e} in data: {data}")
except TypeError as e:

View File

@@ -24,13 +24,14 @@ class CustomFormatter(logging.Formatter):
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;90m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
SOCKET_PATH = "/tmp/airpods_daemon.sock"
def send_command(command):

132
examples/daemon/tray.py Normal file
View File

@@ -0,0 +1,132 @@
import sys
import socket
import json
import signal
import threading
from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMenu, QAction, QMessageBox
from PyQt5.QtGui import QIcon
from PyQt5.QtCore import QObject, pyqtSignal
import logging
SOCKET_PATH = "/tmp/airpods_daemon.sock"
# Initialize battery_status at the module level
battery_status = {
"LEFT": {"status": "Unknown", "level": 0},
"RIGHT": {"status": "Unknown", "level": 0},
"CASE": {"status": "Unknown", "level": 0}
}
# Define a lock
battery_status_lock = threading.Lock()
class BatteryStatusUpdater(QObject):
battery_status_updated = pyqtSignal()
def listen_for_battery_updates(self):
global battery_status
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(SOCKET_PATH)
while True:
data = client.recv(1024)
if data:
try:
response = json.loads(data.decode('utf-8'))
if response["type"] == "battery":
print(response)
with battery_status_lock:
battery_status = response
self.battery_status_updated.emit()
except json.JSONDecodeError as e:
logging.warning(f"Error deserializing data: {e}")
except KeyError as e:
logging.error(f"KeyError: {e} in data: {response}")
def get_battery_status():
global battery_status
with battery_status_lock:
logging.info(f"Getting battery status: {battery_status}")
left = battery_status["LEFT"]
right = battery_status["RIGHT"]
case = battery_status["CASE"]
return f"Left: {left['level']}% - {left['status'].title().replace('_', ' ')} | Right: {right['level']}% - {right['status'].title().replace('_', ' ')} | Case: {case['level']}% - {case['status'].title().replace('_', ' ')}"
from aln import enums
def set_anc_mode(mode):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(SOCKET_PATH)
command = enums.SET_NOISE_CANCELLATION_OFF
if mode == "on":
command = enums.SET_NOISE_CANCELLATION_ON
elif mode == "off":
command = enums.SET_NOISE_CANCELLATION_OFF
elif mode == "transparency":
command = enums.SET_NOISE_CANCELLATION_TRANSPARENCY
elif mode == "adaptive":
command = enums.SET_NOISE_CANCELLATION_ADAPTIVE
client.sendall(command)
response = client.recv(1024)
return json.loads(response.decode())
def control_anc(action):
response = set_anc_mode(action)
logging.info(f"ANC action: {action}, Response: {response}")
def signal_handler(sig, frame):
print("Exiting...")
QApplication.quit()
sys.exit(0)
# Register the signal handler for SIGINT
signal.signal(signal.SIGINT, signal_handler)
app = QApplication(sys.argv)
# Create the system tray icon
tray_icon = QSystemTrayIcon(QIcon("icon.png"), app)
tray_icon.setToolTip(get_battery_status())
# Create the menu
menu = QMenu()
# Add ANC control actions
anc_on_action = QAction("ANC On")
anc_on_action.triggered.connect(lambda: control_anc("on"))
menu.addAction(anc_on_action)
anc_off_action = QAction("ANC Off")
anc_off_action.triggered.connect(lambda: control_anc("off"))
menu.addAction(anc_off_action)
anc_transparency_action = QAction("Transparency Mode")
anc_transparency_action.triggered.connect(lambda: control_anc("transparency"))
menu.addAction(anc_transparency_action)
anc_adaptive_action = QAction("Adaptive Mode")
anc_adaptive_action.triggered.connect(lambda: control_anc("adaptive"))
menu.addAction(anc_adaptive_action)
quit = QAction("Quit")
quit.triggered.connect(app.quit)
menu.addAction(quit)
# Add the menu to the tray icon
tray_icon.setContextMenu(menu)
# Show the tray icon
tray_icon.show()
# Create an instance of BatteryStatusUpdater
battery_status_updater = BatteryStatusUpdater()
# Connect the signal to the slot
battery_status_updater.battery_status_updated.connect(lambda: tray_icon.setToolTip(get_battery_status()))
# Start the battery status listener thread
listener_thread = threading.Thread(target=battery_status_updater.listen_for_battery_updates, daemon=True)
listener_thread.start()
# Run the application
sys.exit(app.exec_())

BIN
icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.3 KiB

BIN
imgs/daemon-log.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 648 KiB

BIN
imgs/ear-detection.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 216 KiB

BIN
imgs/read-data.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 330 KiB

BIN
imgs/set-anc.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

BIN
imgs/tray-icon-hover.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

BIN
imgs/tray-icon-menu.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

View File

@@ -1,3 +1,4 @@
import logging.handlers
import socket
import threading
import signal
@@ -25,7 +26,13 @@ running = True
# RotatingFileHandler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler = logging.FileHandler(LOG_FILE, maxBytes=2**20, backupCount=5, delay=0)
handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=2**20)
handler.setFormatter(formatter)
handler.setLevel(logging.DEBUG)
l = logging.getLogger()
l.setLevel(logging.DEBUG)
l.addHandler(handler)
from json import JSONEncoder