From b735a461c706a6e373293dcf97417955ef6869d3 Mon Sep 17 00:00:00 2001 From: Kavish Devar Date: Fri, 27 Sep 2024 02:52:59 +0530 Subject: [PATCH] added standalone app for experimenting --- aln/__init__.py | 5 +- examples/connect_and_listen.py => main.py | 11 +- standalone.py | 311 ++++++++++++++++++++++ 3 files changed, 323 insertions(+), 4 deletions(-) rename examples/connect_and_listen.py => main.py (88%) create mode 100644 standalone.py diff --git a/aln/__init__.py b/aln/__init__.py index ad80fb3..cd56f28 100644 --- a/aln/__init__.py +++ b/aln/__init__.py @@ -27,9 +27,7 @@ class Connection: def send(self, data: bytes): try: - logging.debug(f'Sending data to {self.mac_address}: {data.hex()}') self.socket.send(data) - logging.debug(f'Sent data to {self.mac_address}') except bluetooth.btcommon.BluetoothError as e: logging.error(f'Failed to send data to {self.mac_address}: {e}') return False @@ -37,12 +35,13 @@ class Connection: def notification_callback(self, notification_type: int): import logging - logging = logging.getLogger("Notification Callback") if notification_type == Notifications.BATTERY_UPDATED: + logging = logging.getLogger("Battery Status") for i in self.notificationListener.BatteryNotification.getBattery(): logging.debug(f'{i.get_component()} - {i.get_status()}: {i.get_level()}') pass elif notification_type == Notifications.EAR_DETECTION_UPDATED: + logging = logging.getLogger("In-Ear Status") logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}') pass pass diff --git a/examples/connect_and_listen.py b/main.py similarity index 88% rename from examples/connect_and_listen.py rename to main.py index 0193564..8cfb7ec 100644 --- a/examples/connect_and_listen.py +++ b/main.py @@ -4,6 +4,7 @@ import logging import threading import time import sys +import shutil class CustomFormatter(logging.Formatter): def format(self, record): @@ -28,8 +29,16 @@ class ConsoleHandler(logging.StreamHandler): def display_logs(self): sys.stdout.write('\033[H\033[J') # Clear the screen - for line in self.log_lines[-10:]: # Display the last 10 log lines + terminal_height, _ = shutil.get_terminal_size() + log_display_lines = self.log_lines[-(terminal_height - 5):] # Display the last terminal_height - 5 log lines + empty_lines = terminal_height - 5 - len(log_display_lines) + + for _ in range(empty_lines): + sys.stdout.write('\n') # Fill empty space with new lines + + for line in log_display_lines: sys.stdout.write(line + self.terminator) + sys.stdout.write('1: ANC Off\n') sys.stdout.write('2: Transparency\n') sys.stdout.write('3: Adaptive Transparency\n') diff --git a/standalone.py b/standalone.py new file mode 100644 index 0000000..65c2bb1 --- /dev/null +++ b/standalone.py @@ -0,0 +1,311 @@ +import threading +import bluetooth +import subprocess +import time +import threading +import os + +# Bluetooth MAC address of AirPods +AIRPODS_MAC = "28:2D:7F:C2:05:5B" + +class initL2CAP(): + lastEarStatus = "" + earStatus = "" + wasMusicPlayingInBoth = False + wasMusicPlayingInSingle = False + + def pauseMusic(self): + subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7")) + + def playMusic(self): + subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7")) + + def getMusicStatus(self): + return subprocess.check_output(("playerctl", "status", "--ignore-player", "OnePlus_7")).decode("utf-8").strip() + + # Change to MAC address of your AirPods + + connected = False + + cmd_off = b"\x04\x00\x04\x00\x09\x00\x0d\x01\x00\x00\x00" + cmd_on = b"\x04\x00\x04\x00\x09\x00\x0d\x02\x00\x00\x00" + cmd_transparency = b"\x04\x00\x04\x00\x09\x00\x0d\x03\x00\x00\x00" + cmd_adaptive = b"\x04\x00\x04\x00\x09\x00\x0d\x04\x00\x00\x00" + cmd_ca_off = b"\x04\x00\x04\x00\x09\x00\x28\x02\x00\x00\x00" + cmd_ca_on = b"\x04\x00\x04\x00\x09\x00\x28\x01\x00\x00\x00" + + def start(self): + cmd_handshake = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" + # cmd_smth0 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xfe\xff" + cmd_smth1 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xff\xff" + + address = "28:2D:7F:C2:05:5B" + + aap_service = "74EC2172-0BAD-4D01-8F77-997B2BE0722A" + aap_port = 0x1001 + + services = bluetooth.find_service(address=address) + + service = [s for s in services if s["service-classes"] == [aap_service]] + + if not service: + print("Device does not have AAP service") + exit() + + self.sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) + sock = self.sock + sock.connect((address, aap_port)) + + print("Connected to AirPods") + self.connected = True + print("Sending handshake...") + print(sock.type) + + sock.send(cmd_handshake) + # sock.send(cmd_smth0) + sock.send(cmd_smth1) + + threading.Thread(target=self.listen).start() + + # battery info: 04 00 04 00 04 00 03 02 01 64 01 01 04 01 64 01 01 08 01 34 02 01 + + def parse_battery_status(self, data): + if len(data) != 22: + return + self.left_bud_level = data[9] + self.left_bud_status = data[10] + + self.right_bud_level = data[14] + self.right_bud_status = data[15] + + self.case_level = data[19] + self.case_status = data[20] + + # Interpret the status + def interpret_status(status): + return "Charging" if status == 1 else "Not Charging" + + # Print the results + print(f"Left Bud: {self.left_bud_level}% - {interpret_status(self.left_bud_status)}") + print(f"Right Bud: {self.right_bud_level}% - {interpret_status(self.right_bud_status)}") + print(f"Case: {self.case_level}% - {interpret_status(self.case_status)}") + + + def parse_anc_status(self, data): + # 04 00 04 00 09 00 0d 03 00 00 00 + if len(data) != 11 and data.hex().startswith("040004000600"): + return + if data[7] == 1: + return "Off" + elif data[7] == 2: + return "On" + elif data[7] == 3: + return "Transparency" + elif data[7] == 4: + return "Adaptive" + + firstEarOutTime = 0 + stop_thread_event = threading.Event() + + def parse_inear_status(self, data): + if len(data) != 8: + return + + second_status = data[6] + first_status = data[7] + + def delayed_action(self, s): + print(s) + if not self.stop_thread_event.is_set(): + print("Delayed action") + if self.wasMusicPlayingInSingle: + self.playMusic() + self.wasMusicPlayingInBoth = False + elif self.wasMusicPlayingInBoth or s == "Playing": + self.wasMusicPlayingInBoth = True + self.wasMusicPlayingInSingle = False + + if first_status and second_status: + if self.earStatus != "Both out": + s = self.getMusicStatus() + self.pauseMusic() + os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B off") + if self.earStatus == "Only one in": + if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3: + print("Only one in called with both out") + self.wasMusicPlayingInSingle = True + self.wasMusicPlayingInBoth = True + self.stop_thread_event.set() + else: + if s == "Playing": + self.wasMusicPlayingInSingle = True + else: + self.wasMusicPlayingInSingle = False + # wasMusicPlayingInSingle = True + elif self.earStatus == "Both in": + # should be unreachable + s = self.getMusicStatus() + if s == "Playing": + self.wasMusicPlayingInBoth = True + self.wasMusicPlayingInSingle = False + else: + self.wasMusicPlayingInSingle = False + self.earStatus = "Both out" + return "Both out" + elif not first_status and not second_status: + if self.earStatus != "Both in": + if self.earStatus == "Both out": + os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink") + elif self.earStatus == "Only one in": + self.stop_thread_event.set() + s = self.getMusicStatus() + if s == "Playing": + self.wasMusicPlayingInBoth = True + if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth: + self.playMusic() + self.wasMusicPlayingInBoth = True + self.wasMusicPlayingInSingle = False + self.earStatus = "Both in" + return "Both in" + elif (first_status and not second_status) or (not first_status and second_status): + if self.earStatus != "Only one in": + self.stop_thread_event.clear() + s = self.getMusicStatus() + self.pauseMusic() + delayed_thread = threading.Timer(0.3, delayed_action, args=[self, s]) + delayed_thread.start() + self.firstEarOutTime = time.time() + if self.earStatus == "Both out": + os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink") + self.earStatus = "Only one in" + + return "Only one in" + + def listen(self): + while True: + res = self.sock.recv(1024) + print(f"Response: {res.hex()}") + self.battery_status = self.parse_battery_status(res) + self.inear_status = self.parse_inear_status(res) + # anc_status = parse_anc_status(res) + # if anc_status: + # print("ANC: ", anc_status) + if self.battery_status: + print(self.battery_status) + if self.inear_status: + print(self.inear_status) + + + # while True: + # print("Select command:") + # print("1. Turn off") + # print("2. Turn on") + # print("3. Toggle transparency") + # print("4. Toggle Adaptive") + # print("5. Conversational Awareness On") + # print("6. Conversational Awareness Off") + # print("0. Exit") + + # cmd = input("Enter command: ") + + # if cmd == "0": + # break + # elif cmd == "1": + # self.sock.send(cmd_off) + # elif cmd == "2": + # self.sock.send(cmd_on) + # elif cmd == "3": + # self.sock.send(cmd_transparency) + # elif cmd == "4": + # self.sock.send(cmd_adaptive) + # elif cmd == "5": + # self.sock.send(cmd_ca_on) + # elif cmd == "6": + # self.sock.send(cmd_ca_off) + + def stop(self): + self.connected = False + self.sock.close() + + + +def is_bluetooth_connected(): + try: + result = subprocess.run(["bluetoothctl", "info", AIRPODS_MAC], capture_output=True, text=True) + return "Connected: yes" in result.stdout + except Exception as e: + print(f"Error checking Bluetooth connection status: {e}") + return False + +# Connect to Bluetooth device using bluetoothctl if not already connected +def connect_bluetooth_device(): + if is_bluetooth_connected(): + print("AirPods are already connected.") + return + + print("Connecting to AirPods...") + subprocess.run(["bluetoothctl", "connect", AIRPODS_MAC]) + + time.sleep(2) # Wait for the connection to establish + + # Switch audio output to AirPods (PulseAudio) + try: + result = subprocess.run(["pactl", "list", "short", "sinks"], capture_output=True, text=True) + sink_name = next((line.split()[1] for line in result.stdout.splitlines() if "bluez_sink" in line), None) + if sink_name: + subprocess.run(["pactl", "set-default-sink", sink_name]) + print(f"Switched audio to AirPods: {sink_name}") + + else: + print("Failed to switch audio to AirPods.") + except Exception as e: + print(f"Error switching audio: {e}") + +# Disconnect from Bluetooth device if connected +def disconnect_bluetooth_device(): + if not is_bluetooth_connected(): + print("AirPods are already disconnected.") + return + + print("Disconnecting from AirPods...") + subprocess.run(["bluetoothctl", "disconnect", AIRPODS_MAC]) + +l2cap = initL2CAP() + +# Function to listen to `playerctl --follow` and react to status changes +def mediaListener(): + try: + # Run playerctl --follow in a subprocess + process = subprocess.Popen( + ["playerctl", "--follow", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + + # Continuously read from the subprocess stdout + for line in process.stdout: + if line: # Make sure the line is not empty + line = line.strip() # Remove any extraneous whitespace + print(f"Received event from playerctl: {line}") + + if "Playing" in line: + print("Media started playing") + connect_bluetooth_device() + if not l2cap.connected: + l2cap.start() + elif "Paused" in line or "Stopped" in line: + print("Media paused or stopped") + # disconnect_bluetooth_device() + + # Check for any errors in stderr + stderr = process.stderr.read() + if stderr: + print(f"Error: {stderr}") + + except Exception as e: + print(f"An error occurred in mediaListener: {e}") + +mediaListener() + +# thread = threading.Thread(target=mediaListener) +# thread.start() + +# thread.stop() \ No newline at end of file