mirror of
https://github.com/kavishdevar/librepods.git
synced 2026-01-29 06:10:52 +00:00
474 lines
19 KiB
Python
474 lines
19 KiB
Python
import sys
|
|
import threading
|
|
from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMenu, QAction, QMessageBox
|
|
from PyQt5.QtGui import QIcon, QPixmap, QPainter
|
|
from PyQt5.QtCore import QObject, pyqtSignal, Qt
|
|
from PyQt5.QtGui import QFont, QPalette
|
|
import logging
|
|
import signal
|
|
import subprocess
|
|
import time
|
|
import os
|
|
from aln import Connection, enums
|
|
from aln.Notifications import Notifications
|
|
import argparse
|
|
import dbus
|
|
import dbus.mainloop.glib
|
|
|
|
enums = enums.enums
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
tray_icon = None
|
|
anc_actions = None
|
|
|
|
battery_status = {
|
|
"LEFT": {"status": "Unknown", "level": 0},
|
|
"RIGHT": {"status": "Unknown", "level": 0},
|
|
"CASE": {"status": "Unknown", "level": 0}
|
|
}
|
|
anc_mode = 0
|
|
battery_status_lock = threading.Lock()
|
|
|
|
CONVERSATIONAL_AWARENESS_FILE = os.path.expanduser("~/.airpods_conversational_awareness")
|
|
CONFIG_FILE = os.path.expanduser("~/.config/aln")
|
|
|
|
def load_conversational_awareness_state():
|
|
if os.path.exists(CONVERSATIONAL_AWARENESS_FILE):
|
|
with open(CONVERSATIONAL_AWARENESS_FILE, "r") as file:
|
|
return file.read().strip() == "enabled"
|
|
return False
|
|
|
|
def save_conversational_awareness_state(enabled):
|
|
with open(CONVERSATIONAL_AWARENESS_FILE, "w") as file:
|
|
file.write("enabled" if enabled else "disabled")
|
|
|
|
def toggle_conversational_awareness():
|
|
current_state = load_conversational_awareness_state()
|
|
new_state = not current_state
|
|
save_conversational_awareness_state(new_state)
|
|
connection.send(enums.SET_CONVERSATION_AWARENESS_ON if new_state else enums.SET_CONVERSATION_AWARENESS_OFF)
|
|
logging.info(f"Conversational Awareness {'enabled' if new_state else 'disabled'}")
|
|
|
|
def load_mac_address():
|
|
if os.path.exists(CONFIG_FILE):
|
|
with open(CONFIG_FILE, "r") as file:
|
|
return file.read().strip()
|
|
return None
|
|
|
|
def save_mac_address(mac_address):
|
|
with open(CONFIG_FILE, "w") as file:
|
|
file.write(mac_address)
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(description="Standalone tray application for AirPods.")
|
|
parser.add_argument("--mac", help="MAC address of the AirPods")
|
|
return parser.parse_args()
|
|
|
|
def get_connected_airpods():
|
|
logging.info("Checking for connected AirPods...")
|
|
result = subprocess.run("bluetoothctl devices | cut -f2 -d' ' | while read uuid; do bluetoothctl info $uuid; done | grep -e 'Device\\|Connected\\|Name'", shell=True, capture_output=True, text=True)
|
|
lines = result.stdout.splitlines()
|
|
for i in range(0, len(lines), 3):
|
|
if "Connected: yes" in lines[i + 2]:
|
|
addr = lines[i].split()[1]
|
|
name = lines[i + 1].split(": ")[1]
|
|
logging.debug(f"Checking services for connected device: {name} ({addr})")
|
|
services_result = run_sdptool_browse(addr)
|
|
if services_result and "UUID 128: 74ec2172-0bad-4d01-8f77-997b2be0722a" in services_result:
|
|
logging.info(f"Found connected AirPods: {name} ({addr})")
|
|
return addr
|
|
logging.error("No connected AirPods found.")
|
|
return None
|
|
|
|
def run_sdptool_browse(addr, retries=5):
|
|
for attempt in range(retries):
|
|
services_result = subprocess.run(f"sdptool browse {addr}", shell=True, capture_output=True, text=True)
|
|
if "Failed to connect to SDP server" not in services_result.stderr:
|
|
return services_result.stdout
|
|
logging.warning(f"Failed to connect to SDP server on {addr}, attempt {attempt + 1} of {retries}")
|
|
time.sleep(1)
|
|
logging.error(f"Failed to connect to SDP server on {addr} after {retries} attempts")
|
|
return None
|
|
|
|
def set_card_profile(mac_address, profile):
|
|
os.system(f"pactl set-card-profile bluez_card.{mac_address.replace(':', '_')} {profile}")
|
|
|
|
class MediaController:
|
|
def __init__(self, mac_address):
|
|
self.mac_address = mac_address
|
|
self.earStatus = "Both out"
|
|
self.wasMusicPlayingInSingle = False
|
|
self.wasMusicPlayingInBoth = False
|
|
self.firstEarOutTime = 0
|
|
self.stop_thread_event = threading.Event()
|
|
|
|
def playMusic(self):
|
|
logging.info("Playing music")
|
|
subprocess.call(("playerctl", "play"))
|
|
|
|
def pauseMusic(self):
|
|
logging.info("Pausing music")
|
|
subprocess.call(("playerctl", "--all-players", "pause"))
|
|
|
|
def isPlaying(self):
|
|
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
|
|
|
|
def handlePlayPause(self, data):
|
|
primary_status = data[0]
|
|
secondary_status = data[1]
|
|
|
|
logging.debug(f"Handling play/pause with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
|
|
|
|
def delayed_action(s):
|
|
if not self.stop_thread_event.is_set():
|
|
if self.wasMusicPlayingInSingle:
|
|
self.playMusic()
|
|
self.wasMusicPlayingInBoth = False
|
|
elif self.wasMusicPlayingInBoth or s:
|
|
self.wasMusicPlayingInBoth = True
|
|
self.wasMusicPlayingInSingle = False
|
|
|
|
if primary_status and secondary_status:
|
|
if self.earStatus != "Both out":
|
|
s = self.isPlaying()
|
|
if s:
|
|
self.pauseMusic()
|
|
set_card_profile(self.mac_address, "off")
|
|
logging.info("Setting profile to off")
|
|
if self.earStatus == "Only one in":
|
|
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
|
|
self.wasMusicPlayingInSingle = True
|
|
self.wasMusicPlayingInBoth = True
|
|
self.stop_thread_event.set()
|
|
else:
|
|
if s:
|
|
self.wasMusicPlayingInSingle = True
|
|
else:
|
|
self.wasMusicPlayingInSingle = False
|
|
elif self.earStatus == "Both in":
|
|
s = self.isPlaying()
|
|
if s:
|
|
self.wasMusicPlayingInBoth = True
|
|
self.wasMusicPlayingInSingle = False
|
|
else:
|
|
self.wasMusicPlayingInSingle = False
|
|
self.earStatus = "Both out"
|
|
return "Both out"
|
|
elif not primary_status and not secondary_status:
|
|
if self.earStatus != "Both in":
|
|
if self.earStatus == "Both out":
|
|
set_card_profile(self.mac_address, "a2dp-sink")
|
|
logging.info("Setting profile to a2dp-sink")
|
|
elif self.earStatus == "Only one in":
|
|
self.stop_thread_event.set()
|
|
s = self.isPlaying()
|
|
if s:
|
|
self.wasMusicPlayingInBoth = True
|
|
if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth:
|
|
self.playMusic()
|
|
self.wasMusicPlayingInBoth = True
|
|
self.wasMusicPlayingInSingle = False
|
|
self.earStatus = "Both in"
|
|
return "Both in"
|
|
elif (primary_status and not secondary_status) or (not primary_status and secondary_status):
|
|
if self.earStatus != "Only one in":
|
|
self.stop_thread_event.clear()
|
|
s = self.isPlaying()
|
|
if s:
|
|
self.pauseMusic()
|
|
delayed_thread = threading.Timer(0.3, delayed_action, args=[s])
|
|
delayed_thread.start()
|
|
self.firstEarOutTime = time.time()
|
|
if self.earStatus == "Both out":
|
|
set_card_profile(self.mac_address, "a2dp-sink")
|
|
logging.info("Setting profile to a2dp-sink")
|
|
self.earStatus = "Only one in"
|
|
return "Only one in"
|
|
|
|
def get_current_volume():
|
|
result = subprocess.run(["pactl", "get-sink-volume", "@DEFAULT_SINK@"], capture_output=True, text=True)
|
|
volume_line = result.stdout.splitlines()[0]
|
|
volume_percent = int(volume_line.split()[4].strip('%'))
|
|
return volume_percent
|
|
|
|
def set_volume(percent):
|
|
subprocess.run(["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{percent}%"])
|
|
|
|
initial_volume = get_current_volume()
|
|
|
|
def handle_conversational_awareness(status):
|
|
if status < 1 or status > 9:
|
|
logging.error(f"Invalid status: {status}")
|
|
pass
|
|
|
|
global initial_volume
|
|
|
|
if status == 1 or status == 2:
|
|
globals()["initial_volume"] = get_current_volume()
|
|
new_volume = max(0, min(int(initial_volume * 0.1), 100))
|
|
elif status == 3:
|
|
new_volume = max(0, min(int(initial_volume * 0.4), 100))
|
|
elif status == 6:
|
|
new_volume = max(0, min(int(initial_volume * 0.5), 100))
|
|
elif status >= 8:
|
|
new_volume = initial_volume
|
|
try:
|
|
set_volume(new_volume)
|
|
except Exception as e:
|
|
logging.error(f"Error setting volume: {e}")
|
|
logging.getLogger("Conversational Awareness").info(f"Volume set to {new_volume}% based on conversational awareness status: {status}")
|
|
|
|
if status == 9:
|
|
logging.getLogger("Conversational Awareness").info("Conversation ended. Restored volume to original level.")
|
|
|
|
class BatteryStatusUpdater(QObject):
|
|
battery_status_updated = pyqtSignal(str)
|
|
anc_mode_updated = pyqtSignal(int)
|
|
|
|
def __init__(self, connection, mac_address):
|
|
super().__init__()
|
|
self.connection = connection
|
|
self.media_controller = MediaController(mac_address)
|
|
|
|
def notification_handler(self, notification_type: int, data: bytes):
|
|
global battery_status, anc_mode
|
|
logging.debug(f"Received data: {' '.join(f'{byte:02X}' for byte in data)}")
|
|
if notification_type == Notifications.BATTERY_UPDATED:
|
|
battery = self.connection.notificationListener.BatteryNotification.getBattery()
|
|
with battery_status_lock:
|
|
battery_status = {
|
|
"LEFT": {"status": battery[0].get_status(), "level": battery[0].get_level()},
|
|
"RIGHT": {"status": battery[1].get_status(), "level": battery[1].get_level()},
|
|
"CASE": {"status": battery[2].get_status(), "level": battery[2].get_level()}
|
|
}
|
|
status_str = get_battery_status()
|
|
self.battery_status_updated.emit(status_str)
|
|
logging.debug(f"Updated battery status: {battery_status}")
|
|
elif notification_type == Notifications.EAR_DETECTION_UPDATED:
|
|
earDetection = self.connection.notificationListener.EarDetectionNotification.getEarDetection()
|
|
self.media_controller.handlePlayPause(earDetection)
|
|
logging.debug(f"Received ear detection status: {earDetection}")
|
|
elif notification_type == Notifications.ANC_UPDATED:
|
|
anc_mode = self.connection.notificationListener.ANCNotification.status
|
|
self.anc_mode_updated.emit(anc_mode)
|
|
logging.debug(f"Received ANC status: {anc_mode}")
|
|
elif notification_type == Notifications.CA_UPDATED:
|
|
ca_status = self.connection.notificationListener.ConversationalAwarenessNotification.status
|
|
handle_conversational_awareness(ca_status)
|
|
logging.debug(f"Received CA status: {ca_status}")
|
|
|
|
def get_battery_status():
|
|
global battery_status
|
|
with battery_status_lock:
|
|
left = battery_status["LEFT"]
|
|
right = battery_status["RIGHT"]
|
|
case = battery_status["CASE"]
|
|
left_status = (left['status'] or 'Unknown').title().replace('_', ' ')
|
|
right_status = (right['status'] or 'Unknown').title().replace('_', ' ')
|
|
case_status = (case['status'] or 'Unknown').title().replace('_', ' ')
|
|
|
|
status_emoji = {
|
|
"Not Charging": "",
|
|
"Charging": "⚡",
|
|
}
|
|
|
|
left_status_emoji = status_emoji.get(left_status, "")
|
|
right_status_emoji = status_emoji.get(right_status, "")
|
|
case_status_emoji = status_emoji.get(case_status, "")
|
|
|
|
return f"Left: {left['level']}% {left_status_emoji} | Right: {right['level']}% {right_status_emoji} | Case: {case['level']}% {case_status_emoji}"
|
|
|
|
def create_battery_icon():
|
|
global battery_status
|
|
with battery_status_lock:
|
|
left_level = battery_status["LEFT"]["level"]
|
|
right_level = battery_status["RIGHT"]["level"]
|
|
lowest_level = min(left_level, right_level)
|
|
icon_size = 64
|
|
|
|
pixmap = QPixmap(icon_size, icon_size)
|
|
pixmap.fill(Qt.transparent)
|
|
|
|
painter = QPainter(pixmap)
|
|
|
|
is_dark_mode = QApplication.palette().color(QPalette.Window).value() < 128
|
|
text_color = Qt.white if is_dark_mode else Qt.black
|
|
|
|
painter.setPen(text_color)
|
|
painter.setFont(QFont('Arial', 20, QFont.Bold))
|
|
painter.drawText(pixmap.rect(), Qt.AlignCenter, f"{lowest_level}%")
|
|
painter.end()
|
|
|
|
return QIcon(pixmap)
|
|
|
|
def signal_handler(sig, frame):
|
|
logging.info("Exiting...")
|
|
QApplication.quit()
|
|
sys.exit(0)
|
|
|
|
connection=None
|
|
battery_status_updater = None
|
|
|
|
def update_anc_menu(anc_mode, actions):
|
|
for action in actions:
|
|
action.setChecked(False)
|
|
if anc_mode == 1:
|
|
actions[0].setChecked(True)
|
|
elif anc_mode == 2:
|
|
actions[1].setChecked(True)
|
|
elif anc_mode == 3:
|
|
actions[2].setChecked(True)
|
|
elif anc_mode == 4:
|
|
actions[3].setChecked(True)
|
|
|
|
def toggle_conversational_awareness_action(action):
|
|
toggle_conversational_awareness()
|
|
action.setChecked(load_conversational_awareness_state())
|
|
|
|
def listen_for_device_connections():
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
bus = dbus.SystemBus()
|
|
logging.info("Listening for device connections...")
|
|
|
|
def device_connected(interface, changed, invalidated, path):
|
|
# /org/bluez/hci0/dev_mac_address/*
|
|
if 'Connected' in changed and changed['Connected']:
|
|
if path.split("/")[-1] == "":
|
|
return
|
|
addr = path.split("/")[-1].replace("_", ":").replace("dev:", "")
|
|
name = changed.get('Name', 'Unknown')
|
|
logging.info(f"Device connected: {name} ({addr})")
|
|
logging.debug(f"Running command: sdptool browse {addr}")
|
|
services_result = run_sdptool_browse(addr)
|
|
logging.debug(f"Services result: {services_result}")
|
|
if services_result and "UUID 128: 74ec2172-0bad-4d01-8f77-997b2be0722a" in services_result:
|
|
logging.info(f"Found connected AirPods: {name} ({addr})")
|
|
connect_to_airpods(addr)
|
|
|
|
bus.add_signal_receiver(device_connected, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path")
|
|
|
|
def interfaces_added(path, interfaces):
|
|
logging.debug(f"Interfaces added: {path}")
|
|
if 'org.bluez.Device1' in interfaces and interfaces['org.bluez.Device1'].get('Connected', False):
|
|
addr = interfaces['org.bluez.Device1']['Address']
|
|
name = interfaces['org.bluez.Device1']['Name']
|
|
logging.info(f"Device connected: {name} ({addr})")
|
|
if path.endswith("/sep1"):
|
|
services_result = run_sdptool_browse(addr)
|
|
if services_result and "UUID 128: 74ec2172-0bad-4d01-8f77-997b2be0722a" in services_result:
|
|
logging.info(f"Found connected AirPods: {name} ({addr})")
|
|
connect_to_airpods(addr)
|
|
|
|
bus.add_signal_receiver(interfaces_added, dbus_interface="org.freedesktop.DBus.ObjectManager", signal_name="InterfacesAdded")
|
|
|
|
bus.add_signal_receiver(audio_device_changed, dbus_interface="org.PulseAudio.Core1.Device", signal_name="NewPlaybackStream")
|
|
|
|
from gi.repository import GLib
|
|
loop = GLib.MainLoop()
|
|
loop.run()
|
|
|
|
def audio_device_changed(*args, **kwargs):
|
|
logging.info("Audio output device changed, checking for connected AirPods...")
|
|
mac_address = get_connected_airpods()
|
|
if mac_address:
|
|
connect_to_airpods(mac_address)
|
|
|
|
def connect_to_airpods(mac_address):
|
|
logging.info(f"Attempting to connect to AirPods at {mac_address}...")
|
|
globals()["connection"] = Connection(mac_address)
|
|
try:
|
|
connection.connect()
|
|
connection.send(enums.HANDSHAKE)
|
|
globals()["battery_status_updater"] = BatteryStatusUpdater(connection, mac_address)
|
|
connection.initialize_notifications(battery_status_updater.notification_handler)
|
|
|
|
battery_status_updater.battery_status_updated.connect(lambda status: tray_icon.setToolTip(status))
|
|
battery_status_updater.battery_status_updated.connect(lambda: tray_icon.setIcon(create_battery_icon()))
|
|
battery_status_updater.anc_mode_updated.connect(lambda mode: update_anc_menu(mode, anc_actions))
|
|
|
|
save_mac_address(mac_address)
|
|
logging.info("Connected to AirPods successfully.")
|
|
except Exception as e:
|
|
logging.error(f"Failed to connect to AirPods: {e}")
|
|
|
|
def main():
|
|
args = parse_arguments()
|
|
mac_address = args.mac or load_mac_address()
|
|
|
|
logging.debug("Starting standalone tray application...")
|
|
app = QApplication(sys.argv)
|
|
|
|
globals()["tray_icon"] = QSystemTrayIcon(create_battery_icon(), app)
|
|
tray_icon.setToolTip(get_battery_status())
|
|
|
|
menu = QMenu()
|
|
|
|
ca_toggle_action = QAction("Toggle Conversational Awareness")
|
|
ca_toggle_action.setCheckable(True)
|
|
ca_toggle_action.setChecked(load_conversational_awareness_state())
|
|
ca_toggle_action.triggered.connect(lambda: toggle_conversational_awareness_action(ca_toggle_action))
|
|
menu.addAction(ca_toggle_action)
|
|
|
|
anc_on_action = QAction("Noise Cancellation")
|
|
anc_on_action.setCheckable(True)
|
|
anc_on_action.triggered.connect(lambda: control_anc("on"))
|
|
menu.addAction(anc_on_action)
|
|
|
|
anc_off_action = QAction("Off")
|
|
anc_off_action.setCheckable(True)
|
|
anc_off_action.triggered.connect(lambda: control_anc("off"))
|
|
menu.addAction(anc_off_action)
|
|
|
|
anc_transparency_action = QAction("Transparency")
|
|
anc_transparency_action.setCheckable(True)
|
|
anc_transparency_action.triggered.connect(lambda: control_anc("transparency"))
|
|
menu.addAction(anc_transparency_action)
|
|
|
|
anc_adaptive_action = QAction("Adaptive")
|
|
anc_adaptive_action.setCheckable(True)
|
|
anc_adaptive_action.triggered.connect(lambda: control_anc("adaptive"))
|
|
menu.addAction(anc_adaptive_action)
|
|
|
|
globals()["anc_actions"] = [anc_off_action, anc_on_action, anc_transparency_action, anc_adaptive_action]
|
|
|
|
quit_action = QAction("Quit")
|
|
quit_action.triggered.connect(lambda: signal_handler(signal.SIGINT, None))
|
|
menu.addAction(quit_action)
|
|
|
|
tray_icon.setContextMenu(menu)
|
|
tray_icon.show()
|
|
|
|
logging.info("Standalone tray application started.")
|
|
|
|
if mac_address:
|
|
connect_to_airpods(mac_address)
|
|
else:
|
|
mac_address = get_connected_airpods()
|
|
if mac_address:
|
|
connect_to_airpods(mac_address)
|
|
else:
|
|
listen_for_device_connections()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
try:
|
|
sys.exit(app.exec_())
|
|
except Exception as e:
|
|
logging.error(f"An error occurred: {e}")
|
|
sys.exit(1)
|
|
|
|
def control_anc(action):
|
|
command = enums.NOISE_CANCELLATION_OFF
|
|
if action == "on":
|
|
command = enums.NOISE_CANCELLATION_ON
|
|
elif action == "off":
|
|
command = enums.NOISE_CANCELLATION_OFF
|
|
elif action == "transparency":
|
|
command = enums.NOISE_CANCELLATION_TRANSPARENCY
|
|
elif action == "adaptive":
|
|
command = enums.NOISE_CANCELLATION_ADAPTIVE
|
|
connection.send(command)
|
|
logging.info(f"ANC action: {action}")
|
|
|
|
main() |