Compare commits

...

18 Commits
v0.2.5 ... main

Author SHA1 Message Date
Kavish Devar
4ef3e4d4da ci: post nightly release to #android-ci on discord 2026-04-28 18:17:04 +05:30
Kavish Devar
b88b14de15 move things around 2026-04-28 17:27:11 +05:30
Kavish Devar
7cd4dfa3e0 remove head tracking 2026-04-28 17:27:00 +05:30
Kavish Devar
30d16e9977 docs: add some docs
I made this a while back, it is very incomplete, just adding in case anyone's looking for the opcodes
2026-04-28 16:19:30 +05:30
Kavish Devar
ddcf15eefe android: clear root module build dir on each run 2026-04-28 12:21:46 +05:30
Kavish Devar
3e89d7f41b android: version bump 2026-04-28 12:13:14 +05:30
Kavish Devar
9eb6010a25 android: connect to audio when reconnecting to last connected device 2026-04-28 12:12:45 +05:30
Kavish Devar
60e865fc1f android: call connect to audio even if BLUETOOTH_PRIVILEGED/MODIFY_PHONE_STATE is not granted 2026-04-28 12:12:07 +05:30
Kavish Devar
629b7b917e android: fix crash on some devices not properly closing socket 2026-04-28 12:10:47 +05:30
Kavish Devar
d4ee741224 android: add disconnect button 2026-04-28 12:10:00 +05:30
Kavish Devar
4c8b0d720d android: fix crash in reading call control settings 2026-04-28 12:08:22 +05:30
Kavish Devar
e20b0f7fd7 android: fix adaptive audio strength slider not being flipped
why... why, apple?
2026-04-28 12:07:00 +05:30
Kavish Devar
b64ff1d09e android: catch exceptions when closing IslandWindow 2026-04-28 12:04:59 +05:30
Kavish Devar
37056c6de7 android: fix icon size in select list 2026-04-28 12:04:29 +05:30
Kavish Devar
3a636e37a4 android: increase bottom padding on all screens 2026-04-28 12:04:08 +05:30
Kavish Devar
136e3e8995 android: do not log writeRaw in ATTManager if socket unavailable 2026-04-28 12:03:51 +05:30
Kavish Devar
e39c1cfeba android: fix ControlCommand.parseFromBytes outofbounds crash 2026-04-28 12:03:26 +05:30
Kavish Devar
b06d780eee android: fix xposed state being set true onResume 2026-04-27 13:13:40 +05:30
30 changed files with 321 additions and 1800 deletions

View File

@@ -22,6 +22,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
short_sha: ${{ steps.vars.outputs.short_sha }}
app_version: ${{ steps.version.outputs.app_version }}
steps:
- uses: actions/checkout@v4
with:
@@ -51,6 +52,9 @@ jobs:
- name: Build
run: ./gradlew packageReleaseArtifacts
working-directory: android
- name: Get app version
id: version
run: echo "app_version=$(grep 'appVersionName =' android/app/build.gradle.kts | sed 's/.*= "\(.*\)"/\1/')" >> $GITHUB_OUTPUT
- id: vars
run: echo "short_sha=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT
- uses: actions/upload-artifact@v4
@@ -122,9 +126,9 @@ jobs:
- id: changelog
run: |
if [ -z "${{ steps.prev.outputs.tag }}" ]; then
NOTES=$(git log --pretty=format:"- %s (%h)")
NOTES=$(git log --pretty=format:"- %s ([%h](https://github.com/kavishdevar/librepods/commit/%H))")
else
NOTES=$(git log ${{ steps.prev.outputs.tag }}..HEAD --pretty=format:"- %s (%h)")
NOTES=$(git log ${{ steps.prev.outputs.tag }}..HEAD --pretty=format:"- %s ([%h](https://github.com/kavishdevar/librepods/commit/%H))")
fi
echo "notes<<EOF" >> $GITHUB_OUTPUT
echo "$NOTES" >> $GITHUB_OUTPUT
@@ -141,3 +145,49 @@ jobs:
-t "Nightly ${{ needs.build.outputs.short_sha }}" \
--notes "${{ steps.changelog.outputs.notes }}" \
--prerelease
- name: Get timestamp
id: timestamp
run: echo "timestamp=$(date -u +%Y-%m-%dT%H:%M:%SZ)" >> $GITHUB_OUTPUT
- name: Post to Discord
run: |
curl -X POST "${{ secrets.DISCORD_ANDROID_CI_URL }}?with_components=true" \
-H "Content-Type: application/json" \
-d '{
"embeds": [
{
"title": "LibrePods Nightly Build",
"description": "Download the latest debug and release APKs.",
"color": 253060,
"fields": [
{
"name": "Changelog",
"value": "${{ steps.changelog.outputs.notes }}",
"inline": false
}
],
"timestamp": "${{ steps.timestamp.outputs.timestamp }}",
"footer": {
"text": "GitHub Actions"
}
}
],
"components": [
{
"type": 1,
"components": [
{
"type": 2,
"label": "Download Release APK",
"style": 5,
"url": "https://github.com/kavishdevar/librepods/releases/download/nightly-${{ needs.build.outputs.short_sha }}/LibrePods-FOSS-v${{ needs.build.outputs.app_version }}-release.apk"
},
{
"type": 2,
"label": "Download Debug APK",
"style": 5,
"url": "https://github.com/kavishdevar/librepods/releases/download/nightly-${{ needs.build.outputs.short_sha }}/LibrePods-FOSS-v${{ needs.build.outputs.app_version }}-debug.apk"
}
]
}
]
}'

View File

@@ -1,164 +0,0 @@
# Bluetooth Low Energy (BLE) - Apple Proximity Pairing Message
This document describes how the AirPods BLE "Proximity Pairing Message" is parsed and interpreted in the application. This message is broadcast by Apple devices (such as AirPods) and contains key information about the device's state, battery, and other properties.
## Overview
When scanning for BLE devices, the application looks for manufacturer data with Apple's ID (`0x004C`). If the data starts with `0x07`, it is identified as a Proximity Pairing Message. The message contains various fields, each representing a specific property of the AirPods.
## Proximity Pairing Message Structure
| Byte Index | Field Name | Description | Example Value(s) |
|------------|-------------------------|---------------------------------------------------------|--------------------------|
| 0 | Prefix | Message type (should be `0x07` for proximity pairing) | `0x07` |
| 1 | Length | Length of the message | `0x12` |
| 2 | Pairing Mode | `0x01` = Paired, `0x00` = Pairing mode | `0x01`, `0x00` |
| 3-4 | Device Model | Big-endian: [3]=high, [4]=low | `0x0E20` (AirPods Pro) |
| 5 | Status | Bitfield, see below | `0x62` |
| 6 | Pods Battery Byte | Nibbles for left/right pod battery | `0xA7` |
| 7 | Flags & Case Battery | Upper nibble: case battery, lower: flags | `0xB3` |
| 8 | Lid Indicator | Bits for lid state and open counter | `0x09` |
| 9 | Device Color | Color code | `0x02` |
| 10 | Connection State | Enum, see below | `0x04` |
| 11-26 | Encrypted Payload | 16 bytes, not parsed | |
## Field Details
### Device Model
| Value (hex) | Model Name |
|-------------|--------------------------|
| 0x0220 | AirPods 1st Gen |
| 0x0F20 | AirPods 2nd Gen |
| 0x1320 | AirPods 3rd Gen |
| 0x1920 | AirPods 4th Gen |
| 0x1B20 | AirPods 4th Gen (ANC) |
| 0x0A20 | AirPods Max |
| 0x1F20 | AirPods Max (USB-C) |
| 0x0E20 | AirPods Pro |
| 0x1420 | AirPods Pro 2nd Gen |
| 0x2420 | AirPods Pro 2nd Gen (USB-C) |
### Status Byte (Bitfield)
| Bit | Meaning | Value if Set |
|-----|--------------------------------|-------------|
| 0 | Right Pod In Ear (XOR logic) | true |
| 1 | Right Pod In Ear (XOR logic) | true |
| 2 | Both Pods In Case | true |
| 3 | Left Pod In Ear (XOR logic) | true |
| 4 | One Pod In Case | true |
| 5 | Primary Pod (1=Left, 0=Right) | true/false |
| 6 | This Pod In Case | true |
### Ear Detection Logic
The in-ear detection uses XOR logic based on:
- Whether the right pod is primary (`areValuesFlipped`)
- Whether this pod is in the case (`isThisPodInTheCase`)
```cpp
bool xorFactor = areValuesFlipped ^ deviceInfo.isThisPodInTheCase;
deviceInfo.isLeftPodInEar = xorFactor ? (status & 0x08) != 0 : (status & 0x02) != 0; // Bit 3 or 1
deviceInfo.isRightPodInEar = xorFactor ? (status & 0x02) != 0 : (status & 0x08) != 0; // Bit 1 or 3
```
### Primary Pod
Determined by bit 5 of the status byte:
- `1` = Left pod is primary
- `0` = Right pod is primary
This affects:
1. Battery level interpretation (which nibble corresponds to which pod)
2. Microphone assignment
3. Ear detection logic
### Microphone Status
The active microphone is determined by:
```cpp
deviceInfo.isLeftPodMicrophone = primaryLeft ^ deviceInfo.isThisPodInTheCase;
deviceInfo.isRightPodMicrophone = !primaryLeft ^ deviceInfo.isThisPodInTheCase;
```
### Pods Battery Byte
- Upper nibble: one pod battery (depends on primary)
- Lower nibble: other pod battery
| Value | Meaning |
|-------|----------------|
| 0x0-0x9 | 0-90% (x10) |
| 0xA-0xE | 100% |
| 0xF | Not available|
### Flags & Case Battery Byte
- Upper nibble: case battery (same encoding as pods)
- Lower nibble: flags
#### Flags (Lower Nibble)
| Bit | Meaning |
|-----|--------------------------|
| 0 | Right Pod Charging (XOR) |
| 1 | Left Pod Charging (XOR) |
| 2 | Case Charging |
### Lid Indicator
| Bits | Meaning |
|------|------------------------|
| 0-2 | Lid Open Counter |
| 3 | Lid State (0=Open, 1=Closed) |
### Device Color
| Value | Color |
|-------|-------------|
| 0x00 | White |
| 0x01 | Black |
| 0x02 | Red |
| 0x03 | Blue |
| 0x04 | Pink |
| 0x05 | Gray |
| 0x06 | Silver |
| 0x07 | Gold |
| 0x08 | Rose Gold |
| 0x09 | Space Gray |
| 0x0A | Dark Blue |
| 0x0B | Light Blue |
| 0x0C | Yellow |
| 0x0D+ | Unknown |
### Connection State
| Value | State |
|-------|--------------|
| 0x00 | Disconnected |
| 0x04 | Idle |
| 0x05 | Music |
| 0x06 | Call |
| 0x07 | Ringing |
| 0x09 | Hanging Up |
| 0xFF | Unknown |
## Example Message
| Byte Index | Example Value | Description |
|------------|--------------|----------------------------|
| 0 | 0x07 | Proximity Pairing Message |
| 1 | 0x12 | Length |
| 2 | 0x01 | Paired |
| 3-4 | 0x0E 0x20 | AirPods Pro |
| 5 | 0x62 | Status |
| 6 | 0xA7 | Pods Battery |
| 7 | 0xB3 | Flags & Case Battery |
| 8 | 0x09 | Lid Indicator |
| 9 | 0x02 | Device Color |
| 10 | 0x04 | Connection State (Idle) |
---
For further details, see [`BleManager`](linux/ble/blemanager.cpp) and [`BleScanner`](linux/ble/blescanner.cpp).

View File

