linux-rust: add conversational awareness

This commit is contained in:
Kavish Devar
2025-10-21 19:14:09 +05:30
parent c0ae061cc7
commit b0561e96df
2 changed files with 318 additions and 0 deletions

View File

@@ -133,6 +133,11 @@ impl AirPodsDevice {
AACPEvent::ControlCommand(status) => {
debug!("Received ControlCommand event: {:?}", status);
}
AACPEvent::ConversationalAwareness(status) => {
debug!("Received ConversationalAwareness event: {}", status);
let controller = mc_clone.lock().await;
controller.handle_conversational_awareness(status).await;
}
_ => {}
}
}

View File

@@ -11,9 +11,14 @@ use libpulse_binding::context::{Context, FlagSet as ContextFlagSet};
use libpulse_binding::operation::State as OperationState;
use std::cell::RefCell;
use std::rc::Rc;
use dbus::arg::RefArg;
use libpulse_binding::def::Retval;
use libpulse_binding::callbacks::ListResult;
use libpulse_binding::context::introspect::{SinkInfo};
use libpulse_binding::proplist::Proplist;
use libpulse_binding::{
volume::{ChannelVolumes, Volume},
};
#[derive(Clone)]
struct OwnedCardProfileInfo {
@@ -27,6 +32,13 @@ struct OwnedCardInfo {
profiles: Vec<OwnedCardProfileInfo>,
}
#[derive(Clone)]
struct OwnedSinkInfo {
name: Option<String>,
proplist: Proplist,
volume: ChannelVolumes,
}
struct MediaControllerState {
connected_device_mac: String,
is_playing: bool,
@@ -38,6 +50,8 @@ struct MediaControllerState {
i_paused_the_media: bool,
ear_detection_enabled: bool,
disconnect_when_not_wearing: bool,
conv_original_volume: Option<u32>,
conv_conversation_started: bool,
}
impl MediaControllerState {
@@ -53,6 +67,8 @@ impl MediaControllerState {
i_paused_the_media: false,
ear_detection_enabled: true,
disconnect_when_not_wearing: true,
conv_original_volume: None,
conv_conversation_started: false,
}
}
}
@@ -237,6 +253,7 @@ impl MediaController {
async fn pause(&self) {
debug!("Pausing playback");
let paused_services = tokio::task::spawn_blocking(|| {
debug!("Listing DBus names for media players");
let conn = Connection::new_session().unwrap();
@@ -589,6 +606,185 @@ impl MediaController {
warn!("Failed to deactivate A2DP profile");
}
}
pub async fn handle_conversational_awareness(&self, status: u8) {
debug!("Entering handle_conversational_awareness with status: {}", status);
let mac;
{
let state = self.state.lock().await;
mac = state.connected_device_mac.clone();
}
if mac.is_empty() {
debug!("No connected device MAC, skipping conversational awareness");
return;
}
let sink_name = get_sink_name_by_mac(&mac).await;
let sink = match sink_name {
Some(s) => s,
None => {
warn!("Could not find sink for MAC {}, skipping conversational awareness", mac);
return;
}
};
let current_volume_opt = tokio::task::spawn_blocking({
let sink = sink.clone();
move || {
get_sink_volume_percent_by_name_sync(&sink)
}
}).await.unwrap_or(None);
match status {
1 => {
let original = current_volume_opt.unwrap_or(0);
debug!("Conversation start (1). Current volume: {}", original);
{
let mut state = self.state.lock().await;
if !state.conv_conversation_started {
state.conv_original_volume = Some(original);
state.conv_conversation_started = true;
} else {
debug!("Conversation already started; not overwriting conv_original_volume");
}
}
if original > 25 {
let sink_clone = sink.clone();
tokio::task::spawn_blocking(move || {
transition_sink_volume(&sink_clone, 25)
}).await.unwrap_or(false);
info!("Conversation start: lowered volume to 25% (original {})", original);
} else {
debug!("Original volume {} <= 25, not reducing to 25", original);
}
}
2 => {
let original = {
let state = self.state.lock().await;
state.conv_original_volume
}.clone();
if let Some(orig) = original {
debug!("Conversation reduce (2). Original: {}", orig);
if orig > 15 {
let sink_clone = sink.clone();
tokio::task::spawn_blocking(move || {
transition_sink_volume(&sink_clone, 15)
}).await.unwrap_or(false);
info!("Conversation reduce: lowered volume to 15% (original {})", orig);
} else {
debug!("Original {} <= 15, not reducing to 15", orig);
}
} else {
debug!("No original volume known for status 2, skipping");
}
}
3 => {
let maybe_orig = {
let state = self.state.lock().await;
(state.conv_conversation_started, state.conv_original_volume)
};
if !maybe_orig.0 {
debug!("Received status 3 but conversation was not started; ignoring increase");
return;
}
if let Some(orig) = maybe_orig.1 {
let target = if orig > 25 { 25 } else { orig };
let sink_clone = sink.clone();
tokio::task::spawn_blocking(move || {
transition_sink_volume(&sink_clone, target)
}).await.unwrap_or(false);
info!("Conversation partial increase (3): set volume to {} (original {})", target, orig);
} else if let Some(orig_from_current) = current_volume_opt {
let target = if orig_from_current > 25 { 25 } else { orig_from_current };
let sink_clone = sink.clone();
tokio::task::spawn_blocking(move || {
transition_sink_volume(&sink_clone, target)
}).await.unwrap_or(false);
info!("Conversation partial increase (3) with fallback current: set volume to {} (measured {})", target, orig_from_current);
} else {
debug!("No original volume known for status 3, skipping");
}
}
4 => {
let mut maybe_original = None;
{
let mut state = self.state.lock().await;
if state.conv_conversation_started {
maybe_original = state.conv_original_volume;
state.conv_original_volume = None;
state.conv_conversation_started = false;
} else {
debug!("Received status 4 but conversation was not started; ignoring restore");
return;
}
}
if let Some(orig) = maybe_original {
let sink_clone = sink.clone();
tokio::task::spawn_blocking(move || {
transition_sink_volume(&sink_clone, orig)
}).await.unwrap_or(false);
info!("Conversation end (4): restored volume to original {}", orig);
} else {
debug!("No stored original volume to restore to on status 4");
}
}
6 => {
let mut maybe_original = None;
{
let mut state = self.state.lock().await;
if state.conv_conversation_started {
maybe_original = state.conv_original_volume;
state.conv_original_volume = None;
state.conv_conversation_started = false;
} else {
debug!("Received status 6 but conversation was not started; ignoring restore");
return;
}
}
if let Some(orig) = maybe_original {
let sink_clone = sink.clone();
tokio::task::spawn_blocking(move || {
transition_sink_volume(&sink_clone, orig)
}).await.unwrap_or(false);
info!("Conversation end (6): restored volume to original {}", orig);
} else {
debug!("No stored original volume to restore to on status 6");
}
}
_ => {
debug!("Unknown conversational awareness status: {}", status);
}
}
}
}
fn get_sink_volume_percent_by_name_sync(sink_name: &str) -> Option<u32> {
match Command::new("pactl").args(&["get-sink-volume", sink_name]).output() {
Ok(output) if output.status.success() => {
if let Ok(s) = String::from_utf8(output.stdout) {
if let Some(pct_idx) = s.find('%') {
let mut start = pct_idx;
let bytes = s.as_bytes();
while start > 0 {
let b = bytes[start - 1];
if b.is_ascii_digit() {
start -= 1;
} else {
break;
}
}
if start < pct_idx {
if let Ok(num) = s[start..pct_idx].trim().parse::<u32>() {
return Some(num);
}
}
}
}
None
}
_ => None,
}
}
fn set_card_profile_sync(card_index: u32, profile_name: &str) -> bool {
@@ -614,3 +810,120 @@ fn set_card_profile_sync(card_index: u32, profile_name: &str) -> bool {
true
}
pub fn transition_sink_volume(sink_name: &str, target_volume: u32) -> bool {
let mut mainloop = Mainloop::new().unwrap();
let mut context = Context::new(&mut mainloop, "LibrePods-transition_sink_volume").unwrap();
context.connect(None, ContextFlagSet::NOAUTOSPAWN, None).unwrap();
loop {
match mainloop.iterate(false) {
_ if context.get_state() == libpulse_binding::context::State::Ready => break,
_ if context.get_state() == libpulse_binding::context::State::Failed || context.get_state() == libpulse_binding::context::State::Terminated => return false,
_ => {},
}
}
let mut introspector = context.introspect();
let sink_info_option = Rc::new(RefCell::new(None));
let op = introspector.get_sink_info_by_name(sink_name, {
let sink_info_option = sink_info_option.clone();
move |result: ListResult<&SinkInfo>| {
if let ListResult::Item(item) = result {
let owned_item = OwnedSinkInfo {
name: item.name.as_ref().map(|s| s.to_string()),
proplist: item.proplist.clone(),
volume: item.volume,
};
*sink_info_option.borrow_mut() = Some(owned_item);
}
}
});
while op.get_state() == OperationState::Running {
mainloop.iterate(false);
}
if let Some(sink_info) = sink_info_option.borrow().as_ref() {
let channels = sink_info.volume.len();
let mut new_volumes = ChannelVolumes::default();
let raw = (((target_volume as f64) / 100.0) * Volume::NORMAL.0.as_f64().unwrap()).round() as u32;
let vol = Volume(raw);
new_volumes.set(channels, vol);
let op = introspector.set_sink_volume_by_name(sink_name, &new_volumes, None);
while op.get_state() == OperationState::Running {
mainloop.iterate(false);
}
mainloop.quit(Retval(0));
true
} else {
error!("Sink not found: {}", sink_name);
false
}
}
async fn get_sink_name_by_mac(mac: &str) -> Option<String> {
debug!("Entering get_sink_name_by_mac for MAC: {}", mac);
if mac.is_empty() {
debug!("MAC is empty, returning None");
return None;
}
let mac_clone = mac.to_string();
tokio::task::spawn_blocking(move || {
let mut mainloop = Mainloop::new().unwrap();
let mut context = Context::new(&mut mainloop, "LibrePods-get_sink_name_by_mac").unwrap();
context.connect(None, ContextFlagSet::NOAUTOSPAWN, None).unwrap();
loop {
match mainloop.iterate(false) {
_ if context.get_state() == libpulse_binding::context::State::Ready => break,
_ if context.get_state() == libpulse_binding::context::State::Failed || context.get_state() == libpulse_binding::context::State::Terminated => return None,
_ => {},
}
}
let introspector = context.introspect();
let sink_info_list = Rc::new(RefCell::new(Some(Vec::new())));
let op = introspector.get_sink_info_list({
let sink_info_list = sink_info_list.clone();
move |result: ListResult<&SinkInfo>| {
if let ListResult::Item(item) = result {
let owned_item = OwnedSinkInfo {
name: item.name.as_ref().map(|s| s.to_string()),
proplist: item.proplist.clone(),
volume: item.volume,
};
sink_info_list.borrow_mut().as_mut().unwrap().push(owned_item);
}
}
});
while op.get_state() == OperationState::Running {
mainloop.iterate(false);
}
mainloop.quit(Retval(0));
if let Some(list) = sink_info_list.borrow().as_ref() {
for sink in list {
if let Some(device_string) = sink.proplist.get_str("device.string") {
if device_string.to_uppercase().contains(&mac_clone.to_uppercase()) {
if let Some(name) = &sink.name {
info!("Found sink name for MAC {}: {}", mac_clone, name);
return Some(name.to_string());
}
}
}
if let Some(bluez_path) = sink.proplist.get_str("bluez.path") {
let mac_from_path = bluez_path.split('/').last().unwrap_or("").replace("dev_", "").replace('_', ":");
if mac_from_path.eq_ignore_ascii_case(&mac_clone) {
if let Some(name) = &sink.name {
info!("Found sink name for MAC {}: {}", mac_clone, name);
return Some(name.to_string());
}
}
}
}
}
error!("No matching sink found for MAC address: {}", mac_clone);
None
}).await.unwrap_or(None)
}