mirror of
https://github.com/kavishdevar/librepods.git
synced 2026-05-01 02:24:18 +00:00
implement conversational awareness in tray app
This commit is contained in:
@@ -5,13 +5,14 @@ class NoiseCancellation:
|
|||||||
ADAPTIVE = b"\x04"
|
ADAPTIVE = b"\x04"
|
||||||
|
|
||||||
class ConversationAwareness:
|
class ConversationAwareness:
|
||||||
Off = b"\x02"
|
OFF = b"\x02"
|
||||||
On = b"\x01"
|
ON = b"\x01"
|
||||||
|
|
||||||
class Capabilites:
|
class Capabilites:
|
||||||
NOISE_CANCELLATION = b"\x0d"
|
NOISE_CANCELLATION = b"\x0d"
|
||||||
CONVERSATION_AWARENESS = b"\x28"
|
CONVERSATION_AWARENESS = b"\x28"
|
||||||
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = b"\x01\x02"
|
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = b"\x01\x02"
|
||||||
EAR_DETECTION = b"\x06"
|
EAR_DETECTION = b"\x06"
|
||||||
|
|
||||||
NoiseCancellation = NoiseCancellation
|
NoiseCancellation = NoiseCancellation
|
||||||
ConversationAwareness = ConversationAwareness
|
ConversationAwareness = ConversationAwareness
|
||||||
@@ -20,31 +20,6 @@ class ANCNotification:
|
|||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def setANC(self, data: bytes):
|
def setData(self, data: bytes):
|
||||||
self.status = data[7]
|
self.status = data[7]
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def getANC(self, returnString: bool = False, fromInt: int = None):
|
|
||||||
if fromInt is not None:
|
|
||||||
fromInt = bytes([fromInt])
|
|
||||||
if fromInt == self.OFF:
|
|
||||||
return "Off"
|
|
||||||
elif fromInt == self.ON:
|
|
||||||
return "On"
|
|
||||||
elif fromInt == self.TRANSPARENCY:
|
|
||||||
return "Transparency"
|
|
||||||
elif fromInt == self.ADAPTIVE:
|
|
||||||
return "Adaptive"
|
|
||||||
pass
|
|
||||||
if returnString:
|
|
||||||
return self.status
|
|
||||||
else:
|
|
||||||
if self.status == self.OFF:
|
|
||||||
return "Off"
|
|
||||||
elif self.status == self.ON:
|
|
||||||
return "On"
|
|
||||||
elif self.status == self.TRANSPARENCY:
|
|
||||||
return "Transparency"
|
|
||||||
elif self.status == self.ADAPTIVE:
|
|
||||||
return "Adaptive"
|
|
||||||
pass
|
|
||||||
19
aln/Notifications/ConversationalAwareness.py
Normal file
19
aln/Notifications/ConversationalAwareness.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# 04 00 04 00 4b 00 02 00 01 [01/02/03/0b/09]
|
||||||
|
|
||||||
|
from ..enums import enums
|
||||||
|
|
||||||
|
class ConversationalAwarenessNotification:
|
||||||
|
NOTIFICATION_PREFIX = enums.CONVERSATION_AWARENESS_RECEIVE_PREFIX
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def isConversationalAwarenessData(self, data: bytes):
|
||||||
|
if len(data) != 10:
|
||||||
|
return False
|
||||||
|
if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def setData(self, data: bytes):
|
||||||
|
self.status = data[9]
|
||||||
|
pass
|
||||||
@@ -4,7 +4,7 @@ from typing import Literal
|
|||||||
|
|
||||||
class EarDetectionNotification:
|
class EarDetectionNotification:
|
||||||
NOTIFICATION_BIT = Capabilites.EAR_DETECTION
|
NOTIFICATION_BIT = Capabilites.EAR_DETECTION
|
||||||
NOTIFICATION_PREFIX = enums.SEND_PREFIX + NOTIFICATION_BIT
|
NOTIFICATION_PREFIX = enums.PREFIX + NOTIFICATION_BIT
|
||||||
IN_EAR = 0x00
|
IN_EAR = 0x00
|
||||||
OUT_OF_EAR = 0x01
|
OUT_OF_EAR = 0x01
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from bluetooth import BluetoothSocket
|
|||||||
import threading
|
import threading
|
||||||
from .Battery import BatteryNotification
|
from .Battery import BatteryNotification
|
||||||
from .EarDetection import EarDetectionNotification
|
from .EarDetection import EarDetectionNotification
|
||||||
|
from .ConversationalAwareness import ConversationalAwarenessNotification
|
||||||
from .ANC import ANCNotification
|
from .ANC import ANCNotification
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@@ -11,6 +12,7 @@ class NotificationListener:
|
|||||||
BATTERY_UPDATED = 0x01
|
BATTERY_UPDATED = 0x01
|
||||||
ANC_UPDATED = 0x02
|
ANC_UPDATED = 0x02
|
||||||
EAR_DETECTION_UPDATED = 0x03
|
EAR_DETECTION_UPDATED = 0x03
|
||||||
|
CA_UPDATED = 0x04
|
||||||
UNKNOWN = 0x00
|
UNKNOWN = 0x00
|
||||||
|
|
||||||
def __init__(self, socket: BluetoothSocket, callback: callable):
|
def __init__(self, socket: BluetoothSocket, callback: callable):
|
||||||
@@ -18,6 +20,7 @@ class NotificationListener:
|
|||||||
self.BatteryNotification = BatteryNotification()
|
self.BatteryNotification = BatteryNotification()
|
||||||
self.EarDetectionNotification = EarDetectionNotification()
|
self.EarDetectionNotification = EarDetectionNotification()
|
||||||
self.ANCNotification = ANCNotification()
|
self.ANCNotification = ANCNotification()
|
||||||
|
self.ConversationalAwarenessNotification = ConversationalAwarenessNotification()
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -34,8 +37,11 @@ class NotificationListener:
|
|||||||
self.EarDetectionNotification.setEarDetection(data)
|
self.EarDetectionNotification.setEarDetection(data)
|
||||||
self.callback(self.EAR_DETECTION_UPDATED, data)
|
self.callback(self.EAR_DETECTION_UPDATED, data)
|
||||||
if self.ANCNotification.isANCData(data):
|
if self.ANCNotification.isANCData(data):
|
||||||
self.ANCNotification.setANC(data)
|
self.ANCNotification.setData(data)
|
||||||
self.callback(self.ANC_UPDATED, data)
|
self.callback(self.ANC_UPDATED, data)
|
||||||
|
if self.ConversationalAwarenessNotification.isConversationalAwarenessData(data):
|
||||||
|
self.ConversationalAwarenessNotification.setData(data)
|
||||||
|
self.callback(self.CA_UPDATED, data)
|
||||||
else:
|
else:
|
||||||
self.callback(self.UNKNOWN, data)
|
self.callback(self.UNKNOWN, data)
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ enums = enums()
|
|||||||
class Notifications:
|
class Notifications:
|
||||||
BATTERY_UPDATED = NotificationListener.BATTERY_UPDATED
|
BATTERY_UPDATED = NotificationListener.BATTERY_UPDATED
|
||||||
ANC_UPDATED = NotificationListener.ANC_UPDATED
|
ANC_UPDATED = NotificationListener.ANC_UPDATED
|
||||||
|
CA_UPDATED = NotificationListener.CA_UPDATED
|
||||||
EAR_DETECTION_UPDATED = NotificationListener.EAR_DETECTION_UPDATED
|
EAR_DETECTION_UPDATED = NotificationListener.EAR_DETECTION_UPDATED
|
||||||
UNKNOWN = NotificationListener.UNKNOWN
|
UNKNOWN = NotificationListener.UNKNOWN
|
||||||
def __init__(self, socket: bluetooth.BluetoothSocket, callback: callable):
|
def __init__(self, socket: bluetooth.BluetoothSocket, callback: callable):
|
||||||
@@ -18,12 +19,13 @@ class Notifications:
|
|||||||
self.BatteryNotification = self.notificationListener.BatteryNotification
|
self.BatteryNotification = self.notificationListener.BatteryNotification
|
||||||
self.EarDetectionNotification = self.notificationListener.EarDetectionNotification
|
self.EarDetectionNotification = self.notificationListener.EarDetectionNotification
|
||||||
self.ANCNotification = self.notificationListener.ANCNotification
|
self.ANCNotification = self.notificationListener.ANCNotification
|
||||||
|
self.ConversationalAwarenessNotification = self.notificationListener.ConversationalAwarenessNotification
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def initialize(self):
|
def initialize(self):
|
||||||
try:
|
try:
|
||||||
self.socket.send(enums.REQUEST_NOTIFICATIONS)
|
|
||||||
self.socket.send(enums.SET_SPECIFIC_FEATURES)
|
self.socket.send(enums.SET_SPECIFIC_FEATURES)
|
||||||
|
self.socket.send(enums.REQUEST_NOTIFICATIONS)
|
||||||
self.notificationListener.start()
|
self.notificationListener.start()
|
||||||
|
|
||||||
except bluetooth.btcommon.BluetoothError as e:
|
except bluetooth.btcommon.BluetoothError as e:
|
||||||
|
|||||||
@@ -23,6 +23,8 @@ class Connection:
|
|||||||
self.notificationListener = self.notifications.notificationListener
|
self.notificationListener = self.notifications.notificationListener
|
||||||
self.BatteryNotification = self.notifications.BatteryNotification
|
self.BatteryNotification = self.notifications.BatteryNotification
|
||||||
self.ANCNotification = self.notifications.ANCNotification
|
self.ANCNotification = self.notifications.ANCNotification
|
||||||
|
self.EarDetectionNotification = self.notifications.EarDetectionNotification
|
||||||
|
self.ConversationalAwarenessNotification = self.notifications.ConversationalAwarenessNotification
|
||||||
self.notifications.initialize()
|
self.notifications.initialize()
|
||||||
|
|
||||||
def send(self, data: bytes):
|
def send(self, data: bytes):
|
||||||
@@ -46,7 +48,11 @@ class Connection:
|
|||||||
pass
|
pass
|
||||||
elif notification_type == Notifications.ANC_UPDATED:
|
elif notification_type == Notifications.ANC_UPDATED:
|
||||||
logging = logging.getLogger("ANC Status")
|
logging = logging.getLogger("ANC Status")
|
||||||
logging.debug(f'{self.notificationListener.ANCNotification.getANC()}')
|
logging.debug(f'{self.notificationListener.ANCNotification.status}')
|
||||||
|
pass
|
||||||
|
elif notification_type == Notifications.CA_UPDATED:
|
||||||
|
logging = logging.getLogger("Conversational Awareness Status")
|
||||||
|
logging.debug(f'{self.notificationListener.ConversationalAwarenessNotification.status}')
|
||||||
pass
|
pass
|
||||||
elif notification_type == Notifications.UNKNOWN:
|
elif notification_type == Notifications.UNKNOWN:
|
||||||
logging = logging.getLogger("Unknown Notification")
|
logging = logging.getLogger("Unknown Notification")
|
||||||
|
|||||||
14
aln/enums.py
14
aln/enums.py
@@ -4,19 +4,21 @@ class enums:
|
|||||||
NOISE_CANCELLATION = Capabilites.NOISE_CANCELLATION
|
NOISE_CANCELLATION = Capabilites.NOISE_CANCELLATION
|
||||||
CONVERSATION_AWARENESS = Capabilites.CONVERSATION_AWARENESS
|
CONVERSATION_AWARENESS = Capabilites.CONVERSATION_AWARENESS
|
||||||
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY
|
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY
|
||||||
SEND_PREFIX = b'\x04\x00\x04\x00'
|
PREFIX = b'\x04\x00\x04\x00'
|
||||||
SETTINGS = b"\x09\x00"
|
SETTINGS = b"\x09\x00"
|
||||||
SUFFIX = b'\x00\x00\x00'
|
SUFFIX = b'\x00\x00\x00'
|
||||||
NOTIFICATION_FILTER = b'\x0f'
|
NOTIFICATION_FILTER = b'\x0f'
|
||||||
SPECIFIC_FEATURES = b'\x4d'
|
SPECIFIC_FEATURES = b'\x4d'
|
||||||
SET_SPECIFIC_FEATURES = SEND_PREFIX + SPECIFIC_FEATURES + b"\x00\xff\x00\x00\x00\x00\x00\x00\x00"
|
SET_SPECIFIC_FEATURES = PREFIX + SPECIFIC_FEATURES + b"\x00\xff\x00\x00\x00\x00\x00\x00\x00"
|
||||||
REQUEST_NOTIFICATIONS = SEND_PREFIX + NOTIFICATION_FILTER + b"\x00\xff\xff\xff\xff"
|
REQUEST_NOTIFICATIONS = PREFIX + NOTIFICATION_FILTER + b"\x00\xff\xff\xff\xff"
|
||||||
HANDSHAKE = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
HANDSHAKE = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||||
NOISE_CANCELLATION_PREFIX = SEND_PREFIX + SETTINGS + NOISE_CANCELLATION
|
NOISE_CANCELLATION_PREFIX = PREFIX + SETTINGS + NOISE_CANCELLATION
|
||||||
NOISE_CANCELLATION_OFF = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.OFF + SUFFIX
|
NOISE_CANCELLATION_OFF = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.OFF + SUFFIX
|
||||||
NOISE_CANCELLATION_ON = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ON + SUFFIX
|
NOISE_CANCELLATION_ON = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ON + SUFFIX
|
||||||
NOISE_CANCELLATION_TRANSPARENCY = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.TRANSPARENCY + SUFFIX
|
NOISE_CANCELLATION_TRANSPARENCY = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.TRANSPARENCY + SUFFIX
|
||||||
NOISE_CANCELLATION_ADAPTIVE = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ADAPTIVE + SUFFIX
|
NOISE_CANCELLATION_ADAPTIVE = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ADAPTIVE + SUFFIX
|
||||||
|
|
||||||
SET_CONVERSATION_AWARENESS_OFF = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.Off + SUFFIX
|
SET_CONVERSATION_AWARENESS_OFF = PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.OFF + SUFFIX
|
||||||
SET_CONVERSATION_AWARENESS_ON = SEND_PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.On + SUFFIX
|
SET_CONVERSATION_AWARENESS_ON = PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.ON + SUFFIX
|
||||||
|
|
||||||
|
CONVERSATION_AWARENESS_RECEIVE_PREFIX = PREFIX + b"\x4b\x00\x02\00"
|
||||||
@@ -57,7 +57,9 @@ def read():
|
|||||||
elif data["type"] == "ear_detection":
|
elif data["type"] == "ear_detection":
|
||||||
logging.info(f"\033[1;33mReceived ear detection status: {data['primary']} - {data['secondary']}\033[1;0m")
|
logging.info(f"\033[1;33mReceived ear detection status: {data['primary']} - {data['secondary']}\033[1;0m")
|
||||||
elif data["type"] == "anc":
|
elif data["type"] == "anc":
|
||||||
logging.info(f"\033[1;33mReceived ANC status: {data['status']}\033[1;0m")
|
logging.info(f"\033[1;33mReceived ANC status: {data['mode']}\033[1;0m")
|
||||||
|
elif data["type"] == "ca":
|
||||||
|
logging.info(f"\033[1;33mReceived Conversational Awareness status: {data['status']}\033[1;0m")
|
||||||
elif data["type"] == "unknown":
|
elif data["type"] == "unknown":
|
||||||
logging.info(f"Received data: {data['data']}")
|
logging.info(f"Received data: {data['data']}")
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ battery_status = {
|
|||||||
"RIGHT": {"status": "Unknown", "level": 0},
|
"RIGHT": {"status": "Unknown", "level": 0},
|
||||||
"CASE": {"status": "Unknown", "level": 0}
|
"CASE": {"status": "Unknown", "level": 0}
|
||||||
}
|
}
|
||||||
|
anc_mode = 0
|
||||||
# Define a lock
|
# Define a lock
|
||||||
battery_status_lock = threading.Lock()
|
battery_status_lock = threading.Lock()
|
||||||
|
|
||||||
@@ -71,7 +71,6 @@ class MediaController:
|
|||||||
def isPlaying(self):
|
def isPlaying(self):
|
||||||
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
|
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
|
||||||
|
|
||||||
|
|
||||||
def handlePlayPause(self, data):
|
def handlePlayPause(self, data):
|
||||||
primary_status = data[0]
|
primary_status = data[0]
|
||||||
secondary_status = data[1]
|
secondary_status = data[1]
|
||||||
@@ -144,8 +143,48 @@ class MediaController:
|
|||||||
self.earStatus = "Only one in"
|
self.earStatus = "Only one in"
|
||||||
return "Only one in"
|
return "Only one in"
|
||||||
|
|
||||||
|
# Function to get current sink volume
|
||||||
|
def get_current_volume():
|
||||||
|
result = subprocess.run(["pactl", "get-sink-volume", "@DEFAULT_SINK@"], capture_output=True, text=True)
|
||||||
|
volume_line = result.stdout.splitlines()[0]
|
||||||
|
volume_percent = int(volume_line.split()[4].strip('%'))
|
||||||
|
return volume_percent
|
||||||
|
|
||||||
|
# Function to set sink volume
|
||||||
|
def set_volume(percent):
|
||||||
|
subprocess.run(["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{percent}%"])
|
||||||
|
|
||||||
|
initial_volume = get_current_volume()
|
||||||
|
|
||||||
|
# Handle conversational awareness
|
||||||
|
def handle_conversational_awareness(status):
|
||||||
|
if status < 1 or status > 9:
|
||||||
|
logging.error(f"Invalid status: {status}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
global initial_volume
|
||||||
|
|
||||||
|
# Volume adjustment logic
|
||||||
|
if status == 1 or status == 2:
|
||||||
|
globals()["initial_volume"] = get_current_volume()
|
||||||
|
new_volume = max(0, min(int(initial_volume * 0.1), 100)) # Reduce to 10% for initial speaking
|
||||||
|
elif status == 3:
|
||||||
|
new_volume = max(0, min(int(initial_volume * 0.4), 100)) # Slightly increase to 40%
|
||||||
|
elif status == 6:
|
||||||
|
new_volume = max(0, min(int(initial_volume * 0.5), 100)) # Set volume to 50%
|
||||||
|
elif status >= 8:
|
||||||
|
new_volume = initial_volume # Fully restore volume
|
||||||
|
|
||||||
|
set_volume(new_volume)
|
||||||
|
logging.getLogger("Conversational Awareness").info(f"Volume set to {new_volume}% based on conversational awareness status: {status}")
|
||||||
|
|
||||||
|
# If status is 9, print conversation end message
|
||||||
|
if status == 9:
|
||||||
|
logging.getLogger("Conversational Awareness").info("Conversation ended. Restored volume to original level.")
|
||||||
|
|
||||||
class BatteryStatusUpdater(QObject):
|
class BatteryStatusUpdater(QObject):
|
||||||
battery_status_updated = pyqtSignal()
|
battery_status_updated = pyqtSignal()
|
||||||
|
anc_mode_updated = pyqtSignal()
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@@ -153,6 +192,7 @@ class BatteryStatusUpdater(QObject):
|
|||||||
|
|
||||||
def listen_to_socket(self):
|
def listen_to_socket(self):
|
||||||
global battery_status
|
global battery_status
|
||||||
|
global anc_mode
|
||||||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
|
||||||
client.connect(SOCKET_PATH)
|
client.connect(SOCKET_PATH)
|
||||||
while True:
|
while True:
|
||||||
@@ -168,6 +208,14 @@ class BatteryStatusUpdater(QObject):
|
|||||||
elif response["type"] == "ear_detection":
|
elif response["type"] == "ear_detection":
|
||||||
self.media_controller.handlePlayPause([response['primary'], response['secondary']])
|
self.media_controller.handlePlayPause([response['primary'], response['secondary']])
|
||||||
logging.debug(f"Received ear detection status: {response}")
|
logging.debug(f"Received ear detection status: {response}")
|
||||||
|
elif response["type"] == "anc":
|
||||||
|
anc_mode = response["mode"]
|
||||||
|
self.anc_mode_updated.emit()
|
||||||
|
logging.debug(f"Received ANC status: {anc_mode}")
|
||||||
|
elif response["type"] == "ca":
|
||||||
|
ca_status = response["status"]
|
||||||
|
handle_conversational_awareness(ca_status)
|
||||||
|
logging.debug(f"Received CA status: {ca_status}")
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logging.warning(f"Error deserializing data: {e}")
|
logging.warning(f"Error deserializing data: {e}")
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
|
|||||||
@@ -69,14 +69,20 @@ def handle_client(connection, client_socket):
|
|||||||
data: int = data
|
data: int = data
|
||||||
ancJSON = {
|
ancJSON = {
|
||||||
"type": "anc",
|
"type": "anc",
|
||||||
"status": data,
|
"mode": data,
|
||||||
}
|
}
|
||||||
data: str = JSONEncoder().encode(ancJSON)
|
data: str = JSONEncoder().encode(ancJSON)
|
||||||
|
elif notif_key == "notif_ca":
|
||||||
|
data: int = data
|
||||||
|
caJSON = {
|
||||||
|
"type": "ca",
|
||||||
|
"status": data,
|
||||||
|
}
|
||||||
|
data: str = JSONEncoder().encode(caJSON)
|
||||||
elif notif_key == "notif_unknown":
|
elif notif_key == "notif_unknown":
|
||||||
logging.debug(f"Unhandled notification type: {notif_key}")
|
logging.debug(f"Unhandled notification type: {notif_key}")
|
||||||
logging.debug(f"Data: {data}")
|
logging.debug(f"Data: {data}")
|
||||||
data: str = JSONEncoder().encode({"type": "unknown", "data": data})
|
data: str = JSONEncoder().encode({"type": "unknown", "data": data})
|
||||||
|
|
||||||
if not client_socket or not isinstance(client_socket, socket.socket):
|
if not client_socket or not isinstance(client_socket, socket.socket):
|
||||||
logging.error("Invalid client socket")
|
logging.error("Invalid client socket")
|
||||||
break
|
break
|
||||||
@@ -183,6 +189,11 @@ def notification_handler(notification_type: int, data: bytes):
|
|||||||
anc = connection.notificationListener.ANCNotification.status
|
anc = connection.notificationListener.ANCNotification.status
|
||||||
globals()["notif_anc"] = anc
|
globals()["notif_anc"] = anc
|
||||||
logger.debug(anc)
|
logger.debug(anc)
|
||||||
|
elif notification_type == Notifications.CA_UPDATED:
|
||||||
|
logger = logging.getLogger("Conversational Awareness Status")
|
||||||
|
ca = connection.notificationListener.ConversationalAwarenessNotification.status
|
||||||
|
globals()["notif_ca"] = ca
|
||||||
|
logger.debug(ca)
|
||||||
elif notification_type == Notifications.UNKNOWN:
|
elif notification_type == Notifications.UNKNOWN:
|
||||||
logger = logging.getLogger("Unknown Notification")
|
logger = logging.getLogger("Unknown Notification")
|
||||||
hex_data = ' '.join(f'{byte:02x}' for byte in data)
|
hex_data = ' '.join(f'{byte:02x}' for byte in data)
|
||||||
|
|||||||
Reference in New Issue
Block a user