Receive adaptive anc notifications

This commit is contained in:
Kavish Devar
2024-10-06 21:21:03 +05:30
parent 028d83b2a4
commit 2aeb2b02a7
21 changed files with 215 additions and 63 deletions

View File

@@ -24,15 +24,8 @@ A clear and concise description of what you expected to happen.
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
- Browser [e.g. chrome, safari]
- Version [e.g. 22]
**Smartphone (please complete the following information):**
- Device: [e.g. iPhone6]
- OS: [e.g. iOS8.1]
- Browser [e.g. stock browser, safari]
- Version [e.g. 22]
- Distro: [e.g. Ubuntu, KDE Neon, Arch]
- Version [e.g. 22.04]
**Additional context**
Add any other context about the problem here.

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

12
.idea/AirPodsLikeNormal.iml generated Normal file
View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.10" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/AirPodsLikeNormal.iml" filepath="$PROJECT_DIR$/.idea/AirPodsLikeNormal.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@@ -4,10 +4,10 @@ class Pro2:
self.name = 'AirPods Pro 2'
self.capabilites = {
Capabilites.NOISE_CANCELLATION: [
Capabilites.NoiseCancellation.Off,
Capabilites.NoiseCancellation.On,
Capabilites.NoiseCancellation.Transparency,
Capabilites.NoiseCancellation.Adaptive,
Capabilites.NoiseCancellation.OFF,
Capabilites.NoiseCancellation.ON,
Capabilites.NoiseCancellation.TRANSPARENCY,
Capabilites.NoiseCancellation.ADAPTIVE,
],
Capabilites.CONVERSATION_AWARENESS: True,
Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY: True

View File

@@ -1,8 +1,8 @@
class NoiseCancellation:
Off = b"\x01"
On = b"\x02"
Transparency = b"\x03"
Adaptive = b"\x04"
OFF = b"\x01"
ON = b"\x02"
TRANSPARENCY = b"\x03"
ADAPTIVE = b"\x04"
class ConversationAwareness:
Off = b"\x02"

50
aln/Notifications/ANC.py Normal file
View File

@@ -0,0 +1,50 @@
from ..enums import enums
from ..Capabilites import Capabilites
class ANCNotification:
NOTIFICATION_PREFIX = enums.NOISE_CANCELLATION_PREFIX
OFF = Capabilites.NoiseCancellation.OFF
ON = Capabilites.NoiseCancellation.ON
TRANSPARENCY = Capabilites.NoiseCancellation.TRANSPARENCY
ADAPTIVE = Capabilites.NoiseCancellation.ADAPTIVE
def __init__(self):
pass
def isANCData(self, data: bytes):
# 04 00 04 00 09 00 0D 01 00 00 00
if len(data) != 11:
return False
if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()):
return True
else:
return False
def setANC(self, data: bytes):
self.status = data[7]
pass
def getANC(self, returnString: bool = False, fromInt: int = None):
if fromInt is not None:
fromInt = bytes([fromInt])
if fromInt == self.OFF:
return "Off"
elif fromInt == self.ON:
return "On"
elif fromInt == self.TRANSPARENCY:
return "Transparency"
elif fromInt == self.ADAPTIVE:
return "Adaptive"
pass
if returnString:
return self.status
else:
if self.status == self.OFF:
return "Off"
elif self.status == self.ON:
return "On"
elif self.status == self.TRANSPARENCY:
return "Transparency"
elif self.status == self.ADAPTIVE:
return "Adaptive"
pass

View File

@@ -2,6 +2,7 @@ from bluetooth import BluetoothSocket
import threading
from .Battery import BatteryNotification
from .EarDetection import EarDetectionNotification
from .ANC import ANCNotification
import logging
logging = logging.getLogger(__name__)
@@ -16,6 +17,7 @@ class NotificationListener:
self.socket = socket
self.BatteryNotification = BatteryNotification()
self.EarDetectionNotification = EarDetectionNotification()
self.ANCNotification = ANCNotification()
self.callback = callback
pass
@@ -31,6 +33,9 @@ class NotificationListener:
if self.EarDetectionNotification.isEarDetectionData(data):
self.EarDetectionNotification.setEarDetection(data)
self.callback(self.EAR_DETECTION_UPDATED, data)
if self.ANCNotification.isANCData(data):
self.ANCNotification.setANC(data)
self.callback(self.ANC_UPDATED, data)
else:
self.callback(self.UNKNOWN, data)
pass

View File

