for @maxwofford! :)

This commit is contained in:
Kavish Devar
2025-01-31 01:59:21 +05:30
parent c84195aec8
commit de53e840ed
3 changed files with 750 additions and 275 deletions

View File

@@ -0,0 +1,339 @@
# AirPods like Normal (ALN) - Bringing Apple-only features to Linux and Android for seamless AirPods functionality!
#
# Copyright (C) 2024 Kavish Devar
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import threading
import bluetooth
import subprocess
import time
import threading
import os
# Bluetooth MAC address of AirPods
AIRPODS_MAC = "28:2D:7F:C2:05:5B"
class initL2CAP():
lastEarStatus = ""
earStatus = ""
wasMusicPlayingInBoth = False
wasMusicPlayingInSingle = False
def pauseMusic(self):
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
def playMusic(self):
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
def getMusicStatus(self):
return subprocess.getoutput("playerctl status --ignore-player OnePlus_7").strip()
# Change to MAC address of your AirPods
connected = False
cmd_off = b"\x04\x00\x04\x00\x09\x00\x0d\x01\x00\x00\x00"
cmd_on = b"\x04\x00\x04\x00\x09\x00\x0d\x02\x00\x00\x00"
cmd_transparency = b"\x04\x00\x04\x00\x09\x00\x0d\x03\x00\x00\x00"
cmd_adaptive = b"\x04\x00\x04\x00\x09\x00\x0d\x04\x00\x00\x00"
cmd_ca_off = b"\x04\x00\x04\x00\x09\x00\x28\x02\x00\x00\x00"
cmd_ca_on = b"\x04\x00\x04\x00\x09\x00\x28\x01\x00\x00\x00"
def start(self):
cmd_handshake = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
# cmd_smth0 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xfe\xff"
cmd_smth1 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xff\xff"
address = "28:2D:7F:C2:05:5B"
aap_service = "74EC2172-0BAD-4D01-8F77-997B2BE0722A"
aap_port = 0x1001
services = bluetooth.find_service(address=address)
service = [s for s in services if s["service-classes"] == [aap_service]]
if not service:
print("Device does not have AAP service")
exit()
self.sock = bluetooth.BluetoothSocket(bluetooth.L2CAP)
sock = self.sock
sock.connect((address, aap_port))
print("Connected to AirPods")
self.connected = True
print("Sending handshake...")
print(sock.type)
sock.send(cmd_handshake)
# sock.send(cmd_smth0)
sock.send(cmd_smth1)
threading.Thread(target=self.listen).start()
# battery info: 04 00 04 00 04 00 03 02 01 64 01 01 04 01 64 01 01 08 01 34 02 01
def parse_battery_status(self, data):
if len(data) != 22:
return
self.left_bud_level = data[9]
self.left_bud_status = data[10]
self.right_bud_level = data[14]
self.right_bud_status = data[15]
self.case_level = data[19]
self.case_status = data[20]
# Interpret the status
def interpret_status(status):
if status == 1:
return "Charging"
elif status == 2:
return "Not charging"
elif status == 4:
return "Disconnected"
else:
return "Unknown"
# Print the results
print(f"Left Bud: {self.left_bud_level}% - {interpret_status(self.left_bud_status)}")
print(f"Right Bud: {self.right_bud_level}% - {interpret_status(self.right_bud_status)}")
print(f"Case: {self.case_level}% - {interpret_status(self.case_status)}")
def parse_anc_status(self, data):
# 04 00 04 00 09 00 0d 03 00 00 00
if len(data) != 11 and data.hex().startswith("040004000600"):
return
if data[7] == 1:
return "Off"
elif data[7] == 2:
return "On"
elif data[7] == 3:
return "Transparency"
elif data[7] == 4:
return "Adaptive"
firstEarOutTime = 0
stop_thread_event = threading.Event()
def parse_inear_status(self, data):
if len(data) != 8:
return
second_status = data[6]
first_status = data[7]
def delayed_action(self, s):
print(s)
if not self.stop_thread_event.is_set():
print("Delayed action")
if self.wasMusicPlayingInSingle:
self.playMusic()
self.wasMusicPlayingInBoth = False
elif self.wasMusicPlayingInBoth or s == "Playing":
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
if first_status and second_status:
if self.earStatus != "Both out":
s = self.getMusicStatus()
self.pauseMusic()
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
if self.earStatus == "Only one in":
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
print("Only one in called with both out")
self.wasMusicPlayingInSingle = True
self.wasMusicPlayingInBoth = True
self.stop_thread_event.set()
else:
if s == "Playing":
self.wasMusicPlayingInSingle = True
else:
self.wasMusicPlayingInSingle = False
# wasMusicPlayingInSingle = True
elif self.earStatus == "Both in":
# should be unreachable
s = self.getMusicStatus()
if s == "Playing":
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
else:
self.wasMusicPlayingInSingle = False
self.earStatus = "Both out"
return "Both out"
elif not first_status and not second_status:
if self.earStatus != "Both in":
if self.earStatus == "Both out":
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink")
elif self.earStatus == "Only one in":
self.stop_thread_event.set()
s = self.getMusicStatus()
if s == "Playing":
self.wasMusicPlayingInBoth = True
if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth:
self.playMusic()
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
self.earStatus = "Both in"
return "Both in"
elif (first_status and not second_status) or (not first_status and second_status):
if self.earStatus != "Only one in":
self.stop_thread_event.clear()
s = self.getMusicStatus()
self.pauseMusic()
delayed_thread = threading.Timer(0.3, delayed_action, args=[self, s])
delayed_thread.start()
self.firstEarOutTime = time.time()
if self.earStatus == "Both out":
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink")
self.earStatus = "Only one in"
return "Only one in"
def listen(self):
while True:
res = self.sock.recv(1024)
print(f"Response: {res.hex()}")
self.battery_status = self.parse_battery_status(res)
self.inear_status = self.parse_inear_status(res)
# anc_status = parse_anc_status(res)
# if anc_status:
# print("ANC: ", anc_status)
if self.battery_status:
print(self.battery_status)
if self.inear_status:
print(self.inear_status)
# while True:
# print("Select command:")
# print("1. Turn off")
# print("2. Turn on")
# print("3. Toggle transparency")
# print("4. Toggle Adaptive")
# print("5. Conversational Awareness On")
# print("6. Conversational Awareness Off")
# print("0. Exit")
# cmd = input("Enter command: ")
# if cmd == "0":
# break
# elif cmd == "1":
# self.sock.send(cmd_off)
# elif cmd == "2":
# self.sock.send(cmd_on)
# elif cmd == "3":
# self.sock.send(cmd_transparency)
# elif cmd == "4":
# self.sock.send(cmd_adaptive)
# elif cmd == "5":
# self.sock.send(cmd_ca_on)
# elif cmd == "6":
# self.sock.send(cmd_ca_off)
def stop(self):
self.connected = False
self.sock.close()
def is_bluetooth_connected():
try:
result = subprocess.run(["bluetoothctl", "info", AIRPODS_MAC], capture_output=True, text=True)
return "Connected: yes" in result.stdout
except Exception as e:
print(f"Error checking Bluetooth connection status: {e}")
return False
# Connect to Bluetooth device using bluetoothctl if not already connected
def connect_bluetooth_device():
if is_bluetooth_connected():
print("AirPods are already connected.")
return
print("Checking if AirPods are available...")
result = subprocess.run(["bluetoothctl", "devices"], capture_output=True, text=True)
if AIRPODS_MAC in result.stdout:
print("AirPods are available. Connecting...")
subprocess.run(["bluetoothctl", "connect", AIRPODS_MAC])
else:
print("AirPods are not available.")
time.sleep(2) # Wait for the connection to establish
# Switch audio output to AirPods (PulseAudio)
try:
result = subprocess.run(["pactl", "list", "short", "sinks"], capture_output=True, text=True)
sink_name = next((line.split()[1] for line in result.stdout.splitlines() if "bluez_sink" in line), None)
if sink_name:
subprocess.run(["pactl", "set-default-sink", sink_name])
print(f"Switched audio to AirPods: {sink_name}")
else:
print("Failed to switch audio to AirPods.")
except Exception as e:
print(f"Error switching audio: {e}")
# Disconnect from Bluetooth device if connected
def disconnect_bluetooth_device():
if not is_bluetooth_connected():
print("AirPods are already disconnected.")
return
print("Disconnecting from AirPods...")
subprocess.run(["bluetoothctl", "disconnect", AIRPODS_MAC])
l2cap = initL2CAP()
# Function to listen to `playerctl --follow` and react to status changes
def mediaListener():
try:
# Run playerctl --follow in a subprocess
process = subprocess.Popen(
["playerctl", "--follow", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# Continuously read from the subprocess stdout
for line in process.stdout:
if line: # Make sure the line is not empty
line = line.strip() # Remove any extraneous whitespace
print(f"Received event from playerctl: {line}")
if "Playing" in line:
print("Media started playing")
connect_bluetooth_device()
if not l2cap.connected:
l2cap.start()
elif "Paused" in line or "Stopped" in line:
print("Media paused or stopped")
# disconnect_bluetooth_device()
# Check for any errors in stderr
stderr = process.stderr.read()
if stderr:
print(f"Error: {stderr}")
except Exception as e:
print(f"An error occurred in mediaListener: {e}")
mediaListener()
# thread = threading.Thread(target=mediaListener)
# thread.start()
# thread.stop()

View File

@@ -1,188 +1,169 @@
# AirPods like Normal (ALN) - Bringing Apple-only features to Linux and Android for seamless AirPods functionality!
#
# Copyright (C) 2024 Kavish Devar
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import sys
import threading
import bluetooth
from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMenu, QAction, QMessageBox
from PyQt5.QtGui import QIcon, QPixmap, QPainter
from PyQt5.QtCore import QObject, pyqtSignal, Qt
from PyQt5.QtGui import QFont, QPalette
import logging
import signal
import subprocess
import time
import threading
import os
from aln import Connection, enums
from aln.Notifications import Notifications
import argparse
import dbus
import dbus.mainloop.glib
# Bluetooth MAC address of AirPods
AIRPODS_MAC = "28:2D:7F:C2:05:5B"
enums = enums.enums
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger().addHandler(logging.StreamHandler())
class initL2CAP():
lastEarStatus = ""
earStatus = ""
wasMusicPlayingInBoth = False
wasMusicPlayingInSingle = False
tray_icon = None
anc_actions = None
def pauseMusic(self):
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
battery_status = {
"LEFT": {"status": "Unknown", "level": 0},
"RIGHT": {"status": "Unknown", "level": 0},
"CASE": {"status": "Unknown", "level": 0}
}
anc_mode = 0
battery_status_lock = threading.Lock()
CONVERSATIONAL_AWARENESS_FILE = os.path.expanduser("~/.airpods_conversational_awareness")
CONFIG_FILE = os.path.expanduser("~/.config/aln")
def load_conversational_awareness_state():
if os.path.exists(CONVERSATIONAL_AWARENESS_FILE):
with open(CONVERSATIONAL_AWARENESS_FILE, "r") as file:
return file.read().strip() == "enabled"
return False
def save_conversational_awareness_state(enabled):
with open(CONVERSATIONAL_AWARENESS_FILE, "w") as file:
file.write("enabled" if enabled else "disabled")
def toggle_conversational_awareness():
current_state = load_conversational_awareness_state()
new_state = not current_state
save_conversational_awareness_state(new_state)
connection.send(enums.SET_CONVERSATION_AWARENESS_ON if new_state else enums.SET_CONVERSATION_AWARENESS_OFF)
logging.info(f"Conversational Awareness {'enabled' if new_state else 'disabled'}")
def load_mac_address():
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r") as file:
return file.read().strip()
return None
def save_mac_address(mac_address):
with open(CONFIG_FILE, "w") as file:
file.write(mac_address)
def parse_arguments():
parser = argparse.ArgumentParser(description="Standalone tray application for AirPods.")
parser.add_argument("--mac", help="MAC address of the AirPods")
return parser.parse_args()
def get_connected_airpods():
logging.info("Checking for connected AirPods...")
result = subprocess.run("bluetoothctl devices | cut -f2 -d' ' | while read uuid; do bluetoothctl info $uuid; done | grep -e 'Device\\|Connected\\|Name'", shell=True, capture_output=True, text=True)
lines = result.stdout.splitlines()
for i in range(0, len(lines), 3):
if "Connected: yes" in lines[i + 2]:
addr = lines[i].split()[1]
name = lines[i + 1].split(": ")[1]
logging.debug(f"Checking services for connected device: {name} ({addr})")
services_result = run_sdptool_browse(addr)
if services_result and "UUID 128: 74ec2172-0bad-4d01-8f77-997b2be0722a" in services_result:
logging.info(f"Found connected AirPods: {name} ({addr})")
return addr
logging.error("No connected AirPods found.")
return None
def run_sdptool_browse(addr, retries=5):
for attempt in range(retries):
services_result = subprocess.run(f"sdptool browse {addr}", shell=True, capture_output=True, text=True)
if "Failed to connect to SDP server" not in services_result.stderr:
return services_result.stdout
logging.warning(f"Failed to connect to SDP server on {addr}, attempt {attempt + 1} of {retries}")
time.sleep(1)
logging.error(f"Failed to connect to SDP server on {addr} after {retries} attempts")
return None
def set_card_profile(mac_address, profile):
os.system(f"pactl set-card-profile bluez_card.{mac_address.replace(':', '_')} {profile}")
class MediaController:
def __init__(self, mac_address):
self.mac_address = mac_address
self.earStatus = "Both out"
self.wasMusicPlayingInSingle = False
self.wasMusicPlayingInBoth = False
self.firstEarOutTime = 0
self.stop_thread_event = threading.Event()
def playMusic(self):
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
logging.info("Playing music")
subprocess.call(("playerctl", "play"))
def getMusicStatus(self):
return subprocess.getoutput("playerctl status --ignore-player OnePlus_7").strip()
def pauseMusic(self):
logging.info("Pausing music")
subprocess.call(("playerctl", "--all-players", "pause"))
# Change to MAC address of your AirPods
def isPlaying(self):
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
connected = False
def handlePlayPause(self, data):
primary_status = data[0]
secondary_status = data[1]
cmd_off = b"\x04\x00\x04\x00\x09\x00\x0d\x01\x00\x00\x00"
cmd_on = b"\x04\x00\x04\x00\x09\x00\x0d\x02\x00\x00\x00"
cmd_transparency = b"\x04\x00\x04\x00\x09\x00\x0d\x03\x00\x00\x00"
cmd_adaptive = b"\x04\x00\x04\x00\x09\x00\x0d\x04\x00\x00\x00"
cmd_ca_off = b"\x04\x00\x04\x00\x09\x00\x28\x02\x00\x00\x00"
cmd_ca_on = b"\x04\x00\x04\x00\x09\x00\x28\x01\x00\x00\x00"
logging.debug(f"Handling play/pause with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
def start(self):
cmd_handshake = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
# cmd_smth0 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xfe\xff"
cmd_smth1 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xff\xff"
address = "28:2D:7F:C2:05:5B"
aap_service = "74EC2172-0BAD-4D01-8F77-997B2BE0722A"
aap_port = 0x1001
services = bluetooth.find_service(address=address)
service = [s for s in services if s["service-classes"] == [aap_service]]
if not service:
print("Device does not have AAP service")
exit()
self.sock = bluetooth.BluetoothSocket(bluetooth.L2CAP)
sock = self.sock
sock.connect((address, aap_port))
print("Connected to AirPods")
self.connected = True
print("Sending handshake...")
print(sock.type)
sock.send(cmd_handshake)
# sock.send(cmd_smth0)
sock.send(cmd_smth1)
threading.Thread(target=self.listen).start()
# battery info: 04 00 04 00 04 00 03 02 01 64 01 01 04 01 64 01 01 08 01 34 02 01
def parse_battery_status(self, data):
if len(data) != 22:
return
self.left_bud_level = data[9]
self.left_bud_status = data[10]
self.right_bud_level = data[14]
self.right_bud_status = data[15]
self.case_level = data[19]
self.case_status = data[20]
# Interpret the status
def interpret_status(status):
if status == 1:
return "Charging"
elif status == 2:
return "Not charging"
elif status == 4:
return "Disconnected"
else:
return "Unknown"
# Print the results
print(f"Left Bud: {self.left_bud_level}% - {interpret_status(self.left_bud_status)}")
print(f"Right Bud: {self.right_bud_level}% - {interpret_status(self.right_bud_status)}")
print(f"Case: {self.case_level}% - {interpret_status(self.case_status)}")
def parse_anc_status(self, data):
# 04 00 04 00 09 00 0d 03 00 00 00
if len(data) != 11 and data.hex().startswith("040004000600"):
return
if data[7] == 1:
return "Off"
elif data[7] == 2:
return "On"
elif data[7] == 3:
return "Transparency"
elif data[7] == 4:
return "Adaptive"
firstEarOutTime = 0
stop_thread_event = threading.Event()
def parse_inear_status(self, data):
if len(data) != 8:
return
second_status = data[6]
first_status = data[7]
def delayed_action(self, s):
print(s)
def delayed_action(s):
if not self.stop_thread_event.is_set():
print("Delayed action")
if self.wasMusicPlayingInSingle:
self.playMusic()
self.wasMusicPlayingInBoth = False
elif self.wasMusicPlayingInBoth or s == "Playing":
elif self.wasMusicPlayingInBoth or s:
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
if first_status and second_status:
self.wasMusicPlayingInSingle = False
if primary_status and secondary_status:
if self.earStatus != "Both out":
s = self.getMusicStatus()
self.pauseMusic()
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
s = self.isPlaying()
if s:
self.pauseMusic()
set_card_profile(self.mac_address, "off")
logging.info("Setting profile to off")
if self.earStatus == "Only one in":
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
print("Only one in called with both out")
self.wasMusicPlayingInSingle = True
self.wasMusicPlayingInBoth = True
self.stop_thread_event.set()
else:
if s == "Playing":
self.wasMusicPlayingInSingle = True
if s:
self.wasMusicPlayingInSingle = True
else:
self.wasMusicPlayingInSingle = False
# wasMusicPlayingInSingle = True
elif self.earStatus == "Both in":
# should be unreachable
s = self.getMusicStatus()
if s == "Playing":
s = self.isPlaying()
if s:
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
else:
self.wasMusicPlayingInSingle = False
self.earStatus = "Both out"
return "Both out"
elif not first_status and not second_status:
elif not primary_status and not secondary_status:
if self.earStatus != "Both in":
if self.earStatus == "Both out":
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink")
set_card_profile(self.mac_address, "a2dp-sink")
logging.info("Setting profile to a2dp-sink")
elif self.earStatus == "Only one in":
self.stop_thread_event.set()
s = self.getMusicStatus()
if s == "Playing":
s = self.isPlaying()
if s:
self.wasMusicPlayingInBoth = True
if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth:
self.playMusic()
@@ -190,150 +171,306 @@ class initL2CAP():
self.wasMusicPlayingInSingle = False
self.earStatus = "Both in"
return "Both in"
elif (first_status and not second_status) or (not first_status and second_status):
elif (primary_status and not secondary_status) or (not primary_status and secondary_status):
if self.earStatus != "Only one in":
self.stop_thread_event.clear()
s = self.getMusicStatus()
self.pauseMusic()
delayed_thread = threading.Timer(0.3, delayed_action, args=[self, s])
s = self.isPlaying()
if s:
self.pauseMusic()
delayed_thread = threading.Timer(0.3, delayed_action, args=[s])
delayed_thread.start()
self.firstEarOutTime = time.time()
if self.earStatus == "Both out":
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink")
set_card_profile(self.mac_address, "a2dp-sink")
logging.info("Setting profile to a2dp-sink")
self.earStatus = "Only one in"
return "Only one in"
def listen(self):
while True:
res = self.sock.recv(1024)
print(f"Response: {res.hex()}")
self.battery_status = self.parse_battery_status(res)
self.inear_status = self.parse_inear_status(res)
# anc_status = parse_anc_status(res)
# if anc_status:
# print("ANC: ", anc_status)
if self.battery_status:
print(self.battery_status)
if self.inear_status:
print(self.inear_status)
def get_current_volume():
result = subprocess.run(["pactl", "get-sink-volume", "@DEFAULT_SINK@"], capture_output=True, text=True)
volume_line = result.stdout.splitlines()[0]
volume_percent = int(volume_line.split()[4].strip('%'))
return volume_percent
def set_volume(percent):
subprocess.run(["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{percent}%"])
initial_volume = get_current_volume()
def handle_conversational_awareness(status):
if status < 1 or status > 9:
logging.error(f"Invalid status: {status}")
pass
# while True:
# print("Select command:")
# print("1. Turn off")
# print("2. Turn on")
# print("3. Toggle transparency")
# print("4. Toggle Adaptive")
# print("5. Conversational Awareness On")
# print("6. Conversational Awareness Off")
# print("0. Exit")
# cmd = input("Enter command: ")
# if cmd == "0":
# break
# elif cmd == "1":
# self.sock.send(cmd_off)
# elif cmd == "2":
# self.sock.send(cmd_on)
# elif cmd == "3":
# self.sock.send(cmd_transparency)
# elif cmd == "4":
# self.sock.send(cmd_adaptive)
# elif cmd == "5":
# self.sock.send(cmd_ca_on)
# elif cmd == "6":
# self.sock.send(cmd_ca_off)
def stop(self):
self.connected = False
self.sock.close()
def is_bluetooth_connected():
global initial_volume
if status == 1 or status == 2:
globals()["initial_volume"] = get_current_volume()
new_volume = max(0, min(int(initial_volume * 0.1), 100))
elif status == 3:
new_volume = max(0, min(int(initial_volume * 0.4), 100))
elif status == 6:
new_volume = max(0, min(int(initial_volume * 0.5), 100))
elif status >= 8:
new_volume = initial_volume
try:
result = subprocess.run(["bluetoothctl", "info", AIRPODS_MAC], capture_output=True, text=True)
return "Connected: yes" in result.stdout
set_volume(new_volume)
except Exception as e:
print(f"Error checking Bluetooth connection status: {e}")
return False
logging.error(f"Error setting volume: {e}")
logging.getLogger("Conversational Awareness").info(f"Volume set to {new_volume}% based on conversational awareness status: {status}")
# Connect to Bluetooth device using bluetoothctl if not already connected
def connect_bluetooth_device():
if is_bluetooth_connected():
print("AirPods are already connected.")
return
if status == 9:
logging.getLogger("Conversational Awareness").info("Conversation ended. Restored volume to original level.")
print("Checking if AirPods are available...")
result = subprocess.run(["bluetoothctl", "devices"], capture_output=True, text=True)
if AIRPODS_MAC in result.stdout:
print("AirPods are available. Connecting...")
subprocess.run(["bluetoothctl", "connect", AIRPODS_MAC])
class BatteryStatusUpdater(QObject):
battery_status_updated = pyqtSignal(str)
anc_mode_updated = pyqtSignal(int)
def __init__(self, connection, mac_address):
super().__init__()
self.connection = connection
self.media_controller = MediaController(mac_address)
def notification_handler(self, notification_type: int, data: bytes):
global battery_status, anc_mode
logging.debug(f"Received data: {' '.join(f'{byte:02X}' for byte in data)}")
if notification_type == Notifications.BATTERY_UPDATED:
battery = self.connection.notificationListener.BatteryNotification.getBattery()
with battery_status_lock:
battery_status = {
"LEFT": {"status": battery[0].get_status(), "level": battery[0].get_level()},
"RIGHT": {"status": battery[1].get_status(), "level": battery[1].get_level()},
"CASE": {"status": battery[2].get_status(), "level": battery[2].get_level()}
}
status_str = get_battery_status()
self.battery_status_updated.emit(status_str)
logging.debug(f"Updated battery status: {battery_status}")
elif notification_type == Notifications.EAR_DETECTION_UPDATED:
earDetection = self.connection.notificationListener.EarDetectionNotification.getEarDetection()
self.media_controller.handlePlayPause(earDetection)
logging.debug(f"Received ear detection status: {earDetection}")
elif notification_type == Notifications.ANC_UPDATED:
anc_mode = self.connection.notificationListener.ANCNotification.status
self.anc_mode_updated.emit(anc_mode)
logging.debug(f"Received ANC status: {anc_mode}")
elif notification_type == Notifications.CA_UPDATED:
ca_status = self.connection.notificationListener.ConversationalAwarenessNotification.status
handle_conversational_awareness(ca_status)
logging.debug(f"Received CA status: {ca_status}")
def get_battery_status():
global battery_status
with battery_status_lock:
left = battery_status["LEFT"]
right = battery_status["RIGHT"]
case = battery_status["CASE"]
left_status = (left['status'] or 'Unknown').title().replace('_', ' ')
right_status = (right['status'] or 'Unknown').title().replace('_', ' ')
case_status = (case['status'] or 'Unknown').title().replace('_', ' ')
status_emoji = {
"Not Charging": "",
"Charging": "",
}
left_status_emoji = status_emoji.get(left_status, "")
right_status_emoji = status_emoji.get(right_status, "")
case_status_emoji = status_emoji.get(case_status, "")
return f"Left: {left['level']}% {left_status_emoji} | Right: {right['level']}% {right_status_emoji} | Case: {case['level']}% {case_status_emoji}"
def create_battery_icon():
global battery_status
with battery_status_lock:
left_level = battery_status["LEFT"]["level"]
right_level = battery_status["RIGHT"]["level"]
lowest_level = min(left_level, right_level)
icon_size = 64
pixmap = QPixmap(icon_size, icon_size)
pixmap.fill(Qt.transparent)
painter = QPainter(pixmap)
is_dark_mode = QApplication.palette().color(QPalette.Window).value() < 128
text_color = Qt.white if is_dark_mode else Qt.black
painter.setPen(text_color)
painter.setFont(QFont('Arial', 20, QFont.Bold))
painter.drawText(pixmap.rect(), Qt.AlignCenter, f"{lowest_level}%")
painter.end()
return QIcon(pixmap)
def signal_handler(sig, frame):
logging.info("Exiting...")
QApplication.quit()
sys.exit(0)
connection=None
battery_status_updater = None
def update_anc_menu(anc_mode, actions):
for action in actions:
action.setChecked(False)
if anc_mode == 1:
actions[0].setChecked(True)
elif anc_mode == 2:
actions[1].setChecked(True)
elif anc_mode == 3:
actions[2].setChecked(True)
elif anc_mode == 4:
actions[3].setChecked(True)
def toggle_conversational_awareness_action(action):
toggle_conversational_awareness()
action.setChecked(load_conversational_awareness_state())
def listen_for_device_connections():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
logging.info("Listening for device connections...")
def device_connected(interface, changed, invalidated, path):
# /org/bluez/hci0/dev_mac_address/*
# repl _ with : in mac_address and check
if 'Connected' in changed and changed['Connected']:
if path.split("/")[-1] == "":
return
addr = path.split("/")[-1].replace("_", ":").replace("dev:", "")
name = changed.get('Name', 'Unknown')
logging.info(f"Device connected: {name} ({addr})")
logging.debug(f"Running command: sdptool browse {addr}")
services_result = run_sdptool_browse(addr)
logging.debug(f"Services result: {services_result}")
if services_result and "UUID 128: 74ec2172-0bad-4d01-8f77-997b2be0722a" in services_result:
logging.info(f"Found connected AirPods: {name} ({addr})")
connect_to_airpods(addr)
bus.add_signal_receiver(device_connected, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path")
def interfaces_added(path, interfaces):
logging.debug(f"Interfaces added: {path}")
if 'org.bluez.Device1' in interfaces and interfaces['org.bluez.Device1'].get('Connected', False):
addr = interfaces['org.bluez.Device1']['Address']
name = interfaces['org.bluez.Device1']['Name']
logging.info(f"Device connected: {name} ({addr})")
if path.endswith("/sep1"):
services_result = run_sdptool_browse(addr)
if services_result and "UUID 128: 74ec2172-0bad-4d01-8f77-997b2be0722a" in services_result:
logging.info(f"Found connected AirPods: {name} ({addr})")
connect_to_airpods(addr)
bus.add_signal_receiver(interfaces_added, dbus_interface="org.freedesktop.DBus.ObjectManager", signal_name="InterfacesAdded")
bus.add_signal_receiver(audio_device_changed, dbus_interface="org.PulseAudio.Core1.Device", signal_name="NewPlaybackStream")
from gi.repository import GLib
loop = GLib.MainLoop()
loop.run()
def audio_device_changed(*args, **kwargs):
logging.info("Audio output device changed, checking for connected AirPods...")
mac_address = get_connected_airpods()
if mac_address:
connect_to_airpods(mac_address)
def connect_to_airpods(mac_address):
logging.info(f"Attempting to connect to AirPods at {mac_address}...")
globals()["connection"] = Connection(mac_address)
try:
connection.connect()
connection.send(enums.HANDSHAKE)
globals()["battery_status_updater"] = BatteryStatusUpdater(connection, mac_address)
connection.initialize_notifications(battery_status_updater.notification_handler)
battery_status_updater.battery_status_updated.connect(lambda status: tray_icon.setToolTip(status))
battery_status_updater.battery_status_updated.connect(lambda: tray_icon.setIcon(create_battery_icon()))
battery_status_updater.anc_mode_updated.connect(lambda mode: update_anc_menu(mode, anc_actions))
save_mac_address(mac_address)
logging.info("Connected to AirPods successfully.")
except Exception as e:
logging.error(f"Failed to connect to AirPods: {e}")
def main():
args = parse_arguments()
mac_address = args.mac or load_mac_address()
logging.debug("Starting standalone tray application...")
app = QApplication(sys.argv)
globals()["tray_icon"] = QSystemTrayIcon(create_battery_icon(), app)
tray_icon.setToolTip(get_battery_status())
menu = QMenu()
ca_toggle_action = QAction("Toggle Conversational Awareness")
ca_toggle_action.setCheckable(True)
ca_toggle_action.setChecked(load_conversational_awareness_state())
ca_toggle_action.triggered.connect(lambda: toggle_conversational_awareness_action(ca_toggle_action))
menu.addAction(ca_toggle_action)
anc_on_action = QAction("Noise Cancellation")
anc_on_action.setCheckable(True)
anc_on_action.triggered.connect(lambda: control_anc("on"))
menu.addAction(anc_on_action)
anc_off_action = QAction("Off")
anc_off_action.setCheckable(True)
anc_off_action.triggered.connect(lambda: control_anc("off"))
menu.addAction(anc_off_action)
anc_transparency_action = QAction("Transparency")
anc_transparency_action.setCheckable(True)
anc_transparency_action.triggered.connect(lambda: control_anc("transparency"))
menu.addAction(anc_transparency_action)
anc_adaptive_action = QAction("Adaptive")
anc_adaptive_action.setCheckable(True)
anc_adaptive_action.triggered.connect(lambda: control_anc("adaptive"))
menu.addAction(anc_adaptive_action)
globals()["anc_actions"] = [anc_off_action, anc_on_action, anc_transparency_action, anc_adaptive_action]
quit_action = QAction("Quit")
quit_action.triggered.connect(lambda: signal_handler(signal.SIGINT, None))
menu.addAction(quit_action)
tray_icon.setContextMenu(menu)
tray_icon.show()
logging.info("Standalone tray application started.")
if mac_address:
connect_to_airpods(mac_address)
else:
print("AirPods are not available.")
time.sleep(2) # Wait for the connection to establish
# Switch audio output to AirPods (PulseAudio)
try:
result = subprocess.run(["pactl", "list", "short", "sinks"], capture_output=True, text=True)
sink_name = next((line.split()[1] for line in result.stdout.splitlines() if "bluez_sink" in line), None)
if sink_name:
subprocess.run(["pactl", "set-default-sink", sink_name])
print(f"Switched audio to AirPods: {sink_name}")
mac_address = get_connected_airpods()
if mac_address:
connect_to_airpods(mac_address)
else:
print("Failed to switch audio to AirPods.")
except Exception as e:
print(f"Error switching audio: {e}")
listen_for_device_connections()
signal.signal(signal.SIGINT, signal_handler)
# Disconnect from Bluetooth device if connected
def disconnect_bluetooth_device():
if not is_bluetooth_connected():
print("AirPods are already disconnected.")
return
print("Disconnecting from AirPods...")
subprocess.run(["bluetoothctl", "disconnect", AIRPODS_MAC])
l2cap = initL2CAP()
# Function to listen to `playerctl --follow` and react to status changes
def mediaListener():
try:
# Run playerctl --follow in a subprocess
process = subprocess.Popen(
["playerctl", "--follow", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# Continuously read from the subprocess stdout
for line in process.stdout:
if line: # Make sure the line is not empty
line = line.strip() # Remove any extraneous whitespace
print(f"Received event from playerctl: {line}")
if "Playing" in line:
print("Media started playing")
connect_bluetooth_device()
if not l2cap.connected:
l2cap.start()
elif "Paused" in line or "Stopped" in line:
print("Media paused or stopped")
# disconnect_bluetooth_device()
# Check for any errors in stderr
stderr = process.stderr.read()
if stderr:
print(f"Error: {stderr}")
sys.exit(app.exec_())
except Exception as e:
print(f"An error occurred in mediaListener: {e}")
logging.error(f"An error occurred: {e}")
sys.exit(1)
mediaListener()
def control_anc(action):
command = enums.NOISE_CANCELLATION_OFF
if action == "on":
command = enums.NOISE_CANCELLATION_ON
elif action == "off":
command = enums.NOISE_CANCELLATION_OFF
elif action == "transparency":
command = enums.NOISE_CANCELLATION_TRANSPARENCY
elif action == "adaptive":
command = enums.NOISE_CANCELLATION_ADAPTIVE
connection.send(command)
logging.info(f"ANC action: {action}")
# thread = threading.Thread(target=mediaListener)
# thread.start()
# thread.stop()
main()

View File

@@ -215,7 +215,6 @@ def notification_handler(notification_type: int, data: bytes):
hex_data = ' '.join(f'{byte:02x}' for byte in data)
globals()["notif_unknown"] = hex_data
logger.debug(hex_data)
def main():
global running
logging.info("Starting AirPods daemon")
@@ -261,4 +260,4 @@ if __name__ == "__main__":
os.dup2(logfile.fileno(), sys.stdout.fileno())
os.dup2(logfile.fileno(), sys.stderr.fileno())
main()
main()