diff --git a/aln/AirPods/Pro2.py b/aln/AirPods/Pro2.py new file mode 100644 index 0000000..223dea3 --- /dev/null +++ b/aln/AirPods/Pro2.py @@ -0,0 +1,14 @@ +from ..Capabilites import Capabilites +class Pro2: + def __init__(self): + self.name = 'AirPods Pro 2' + self.capabilites = { + Capabilites.NOISE_CANCELLATION: [ + Capabilites.NoiseCancellation.Off, + Capabilites.NoiseCancellation.On, + Capabilites.NoiseCancellation.Transparency, + Capabilites.NoiseCancellation.Adaptive, + ], + Capabilites.CONVERSATION_AWARENESS: True, + Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY: True + } \ No newline at end of file diff --git a/aln/AirPods/__init__.py b/aln/AirPods/__init__.py new file mode 100644 index 0000000..98a0b81 --- /dev/null +++ b/aln/AirPods/__init__.py @@ -0,0 +1,3 @@ +from . import Pro2 + +Pro2 = Pro2.Pro2 \ No newline at end of file diff --git a/aln/Capabilites/__init__.py b/aln/Capabilites/__init__.py new file mode 100644 index 0000000..2452695 --- /dev/null +++ b/aln/Capabilites/__init__.py @@ -0,0 +1,17 @@ +class NoiseCancellation: + Off = b"\x01" + On = b"\x02" + Transparency = b"\x03" + Adaptive = b"\x04" + +class ConversationAwareness: + Off = b"\x02" + On = b"\x01" + +class Capabilites: + NOISE_CANCELLATION = b"\x0d" + CONVERSATION_AWARENESS = b"\x28" + CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = b"\x01\x02" + EAR_DETECTION = b"\x06" + NoiseCancellation = NoiseCancellation + ConversationAwareness = ConversationAwareness \ No newline at end of file diff --git a/aln/Notifications/Battery.py b/aln/Notifications/Battery.py new file mode 100644 index 0000000..e9e3b3a --- /dev/null +++ b/aln/Notifications/Battery.py @@ -0,0 +1,64 @@ +class BatteryComponent: + LEFT = 4 + RIGHT = 2 + CASE = 8 + pass +class BatteryStatus: + CHARGING = 1 + NOT_CHARGING = 2 + DISCONNECTED = 4 + pass + +class Battery: + + def get_name(self, cls, value): + for key, val in cls.__dict__.items(): + if val == value: + return key + return None + + def get_component(self): + return self.get_name(BatteryComponent, self.component) + + def get_level(self): + return self.level + + def get_status(self): + return self.get_name(BatteryStatus, self.status) + + def __init__(self, component: int, level: int, status: int): + self.component = component + self.level = level + self.status = status + pass + +class BatteryNotification: + def __init__(self): + self.first = Battery(BatteryComponent.LEFT, 0, BatteryStatus.DISCONNECTED) + self.second = Battery(BatteryComponent.RIGHT, 0, BatteryStatus.DISCONNECTED) + self.case = Battery(BatteryComponent.CASE, 0, BatteryStatus.DISCONNECTED) + pass + + def isBatteryData(self, data): + if len(data) != 22: + return False + if data[0] == 0x04 and data[1] == 0x00 and data[2] == 0x04 and data[3] == 0x00 and data[4] == 0x04 and data[5] == 0x00: + return True + else: + return False + def setBattery(self, data): + self.count = data[6] + self.first = Battery(data[7], data[9], data[10]) + self.second = Battery(data[12], data[14], data[15]) + self.case = Battery(data[17], data[19], data[20]) + pass + + def getBattery(self): + if self.first.component == BatteryComponent.LEFT: + self.left = self.first + self.right = self.second + else: + self.left = self.second + self.right = self.first + self.case = self.case + return [self.left, self.right, self.case] \ No newline at end of file diff --git a/aln/Notifications/EarDetection.py b/aln/Notifications/EarDetection.py new file mode 100644 index 0000000..028bf46 --- /dev/null +++ b/aln/Notifications/EarDetection.py @@ -0,0 +1,27 @@ +from ..Capabilites import Capabilites +from ..enums import enums +import logging +from typing import Literal + +class EarDetectionNotification: + NOTIFICATION_BIT = Capabilites.EAR_DETECTION + NOTIFICATION_PREFIX = enums.SEND_PREFIX + NOTIFICATION_BIT + IN_EAR = 0x00 + OUT_OF_EAR = 0x01 + def __init__(self): + pass + + def isEarDetectionData(self, data: bytes): + if len(data) != 8: + return False + if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()): + return True + + def setEarDetection(self, data: bytes): + self.first = data[6] + self.second = data[7] + + def getEarDetection(self): + return [self.first, self.second] + pass + \ No newline at end of file diff --git a/aln/Notifications/Listener.py b/aln/Notifications/Listener.py new file mode 100644 index 0000000..e7ecf5a --- /dev/null +++ b/aln/Notifications/Listener.py @@ -0,0 +1,43 @@ +from bluetooth import BluetoothSocket +import threading +from .Battery import BatteryNotification +from .EarDetection import EarDetectionNotification +import logging + +logging = logging.getLogger(__name__) + +class NotificationListener: + BATTERY_UPDATED = 0x01 + ANC_UPDATED = 0x02 + EAR_DETECTION_UPDATED = 0x03 + + def __init__(self, socket: BluetoothSocket, callback: callable): + self.socket = socket + self.BatteryNotification = BatteryNotification() + self.EarDetectionNotification = EarDetectionNotification() + self.callback = callback + pass + + def __start(self): + while True: + data = self.socket.recv(1024) + if len(data) == 0: + break + if self.BatteryNotification.isBatteryData(data): + self.BatteryNotification.setBattery(data) + self.callback(self.BATTERY_UPDATED) + pass + if self.EarDetectionNotification.isEarDetectionData(data): + self.EarDetectionNotification.setEarDetection(data) + self.callback(self.EAR_DETECTION_UPDATED) + pass + pass + + def start(self): + threading.Thread(target=self.__start).start() + pass + + def stop(self): + self.socket.close() + pass + \ No newline at end of file diff --git a/aln/Notifications/__init__.py b/aln/Notifications/__init__.py new file mode 100644 index 0000000..9a4abe5 --- /dev/null +++ b/aln/Notifications/__init__.py @@ -0,0 +1,36 @@ +from .Listener import NotificationListener +from ..enums import enums +import bluetooth +import logging + +logging = logging.getLogger(__name__) + +enums = enums() + +class Notifications: + BATTERY_UPDATED = NotificationListener.BATTERY_UPDATED + ANC_UPDATED = NotificationListener.ANC_UPDATED + EAR_DETECTION_UPDATED = NotificationListener.EAR_DETECTION_UPDATED + + def __init__(self, socket: bluetooth.BluetoothSocket, callback: callable): + self.socket = socket + self.notificationListener = NotificationListener(self.socket, callback) + self.BatteryNotification = self.notificationListener.BatteryNotification + self.EarDetectionNotification = self.notificationListener.EarDetectionNotification + pass + + def initialize(self): + try: + self.socket.send(enums.REQUEST_NOTIFICATIONS) + self.notificationListener.start() + + except bluetooth.btcommon.BluetoothError as e: + logging.error(f'Failed to send data to {self.mac_address}: {e}') + return False + return True + + def __del__(self): + self.notificationListener.stop() + self.socket.close() + pass + pass \ No newline at end of file diff --git a/aln/__init__.py b/aln/__init__.py new file mode 100644 index 0000000..ad80fb3 --- /dev/null +++ b/aln/__init__.py @@ -0,0 +1,56 @@ +from .Notifications import Notifications +from .Capabilites import Capabilites +from .enums import enums + +import bluetooth +import logging + +logging = logging.getLogger("Connection Handler") + +class Connection: + def __init__(self, mac_address: str): + self.mac_address = mac_address + self.socket = bluetooth.BluetoothSocket(bluetooth.L2CAP) + def connect(self): + try: + self.socket.connect((self.mac_address, 0x1001)) + except bluetooth.btcommon.BluetoothError as e: + logging.error(f'Failed to connect to {self.mac_address}: {e}') + return False + return True + + def initialize_notifications(self): + self.notifications = Notifications(self.socket, self.notification_callback) + self.notificationListener = self.notifications.notificationListener + self.BatteryNotification = self.notifications.BatteryNotification + self.notifications.initialize() + + def send(self, data: bytes): + try: + logging.debug(f'Sending data to {self.mac_address}: {data.hex()}') + self.socket.send(data) + logging.debug(f'Sent data to {self.mac_address}') + except bluetooth.btcommon.BluetoothError as e: + logging.error(f'Failed to send data to {self.mac_address}: {e}') + return False + return True + + def notification_callback(self, notification_type: int): + import logging + logging = logging.getLogger("Notification Callback") + if notification_type == Notifications.BATTERY_UPDATED: + for i in self.notificationListener.BatteryNotification.getBattery(): + logging.debug(f'{i.get_component()} - {i.get_status()}: {i.get_level()}') + pass + elif notification_type == Notifications.EAR_DETECTION_UPDATED: + logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}') + pass + pass + + def disconnect(self): + self.socket.close() + pass + + def __del__(self): + self.socket.close() + pass \ No newline at end of file diff --git a/aln/enums.py b/aln/enums.py new file mode 100644 index 0000000..311238c --- /dev/null +++ b/aln/enums.py @@ -0,0 +1,19 @@ +from .Capabilites import Capabilites + +class enums: + NOISE_CANCELLATION = Capabilites.NOISE_CANCELLATION + CONVERSATION_AWARENESS = Capabilites.CONVERSATION_AWARENESS + CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY + SEND_PREFIX = b'\x04\x00\x04\x00' + SETTINGS = b"\x09\x00" + SETTINGS_SEND_SUFFIX = b'\x00\x00\x00' + NOTIFICATION_FILTER = b'\x0f' + REQUEST_NOTIFICATIONS = SEND_PREFIX + NOTIFICATION_FILTER + b"\x00\xff\xff\xff\xff" + HANDSHAKE = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + SET_NOISE_CANCELLATION_OFF = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Off + SETTINGS_SEND_SUFFIX + SET_NOISE_CANCELLATION_ON = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.On + SETTINGS_SEND_SUFFIX + SET_NOISE_CANCELLATION_TRANSPARENCY = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Transparency + SETTINGS_SEND_SUFFIX + SET_NOISE_CANCELLATION_ADAPTIVE = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Adaptive + SETTINGS_SEND_SUFFIX + SET_CONVERSATION_AWARENESS_OFF = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.Off + SETTINGS_SEND_SUFFIX + SET_CONVERSATION_AWARENESS_ON = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.On + SETTINGS_SEND_SUFFIX \ No newline at end of file diff --git a/examples/connect_and_listen.py b/examples/connect_and_listen.py new file mode 100644 index 0000000..0193564 --- /dev/null +++ b/examples/connect_and_listen.py @@ -0,0 +1,94 @@ +from aln import Connection +from aln import enums +import logging +import threading +import time +import sys + +class CustomFormatter(logging.Formatter): + def format(self, record): + # Format the log message with spaces around colons without altering the original message + formatted_message = record.getMessage().replace(':', ': ') + record.message = formatted_message + return super().format(record) + +class ConsoleHandler(logging.StreamHandler): + def __init__(self, stream=None): + super().__init__(stream) + self.terminator = '\n' + self.log_lines = [] + + def emit(self, record): + try: + msg = self.format(record) + self.log_lines.append(msg) + self.display_logs() + except Exception: + self.handleError(record) + + def display_logs(self): + sys.stdout.write('\033[H\033[J') # Clear the screen + for line in self.log_lines[-10:]: # Display the last 10 log lines + sys.stdout.write(line + self.terminator) + sys.stdout.write('1: ANC Off\n') + sys.stdout.write('2: Transparency\n') + sys.stdout.write('3: Adaptive Transparency\n') + sys.stdout.write('4: ANC On\n') + sys.stdout.write('Select ANC Mode: ') + sys.stdout.flush() + +def input_thread(connection): + while True: + anc_mode = input() + if anc_mode == '1': + connection.send(enums.SET_NOISE_CANCELLATION_OFF) + elif anc_mode == '2': + connection.send(enums.SET_NOISE_CANCELLATION_TRANSPARENCY) + elif anc_mode == '3': + connection.send(enums.SET_NOISE_CANCELLATION_ADAPTIVE) + elif anc_mode == '4': + connection.send(enums.SET_NOISE_CANCELLATION_ON) + else: + logging.error('Invalid ANC Mode') + +def main(): + # Set up logging + handler = ConsoleHandler() + + log_format = ( + "%(asctime)s - %(levelname)s - %(name)s - %(message)s" + ) + + logging.addLevelName(logging.DEBUG, "\033[1;34m%s\033[1;0m" % logging.getLevelName(logging.DEBUG)) + logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO)) + logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING)) + logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR)) + logging.addLevelName(logging.CRITICAL, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL)) + + formatter = CustomFormatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s') + handler.setFormatter(formatter) + logging.basicConfig(level=logging.DEBUG, handlers=[handler]) + + connection = Connection('28:2D:7F:C2:05:5B') + connection.connect() + logging.info('Sending Handshake') + connection.send(enums.HANDSHAKE) + logging.info('Initializing Notifications') + connection.initialize_notifications() + logging.info('Initialized Notifications') + + # Start the input thread + thread = threading.Thread(target=input_thread, args=(connection,)) + thread.daemon = True + thread.start() + + try: + # Keep the main thread alive to handle logging + while True: + time.sleep(1) + except KeyboardInterrupt: + logging.info('Program interrupted. Exiting...') + connection.disconnect() # Ensure the connection is properly closed + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b1bb6ef --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["aln"] + +[project] +name = "aln" +version = "0.0.1" +authors = [ + { name="Kavish Devar", email="mail@kavishdevar.me" }, +] +description = "Use your AirPods the way they were intended." +readme = "README.md" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +[project.urls] +Homepage = "https://github.com/kavishdevar/aln" +Issues = "https://github.com/kavishdevar/aln/issues" \ No newline at end of file