@@ -17,11 +17,13 @@ class Notifications:
self.notificationListener = NotificationListener(self.socket, callback)
self.BatteryNotification = self.notificationListener.BatteryNotification
self.EarDetectionNotification = self.notificationListener.EarDetectionNotification
self.ANCNotification = self.notificationListener.ANCNotification
pass
def initialize(self):
try:
self.socket.send(enums.REQUEST_NOTIFICATIONS)
self.socket.send(enums.SET_SPECIFIC_FEATURES)
self.notificationListener.start()
except bluetooth.btcommon.BluetoothError as e:

View File

@@ -1,7 +1,4 @@
from .Notifications import Notifications
from .Capabilites import Capabilites
from .enums import enums
import bluetooth
import logging
@@ -25,6 +22,7 @@ class Connection:
self.notifications = Notifications(self.socket, callback)
self.notificationListener = self.notifications.notificationListener
self.BatteryNotification = self.notifications.BatteryNotification
self.ANCNotification = self.notifications.ANCNotification
self.notifications.initialize()
def send(self, data: bytes):
@@ -46,6 +44,10 @@ class Connection:
logging = logging.getLogger("In-Ear Status")
logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}')
pass
elif notification_type == Notifications.ANC_UPDATED:
logging = logging.getLogger("ANC Status")
logging.debug(f'{self.notificationListener.ANCNotification.getANC()}')
pass
elif notification_type == Notifications.UNKNOWN:
logging = logging.getLogger("Unknown Notification")
hex_data = ' '.join(f'{byte:02x}' for byte in data)

View File

@@ -6,14 +6,17 @@ class enums:
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY
SEND_PREFIX = b'\x04\x00\x04\x00'
SETTINGS = b"\x09\x00"
SETTINGS_SEND_SUFFIX = b'\x00\x00\x00'
SUFFIX = b'\x00\x00\x00'
NOTIFICATION_FILTER = b'\x0f'
SPECIFIC_FEATURES = b'\x4d'
SET_SPECIFIC_FEATURES = SEND_PREFIX + SPECIFIC_FEATURES + b"\x00\xff\x00\x00\x00\x00\x00\x00\x00"
REQUEST_NOTIFICATIONS = SEND_PREFIX + NOTIFICATION_FILTER + b"\x00\xff\xff\xff\xff"
HANDSHAKE = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SET_NOISE_CANCELLATION_OFF = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Off + SETTINGS_SEND_SUFFIX
SET_NOISE_CANCELLATION_ON = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.On + SETTINGS_SEND_SUFFIX
SET_NOISE_CANCELLATION_TRANSPARENCY = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Transparency + SETTINGS_SEND_SUFFIX
SET_NOISE_CANCELLATION_ADAPTIVE = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION + Capabilites.NoiseCancellation.Adaptive + SETTINGS_SEND_SUFFIX
SET_CONVERSATION_AWARENESS_OFF = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.Off + SETTINGS_SEND_SUFFIX
SET_CONVERSATION_AWARENESS_ON = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.On + SETTINGS_SEND_SUFFIX
NOISE_CANCELLATION_PREFIX = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION
NOISE_CANCELLATION_OFF = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.OFF + SUFFIX
NOISE_CANCELLATION_ON = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ON + SUFFIX
NOISE_CANCELLATION_TRANSPARENCY = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.TRANSPARENCY + SUFFIX
NOISE_CANCELLATION_ADAPTIVE = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ADAPTIVE + SUFFIX
SET_CONVERSATION_AWARENESS_OFF = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.Off + SUFFIX
SET_CONVERSATION_AWARENESS_ON = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.On + SUFFIX

View File

@@ -7,8 +7,6 @@ import time
import os
import logging
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
@@ -23,10 +21,10 @@ class CustomFormatter(logging.Formatter):
# Apply color to the level name
levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8)
record.levelname = levelname
# Format the message
formatted_message = super().format(record)
return formatted_message
# Custom formatter with fixed width for level name
@@ -49,14 +47,14 @@ class MediaController:
def playMusic(self):
logging.info("Playing music")
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
subprocess.call(("playerctl", "play"))
def pauseMusic(self):
logging.info("Pausing music")
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
subprocess.call(("playerctl", "--all-players", "pause"))
def isPlaying(self):
return subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() == "Playing"
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
def handlePlayPause(self, data):
primary_status = data[0]

View File

@@ -1,7 +1,7 @@
import socket
import json
import logging
from aln.Notifications.ANC import ANCNotification
SOCKET_PATH = "/tmp/airpods_daemon.sock"
import logging
@@ -56,6 +56,8 @@ def read():
logging.info(f"\033[1;33mReceived battery status: {b} - {battery_data['status']} - {battery_data['level']}\033[1;0m")
elif data["type"] == "ear_detection":
logging.info(f"\033[1;33mReceived ear detection status: {data['primary']} - {data['secondary']}\033[1;0m")
elif data["type"] == "anc":
logging.info(f"\033[1;33mReceived ANC status: {data['status']}\033[1;0m")
elif data["type"] == "unknown":
logging.info(f"Received data: {data['data']}")
else:

View File

@@ -62,12 +62,12 @@ if __name__ == "__main__":
args = parse_arguments()
if args.mode == "off" or args.mode == "1":
command = enums.SET_NOISE_CANCELLATION_OFF
command = enums.NOISE_CANCELLATION_OFF
elif args.mode == "on" or args.mode == "2":
command = enums.SET_NOISE_CANCELLATION_ON
command = enums.NOISE_CANCELLATION_ON
elif args.mode == "transparency" or args.mode == "3":
command = enums.SET_NOISE_CANCELLATION_TRANSPARENCY
command = enums.NOISE_CANCELLATION_TRANSPARENCY
elif args.mode == "adaptive" or args.mode == "4":
command = enums.SET_NOISE_CANCELLATION_ADAPTIVE
command = enums.NOISE_CANCELLATION_ADAPTIVE
send_command(command)

View File

@@ -11,6 +11,35 @@ import subprocess
import time
import os
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
logging.DEBUG: "\033[48;5;240;38;5;15m%s\033[1;0m", # Grey background, white bold text
logging.INFO: "\033[48;5;34;38;5;15m%s\033[1;0m", # Green background, white bold text
logging.WARNING: "\033[1;48;5;214;38;5;0m%s\033[1;0m", # Orange background, black bold text
logging.ERROR: "\033[1;48;5;202;38;5;15m%s\033[1;0m", # Orange-red background, white bold text
logging.CRITICAL: "\033[1;48;5;196;38;5;15m%s\033[1;0m", # Pure red background, white bold text
}
def format(self, record):
# Apply color to the level name
levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8)
record.levelname = levelname
# Format the message
formatted_message = super().format(record)
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
SOCKET_PATH = "/tmp/airpods_daemon.sock"
# Initialize battery_status at the module level
@@ -33,14 +62,15 @@ class MediaController:
def playMusic(self):
logging.info("Playing music")
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
subprocess.call(("playerctl", "play"))
def pauseMusic(self):
logging.info("Pausing music")
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
subprocess.call(("playerctl", "--all-players", "pause"))
def isPlaying(self):
return subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() == "Playing"
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
def handlePlayPause(self, data):
primary_status = data[0]
@@ -121,7 +151,7 @@ class BatteryStatusUpdater(QObject):
super().__init__()
self.media_controller = MediaController()
def listen_for_battery_updates(self):
def listen_to_socket(self):
global battery_status
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(SOCKET_PATH)
@@ -133,9 +163,11 @@ class BatteryStatusUpdater(QObject):
if response["type"] == "battery":
with battery_status_lock:
battery_status = response
logging.debug(f"Received battery status: {response}")
self.battery_status_updated.emit()
elif response["type"] == "ear_detection":
self.media_controller.handlePlayPause([response['primary'], response['secondary']])
logging.debug(f"Received ear detection status: {response}")
except json.JSONDecodeError as e:
logging.warning(f"Error deserializing data: {e}")
except KeyError as e:
@@ -148,21 +180,24 @@ def get_battery_status():
left = battery_status["LEFT"]
right = battery_status["RIGHT"]
case = battery_status["CASE"]
return f"Left: {left['level']}% - {left['status'].title().replace('_', ' ')} | Right: {right['level']}% - {right['status'].title().replace('_', ' ')} | Case: {case['level']}% - {case['status'].title().replace('_', ' ')}"
left_status = (left['status'] or 'Unknown').title().replace('_', ' ')
right_status = (right['status'] or 'Unknown').title().replace('_', ' ')
case_status = (case['status'] or 'Unknown').title().replace('_', ' ')
return f"Left: {left['level']}% - {left_status} | Right: {right['level']}% - {right_status} | Case: {case['level']}% - {case_status}"
from aln import enums
def set_anc_mode(mode):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(SOCKET_PATH)
command = enums.SET_NOISE_CANCELLATION_OFF
command = enums.NOISE_CANCELLATION_OFF
if mode == "on":
command = enums.SET_NOISE_CANCELLATION_ON
command = enums.NOISE_CANCELLATION_ON
elif mode == "off":
command = enums.SET_NOISE_CANCELLATION_OFF
command = enums.NOISE_CANCELLATION_OFF
elif mode == "transparency":
command = enums.SET_NOISE_CANCELLATION_TRANSPARENCY
command = enums.NOISE_CANCELLATION_TRANSPARENCY
elif mode == "adaptive":
command = enums.SET_NOISE_CANCELLATION_ADAPTIVE
command = enums.NOISE_CANCELLATION_ADAPTIVE
client.sendall(command)
response = client.recv(1024)
return json.loads(response.decode())
@@ -172,7 +207,7 @@ def control_anc(action):
logging.info(f"ANC action: {action}, Response: {response}")
def signal_handler(sig, frame):
print("Exiting...")
logging.info("Exiting...")
QApplication.quit()
sys.exit(0)
@@ -222,8 +257,8 @@ battery_status_updater = BatteryStatusUpdater()
battery_status_updater.battery_status_updated.connect(lambda: tray_icon.setToolTip(get_battery_status()))
# Start the battery status listener thread
listener_thread = threading.Thread(target=battery_status_updater.listen_for_battery_updates, daemon=True)
listener_thread = threading.Thread(target=battery_status_updater.listen_to_socket, daemon=True)
listener_thread.start()
# Run the application
sys.exit(app.exec_())
sys.exit(app.exec_())