@@ -1,6 +1,6 @@
import java.util.Properties
val appVersionName = "0.2.5"
val appVersionName = "0.2.6"
plugins {
alias(libs.plugins.android.application)
@@ -30,7 +30,7 @@ android {
applicationId = "me.kavishdevar.librepods"
minSdk = 33
targetSdk = 37
versionCode = 42
versionCode = 46
versionName = appVersionName
}
buildTypes {
@@ -177,6 +177,8 @@ fun registerRootModuleZipTask(
rename { "LibrePods.apk" }
}
delete(layout.buildDirectory.dir("outputs/rootModuleZips"))
archiveFileName.set("LibrePods-FOSS-v$appVersionName-$buildType.zip")
destinationDirectory.set(layout.buildDirectory.dir("outputs/rootModuleZips"))
}

View File

@@ -207,10 +207,7 @@ class AACPManager {
identifier: ControlCommandIdentifiers, value: ByteArray
) {
val existingStatus = getControlCommandStatus(identifier)
if (existingStatus == value) {
controlCommandStatusList.remove(existingStatus)
}
if (existingStatus != null) {
if (existingStatus?.value.contentEquals(value)) {
controlCommandStatusList.remove(existingStatus)
}
controlCommandListeners[identifier]?.forEach { listener ->
@@ -414,7 +411,13 @@ class AACPManager {
}
Opcodes.CONTROL_COMMAND -> {
val controlCommand = ControlCommand.fromByteArray(packet)
val controlCommand = try {
ControlCommand.fromByteArray(packet)
} catch (e: Exception) {
Log.w(TAG, "Failed to parse control command: ${e.message}")
callback?.onUnknownPacketReceived(packet)
return
}
setControlCommandStatusValue(
ControlCommandIdentifiers.fromByte(controlCommand.identifier) ?: return,
controlCommand.value
@@ -1078,25 +1081,25 @@ class AACPManager {
companion object {
fun fromByteArray(data: ByteArray): ControlCommand {
if (data.size < 4) {
throw IllegalArgumentException("Data array too short to parse ControlCommand")
var offset = 0
while (data.size - offset >= 4 &&
data[offset] == 0x04.toByte() &&
data[offset + 1] == 0x00.toByte() &&
data[offset + 2] == 0x04.toByte() &&
data[offset + 3] == 0x00.toByte()
) {
offset += 4
}
if (data[0] == 0x04.toByte() && data[1] == 0x00.toByte() && data[2] == 0x04.toByte() && data[3] == 0x00.toByte()) {
val newData = ByteArray(data.size - 4)
System.arraycopy(data, 4, newData, 0, data.size - 4)
return fromByteArray(newData)
if (data.size - offset < 7) {
throw IllegalArgumentException("Too short for ControlCommand")
}
if (data[0] != Opcodes.CONTROL_COMMAND) {
throw IllegalArgumentException("Data array does not start with CONTROL_COMMAND opcode")
if (data[offset] != Opcodes.CONTROL_COMMAND) {
throw IllegalArgumentException("Invalid opcode")
}
val identifier = data[2]
val value = ByteArray(4)
System.arraycopy(data, 3, value, 0, 4)
val trimmedValue = value.dropLastWhile { it == 0x00.toByte() }.toByteArray()
val finalValue = if (trimmedValue.isEmpty()) byteArrayOf(0x00) else trimmedValue
return ControlCommand(identifier, finalValue)
val identifier = data[offset + 2]
val value = data.copyOfRange(offset + 3, offset + 7)
val trimmed = value.dropLastWhile { it == 0x00.toByte() }.toByteArray()
return ControlCommand(identifier, if (trimmed.isEmpty()) byteArrayOf(0x00) else trimmed)
}
}
}
@@ -1122,7 +1125,13 @@ class AACPManager {
Log.d(TAG, "Sending packet: ${packet.joinToString(" ") { "%02X".format(it) }}")
if (packet[4] == Opcodes.CONTROL_COMMAND) {
val controlCommand = ControlCommand.fromByteArray(packet)
val controlCommand = try {
ControlCommand.fromByteArray(packet)
} catch (e: Exception) {
Log.w(TAG, "Invalid control command: ${e.message}")
callback?.onUnknownPacketReceived(packet)
return false
}
Log.d(
TAG, "Control command: ${controlCommand.identifier.toHexString()} - ${
controlCommand.value.joinToString(" ") { "%02X".format(it) }

View File

@@ -172,6 +172,7 @@ class ATTManager(private val adapter: BluetoothAdapter, private val device: Blue
}
private fun writeRaw(pdu: ByteArray) {
if (output == null) return
output?.write(pdu)
output?.flush()
Log.d(TAG, "writeRaw: ${pdu.joinToString(" ") { String.format("%02X", it) }}")

View File

@@ -77,7 +77,7 @@ fun StyledScaffold(
.clip(RoundedCornerShape(52.dp))
) { paddingValues ->
val topPadding = paddingValues.calculateTopPadding()
val bottomPadding = paddingValues.calculateBottomPadding()
val bottomPadding = paddingValues.calculateBottomPadding() + 16.dp
val startPadding = paddingValues.calculateLeftPadding(LocalLayoutDirection.current)
val endPadding = paddingValues.calculateRightPadding(LocalLayoutDirection.current)

View File

@@ -28,6 +28,7 @@ import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.heightIn
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.wrapContentWidth
@@ -128,7 +129,7 @@ fun StyledSelectList(
contentDescription = "Icon",
tint = Color(0xFF007AFF),
modifier = Modifier
.heightIn(min = 48.dp)
.height(48.dp)
.wrapContentWidth()
)
}

View File

@@ -712,8 +712,16 @@ class IslandWindow(private val context: Context) {
}
isClosing = false
// Make sure all animations are canceled
springAnimation.cancel()
flingAnimator.cancel()
try {
springAnimation.cancel()
} catch (e: Exception) {
e("IslandWindow", "Error cancelling spring animation $e")
}
try {
flingAnimator.cancel()
} catch (e: Exception) {
e("IslandWindow", "Error cancelling fling animation $e")
}
}
fun forceClose() {

View File

@@ -51,10 +51,10 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import me.kavishdevar.librepods.R
import me.kavishdevar.librepods.bluetooth.AACPManager
import me.kavishdevar.librepods.presentation.components.StyledButton
import me.kavishdevar.librepods.presentation.components.StyledScaffold
import me.kavishdevar.librepods.presentation.components.StyledSlider
import me.kavishdevar.librepods.bluetooth.AACPManager
import me.kavishdevar.librepods.presentation.viewmodel.AirPodsViewModel
@Composable
@@ -95,11 +95,7 @@ fun AdaptiveStrengthScreen(viewModel: AirPodsViewModel, navController: NavContro
}
}
val sliderValue = remember {
mutableFloatStateOf(
state.controlStates[AACPManager.Companion.ControlCommandIdentifiers.AUTO_ANC_STRENGTH]?.getOrNull(
0
)?.toFloat() ?: 50f
)
mutableFloatStateOf(100f - (state.controlStates[AACPManager.Companion.ControlCommandIdentifiers.AUTO_ANC_STRENGTH]?.getOrNull(0)?.toFloat() ?: 50f))
}
var job by remember { mutableStateOf<Job?>(null) }
val scope = rememberCoroutineScope()

View File

@@ -32,6 +32,7 @@ import androidx.compose.foundation.layout.Spacer
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.heightIn
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.lazy.LazyColumn
import androidx.compose.foundation.shape.RoundedCornerShape
@@ -240,7 +241,7 @@ fun AirPodsSettingsScreen(viewModel: AirPodsViewModel, navController: NavControl
item(key = "spacer_call") { Spacer(modifier = Modifier.height(16.dp)) }
item(key = "call_control") {
val bytes = state.controlStates[AACPManager.Companion.ControlCommandIdentifiers.CALL_MANAGEMENT_CONFIG]?.take(2)?.toByteArray() ?: byteArrayOf(0x00, 0x00)
val flipped = bytes[1] == 0x02.toByte()
val flipped = try { bytes[1] == 0x02.toByte() } catch (e: Exception) { false }
CallControlSettings(
hazeState = hazeState,
flipped = flipped,
@@ -430,6 +431,30 @@ fun AirPodsSettingsScreen(viewModel: AirPodsViewModel, navController: NavControl
)
}
item(key = "spacer_disconnect") { Spacer(modifier = Modifier.height(28.dp)) }
item(key = "disconnect_button") {
StyledButton(
onClick = viewModel::disconnect,
backdrop = rememberLayerBackdrop(),
isInteractive = false,
modifier = Modifier
.fillMaxWidth()
.heightIn(min = 56.dp)
) {
Text(
text = stringResource(R.string.disconnect),
style = TextStyle(
fontSize = 16.sp,
fontWeight = FontWeight.Normal,
color = if (isSystemInDarkTheme()) Color(0xFF0091FF) else Color(0xFF0088FF),
fontFamily = FontFamily(Font(R.font.sf_pro))
),
textAlign = TextAlign.Start,
modifier = Modifier.fillMaxWidth()
)
}
}
// item(key = "spacer_debug") { Spacer(modifier = Modifier.height(16.dp)) }
// item(key = "debug") { NavigationButton("debug", "Debug", navController) }
item(key = "spacer_bottom") { Spacer(Modifier.height(bottomPadding)) }

View File

@@ -23,6 +23,8 @@ import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.content.SharedPreferences
import android.content.pm.PackageManager
import android.widget.Toast
import androidx.core.content.edit
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
@@ -529,7 +531,7 @@ class AirPodsViewModel(
actualModel = fakeInstance.actualModelNumber,
serialNumbers = listOf("DEMO", "DEMO", "DEMO"),
version3 = "Demo Firmware",
// isPremium = true
isPremium = true
)
}
}
@@ -540,5 +542,9 @@ class AirPodsViewModel(
fun disconnect() {
service.disconnectAirPods()
if (appContext.checkSelfPermission("android.permission.BLUETOOTH_PRIVILEGED") != PackageManager.PERMISSION_GRANTED) {
Toast.makeText(appContext, "App has disconnected, disconnect from Android Settings.",
Toast.LENGTH_LONG).show()
}
}
}

View File

@@ -2768,33 +2768,43 @@ class AirPodsService : Service(), SharedPreferences.OnSharedPreferenceChangeList
while (socket.isConnected) {
socket.let { it ->
val buffer = ByteArray(1024)
val bytesRead = it.inputStream.read(buffer)
var data: ByteArray
if (bytesRead > 0) {
data = buffer.copyOfRange(0, bytesRead)
sendBroadcast(Intent(AirPodsNotifications.AIRPODS_DATA).apply {
putExtra("data", buffer.copyOfRange(0, bytesRead))
setPackage(packageName)
})
val bytes = buffer.copyOfRange(0, bytesRead)
val formattedHex = bytes.joinToString(" ") { "%02X".format(it) }
try {
val buffer = ByteArray(1024)
val bytesRead = it.inputStream.read(buffer)
var data: ByteArray
if (bytesRead > 0) {
data = buffer.copyOfRange(0, bytesRead)
sendBroadcast(Intent(AirPodsNotifications.AIRPODS_DATA).apply {
putExtra("data", buffer.copyOfRange(0, bytesRead))
setPackage(packageName)
})
val bytes = buffer.copyOfRange(0, bytesRead)
val formattedHex = bytes.joinToString(" ") { "%02X".format(it) }
// CrossDevice.sendReceivedPacket(bytes)
updateNotificationContent(
true,
sharedPreferences.getString("name", device.name),
batteryNotification.getBattery()
)
updateNotificationContent(
true,
sharedPreferences.getString("name", device.name),
batteryNotification.getBattery()
)
aacpManager.receivePacket(data)
aacpManager.receivePacket(data)
if (!isHeadTrackingData(data)) {
Log.d("AirPodsData", "Data received: $formattedHex")
logPacket(data, "AirPods")
if (!isHeadTrackingData(data)) {
Log.d("AirPodsData", "Data received: $formattedHex")
logPacket(data, "AirPods")
}
} else if (bytesRead == -1) {
Log.d("AirPods Service", "Socket closed (bytesRead = -1)")
sendBroadcast(Intent(AirPodsNotifications.AIRPODS_DISCONNECTED).apply {
setPackage(packageName)
})
aacpManager.disconnected()
return@launch
}
} else if (bytesRead == -1) {
Log.d("AirPods Service", "Socket closed (bytesRead = -1)")
} catch (e: Exception) {
Log.w(TAG, "Error reading data, we have probably disconnected.")
e.printStackTrace()
sendBroadcast(Intent(AirPodsNotifications.AIRPODS_DISCONNECTED).apply {
setPackage(packageName)
})
@@ -2884,6 +2894,11 @@ class AirPodsService : Service(), SharedPreferences.OnSharedPreferenceChangeList
override fun onServiceDisconnected(profile: Int) {}
}, BluetoothProfile.A2DP)
try {
device?.disconnect()
} catch (e: Exception) {
Log.w(TAG, "device.disconnect() failed, $e")
}
}
if (checkSelfPermission("android.permission.MODIFY_PHONE_STATE") == PackageManager.PERMISSION_GRANTED){
bluetoothAdapter.getProfileProxy(this, object : BluetoothProfile.ServiceListener {
@@ -2901,7 +2916,6 @@ class AirPodsService : Service(), SharedPreferences.OnSharedPreferenceChangeList
}, BluetoothProfile.HEADSET)
}
Log.d(TAG, "Disconnected AirPods upon user request")
}
val earDetectionNotification = AirPodsNotifications.EarDetection()
@@ -2991,17 +3005,22 @@ class AirPodsService : Service(), SharedPreferences.OnSharedPreferenceChangeList
fun connectAudio(context: Context, device: BluetoothDevice?) {
val bluetoothAdapter = context.getSystemService(BluetoothManager::class.java).adapter
if (context.checkSelfPermission("android.permission.BLUETOOTH_PRIVILEGED") == PackageManager.PERMISSION_GRANTED) {
bluetoothAdapter?.getProfileProxy(context, object : BluetoothProfile.ServiceListener {
override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
if (profile == BluetoothProfile.A2DP) {
try {
val policyMethod = proxy.javaClass.getMethod(
"setConnectionPolicy", BluetoothDevice::class.java, Int::class.java
)
Log.d(TAG, "calling A2DP.setConnectionPolicy for ${device?.address} to 100")
policyMethod.invoke(proxy, device, 100)
if (context.checkSelfPermission("android.permission.BLUETOOTH_PRIVILEGED") == PackageManager.PERMISSION_GRANTED) {
val policyMethod = proxy.javaClass.getMethod(
"setConnectionPolicy",
BluetoothDevice::class.java,
Int::class.java
)
Log.d(TAG, "calling A2DP.setConnectionPolicy for ${device?.address} to 100")
policyMethod.invoke(proxy, device, 100)
}
else {
Log.d(TAG, "not setting connection policy for A2DP, no BLUETOOTH_PRIVILEGED permission")
}
val connectMethod =
proxy.javaClass.getMethod("connect", BluetoothDevice::class.java)
connectMethod.invoke(
@@ -3020,21 +3039,26 @@ class AirPodsService : Service(), SharedPreferences.OnSharedPreferenceChangeList
override fun onServiceDisconnected(profile: Int) {}
}, BluetoothProfile.A2DP)
} else {
Log.d(TAG, "not connecting A2DP, no BLUETOOTH_PRIVILEGED permission")
}
if (checkSelfPermission("android.permission.MODIFY_PHONE_STATE") == PackageManager.PERMISSION_GRANTED) {
bluetoothAdapter?.getProfileProxy(context, object : BluetoothProfile.ServiceListener {
override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
if (profile == BluetoothProfile.HEADSET) {
try {
val policyMethod = proxy.javaClass.getMethod(
"setConnectionPolicy",
BluetoothDevice::class.java,
Int::class.java
)
Log.d(TAG, "calling HEADSET.setConnectionPolicy for ${device?.address} to 100")
policyMethod.invoke(proxy, device, 100)
if (checkSelfPermission("android.permission.MODIFY_PHONE_STATE") == PackageManager.PERMISSION_GRANTED) {
val policyMethod = proxy.javaClass.getMethod(
"setConnectionPolicy",
BluetoothDevice::class.java,
Int::class.java
)
Log.d(
TAG,
"calling HEADSET.setConnectionPolicy for ${device?.address} to 100"
)
policyMethod.invoke(proxy, device, 100)
} else {
Log.d(TAG, "not setting connection policy for HEADSET, no MODIFIY_PHONE_STATE permission")
}
val connectMethod =
proxy.javaClass.getMethod("connect", BluetoothDevice::class.java)
connectMethod.invoke(proxy, device)
@@ -3048,9 +3072,6 @@ class AirPodsService : Service(), SharedPreferences.OnSharedPreferenceChangeList
override fun onServiceDisconnected(profile: Int) {}
}, BluetoothProfile.HEADSET)
} else {
Log.d(TAG, "not connecting HEADSET, no MODIFIY_PHONE_STATE permission")
}
}
fun setName(name: String) {
@@ -3150,6 +3171,7 @@ class AirPodsService : Service(), SharedPreferences.OnSharedPreferenceChangeList
CoroutineScope(Dispatchers.IO).launch {
Log.d(TAG, "connecting to $macAddress")
connectToSocket(bluetoothAdapter, device!!, manual = true)
connectAudio(this@AirPodsService, device!!)
}
}
}

View File

@@ -23,7 +23,7 @@ class LibrePodsApplication: Application(), XposedServiceHelper.OnServiceListener
override fun onResume(owner: LifecycleOwner) {
BillingManager.provider.queryPurchases()
XposedState.isAvailable = true
XposedState.isAvailable = XposedServiceHolder.service != null
XposedState.bluetoothScopeEnabled = XposedServiceHolder.service?.scope?.contains("com.google.android.bluetooth") == true || XposedServiceHolder.service?.scope?.contains("com.android.bluetooth") == true
}

View File

@@ -1,11 +0,0 @@
#!/bin/sh
set -eux
cd root-module
rm -f ../btl2capfix.zip
# COPYFILE_DISABLE env is a macOS fix to avoid parasitic files in ZIPs: https://superuser.com/a/260264
export COPYFILE_DISABLE=1
curl -L -o ./radare2-5.9.9-android-aarch64.tar.gz "https://github.com/devnoname120/radare2/releases/download/5.9.8-android-aln/radare2-5.9.9-android-aarch64-aln.tar.gz"
zip -r ../btl2capfix.zip . -x \*.DS_Store \*__MACOSX \*DEBIAN ._\* .gitignore

View File

@@ -16,53 +16,53 @@ Bytes that are not used are set to `0x00`. From what I've observed, the `data3`
## Identifiers and details
| Command identifier | Description |
|--------------|---------------------|
| 0x01 | Mic Mode |
| 0x05 | Button Send Mode |
| 0x06 | Owns connection |
| 0x0A | Ear Detection |
| 0x12 | VoiceTrigger for Siri |
| 0x14 | SingleClickMode |
| 0x15 | DoubleClickMode |
| 0x16 | ClickHoldMode |
| 0x17 | DoubleClickInterval |
| 0x18 | ClickHoldInterval |
| 0x1A | ListeningModeConfigs |
| 0x1B | OneBudANCMode |
| 0x1C | CrownRotationDirection |
| 0x0D | ListeningMode |
| 0x1E | AutoAnswerMode |
| 0x1F | Chime Volume |
| 0x20 | Connect Automatically |
| 0x23 | VolumeSwipeInterval |
| 0x24 | Call Management Config |
| 0x25 | VolumeSwipeMode |
| 0x26 | Adaptive Volume Config |
| 0x27 | Software Mute config |
| 0x28 | Conversation Detect config |
| 0x29 | SSL |
| 0x2C | Hearing Aid Enrolled and Hearing Aid Enabled |
| 0x2E | AutoANC Strength |
| 0x2F | HPS Gain Swipe |
| 0x30 | HRM enable/disable state |
| 0x31 | In Case Tone config |
| 0x32 | Siri Multitone config |
| 0x33 | Hearing Assist config |
| 0x34 | Allow Off Option for Listening Mode config |
| 0x35 | Sleep Detection config |
| 0x36 | Allow Auto Connect |
| 0x37 | PPE Toggle config |
| 0x38 | Personal Protective Equipment Cap Level config |
| 0x39 | Raw Gestures config |
| 0x3A | Temporary Pairing Config |
| 0x3B | Dynamic End of Charge config |
| 0x3C | System Siri message config |
| 0x3D | Hearing Aid Generic config |
| 0x3E | Uplink EQ Bud config |
| 0x3F | Uplink EQ Source config |
| 0x40 | In Case Tone Volume |
| 0x41 | Disable Button Input config |
| Command identifier | Description |
| ------------------ | ---------------------------------------------- |
| 0x01 | Mic Mode |
| 0x05 | Button Send Mode |
| 0x06 | Owns connection |
| 0x0A | Ear Detection |
| 0x12 | VoiceTrigger for Siri |
| 0x14 | SingleClickMode |
| 0x15 | DoubleClickMode |
| 0x16 | ClickHoldMode |
| 0x17 | DoubleClickInterval |
| 0x18 | ClickHoldInterval |
| 0x1A | ListeningModeConfigs |
| 0x1B | OneBudANCMode |
| 0x1C | CrownRotationDirection |
| 0x0D | ListeningMode |
| 0x1E | AutoAnswerMode |
| 0x1F | Chime Volume |
| 0x20 | Connect Automatically |
| 0x23 | VolumeSwipeInterval |
| 0x24 | Call Management Config |
| 0x25 | VolumeSwipeMode |
| 0x26 | Adaptive Volume Config |
| 0x27 | Software Mute config |
| 0x28 | Conversation Detect config |
| 0x29 | SSL |
| 0x2C | Hearing Aid Enrolled and Hearing Aid Enabled |
| 0x2E | AutoANC Strength |
| 0x2F | HPS Gain Swipe |
| 0x30 | HRM enable/disable state |
| 0x31 | In Case Tone config |
| 0x32 | Siri Multitone config |
| 0x33 | Hearing Assist config |
| 0x34 | Allow Off Option for Listening Mode config |
| 0x35 | Sleep Detection config |
| 0x36 | Allow Auto Connect |
| 0x37 | PPE Toggle config |
| 0x38 | Personal Protective Equipment Cap Level config |
| 0x39 | Raw Gestures config |
| 0x3A | Temporary Pairing Config |
| 0x3B | Dynamic End of Charge config |
| 0x3C | System Siri message config |
| 0x3D | Hearing Aid Generic config |
| 0x3E | Uplink EQ Bud config |
| 0x3F | Uplink EQ Source config |
| 0x40 | In Case Tone Volume |
| 0x41 | Disable Button Input config |
## Command Details

26
docs/device-info.md Normal file
View File

@@ -0,0 +1,26 @@
---
opcode: 0x001D
title: Device Information
description: Information about AirPods, such as model, firmware version, and serial number. This can not be requested from the accessory; it is only sent by the accessory to the host upon connection.
---
## Device information
The device information packet is sent by the accessory to the host upon connection. It contains various details about the AirPods, including model number, software version, and serial number.
Each `null` indicates the start of a new string field.
The data is in this order:
- Name
- Model number
- Manufacturer (always "Apple Inc.")
- Serial number
- Version 1
- Version 2
- Hardware revision (?) (I have `1.0.0`)
- Updater app version (?) (I have `com.apple.accessory.updater.app.71`)
- Serial number (Left Bud)
- Serial number (Right Bud)
- Version (?) (I have `8454371`)
- A few more bytes, I don't know what they are

33
docs/opcodes.md Normal file
View File

@@ -0,0 +1,33 @@
# AACP opcodes
AACP (Apple Accessory Communication Protocol) uses various opcodes to define different types of actions and commands. Each opcode is a 16-bit integer that specifies the kind of operation being performed. The opcode is sent in little-endian format as part of the AACP packet structure.
| Opcode (Hex) | Destination | Description |
| ------------ | ----------- | ------------------------------------------------------------------ |
| 0x0001 | Accessory | Unknown |
| 0x0004 | Host | [Battery report](/docs/battery_report.md) |
| 0x0006 | Host | [Ear detection](/docs/ear-detection_report.md) |
| 0x0009 | Both | [Control commands](/docs/control_commands.md) |
| 0x000D | Accessory | [Audio source req](/docs/audio-source.md) |
| 0x000E | Host | [Audio source resp](/docs/audio-source.md) |
| 0x000F | Accessory | [Notification register](/docs/notification-register.md) |
| 0x0010 | Accessory | [Smart routing relay](/docs/smart-routing-relay.md#send) |
| 0x0011 | Host | [Smart routing response](/docs/smart-routing-relay.md#receive) |
| 0x0014 | Accessory | Send connected device MAC |
| 0x0017 | Both | Multiple things - undocumented |
| 0x0019 | Host | [Stem press](/docs/stem-press.md) |
| 0x001B | Accessory | [Timestamp](/docs/timestamp.md) |
| 0x001D | Host | [Device Information](/docs/device-info.md) |
| 0x001E | Accessory | [Rename device](/docs/rename.md) |
| 0x0022 | Accessory | Unknown |
| 0x0029 | Accessory | [Host capabilities](/docs/host-capabilities.md#another-opcode) (?) |
| 0x002B | Host | Paired devices (?) |
| 0x002D | Accessory | [List of connected dev. req](/docs/connected-devices.md#send) |
| 0x002E | Host | [List of connected devices](/docs/connected-devices.md#receive) |
| 0x0030 | Accessory | [BLE keys req](/docs/ble-keys.md) |
| 0x0031 | Host | [BLE keys response](/docs/ble-keys.md) |
| 0x004B | Host | [Conversation awareness](/docs/conversational-awareness.md) |
| 0x004D | Accessory | [Host capabilities](/docs/host-capabilities.md) |
| 0x004F | Both | Information req/res (doesn't work, even with apple's DID) |
| 0x0053 | Both | [EQ data](/docs/eq.md) |

View File

@@ -1,6 +1,6 @@
{
"version": "v0.0.3",
"versionCode": 3,
"zipUrl": "https://github.com/kavishdevar/librepods/releases/download/v0.0.3/btl2capfix-v0.0.3.zip",
"version": "v0.2.6",
"versionCode": 46,
"zipUrl": "https://github.com/kavishdevar/librepods/releases/download/v0.2.3/LibrePods-FOSS-v0.2.3-release.zip",
"changelog": "https://raw.githubusercontent.com/kavishdevar/librepods/main/CHANGELOG.md"
}

View File

@@ -1,60 +0,0 @@
# AirPods Head Tracking Visualizer
This implements head tracking with AirPods by gathering sensor data over l2cap, processing orientation and acceleration values, and detecting head gestures. The codebase is split into the following components:
# How to use
Connect your airpods and change the mac address in `plot.py` to your airpods mac address. Then run the following command to start the program.
```bash
python plot.py
```
Alternatively, you can directly run the `gestures.py` to just detect gestures.
```bash
python gestures.py
```
- **Connection and Data Collection**
The project uses a custom ConnectionManager (imported in multiple files) to connect via Bluetooth to AirPods. Once connected, sensor packets are received in raw hex format. An AirPodsTracker class (in `plot.py`) handles the start/stop of tracking, logging of raw data, and parsing of packets into useful fields.
- **Orientation Calculation and Visualization**
The `HeadOrientation` class (in `head_orientation.py`) is responsible for:
- **Calibration:**
A set number of samples (default 10) are collected to calculate the neutral (baseline) values for the sensors. For example:
`o1_neutral = np.mean(samples[:, 0])`
- **Calculating Angles:**
For each new packet, the raw orientation values are normalized by subtracting the neutral baseline. Then:
- **Pitch** is computed as:
```
pitch = (o2_norm + o3_norm) / 2 / 32000 * 180
```
This averages the deviations from neutral, scales the result to degrees (assuming a sensor range around 32000), thus giving a smooth estimation of up/down tilt.
- **Yaw** is computed as:
```
yaw = (o2_norm - o3_norm) / 2 / 32000 * 180
```
Here, the difference between the two sensor axes is used to detect left/right rotation.
- **ASCII Visualization:**
Based on the calculated pitch and yaw, an ASCII art "face" is generated. The algorithm rotates points on a circle using simple trigonometric formulas (with scaling factors based on sensor depth) to build an approximate visual representation of head orientation.
- **Live Plotting and Interactive Commands**
The code offers both terminal-based plotting and graphical plotting via matplotlib. The AirPodsTracker manages live plotting by maintaining a buffer of recent packets. When in terminal mode, the code uses libraries like `asciichartpy` and `drawille` to render charts; in graphical mode, it creates live-updating plots.
- **Gesture Detection**
The `GestureDetector` class (in `gestures.py`) processes the head tracking data to detect nodding ("Yes") or head shaking ("No"):
- **Smoothing:**
Raw horizontal and vertical sensor data undergo moving-average smoothing using small fixed-size buffers. This reduces noise and provides a steadier signal.
- **Peak and Trough Detection:**
The code monitors small sections (e.g. the last 4 values) to compute variance and dynamically determine thresholds for direction changes. When a significant reversal (e.g. from increasing to decreasing) is detected that surpasses the dynamic threshold value (derived partly from a fixed threshold and variance), a peak or trough is recorded.
- **Rhythm Consistency:**
Time intervals between detected peaks are captured. The consistency of these intervals (by comparing them to their mean and computing relative variance) is used to evaluate whether the movement is rhythmic—a trait of intentional gestures.
- **Confidence Calculation:**
Multiple factors are considered:
- **Amplitude Factor:** Compares the average detected peak amplitude with a constant (like 600) to provide a normalized measure.
- **Rhythm Factor:** Derived from the consistency of the time intervals of the peaks.
- **Alternation Factor:** Verifies that the signal alternates (for instance, switching between positive and negative values).
- **Isolation Factor:** Checks that movement on the target axis (vertical for nodding, horizontal for shaking) dominates over the non-target axis.
A weighted sum of these factors forms a confidence score which, if above a predefined threshold (e.g. 0.7), confirms a detected gesture.

View File

@@ -1,29 +0,0 @@
import logging
from logging import Formatter, LogRecord
from typing import Dict
class Colors:
RESET: str = "\033[0m"
BOLD: str = "\033[1m"
RED: str = "\033[91m"
GREEN: str = "\033[92m"
YELLOW: str = "\033[93m"
BLUE: str = "\033[94m"
MAGENTA: str = "\033[95m"
CYAN: str = "\033[96m"
WHITE: str = "\033[97m"
BG_BLACK: str = "\033[40m"
class ColorFormatter(Formatter):
FORMATS: Dict[int, str] = {
logging.DEBUG: f"{Colors.BLUE}[%(levelname)s] %(message)s{Colors.RESET}",
logging.INFO: f"{Colors.GREEN}%(message)s{Colors.RESET}",
logging.WARNING: f"{Colors.YELLOW}%(message)s{Colors.RESET}",
logging.ERROR: f"{Colors.RED}[%(levelname)s] %(message)s{Colors.RESET}",
logging.CRITICAL: f"{Colors.RED}{Colors.BOLD}[%(levelname)s] %(message)s{Colors.RESET}"
}
def format(self, record: LogRecord) -> str:
log_fmt: str = self.FORMATS.get(record.levelno)
formatter: Formatter = Formatter(log_fmt, datefmt="%H:%M:%S")
return formatter.format(record)

View File

@@ -1,64 +0,0 @@
import bluetooth
import logging
from bluetooth import BluetoothSocket
from logging import Logger
class ConnectionManager:
INIT_CMD: str = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00"
START_CMD: str = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00"
STOP_CMD: str = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00"
def __init__(self, bt_addr: str = "28:2D:7F:C2:05:5B", psm: int = 0x1001, logger: Logger = None) -> None:
self.bt_addr: str = bt_addr
self.psm: int = psm
self.logger: Logger = logger if logger else logging.getLogger(__name__)
self.sock: BluetoothSocket = None
self.connected: bool = False
self.started: bool = False
def connect(self) -> bool:
self.logger.info(f"Connecting to {self.bt_addr} on PSM {self.psm:#04x}...")
try:
self.sock = BluetoothSocket(bluetooth.L2CAP)
self.sock.connect((self.bt_addr, self.psm))
self.connected = True
self.logger.info("Connected to AirPods.")
self.sock.send(bytes.fromhex(self.INIT_CMD))
self.logger.info("Initialization complete.")
except Exception as e:
self.logger.error(f"Connection failed: {e}")
self.connected = False
return self.connected
def send_start(self) -> bool:
if not self.connected:
self.logger.error("Not connected. Cannot send START command.")
return False
if not self.started:
self.sock.send(bytes.fromhex(self.START_CMD))
self.started = True
self.logger.info("START command sent.")
else:
self.logger.info("START command has already been sent.")
return True
def send_stop(self) -> None:
if self.connected and self.started:
try:
self.sock.send(bytes.fromhex(self.STOP_CMD))
self.logger.info("STOP command sent.")
self.started = False
except Exception as e:
self.logger.error(f"Error sending STOP command: {e}")
else:
self.logger.info("Cannot send STOP; not started or not connected.")
def disconnect(self) -> None:
if self.sock:
try:
self.sock.close()
self.logger.info("Disconnected from AirPods.")
except Exception as e:
self.logger.error(f"Error during disconnect: {e}")
self.connected = False
self.started = False

View File

@@ -1,358 +0,0 @@
import logging
import statistics
import time
from bluetooth import BluetoothSocket
from collections import deque
from colors import *
from connection_manager import ConnectionManager
from logging import Logger, StreamHandler
from threading import Lock, Thread
from typing import Any, Deque, List, Optional, Tuple
handler: StreamHandler = StreamHandler()
handler.setFormatter(ColorFormatter())
log: Logger = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.addHandler(handler)
log.propagate = False
class GestureDetector:
INIT_CMD: str = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00"
START_CMD: str = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00"
STOP_CMD: str = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00"
def __init__(self, conn: ConnectionManager = None) -> None:
self.sock: BluetoothSocket = None
self.bt_addr: str = "28:2D:7F:C2:05:5B"
self.psm: int = 0x1001
self.running: bool = False
self.data_lock: Lock = Lock()
self.horiz_buffer: Deque[int] = deque(maxlen=100)
self.vert_buffer: Deque[int] = deque(maxlen=100)
self.horiz_avg_buffer: Deque[float] = deque(maxlen=5)
self.vert_avg_buffer: Deque[float] = deque(maxlen=5)
self.horiz_peaks: List[int] = []
self.horiz_troughs: List[int] = []
self.vert_peaks: List[int] = []
self.vert_troughs: List[int] = []
self.last_peak_time: float = 0
self.peak_intervals: Deque[float] = deque(maxlen=5)
self.peak_threshold: int = 400
self.direction_change_threshold: int = 175
self.rhythm_consistency_threshold: float = 0.5
self.horiz_increasing: Optional[bool] = None
self.vert_increasing: Optional[bool] = None
self.required_extremes = 3
self.detection_timeout: int = 15
self.min_confidence_threshold: float = 0.7
self.conn: ConnectionManager = conn
def connect(self) -> bool:
try:
log.info(f"Connecting to AirPods at {self.bt_addr}...")
if self.conn is None:
self.conn = ConnectionManager(self.bt_addr, self.psm, logger=log)
if not self.conn.connect():
return False
else:
if not self.conn.connected:
if not self.conn.connect():
return False
self.sock = self.conn.sock
log.info(f"{Colors.GREEN}✓ Connected to AirPods via ConnectionManager{Colors.RESET}")
return True
except Exception as e:
log.error(f"{Colors.RED}Connection failed: {e}{Colors.RESET}")
return False
def process_data(self) -> None:
"""Process incoming head tracking data."""
self.conn.send_start()
log.info(f"{Colors.GREEN}✓ Head tracking activated{Colors.RESET}")
self.running = True
start_time: float = time.time()
log.info(f"{Colors.GREEN}Ready! Make a YES or NO gesture{Colors.RESET}")
log.info(f"{Colors.YELLOW}Tip: Use natural, moderate speed head movements{Colors.RESET}")
while self.running:
if time.time() - start_time > self.detection_timeout:
log.warning(f"{Colors.YELLOW}⚠️ Detection timeout reached. No gesture detected.{Colors.RESET}")
self.running = False
break
try:
if not self.sock:
log.error("Socket not available.")
break
data: bytes = self.sock.recv(1024)
formatted: str = self.format_hex(data)
if self.is_valid_tracking_packet(formatted):
raw_bytes: bytes = bytes.fromhex(formatted.replace(" ", ""))
horizontal, vertical = self.extract_orientation_values(raw_bytes)
if horizontal is not None and vertical is not None:
smooth_h, smooth_v = self.apply_smoothing(horizontal, vertical)
with self.data_lock:
self.horiz_buffer.append(smooth_h)
self.vert_buffer.append(smooth_v)
self.detect_peaks_and_troughs()
gesture: Optional[str] = self.detect_gestures()
if gesture:
self.running = False
break
except Exception as e:
if self.running:
log.error(f"Data processing error: {e}")
break
def disconnect(self) -> None:
"""Disconnect from socket."""
self.conn.disconnect()
def format_hex(self, data: bytes) -> str:
"""Format binary data to readable hex string."""
hex_str: str = data.hex()
return ' '.join(hex_str[i:i+2] for i in range(0, len(hex_str), 2))
def is_valid_tracking_packet(self, hex_string: str) -> bool:
"""Verify packet is a valid head tracking packet."""
standard_header: str = "04 00 04 00 17 00 00 00 10 00 45 00"
alternate_header: str = "04 00 04 00 17 00 00 00 10 00 44 00"
if not hex_string.startswith(standard_header) and not hex_string.startswith(alternate_header):
return False
if len(hex_string.split()) < 80:
return False
return True
def extract_orientation_values(self, raw_bytes: bytes) -> Tuple[Optional[int], Optional[int]]:
"""Extract head orientation data from packet."""
try:
horizontal: int = int.from_bytes(raw_bytes[51:53], byteorder='little', signed=True)
vertical: int = int.from_bytes(raw_bytes[53:55], byteorder='little', signed=True)
return horizontal, vertical
except Exception as e:
log.debug(f"Failed to extract orientation: {e}")
return None, None
def apply_smoothing(self, horizontal: int, vertical: int) -> Tuple[float, float]:
"""Apply moving average smoothing (Apple-like filtering)."""
self.horiz_avg_buffer.append(horizontal)
self.vert_avg_buffer.append(vertical)
smooth_horiz: float = sum(self.horiz_avg_buffer) / len(self.horiz_avg_buffer)
smooth_vert: float = sum(self.vert_avg_buffer) / len(self.vert_avg_buffer)
return smooth_horiz, smooth_vert
def detect_peaks_and_troughs(self) -> None:
"""Detect motion direction changes with Apple-like refinements."""
if len(self.horiz_buffer) < 4 or len(self.vert_buffer) < 4:
return
h_values: List[int] = list(self.horiz_buffer)[-4:]
v_values: List[int] = list(self.vert_buffer)[-4:]
h_variance: float = statistics.variance(h_values) if len(h_values) > 1 else 0
v_variance: float = statistics.variance(v_values) if len(v_values) > 1 else 0
current: int = self.horiz_buffer[-1]
prev: int = self.horiz_buffer[-2]
if self.horiz_increasing is None:
self.horiz_increasing = current > prev
dynamic_h_threshold: float = max(100, min(self.direction_change_threshold, h_variance / 3))
if self.horiz_increasing and current < prev - dynamic_h_threshold:
if abs(prev) > self.peak_threshold:
self.horiz_peaks.append((len(self.horiz_buffer)-1, prev, time.time()))
direction: str = "➡️ " if prev > 0 else "⬅️ "
log.info(f"{Colors.CYAN}{direction} Horizontal max: {prev} (threshold: {dynamic_h_threshold:.1f}){Colors.RESET}")
now: float = time.time()
if self.last_peak_time > 0:
interval: float = now - self.last_peak_time
self.peak_intervals.append(interval)
self.last_peak_time = now
self.horiz_increasing = False
elif not self.horiz_increasing and current > prev + dynamic_h_threshold:
if abs(prev) > self.peak_threshold:
self.horiz_troughs.append((len(self.horiz_buffer)-1, prev, time.time()))
direction: str = "➡️ " if prev > 0 else "⬅️ "
log.info(f"{Colors.CYAN}{direction} Horizontal max: {prev} (threshold: {dynamic_h_threshold:.1f}){Colors.RESET}")
now: float = time.time()
if self.last_peak_time > 0:
interval: float = now - self.last_peak_time
self.peak_intervals.append(interval)
self.last_peak_time = now
self.horiz_increasing = True
current: int = self.vert_buffer[-1]
prev: int = self.vert_buffer[-2]
if self.vert_increasing is None:
self.vert_increasing = current > prev
dynamic_v_threshold: float = max(100, min(self.direction_change_threshold, v_variance / 3))
if self.vert_increasing and current < prev - dynamic_v_threshold:
if abs(prev) > self.peak_threshold:
self.vert_peaks.append((len(self.vert_buffer)-1, prev, time.time()))
direction: str = "⬆️ " if prev > 0 else "⬇️ "
log.info(f"{Colors.MAGENTA}{direction} Vertical max: {prev} (threshold: {dynamic_v_threshold:.1f}){Colors.RESET}")
now: float = time.time()
if self.last_peak_time > 0:
interval: float = now - self.last_peak_time
self.peak_intervals.append(interval)
self.last_peak_time = now
self.vert_increasing = False
elif not self.vert_increasing and current > prev + dynamic_v_threshold:
if abs(prev) > self.peak_threshold:
self.vert_troughs.append((len(self.vert_buffer)-1, prev, time.time()))
direction: str = "⬆️ " if prev > 0 else "⬇️ "
log.info(f"{Colors.MAGENTA}{direction} Vertical max: {prev} (threshold: {dynamic_v_threshold:.1f}){Colors.RESET}")
now: float = time.time()
if self.last_peak_time > 0:
interval: float = now - self.last_peak_time
self.peak_intervals.append(interval)
self.last_peak_time = now
self.vert_increasing = True
def calculate_rhythm_consistency(self) -> float:
"""Calculate how consistent the timing between peaks is (Apple-like)."""
if len(self.peak_intervals) < 2:
return 0
mean_interval: float = statistics.mean(self.peak_intervals)
if mean_interval == 0:
return 0
variances: List[float] = [(i/mean_interval - 1.0) ** 2 for i in self.peak_intervals]
consistency: float = 1.0 - min(1.0, statistics.mean(variances) / self.rhythm_consistency_threshold)
return max(0, consistency)
def calculate_confidence_score(self, extremes: List[Tuple[int, int, float]], is_vertical: bool = True) -> float:
"""Calculate confidence score for gesture detection (Apple-like)."""
if len(extremes) < self.required_extremes:
return 0.0
sorted_extremes: List[Tuple[int, int, float]] = sorted(extremes, key=lambda x: x[0])
recent: List[Tuple[int, int, float]] = sorted_extremes[-self.required_extremes:]
avg_amplitude: float = sum(abs(val) for _, val, _ in recent) / len(recent)
amplitude_factor: float = min(1.0, avg_amplitude / 600)
rhythm_factor: float = self.calculate_rhythm_consistency()
signs: List[int] = [1 if val > 0 else -1 for _, val, _ in recent]
alternating: bool = all(signs[i] != signs[i-1] for i in range(1, len(signs)))
alternation_factor: float = 1.0 if alternating else 0.5
if is_vertical:
vert_amp: float = sum(abs(val) for _, val, _ in recent) / len(recent)
horiz_vals: List[int] = list(self.horiz_buffer)[-len(recent)*2:]
horiz_amp: float = sum(abs(val) for val in horiz_vals) / len(horiz_vals) if horiz_vals else 0
isolation_factor: float = min(1.0, vert_amp / (horiz_amp + 0.1) * 1.2)
else:
horiz_amp: float = sum(abs(val) for _, val, _ in recent)
vert_vals: List[int] = list(self.vert_buffer)[-len(recent)*2:]
vert_amp: float = sum(abs(val) for val in vert_vals) / len(vert_vals) if vert_vals else 0
isolation_factor: float = min(1.0, horiz_amp / (vert_amp + 0.1) * 1.2)
confidence: float = (
amplitude_factor * 0.4 +
rhythm_factor * 0.2 +
alternation_factor * 0.2 +
isolation_factor * 0.2
)
return confidence
def detect_gestures(self) -> Optional[str]:
"""Recognize head gesture patterns with Apple-like intelligence."""
if len(self.vert_peaks) + len(self.vert_troughs) >= self.required_extremes:
all_extremes: List[Tuple[int, int, float]] = sorted(self.vert_peaks + self.vert_troughs, key=lambda x: x[0])
confidence: float = self.calculate_confidence_score(all_extremes, is_vertical=True)
log.info(f"Vertical motion confidence: {confidence:.2f} (need {self.min_confidence_threshold:.2f})")
if confidence >= self.min_confidence_threshold:
log.info(f"{Colors.GREEN}🎯 \"Yes\" Gesture Detected (confidence: {confidence:.2f}){Colors.RESET}")
return "YES"
if len(self.horiz_peaks) + len(self.horiz_troughs) >= self.required_extremes:
all_extremes: List[Tuple[int, int, float]] = sorted(self.horiz_peaks + self.horiz_troughs, key=lambda x: x[0])
confidence: float = self.calculate_confidence_score(all_extremes, is_vertical=False)
log.info(f"Horizontal motion confidence: {confidence:.2f} (need {self.min_confidence_threshold:.2f})")
if confidence >= self.min_confidence_threshold:
log.info(f"{Colors.GREEN}🎯 \"No\" gesture detected (confidence: {confidence:.2f}){Colors.RESET}")
return "NO"
return None
def start_detection(self) -> None:
"""Begin gesture detection process."""
log.info(f"{Colors.BOLD}{Colors.WHITE}Starting gesture detection...{Colors.RESET}")
if not self.connect():
log.error(f"{Colors.RED}Failed to connect to AirPods.{Colors.RESET}")
return
data_thread: Thread = Thread(target=self.process_data)
data_thread.daemon = True
data_thread.start()
try:
data_thread.join(timeout=self.detection_timeout + 2)
if data_thread.is_alive():
log.warning(f"{Colors.YELLOW}⚠️ Timeout reached. Stopping detection.{Colors.RESET}")
self.running = False
except KeyboardInterrupt:
log.info(f"{Colors.YELLOW}Detection canceled by user.{Colors.RESET}")
self.running = False
if __name__ == "__main__":
self.disconnect()
log.info(f"{Colors.GREEN}Gesture detection complete.{Colors.RESET}")
if __name__ == "__main__":
print(f"{Colors.BG_BLACK}{Colors.CYAN}╔════════════════════════════════════════╗{Colors.RESET}")
print(f"{Colors.BG_BLACK}{Colors.CYAN}║ AirPods Head Gesture Detector ║{Colors.RESET}")
print(f"{Colors.BG_BLACK}{Colors.CYAN}╚════════════════════════════════════════╝{Colors.RESET}")
print(f"\n{Colors.WHITE}This program detects head gestures using AirPods:{Colors.RESET}")
print(f"{Colors.GREEN}• YES: {Colors.WHITE}nodding head up and down{Colors.RESET}")
print(f"{Colors.RED}• NO: {Colors.WHITE}shaking head left and right{Colors.RESET}\n")
detector: GestureDetector = GestureDetector()
detector.start_detection()

View File

@@ -1,123 +0,0 @@
import math
import numpy as np
import logging
import os
from colors import *
from drawille import Canvas
from logging import Logger, StreamHandler
from matplotlib.animation import FuncAnimation
from matplotlib.pyplot import Axes, Figure
from numpy.typing import NDArray
from os import terminal_size as TerminalSize
from typing import Any, Dict, List, Optional, Tuple
handler: StreamHandler = StreamHandler()
handler.setFormatter(ColorFormatter())
log: Logger = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.addHandler(handler)
log.propagate = False
class HeadOrientation:
def __init__(self, use_terminal: bool = False) -> None:
self.orientation_offset: int = 5500
self.o1_neutral: int = 19000
self.o2_neutral: int = 0
self.o3_neutral: int = 0
self.calibration_samples: List[List[int]] = []
self.calibration_complete: bool = False
self.calibration_sample_count: int = 10
self.fig: Optional[Figure] = None
self.ax: Optional[Axes] = None
self.arrow: Any = None
self.animation: Optional[FuncAnimation] = None
self.use_terminal: bool = use_terminal
def reset_calibration(self) -> None:
self.calibration_samples = []
self.calibration_complete = False
def add_calibration_sample(self, orientation_values: List[int]) -> bool:
if len(self.calibration_samples) < self.calibration_sample_count:
self.calibration_samples.append(orientation_values)
return False
if not self.calibration_complete:
self._calculate_calibration()
return True
return True
def _calculate_calibration(self) -> None:
if len(self.calibration_samples) < 3:
log.warning("Not enough calibration samples")
return
samples: NDArray[[List[int]]] = np.array(self.calibration_samples)
self.o1_neutral: float = np.mean(samples[:, 0])
avg_o2: float = np.mean(samples[:, 1])
avg_o3: float = np.mean(samples[:, 2])
self.o2_neutral: float = avg_o2
self.o3_neutral: float = avg_o3
log.info("Calibration complete: o1_neutral=%.2f, o2_neutral=%.2f, o3_neutral=%.2f",
self.o1_neutral, self.o2_neutral, self.o3_neutral)
self.calibration_complete = True
def calculate_orientation(self, o1: float, o2: float, o3: float) -> Dict[str, float]:
if not self.calibration_complete:
return {'pitch': 0, 'yaw': 0}
o1_norm: float = o1 - self.o1_neutral
o2_norm: float = o2 - self.o2_neutral
o3_norm: float = o3 - self.o3_neutral
pitch: float = (o2_norm + o3_norm) / 2 / 32000 * 180
yaw: float = (o2_norm - o3_norm) / 2 / 32000 * 180
return {'pitch': pitch, 'yaw': yaw}
def create_face_art(self, pitch: float, yaw: float) -> str:
if self.use_terminal:
try:
ts: TerminalSize = os.get_terminal_size()
width, height = ts.columns, ts.lines * 2
except Exception:
width, height = 80, 40
else:
width, height = 80, 40
center_x, center_y = width // 2, height // 2
radius: int = (min(width, height) // 2 - 2) // 2
pitch_rad: float = math.radians(pitch)
yaw_rad: float = math.radians(yaw)
canvas: Canvas = Canvas()
def rotate_point(x: float, y: float, z: float, pitch_r: float, yaw_r: float) -> Tuple[int, int]:
cos_y, sin_y = math.cos(yaw_r), math.sin(yaw_r)
cos_p, sin_p = math.cos(pitch_r), math.sin(pitch_r)
x1: float = x * cos_y - z * sin_y
z1: float = x * sin_y + z * cos_y
y1: float = y * cos_p - z1 * sin_p
z2: float = y * sin_p + z1 * cos_p
scale: float = 1 + (z2 / width)
return int(center_x + x1 * scale), int(center_y + y1 * scale)
for angle in range(0, 360, 2):
rad: float = math.radians(angle)
x: float = radius * math.cos(rad)
y: float = radius * math.sin(rad)
x1, y1 = rotate_point(x, y, 0, pitch_rad, yaw_rad)
canvas.set(x1, y1)
for eye in [(-radius//2, -radius//3, 2), (radius//2, -radius//3, 2)]:
ex, ey, ez = eye
x1, y1 = rotate_point(ex, ey, ez, pitch_rad, yaw_rad)
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
canvas.set(x1 + dx, y1 + dy)
nx, ny = rotate_point(0, 0, 1, pitch_rad, yaw_rad)
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
canvas.set(nx + dx, ny + dy)
smile_depth: int = radius // 8
mouth_local_y: int = radius // 4
mouth_length: int = radius
for x_offset in range(-mouth_length // 2, mouth_length // 2 + 1):
norm: float = abs(x_offset) / (mouth_length / 2)
y_offset: int = int((1 - norm ** 2) * smile_depth)
local_x: int = x_offset
local_y: int = mouth_local_y + y_offset
mx, my = rotate_point(local_x, local_y, 0, pitch_rad, yaw_rad)
canvas.set(mx, my)
return canvas.frame()

View File

@@ -1,843 +0,0 @@
import asciichartpy as acp
import logging
import matplotlib.pyplot as plt
import numpy as np
import os
import struct
import time
from bluetooth import BluetoothSocket
from colors import *
from connection_manager import ConnectionManager
from datetime import datetime as DateTime
from drawille import Canvas
from head_orientation import HeadOrientation
from logging import Logger, StreamHandler
from matplotlib.animation import FuncAnimation
from matplotlib.legend import Legend
from matplotlib.pyplot import Axes, Figure
from numpy.typing import NDArray
from rich.live import Live
from rich.layout import Layout
from rich.panel import Panel
from rich.console import Console
from threading import Lock, Thread
from typing import Any, Dict, List, Optional, TextIO, Tuple, Union
handler: StreamHandler = StreamHandler()
handler.setFormatter(ColorFormatter())
logger: Logger = logging.getLogger("airpods-head-tracking")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger.propagate = True
INIT_CMD: str = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00"
NOTIF_CMD: str = "04 00 04 00 0F 00 FF FF FE FF"
START_CMD: str = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00"
STOP_CMD: str = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00"
KEY_FIELDS: Dict[str, Tuple[int, int]] = {
"orientation 1": (43, 2),
"orientation 2": (45, 2),
"orientation 3": (47, 2),
"Horizontal Acceleration": (51, 2),
"Vertical Acceleration": (53, 2),
"unkown 1": (61, 2),
"unkown 2 ": (49, 2),
}
class AirPodsTracker:
def __init__(self) -> None:
self.sock: BluetoothSocket = None
self.recording: bool = False
self.log_file: Optional[TextIO] = None
self.listener_thread: Optional[Thread] = None
self.bt_addr: str = "28:2D:7F:C2:05:5B"
self.psm: int = 0x1001
self.raw_packets: List[bytes] = []
self.parsed_packets: List[bytes] = []
self.live_data: List[bytes] = []
self.live_plotting: bool = False
self.animation: FuncAnimation = None
self.fig: Optional[Figure] = None
self.axes: Optional[Axes] = None
self.lines: Dict[str, Any] = {}
self.selected_fields: List[str] = []
self.data_lock: Lock = Lock()
self.orientation_offset: int = 5500
self.use_terminal: bool = True # '--terminal' in sys.argv
self.orientation_visualizer: HeadOrientation = HeadOrientation(use_terminal=self.use_terminal)
self.conn: Optional[ConnectionManager] = None
def connect(self):
try:
logger.info("Trying to connect to %s on PSM 0x%04X...", self.bt_addr, self.psm)
self.conn = ConnectionManager(self.bt_addr, self.psm, logger=logger)
if not self.conn.connect():
logger.error("Connection failed via ConnectionManager.")
return False
self.sock = self.conn.sock
self.sock.send(bytes.fromhex(NOTIF_CMD))
logger.info("Sent initialization command.")
self.listener_thread = Thread(target=self.listen, daemon=True)
self.listener_thread.start()
return True
except Exception as e:
logger.error("Connection error: %s", e)
return False
def start_tracking(self, duration: Optional[float] = None) -> None:
if not self.recording:
self.conn.send_start()
filename: str = f"head_tracking_{DateTime.now().strftime('%Y%m%d_%H%M%S')}.log"
self.log_file = open(filename, "w")
self.recording = True
logger.info("Recording started. Saving data to %s", filename)
if duration is not None and duration > 0:
def auto_stop() -> None:
time.sleep(duration)
if self.recording:
self.stop_tracking()
logger.info("Recording automatically stopped after %s seconds.", duration)
timer_thread = Thread(target=auto_stop, daemon=True)
timer_thread.start()
logger.info("Will automatically stop recording after %s seconds.", duration)
else:
logger.info("Already recording.")
def stop_tracking(self) -> None:
if self.recording:
self.conn.send_stop()
self.recording = False
if self.log_file is not None:
self.log_file.close()
self.log_file = None
logger.info("Recording stopped.")
else:
logger.info("Not currently recording.")
def format_hex(self, data: bytes) -> str:
hex_str: str = data.hex()
return ' '.join(hex_str[i:i + 2] for i in range(0, len(hex_str), 2))
def parse_raw_packet(self, hex_string: str) -> bytes:
return bytes.fromhex(hex_string.replace(" ", ""))
def interpret_bytes(self, raw_bytes: bytes, start: int, length: int, data_type: str = "signed_short") -> Optional[Union[int, float]]:
if start + length > len(raw_bytes):
return None
match data_type:
case "signed_short":
return int.from_bytes(raw_bytes[start:start + 2], byteorder='little', signed=True)
case "unsigned_short":
return int.from_bytes(raw_bytes[start:start + 2], byteorder='little', signed=False)
case "signed_short_be":
return int.from_bytes(raw_bytes[start:start + 2], byteorder='big', signed=True)
case "float_le":
if start + 4 <= len(raw_bytes):
return struct.unpack('<f', raw_bytes[start:start + 4])[0]
case "float_be":
if start + 4 <= len(raw_bytes):
return struct.unpack('>f', raw_bytes[start:start + 4])[0]
case _:
return None
def normalize_orientation(self, value: Optional[Union[int, float]], field_name: str) -> Optional[Union[int, float]]:
if 'orientation' in field_name.lower():
return value + self.orientation_offset
return value
def parse_packet_all_fields(self, raw_bytes: bytes) -> Dict[str, Union[int, float]]:
packet: Dict[str, Union[int, float]] = {}
packet["seq_num"] = int.from_bytes(raw_bytes[12:14], byteorder='little')
for field_name, (start, length) in KEY_FIELDS.items():
if field_name == "float_val" and start + 4 <= len(raw_bytes):
packet[field_name] = self.interpret_bytes(raw_bytes, start, 4, "float_le")
else:
raw_value = self.interpret_bytes(raw_bytes, start, length, "signed_short")
if raw_value is not None:
packet[field_name] = self.normalize_orientation(raw_value, field_name)
for i in range(30, min(90, len(raw_bytes) - 1), 2):
field_name: str = f"byte_{i:02d}"
raw_value: Optional[Union[int, float]] = self.interpret_bytes(raw_bytes, i, 2, "signed_short")
if raw_value is not None:
packet[field_name] = self.normalize_orientation(raw_value, field_name)
return packet
def apply_dark_theme(self, fig: Figure, axes: List[Axes]) -> None:
fig.patch.set_facecolor('#1e1e1e')
for ax in axes:
ax.set_facecolor('#2d2d2d')
ax.title.set_color('white')
ax.xaxis.label.set_color('white')
ax.yaxis.label.set_color('white')
ax.tick_params(colors='white')
ax.tick_params(axis='x', colors='white')
ax.tick_params(axis='y', colors='white')
ax.grid(True, color='#555555', alpha=0.3, linestyle='--')
for spine in ax.spines.values():
spine.set_color('#555555')
legend: Optional[Legend] = ax.get_legend()
if (legend):
legend.get_frame().set_facecolor('#2d2d2d')
legend.get_frame().set_alpha(0.7)
for text in legend.get_texts():
text.set_color('white')
def listen(self) -> None:
while True:
try:
data: bytes = self.sock.recv(1024)
formatted: str = self.format_hex(data)
timestamp: str = DateTime.now().isoformat()
is_valid: bool = self.is_valid_tracking_packet(formatted)
if not self.live_plotting:
if is_valid:
logger.info("%s - Response: %s...", timestamp, formatted[:60])
else:
logger.info("%s - Skipped non-tracking packet.", timestamp)
if is_valid:
if self.recording and self.log_file is not None:
self.log_file.write(formatted + "\n")
self.log_file.flush()
try:
raw_bytes: bytes = self.parse_raw_packet(formatted)
packet: Dict[str, Union[int, float]] = self.parse_packet_all_fields(raw_bytes)
with self.data_lock:
self.live_data.append(packet)
if len(self.live_data) > 300:
self.live_data.pop(0)
except Exception as e:
logger.error(f"Error parsing packet: {e}")
except Exception as e:
logger.error("Error receiving data: %s", e)
break
def load_log_file(self, filepath: str) -> bool:
self.raw_packets = []
self.parsed_packets = []
try:
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line:
try:
raw_bytes: bytes = self.parse_raw_packet(line)
self.raw_packets.append(raw_bytes)
packet: Dict[str, Union[int, float]] = self.parse_packet_all_fields(raw_bytes)
min_seq_num: int = min(
[parsed_packet["seq_num"] for parsed_packet in self.parsed_packets], default=0
)
if packet["seq_num"] > min_seq_num:
self.parsed_packets.append(packet)
except Exception as e:
logger.error(f"Error parsing line: {e}")
logger.info(f"Loaded {len(self.parsed_packets)} packets from {filepath}")
return True
except Exception as e:
logger.error(f"Error loading log file: {e}")
return False
def extract_field_values(self, field_name: str, data_source: str = 'loaded') -> List[Union[int, float]]:
if data_source == 'loaded':
data: List[Dict[str, Union[int, float]]] = self.parsed_packets
else:
with self.data_lock:
data: List[Dict[str, Union[int, float]]] = self.live_data.copy()
values: List[Union[int, float]] = [packet.get(field_name, 0) for packet in data if field_name in packet]
if data_source == 'live' and len(values) > 5:
try:
values: NDArray[Any] = np.array(values, dtype=float)
values = np.convolve(values, np.ones(5) / 5, mode='valid')
except Exception as e:
logger.warning(f"Smoothing error (non-critical): {e}")
return values
def is_valid_tracking_packet(self, hex_string: str) -> bool:
standard_header: str = "04 00 04 00 17 00 00 00 10 00"
if not hex_string.startswith(standard_header):
if self.live_plotting:
logger.warning("Invalid packet header: %s", hex_string[:30])
return False
if len(hex_string.split()) < 80:
if self.live_plotting:
logger.warning("Invalid packet length: %s", hex_string[:30])
return False
return True
def plot_fields(self, field_names: Optional[List[str]] = None) -> None:
if not self.parsed_packets:
logger.error("No data to plot. Load a log file first.")
return
if field_names is None:
field_names: List[str] = list(KEY_FIELDS.keys())
if not self.orientation_visualizer.calibration_complete:
if len(self.parsed_packets) < self.orientation_visualizer.calibration_sample_count:
logger.error("Not enough packets for calibration. Need at least 10 packets.")
return
for packet in self.parsed_packets[:self.orientation_visualizer.calibration_sample_count]:
self.orientation_visualizer.add_calibration_sample([
packet.get('orientation 1', 0),
packet.get('orientation 2', 0),
packet.get('orientation 3', 0)
])
if self.use_terminal:
self._plot_fields_terminal(field_names)
else:
acceleration_fields: List[str] = [f for f in field_names if 'acceleration' in f.lower()]
orientation_fields: List[str] = [f for f in field_names if 'orientation' in f.lower()]
other_fields: List[str] = [f for f in field_names if f not in acceleration_fields + orientation_fields]
fig, axes = plt.subplots(3, 1, figsize=(14, 12), sharex=True)
self.apply_dark_theme(fig, axes)
acceleration_colors: List[str] = ['#FFFF00', '#00FFFF']
orientation_colors: List[str] = ['#FF00FF', '#00FF00', '#FFA500']
other_colors: List[str] = ['#52b788', '#f4a261', '#e76f51', '#2a9d8f']
if acceleration_fields:
for i, field in enumerate(acceleration_fields):
values = self.extract_field_values(field)
axes[0].plot(values, label=field, color=acceleration_colors[i % len(acceleration_colors)], linewidth=2)
axes[0].set_title("Acceleration Data", fontsize=14)
axes[0].legend()
if orientation_fields:
for i, field in enumerate(orientation_fields):
values = self.extract_field_values(field)
axes[1].plot(values, label=field, color=orientation_colors[i % len(orientation_colors)], linewidth=2)
axes[1].set_title("Orientation Data", fontsize=14)
axes[1].legend()
if other_fields:
for i, field in enumerate(other_fields):
values = self.extract_field_values(field)
axes[2].plot(values, label=field, color=other_colors[i % len(other_colors)], linewidth=2)
axes[2].set_title("Other Fields", fontsize=14)
axes[2].legend()
plt.xlabel("Packet Index", fontsize=12)
plt.tight_layout()
plt.show()
def _plot_fields_terminal(self, field_names: List[str]) -> None:
"""Internal method for terminal-based plotting"""
terminal_width: int = os.get_terminal_size().columns
plot_width: int = min(terminal_width - 10, 120)
plot_height: int = 15
acceleration_fields: List[str] = [f for f in field_names if 'acceleration' in f.lower()]
orientation_fields: List[str] = [f for f in field_names if 'orientation' in f.lower()]
other_fields: List[str] = [f for f in field_names if f not in acceleration_fields + orientation_fields]
def plot_group(fields: List[str], title: str) -> None:
if not fields:
return
print(f"\n{title}")
print("=" * len(title))
for field in fields:
values: List[float] = self.extract_field_values(field)
if len(values) > plot_width:
values = values[-plot_width:]
if title == "Acceleration Data":
chart: str = acp.plot(values, {'height': plot_height})
print(chart)
else:
chart: str = acp.plot(values, {'height': plot_height})
print(chart)
print(f"Min: {min(values):.2f}, Max: {max(values):.2f}, " + f"Mean: {np.mean(values):.2f}")
print()
plot_group(acceleration_fields, "Acceleration Data")
plot_group(orientation_fields, "Orientation Data")
plot_group(other_fields, "Other Fields")
def create_braille_plot(self, values: List[float], width: int = 80, height: int = 20, y_label: bool = True, fixed_y_min: Optional[float] = None, fixed_y_max: Optional[float] = None) -> str:
canvas: Canvas = Canvas()
if fixed_y_min is None or fixed_y_max is None:
local_min, local_max = min(values), max(values)
else:
local_min, local_max = fixed_y_min, fixed_y_max
y_range: float = local_max - local_min or 1
x_step: int = max(1, len(values) // width)
for i, v in enumerate(values[::x_step]):
y: int = int(((v - local_min) / y_range) * (height * 2 - 1))
canvas.set(i, y)
frame: str = canvas.frame()
if y_label:
lines: List[str] = frame.split('\n')
labeled_lines: List[str] = []
for idx, line in enumerate(lines):
if idx == 0:
labeled_lines.append(f"{local_max:6.0f} {line}")
elif idx == len(lines)-1:
labeled_lines.append(f"{local_min:6.0f} {line}")
else:
labeled_lines.append(" " + line)
frame = "\n".join(labeled_lines)
return frame
def _start_live_plotting_terminal(self, record_data: bool = False, duration: Optional[float] = None) -> None:
import sys, select, tty, termios
old_settings = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin.fileno())
console: Console = Console()
term_width: int = console.width
plot_width: int = round(min(term_width / 2 - 15, 120))
ori_height: int = 10
def make_compact_layout() -> Layout:
layout: Layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="main", ratio=1),
)
layout["main"].split_row(
Layout(name="accelerations", ratio=1),
Layout(name="orientations", ratio=1)
)
layout["accelerations"].split_column(
Layout(name="vertical", ratio=1),
Layout(name="horizontal", ratio=1)
)
layout["orientations"].split_column(
Layout(name="face", ratio=1),
Layout(name="raw", ratio=1)
)
return layout
layout: Layout = make_compact_layout()
try:
import time
with Live(layout, refresh_per_second=20, screen=True) as live:
while True:
if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
ch = sys.stdin.read(1)
if ch == 'p':
self.paused = not self.paused
logger.info("Paused" if self.paused else "Resumed")
if self.paused:
time.sleep(0.1)
rec_str: str = " [red][REC][/red]" if record_data else ""
left: str = "AirPods Head Tracking - v1.0.0"
right: str = "Ctrl+C - Close | p - Pause" + rec_str
status: str = "[bold red]Paused[/bold red]"
header: List[str] = list(" " * term_width)
header[0:len(left)] = list(left)
header[term_width - len(right):] = list(right)
start: int = (term_width - len(status)) // 2
header[start:start+len(status)] = list(status)
header_text: str = "".join(header)
layout["header"].update(Panel(header_text, style="bold white on black"))
continue
with self.data_lock:
if len(self.live_data) < 1:
continue
latest: Dict[str, float] = self.live_data[-1]
data: List[Dict[str, float]] = self.live_data[-plot_width:]
if not self.orientation_visualizer.calibration_complete:
sample: List[float] = [
latest.get('orientation 1', 0),
latest.get('orientation 2', 0),
latest.get('orientation 3', 0)
]
self.orientation_visualizer.add_calibration_sample(sample)
time.sleep(0.05)
rec_str: str = " [red][REC][/red]" if record_data else ""
left: str = "AirPods Head Tracking - v1.0.0"
status: str = "[bold yellow]Calibrating...[/bold yellow]"
right: str = "Ctrl+C - Close | p - Pause"
remaining: int = max(term_width - len(left) - len(right), 0)
header_text: str = f"{left}{status.center(remaining)}{right}{rec_str}"
layout["header"].update(Panel(header_text, style="bold white on black"))
live.refresh()
continue
o1: float = latest.get('orientation 1', 0)
o2: float = latest.get('orientation 2', 0)
o3: float = latest.get('orientation 3', 0)
orientation: Dict[str, float] = self.orientation_visualizer.calculate_orientation(o1, o2, o3)
pitch: float = orientation['pitch']
yaw: float = orientation['yaw']
h_accel: List[float] = [p.get('Horizontal Acceleration', 0) for p in data]
v_accel: List[float] = [p.get('Vertical Acceleration', 0) for p in data]
if len(h_accel) > plot_width:
h_accel = h_accel[-plot_width:]
if len(v_accel) > plot_width:
v_accel = v_accel[-plot_width:]
global_min: float = min(min(v_accel), min(h_accel))
global_max: float = max(max(v_accel), max(h_accel))
config_acc: Dict[str, float] = {'height': 20, 'min': global_min, 'max': global_max}
vert_plot: str = acp.plot(v_accel, config_acc)
horiz_plot: str = acp.plot(h_accel, config_acc)
rec_str: str = " [red][REC][/red]" if record_data else ""
left: str = "AirPods Head Tracking - v1.0.0"
right: str = "Ctrl+C - Close | p - Pause" + rec_str
status: str = "[bold green]Live[/bold green]"
header: List[str] = list(" " * term_width)
header[0:len(left)] = list(left)
header[term_width - len(right):] = list(right)
start: int = (term_width - len(status)) // 2
header[start:start+len(status)] = list(status)
header_text: str = "".join(header)
layout["header"].update(Panel(header_text, style="bold white on black"))
face_art: str = self.orientation_visualizer.create_face_art(pitch, yaw)
layout["accelerations"]["vertical"].update(Panel(
"[bold yellow]Vertical Acceleration[/]\n" +
vert_plot + "\n" +
f"Cur: {v_accel[-1]:6.1f} | Min: {min(v_accel):6.1f} | Max: {max(v_accel):6.1f}",
style="yellow"
))
layout["accelerations"]["horizontal"].update(Panel(
"[bold cyan]Horizontal Acceleration[/]\n" +
horiz_plot + "\n" +
f"Cur: {h_accel[-1]:6.1f} | Min: {min(h_accel):6.1f} | Max: {max(h_accel):6.1f}",
style="cyan"
))
layout["orientations"]["face"].update(Panel(face_art, title="[green]Orientation - Visualization[/]", style="green"))
o2_values: List[float] = [p.get('orientation 2', 0) for p in data[-plot_width:]]
o3_values: List[float] = [p.get('orientation 3', 0) for p in data[-plot_width:]]
o2_values: List[float] = o2_values[:plot_width]
o3_values: List[float] = o3_values[:plot_width]
common_min: float = min(min(o2_values), min(o3_values))
common_max: float = max(max(o2_values), max(o3_values))
config_ori: Dict[str, float] = {'height': ori_height, 'min': common_min, 'max': common_max, 'format': "{:6.0f}"}
chart_o2: str = acp.plot(o2_values, config_ori)
chart_o3: str = acp.plot(o3_values, config_ori)
layout["orientations"]["raw"].update(Panel(
"[bold yellow]Orientation 1:[/]\n" + chart_o2 + "\n" +
f"Cur: {o2_values[-1]:6.1f} | Min: {min(o2_values):6.1f} | Max: {max(o2_values):6.1f}\n\n" +
"[bold green]Orientation 2:[/]\n" + chart_o3 + "\n" +
f"Cur: {o3_values[-1]:6.1f} | Min: {min(o3_values):6.1f} | Max: {max(o3_values):6.1f}",
title="[cyan]Orientation Raw[/]", style="yellow"
))
live.refresh()
time.sleep(0.05)
except KeyboardInterrupt:
logger.info("\nStopped.")
if record_data:
self.stop_tracking()
else:
if self.sock:
self.sock.send(bytes.fromhex(STOP_CMD))
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def _start_live_plotting(self, record_data: bool = False, duration: Optional[float] = None) -> None:
terminal_width: int = os.get_terminal_size().columns
plot_width: int = min(terminal_width - 10, 80)
plot_height: int = 10
try:
while True:
os.system('clear' if os.name == 'posix' else 'cls')
with self.data_lock:
if len(self.live_data) == 0:
print("\nWaiting for data...")
time.sleep(0.1)
continue
data: List[Dict[str, float]] = self.live_data[-plot_width:]
acceleration_fields: List[str] = [f for f in KEY_FIELDS.keys() if 'acceleration' in f.lower()]
orientation_fields: List[str] = [f for f in KEY_FIELDS.keys() if 'orientation' in f.lower()]
other_fields: List[str] = [f for f in KEY_FIELDS.keys() if f not in acceleration_fields + orientation_fields]
def plot_group(fields: List[str], title: str) -> None:
if not fields:
return
print(f"\n{title}")
print("=" * len(title))
for field in fields:
values: List[float] = [packet.get(field, 0) for packet in data if field in packet]
if len(values) > 0:
chart: str = acp.plot(values, {'height': plot_height})
print(chart)
print(f"Current: {values[-1]:.2f}, " +
f"Min: {min(values):.2f}, Max: {max(values):.2f}")
print()
plot_group(acceleration_fields, "Acceleration Data")
plot_group(orientation_fields, "Orientation Data")
plot_group(other_fields, "Other Fields")
print("\nPress Ctrl+C to stop plotting")
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("\nLive plotting stopped.")
self.sock.send(bytes.fromhex(STOP_CMD))
if record_data:
self.stop_tracking()
self.live_plotting = False
def start_live_plotting(self, record_data: bool = False, duration: Optional[float] = None) -> None:
if self.sock is None:
if not self.connect():
logger.error("Could not connect to AirPods. Live plotting aborted.")
return
if not self.recording and record_data:
self.start_tracking(duration)
logger.info("Recording enabled during live plotting")
elif not self.recording:
self.sock.send(bytes.fromhex(START_CMD))
logger.info("Head tracking started (not recording to file)")
with self.data_lock:
self.live_data = []
self.live_plotting = True
self.paused = False
if self.use_terminal:
self._start_live_plotting_terminal(record_data, duration)
else:
from matplotlib.gridspec import GridSpec, GridSpecFromSubplotSpec
fig: Figure = plt.figure(figsize=(14, 6))
gs: GridSpec = GridSpec(1, 2, width_ratios=[1, 1])
ax_accel: Axes = fig.add_subplot(gs[0])
subgs: GridSpecFromSubplotSpec = GridSpecFromSubplotSpec(2, 1, subplot_spec=gs[1], height_ratios=[2, 1])
ax_head_top: Axes = fig.add_subplot(subgs[0], projection='3d')
ax_ori: Axes = fig.add_subplot(subgs[1])
ax_accel.set_title("Acceleration Data")
ax_accel.set_xlabel("Packet Index")
ax_accel.set_ylabel("Acceleration")
ax_accel.legend(loc='upper right', framealpha=0.7)
fig.patch.set_facecolor('#1e1e1e')
ax_accel.set_facecolor('#2d2d2d')
self.apply_dark_theme(fig, [ax_accel, ax_head_top, ax_ori])
plt.ion()
def update_plot(_: int) -> None:
with self.data_lock:
data: List[Dict[str, float]] = self.live_data.copy()
if len(data) == 0:
return
latest: Dict[str, float] = data[-1]
if not self.orientation_visualizer.calibration_complete:
sample: List[float] = [
latest.get('orientation 1', 0),
latest.get('orientation 2', 0),
latest.get('orientation 3', 0)
]
self.orientation_visualizer.add_calibration_sample(sample)
ax_head_top.cla()
ax_head_top.text(0.5, 0.5, "Calibrating... please wait", horizontalalignment='center', verticalalignment='center', transform=ax_head_top.transAxes, color='white')
fig.canvas.draw_idle()
return
h_accel: List[float] = [p.get('Horizontal Acceleration', 0) for p in data]
v_accel: List[float] = [p.get('Vertical Acceleration', 0) for p in data]
x_vals: List[int] = list(range(len(h_accel)))
ax_accel.cla()
ax_accel.plot(x_vals, v_accel, label='Vertical Acceleration', color='#FFFF00', linewidth=2)
ax_accel.plot(x_vals, h_accel, label='Horizontal Acceleration', color='#00FFFF', linewidth=2)
ax_accel.set_title("Acceleration Data")
ax_accel.set_xlabel("Packet Index")
ax_accel.set_ylabel("Acceleration")
ax_accel.legend(loc='upper right', framealpha=0.7)
ax_accel.set_facecolor('#2d2d2d')
ax_accel.title.set_color('white')
ax_accel.xaxis.label.set_color('white')
ax_accel.yaxis.label.set_color('white')
latest: Dict[str, float] = data[-1]
o1: float = latest.get('orientation 1', 0)
o2: float = latest.get('orientation 2', 0)
o3: float = latest.get('orientation 3', 0)
orientation: Dict[str, float] = self.orientation_visualizer.calculate_orientation(o1, o2, o3)
pitch: float = orientation['pitch']
yaw: float = orientation['yaw']
ax_head_top.cla()
ax_head_top.set_title("Head Orientation")
ax_head_top.set_xlim([-1, 1])
ax_head_top.set_ylim([-1, 1])
ax_head_top.set_zlim([-1, 1])
ax_head_top.set_facecolor('#2d2d2d')
pitch_rad = np.radians(pitch)
yaw_rad = np.radians(yaw)
Rz: NDArray[Any] = np.array([
[np.cos(yaw_rad), np.sin(yaw_rad), 0],
[-np.sin(yaw_rad), np.cos(yaw_rad), 0],
[0, 0, 1]
])
Ry: NDArray[Any] = np.array([
[np.cos(pitch_rad), 0, np.sin(pitch_rad)],
[0, 1, 0],
[-np.sin(pitch_rad), 0, np.cos(pitch_rad)]
])
R: NDArray[Any] = Rz @ Ry
dir_vec: NDArray[Any] = R @ np.array([1, 0, 0])
ax_head_top.quiver(0, 0, 0, dir_vec[0], dir_vec[1], dir_vec[2],
color='r', length=0.8, linewidth=3)
ax_ori.cla()
o2_values: List[float] = [p.get('orientation 2', 0) for p in data]
o3_values: List[float] = [p.get('orientation 3', 0) for p in data]
x_range: List[int] = list(range(len(o2_values)))
ax_ori.plot(x_range, o2_values, label='Orientation 1', color='red', linewidth=2)
ax_ori.plot(x_range, o3_values, label='Orientation 2', color='green', linewidth=2)
ax_ori.set_facecolor('#2d2d2d')
ax_ori.tick_params(colors='white')
ax_ori.set_title("Orientation Raw")
ax_ori.legend(facecolor='#2d2d2d', edgecolor='#555555',
labelcolor='white', loc='upper right')
ax_ori.text(0.95, 0.9, f"Pitch: {pitch:.1f}°\nYaw: {yaw:.1f}°",
transform=ax_ori.transAxes, color='white',
ha='right', va='top', bbox=dict(facecolor='#2d2d2d', alpha=0.5))
fig.canvas.draw_idle()
self.animation = FuncAnimation(
fig, update_plot,
interval=20,
blit=False,
cache_frame_data=False
)
plt.show(block=True)
self.sock.send(bytes.fromhex(STOP_CMD))
logger.info("Stopping head tracking AirPods.")
if self.recording and record_data:
self.stop_tracking()
logger.info("Recording stopped after sending close command")
else:
logger.info("Live plotting ended (no recording to stop).")
self.live_plotting = False
self.animation = None
plt.ioff()
def interactive_mode(self) -> None:
from prompt_toolkit import PromptSession
session: PromptSession = PromptSession("> ")
logger.info("\nAirPods Head Tracking Analyzer")
print("------------------------------")
logger.info("Commands:")
print(" connect - connect to your AirPods")
print(" start [seconds] - start recording head tracking data, optionally for specified duration")
print(" stop - stop recording")
print(" load <file> - load and parse a log file")
print(" plot - plot all sensor data fields")
print(" live [seconds] - start live plotting (without recording), optionally stop recording after seconds")
print(" liver [seconds] - start live plotting with recording, optionally stop recording after seconds")
print(" gestures - start gesture detection")
print(" quit - exit the program")
while True:
try:
cmd_input: str = session.prompt("> ")
cmd_parts: List[str] = cmd_input.strip().split()
if not cmd_parts:
continue
cmd = cmd_parts[0].lower()
match cmd:
case "connect":
self.connect()
case "start":
duration = float(cmd_parts[1]) if len(cmd_parts) > 1 else None
self.start_tracking(duration)
case "stop":
self.stop_tracking()
case "load":
if len(cmd_parts) > 1:
self.load_log_file(cmd_parts[1])
case "plot":
self.plot_fields()
case "live":
duration = float(cmd_parts[1]) if len(cmd_parts) > 1 else None
logger.info("Starting live plotting mode (without recording)%s.",
f" for {duration} seconds" if duration else "")
self.start_live_plotting(record_data=False, duration=duration)
case "liver":
duration = float(cmd_parts[1]) if len(cmd_parts) > 1 else None
logger.info("Starting live plotting mode WITH recording%s.",
f" for {duration} seconds" if duration else "")
self.start_live_plotting(record_data=True, duration=duration)
case "gestures":
from gestures import GestureDetector
if self.conn is not None:
detector: GestureDetector = GestureDetector(conn=self.conn)
else:
detector: GestureDetector = GestureDetector()
detector.start_detection()
case "quit":
logger.info("Exiting.")
if self.conn != None:
self.conn.disconnect()
break
case "help":
logger.info("\nAirPods Head Tracking Analyzer")
logger.info("------------------------------")
logger.info("Commands:")
logger.info(" connect - connect to your AirPods")
logger.info(" start [seconds] - start recording head tracking data, optionally for specified duration")
logger.info(" stop - stop recording")
logger.info(" load <file> - load and parse a log file")
logger.info(" plot - plot all sensor data fields")
logger.info(" live [seconds] - start live plotting (without recording), optionally stop recording after seconds")
logger.info(" liver [seconds] - start live plotting with recording, optionally stop recording after seconds")
logger.info(" gestures - start gesture detection")
logger.info(" quit - exit the program")
case _:
logger.info("Unknown command. Type 'help' to see available commands.")
except KeyboardInterrupt:
logger.info("Use 'quit' to exit.")
except EOFError:
logger.info("Exiting.")
if self.conn != None:
self.conn.disconnect()
break
if __name__ == "__main__":
import sys
tracker: AirPodsTracker = AirPodsTracker()
tracker.interactive_mode()

View File

@@ -1,6 +0,0 @@
drawille
numpy
pybluez
matplotlib
asciichartpy
rich

View File

@@ -1,7 +1,7 @@
id=librepods
name=LibrePods
version=v0.2.0
versionCode=34
version=v0.2.6
versionCode=46
author=@kavishdevar
description=Installs LibrePods as a system app for granting BLUETOOTH_PRIVILEGED and MODIFY_PHONE_STATE permission for better integraion with android.
updateJson=https://raw.githubusercontent.com/kavishdevar/librepods/main/update_nonpatch.json
updateJson=https://raw.githubusercontent.com/kavishdevar/librepods/main/extras/update_nonpatch.json

View File

@@ -1,6 +1,6 @@
{
"version": "v0.2.3",
"versionCode": 36,
"zipUrl": "https://github.com/kavishdevar/librepods/releases/download/v0.2.3/LibrePods-FOSS-v0.2.3-release.zip",
"changelog": "https://raw.githubusercontent.com/kavishdevar/librepods/main/CHANGELOG.md"
}
"version": "v0.2.6",
"versionCode": 46,
"zipUrl": "https://github.com/kavishdevar/librepods/releases/download/v0.2.6/LibrePods-FOSS-v0.2.6-release.zip",
"changelog": "https://raw.githubusercontent.com/kavishdevar/librepods/main/extras/CHANGELOG.md"
}