mirror of
https://github.com/kavishdevar/librepods.git
synced 2026-05-28 03:48:39 +00:00
first commit!
This commit is contained in:
14
aln/AirPods/Pro2.py
Normal file
14
aln/AirPods/Pro2.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
from ..Capabilites import Capabilites
|
||||||
|
class Pro2:
|
||||||
|
def __init__(self):
|
||||||
|
self.name = 'AirPods Pro 2'
|
||||||
|
self.capabilites = {
|
||||||
|
Capabilites.NOISE_CANCELLATION: [
|
||||||
|
Capabilites.NoiseCancellation.Off,
|
||||||
|
Capabilites.NoiseCancellation.On,
|
||||||
|
Capabilites.NoiseCancellation.Transparency,
|
||||||
|
Capabilites.NoiseCancellation.Adaptive,
|
||||||
|
],
|
||||||
|
Capabilites.CONVERSATION_AWARENESS: True,
|
||||||
|
Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY: True
|
||||||
|
}
|
||||||
3
aln/AirPods/__init__.py
Normal file
3
aln/AirPods/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
from . import Pro2
|
||||||
|
|
||||||
|
Pro2 = Pro2.Pro2
|
||||||
17
aln/Capabilites/__init__.py
Normal file
17
aln/Capabilites/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
class NoiseCancellation:
|
||||||
|
Off = b"\x01"
|
||||||
|
On = b"\x02"
|
||||||
|
Transparency = b"\x03"
|
||||||
|
Adaptive = b"\x04"
|
||||||
|
|
||||||
|
class ConversationAwareness:
|
||||||
|
Off = b"\x02"
|
||||||
|
On = b"\x01"
|
||||||
|
|
||||||
|
class Capabilites:
|
||||||
|
NOISE_CANCELLATION = b"\x0d"
|
||||||
|
CONVERSATION_AWARENESS = b"\x28"
|
||||||
|
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = b"\x01\x02"
|
||||||
|
EAR_DETECTION = b"\x06"
|
||||||
|
NoiseCancellation = NoiseCancellation
|
||||||
|
ConversationAwareness = ConversationAwareness
|
||||||
64
aln/Notifications/Battery.py
Normal file
64
aln/Notifications/Battery.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
class BatteryComponent:
|
||||||
|
LEFT = 4
|
||||||
|
RIGHT = 2
|
||||||
|
CASE = 8
|
||||||
|
pass
|
||||||
|
class BatteryStatus:
|
||||||
|
CHARGING = 1
|
||||||
|
NOT_CHARGING = 2
|
||||||
|
DISCONNECTED = 4
|
||||||
|
pass
|
||||||
|
|
||||||
|
class Battery:
|
||||||
|
|
||||||
|
def get_name(self, cls, value):
|
||||||
|
for key, val in cls.__dict__.items():
|
||||||
|
if val == value:
|
||||||
|
return key
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_component(self):
|
||||||
|
return self.get_name(BatteryComponent, self.component)
|
||||||
|
|
||||||
|
def get_level(self):
|
||||||
|
return self.level
|
||||||
|
|
||||||
|
def get_status(self):
|
||||||
|
return self.get_name(BatteryStatus, self.status)
|
||||||
|
|
||||||
|
def __init__(self, component: int, level: int, status: int):
|
||||||
|
self.component = component
|
||||||
|
self.level = level
|
||||||
|
self.status = status
|
||||||
|
pass
|
||||||
|
|
||||||
|
class BatteryNotification:
|
||||||
|
def __init__(self):
|
||||||
|
self.first = Battery(BatteryComponent.LEFT, 0, BatteryStatus.DISCONNECTED)
|
||||||
|
self.second = Battery(BatteryComponent.RIGHT, 0, BatteryStatus.DISCONNECTED)
|
||||||
|
self.case = Battery(BatteryComponent.CASE, 0, BatteryStatus.DISCONNECTED)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def isBatteryData(self, data):
|
||||||
|
if len(data) != 22:
|
||||||
|
return False
|
||||||
|
if data[0] == 0x04 and data[1] == 0x00 and data[2] == 0x04 and data[3] == 0x00 and data[4] == 0x04 and data[5] == 0x00:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
def setBattery(self, data):
|
||||||
|
self.count = data[6]
|
||||||
|
self.first = Battery(data[7], data[9], data[10])
|
||||||
|
self.second = Battery(data[12], data[14], data[15])
|
||||||
|
self.case = Battery(data[17], data[19], data[20])
|
||||||
|
pass
|
||||||
|
|
||||||
|
def getBattery(self):
|
||||||
|
if self.first.component == BatteryComponent.LEFT:
|
||||||
|
self.left = self.first
|
||||||
|
self.right = self.second
|
||||||
|
else:
|
||||||
|
self.left = self.second
|
||||||
|
self.right = self.first
|
||||||
|
self.case = self.case
|
||||||
|
return [self.left, self.right, self.case]
|
||||||
27
aln/Notifications/EarDetection.py
Normal file
27
aln/Notifications/EarDetection.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
from ..Capabilites import Capabilites
|
||||||
|
from ..enums import enums
|
||||||
|
import logging
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
class EarDetectionNotification:
|
||||||
|
NOTIFICATION_BIT = Capabilites.EAR_DETECTION
|
||||||
|
NOTIFICATION_PREFIX = enums.SEND_PREFIX + NOTIFICATION_BIT
|
||||||
|
IN_EAR = 0x00
|
||||||
|
OUT_OF_EAR = 0x01
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def isEarDetectionData(self, data: bytes):
|
||||||
|
if len(data) != 8:
|
||||||
|
return False
|
||||||
|
if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def setEarDetection(self, data: bytes):
|
||||||
|
self.first = data[6]
|
||||||
|
self.second = data[7]
|
||||||
|
|
||||||
|
def getEarDetection(self):
|
||||||
|
return [self.first, self.second]
|
||||||
|
pass
|
||||||
|
|
||||||
43
aln/Notifications/Listener.py
Normal file
43
aln/Notifications/Listener.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
from bluetooth import BluetoothSocket
|
||||||
|
import threading
|
||||||
|
from .Battery import BatteryNotification
|
||||||
|
from .EarDetection import EarDetectionNotification
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logging = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class NotificationListener:
|
||||||
|
BATTERY_UPDATED = 0x01
|
||||||
|
ANC_UPDATED = 0x02
|
||||||
|
EAR_DETECTION_UPDATED = 0x03
|
||||||
|
|
||||||
|
def __init__(self, socket: BluetoothSocket, callback: callable):
|
||||||
|
self.socket = socket
|
||||||
|
self.BatteryNotification = BatteryNotification()
|
||||||
|
self.EarDetectionNotification = EarDetectionNotification()
|
||||||
|
self.callback = callback
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __start(self):
|
||||||
|
while True:
|
||||||
|
data = self.socket.recv(1024)
|
||||||
|
if len(data) == 0:
|
||||||
|
break
|
||||||
|
if self.BatteryNotification.isBatteryData(data):
|
||||||
|
self.BatteryNotification.setBattery(data)
|
||||||
|
self.callback(self.BATTERY_UPDATED)
|
||||||
|
pass
|
||||||
|
if self.EarDetectionNotification.isEarDetectionData(data):
|
||||||
|
self.EarDetectionNotification.setEarDetection(data)
|
||||||
|
self.callback(self.EAR_DETECTION_UPDATED)
|
||||||
|
pass
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
threading.Thread(target=self.__start).start()
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.socket.close()
|
||||||
|
pass
|
||||||
|
|
||||||
36
aln/Notifications/__init__.py
Normal file
36
aln/Notifications/__init__.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
from .Listener import NotificationListener
|
||||||
|
from ..enums import enums
|
||||||
|
import bluetooth
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logging = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
enums = enums()
|
||||||
|
|
||||||
|
class Notifications:
|
||||||
|
BATTERY_UPDATED = NotificationListener.BATTERY_UPDATED
|
||||||
|
ANC_UPDATED = NotificationListener.ANC_UPDATED
|
||||||
|
EAR_DETECTION_UPDATED = NotificationListener.EAR_DETECTION_UPDATED
|
||||||
|
|
||||||
|
def __init__(self, socket: bluetooth.BluetoothSocket, callback: callable):
|
||||||
|
self.socket = socket
|
||||||
|
self.notificationListener = NotificationListener(self.socket, callback)
|
||||||
|
self.BatteryNotification = self.notificationListener.BatteryNotification
|
||||||
|
self.EarDetectionNotification = self.notificationListener.EarDetectionNotification
|
||||||
|
pass
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
try:
|
||||||
|
self.socket.send(enums.REQUEST_NOTIFICATIONS)
|
||||||
|
self.notificationListener.start()
|
||||||
|
|
||||||
|
except bluetooth.btcommon.BluetoothError as e:
|
||||||
|
logging.error(f'Failed to send data to {self.mac_address}: {e}')
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.notificationListener.stop()
|
||||||
|
self.socket.close()
|
||||||
|
pass
|
||||||
|
pass
|
||||||
56
aln/__init__.py
Normal file
56
aln/__init__.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
from .Notifications import Notifications
|
||||||
|
from .Capabilites import Capabilites
|
||||||
|
from .enums import enums
|
||||||
|
|
||||||
|
import bluetooth
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logging = logging.getLogger("Connection Handler")
|
||||||
|
|
||||||
|
class Connection:
|
||||||
|
def __init__(self, mac_address: str):
|
||||||
|
self.mac_address = mac_address
|
||||||
|
self.socket = bluetooth.BluetoothSocket(bluetooth.L2CAP)
|
||||||
|
def connect(self):
|
||||||
|
try:
|
||||||
|
self.socket.connect((self.mac_address, 0x1001))
|
||||||
|
except bluetooth.btcommon.BluetoothError as e:
|
||||||
|
logging.error(f'Failed to connect to {self.mac_address}: {e}')
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def initialize_notifications(self):
|
||||||
|
self.notifications = Notifications(self.socket, self.notification_callback)
|
||||||
|
self.notificationListener = self.notifications.notificationListener
|
||||||
|
self.BatteryNotification = self.notifications.BatteryNotification
|
||||||
|
self.notifications.initialize()
|
||||||
|
|
||||||
|
def send(self, data: bytes):
|
||||||
|
try:
|
||||||
|
logging.debug(f'Sending data to {self.mac_address}: {data.hex()}')
|
||||||
|
self.socket.send(data)
|
||||||
|
logging.debug(f'Sent data to {self.mac_address}')
|
||||||
|
except bluetooth.btcommon.BluetoothError as e:
|
||||||
|
logging.error(f'Failed to send data to {self.mac_address}: {e}')
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def notification_callback(self, notification_type: int):
|
||||||
|
import logging
|
||||||
|
logging = logging.getLogger("Notification Callback")
|
||||||
|
if notification_type == Notifications.BATTERY_UPDATED:
|
||||||
|
for i in self.notificationListener.BatteryNotification.getBattery():
|
||||||
|
logging.debug(f'{i.get_component()} - {i.get_status()}: {i.get_level()}')
|
||||||
|
pass
|
||||||
|
elif notification_type == Notifications.EAR_DETECTION_UPDATED:
|
||||||
|
logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}')
|
||||||
|
pass
|
||||||
|
pass
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
self.socket.close()
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.socket.close()
|
||||||
|
pass
|
||||||
19
aln/enums.py
Normal file
19
aln/enums.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
from .Capabilites import Capabilites
|
||||||
|
|
||||||
|
class enums:
|
||||||
|
NOISE_CANCELLATION = Capabilites.NOISE_CANCELLATION
|
||||||
|
CONVERSATION_AWARENESS = Capabilites.CONVERSATION_AWARENESS
|
||||||
|
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY
|
||||||
|
SEND_PREFIX = b'\x04\x00\x04\x00'
|
||||||
|
SETTINGS = b"\x09\x00"
|
||||||
|
SETTINGS_SEND_SUFFIX = b'\x00\x00\x00'
|
||||||
|
NOTIFICATION_FILTER = b'\x0f'
|
||||||
|
REQUEST_NOTIFICATIONS = SEND_PREFIX + NOTIFICATION_FILTER + b"\x00\xff\xff\xff\xff"
|
||||||
|
HANDSHAKE = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||||
|
|
||||||
|
SET_NOISE_CANCELLATION_OFF = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Off + SETTINGS_SEND_SUFFIX
|
||||||
|
SET_NOISE_CANCELLATION_ON = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.On + SETTINGS_SEND_SUFFIX
|
||||||
|
SET_NOISE_CANCELLATION_TRANSPARENCY = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Transparency + SETTINGS_SEND_SUFFIX
|
||||||
|
SET_NOISE_CANCELLATION_ADAPTIVE = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Adaptive + SETTINGS_SEND_SUFFIX
|
||||||
|
SET_CONVERSATION_AWARENESS_OFF = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.Off + SETTINGS_SEND_SUFFIX
|
||||||
|
SET_CONVERSATION_AWARENESS_ON = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.On + SETTINGS_SEND_SUFFIX
|
||||||
94
examples/connect_and_listen.py
Normal file
94
examples/connect_and_listen.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
from aln import Connection
|
||||||
|
from aln import enums
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
|
||||||
|
class CustomFormatter(logging.Formatter):
|
||||||
|
def format(self, record):
|
||||||
|
# Format the log message with spaces around colons without altering the original message
|
||||||
|
formatted_message = record.getMessage().replace(':', ': ')
|
||||||
|
record.message = formatted_message
|
||||||
|
return super().format(record)
|
||||||
|
|
||||||
|
class ConsoleHandler(logging.StreamHandler):
|
||||||
|
def __init__(self, stream=None):
|
||||||
|
super().__init__(stream)
|
||||||
|
self.terminator = '\n'
|
||||||
|
self.log_lines = []
|
||||||
|
|
||||||
|
def emit(self, record):
|
||||||
|
try:
|
||||||
|
msg = self.format(record)
|
||||||
|
self.log_lines.append(msg)
|
||||||
|
self.display_logs()
|
||||||
|
except Exception:
|
||||||
|
self.handleError(record)
|
||||||
|
|
||||||
|
def display_logs(self):
|
||||||
|
sys.stdout.write('\033[H\033[J') # Clear the screen
|
||||||
|
for line in self.log_lines[-10:]: # Display the last 10 log lines
|
||||||
|
sys.stdout.write(line + self.terminator)
|
||||||
|
sys.stdout.write('1: ANC Off\n')
|
||||||
|
sys.stdout.write('2: Transparency\n')
|
||||||
|
sys.stdout.write('3: Adaptive Transparency\n')
|
||||||
|
sys.stdout.write('4: ANC On\n')
|
||||||
|
sys.stdout.write('Select ANC Mode: ')
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
def input_thread(connection):
|
||||||
|
while True:
|
||||||
|
anc_mode = input()
|
||||||
|
if anc_mode == '1':
|
||||||
|
connection.send(enums.SET_NOISE_CANCELLATION_OFF)
|
||||||
|
elif anc_mode == '2':
|
||||||
|
connection.send(enums.SET_NOISE_CANCELLATION_TRANSPARENCY)
|
||||||
|
elif anc_mode == '3':
|
||||||
|
connection.send(enums.SET_NOISE_CANCELLATION_ADAPTIVE)
|
||||||
|
elif anc_mode == '4':
|
||||||
|
connection.send(enums.SET_NOISE_CANCELLATION_ON)
|
||||||
|
else:
|
||||||
|
logging.error('Invalid ANC Mode')
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# Set up logging
|
||||||
|
handler = ConsoleHandler()
|
||||||
|
|
||||||
|
log_format = (
|
||||||
|
"%(asctime)s - %(levelname)s - %(name)s - %(message)s"
|
||||||
|
)
|
||||||
|
|
||||||
|
logging.addLevelName(logging.DEBUG, "\033[1;34m%s\033[1;0m" % logging.getLevelName(logging.DEBUG))
|
||||||
|
logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO))
|
||||||
|
logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
|
||||||
|
logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
|
||||||
|
logging.addLevelName(logging.CRITICAL, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL))
|
||||||
|
|
||||||
|
formatter = CustomFormatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logging.basicConfig(level=logging.DEBUG, handlers=[handler])
|
||||||
|
|
||||||
|
connection = Connection('28:2D:7F:C2:05:5B')
|
||||||
|
connection.connect()
|
||||||
|
logging.info('Sending Handshake')
|
||||||
|
connection.send(enums.HANDSHAKE)
|
||||||
|
logging.info('Initializing Notifications')
|
||||||
|
connection.initialize_notifications()
|
||||||
|
logging.info('Initialized Notifications')
|
||||||
|
|
||||||
|
# Start the input thread
|
||||||
|
thread = threading.Thread(target=input_thread, args=(connection,))
|
||||||
|
thread.daemon = True
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Keep the main thread alive to handle logging
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info('Program interrupted. Exiting...')
|
||||||
|
connection.disconnect() # Ensure the connection is properly closed
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
25
pyproject.toml
Normal file
25
pyproject.toml
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[tool.hatch.build.targets.wheel]
|
||||||
|
packages = ["aln"]
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "aln"
|
||||||
|
version = "0.0.1"
|
||||||
|
authors = [
|
||||||
|
{ name="Kavish Devar", email="mail@kavishdevar.me" },
|
||||||
|
]
|
||||||
|
description = "Use your AirPods the way they were intended."
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.8"
|
||||||
|
classifiers = [
|
||||||
|
"Programming Language :: Python :: 3",
|
||||||
|
"License :: OSI Approved :: MIT License",
|
||||||
|
"Operating System :: OS Independent",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.urls]
|
||||||
|
Homepage = "https://github.com/kavishdevar/aln"
|
||||||
|
Issues = "https://github.com/kavishdevar/aln/issues"
|
||||||
Reference in New Issue
Block a user