View File

@@ -52,16 +52,16 @@ def input_thread(connection: Connection):
while True:
anc_mode = input()
if anc_mode == '1':
connection.send(enums.SET_NOISE_CANCELLATION_OFF)
connection.send(enums.NOISE_CANCELLATION_OFF)
logging.info('ANC Off')
elif anc_mode == '2':
connection.send(enums.SET_NOISE_CANCELLATION_TRANSPARENCY)
connection.send(enums.NOISE_CANCELLATION_TRANSPARENCY)
logging.info('Transparency On')
elif anc_mode == '3':
connection.send(enums.SET_NOISE_CANCELLATION_ADAPTIVE)
connection.send(enums.NOISE_CANCELLATION_ADAPTIVE)
logging.info('Adaptive Transparency On')
elif anc_mode == '4':
connection.send(enums.SET_NOISE_CANCELLATION_ON)
connection.send(enums.NOISE_CANCELLATION_ON)
logging.info('ANC On')
else:
logging.error('Invalid ANC Mode')

View File

@@ -21,7 +21,7 @@ class initL2CAP():
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
def getMusicStatus(self):
return subprocess.check_output(("playerctl", "status", "--ignore-player", "OnePlus_7")).decode("utf-8").strip()
return subprocess.getoutput("playerctl status --ignore-player OnePlus_7").strip()
# Change to MAC address of your AirPods

View File

@@ -6,10 +6,10 @@ import sys
import logging
from aln import Connection, enums
from aln.Notifications import Notifications
import os
from aln.Notifications.Battery import Battery
import os
import bluetooth
from aln.enums import enums
connection = None
AIRPODS_MAC = '28:2D:7F:C2:05:5B'
@@ -55,7 +55,9 @@ def handle_client(connection, client_socket):
"level": i.get_level()
}
data: str = JSONEncoder().encode(batteryJSON)
elif notif_key == "notif_ear_detection":
# noinspection PyTypeChecker
data: list[int] = data
earDetectionJSON = {
"type": "ear_detection",
@@ -63,6 +65,13 @@ def handle_client(connection, client_socket):
"secondary": data[1]
}
data: str = JSONEncoder().encode(earDetectionJSON)
elif notif_key == "notif_anc":
data: int = data
ancJSON = {
"type": "anc",
"status": data,
}
data: str = JSONEncoder().encode(ancJSON)
elif notif_key == "notif_unknown":
logging.debug(f"Unhandled notification type: {notif_key}")
logging.debug(f"Data: {data}")
@@ -136,7 +145,7 @@ def start_socket_server(connection):
server_socket.close()
logging.info("Socket server stopped")
def stop_daemon(signum, frame):
def stop_daemon(_, __):
"""Signal handler to stop the daemon."""
global running
logging.info("Received termination signal. Stopping daemon...")
@@ -169,6 +178,11 @@ def notification_handler(notification_type: int, data: bytes):
earDetection = connection.notificationListener.EarDetectionNotification.getEarDetection()
globals()["notif_ear_detection"] = earDetection
logger.debug(earDetection)
elif notification_type == Notifications.ANC_UPDATED:
logger = logging.getLogger("ANC Status")
anc = connection.notificationListener.ANCNotification.status
globals()["notif_anc"] = anc
logger.debug(anc)
elif notification_type == Notifications.UNKNOWN:
logger = logging.getLogger("Unknown Notification")
hex_data = ' '.join(f'{byte:02x}' for byte in data)
@@ -190,6 +204,7 @@ def main():
connection.send(enums.HANDSHAKE)
logging.info("Handshake sent")
connection.initialize_notifications(notification_handler)
# Start the socket server to listen for client connections
@@ -219,4 +234,4 @@ if __name__ == "__main__":
os.dup2(logfile.fileno(), sys.stdout.fileno())
os.dup2(logfile.fileno(), sys.stderr.fileno())
main()
main()