diff --git a/Proximity Pairing Message.md b/Proximity Pairing Message.md deleted file mode 100644 index 4f34b61..0000000 --- a/Proximity Pairing Message.md +++ /dev/null @@ -1,164 +0,0 @@ -# Bluetooth Low Energy (BLE) - Apple Proximity Pairing Message - -This document describes how the AirPods BLE "Proximity Pairing Message" is parsed and interpreted in the application. This message is broadcast by Apple devices (such as AirPods) and contains key information about the device's state, battery, and other properties. - -## Overview - -When scanning for BLE devices, the application looks for manufacturer data with Apple's ID (`0x004C`). If the data starts with `0x07`, it is identified as a Proximity Pairing Message. The message contains various fields, each representing a specific property of the AirPods. - -## Proximity Pairing Message Structure - -| Byte Index | Field Name | Description | Example Value(s) | -|------------|-------------------------|---------------------------------------------------------|--------------------------| -| 0 | Prefix | Message type (should be `0x07` for proximity pairing) | `0x07` | -| 1 | Length | Length of the message | `0x12` | -| 2 | Pairing Mode | `0x01` = Paired, `0x00` = Pairing mode | `0x01`, `0x00` | -| 3-4 | Device Model | Big-endian: [3]=high, [4]=low | `0x0E20` (AirPods Pro) | -| 5 | Status | Bitfield, see below | `0x62` | -| 6 | Pods Battery Byte | Nibbles for left/right pod battery | `0xA7` | -| 7 | Flags & Case Battery | Upper nibble: case battery, lower: flags | `0xB3` | -| 8 | Lid Indicator | Bits for lid state and open counter | `0x09` | -| 9 | Device Color | Color code | `0x02` | -| 10 | Connection State | Enum, see below | `0x04` | -| 11-26 | Encrypted Payload | 16 bytes, not parsed | | - -## Field Details - -### Device Model - -| Value (hex) | Model Name | -|-------------|--------------------------| -| 0x0220 | AirPods 1st Gen | -| 0x0F20 | AirPods 2nd Gen | -| 0x1320 | AirPods 3rd Gen | -| 0x1920 | AirPods 4th Gen | -| 0x1B20 | AirPods 4th Gen (ANC) | -| 0x0A20 | AirPods Max | -| 0x1F20 | AirPods Max (USB-C) | -| 0x0E20 | AirPods Pro | -| 0x1420 | AirPods Pro 2nd Gen | -| 0x2420 | AirPods Pro 2nd Gen (USB-C) | - -### Status Byte (Bitfield) - -| Bit | Meaning | Value if Set | -|-----|--------------------------------|-------------| -| 0 | Right Pod In Ear (XOR logic) | true | -| 1 | Right Pod In Ear (XOR logic) | true | -| 2 | Both Pods In Case | true | -| 3 | Left Pod In Ear (XOR logic) | true | -| 4 | One Pod In Case | true | -| 5 | Primary Pod (1=Left, 0=Right) | true/false | -| 6 | This Pod In Case | true | - -### Ear Detection Logic - -The in-ear detection uses XOR logic based on: -- Whether the right pod is primary (`areValuesFlipped`) -- Whether this pod is in the case (`isThisPodInTheCase`) - -```cpp -bool xorFactor = areValuesFlipped ^ deviceInfo.isThisPodInTheCase; -deviceInfo.isLeftPodInEar = xorFactor ? (status & 0x08) != 0 : (status & 0x02) != 0; // Bit 3 or 1 -deviceInfo.isRightPodInEar = xorFactor ? (status & 0x02) != 0 : (status & 0x08) != 0; // Bit 1 or 3 -``` - -### Primary Pod - -Determined by bit 5 of the status byte: -- `1` = Left pod is primary -- `0` = Right pod is primary - -This affects: -1. Battery level interpretation (which nibble corresponds to which pod) -2. Microphone assignment -3. Ear detection logic - -### Microphone Status - -The active microphone is determined by: -```cpp -deviceInfo.isLeftPodMicrophone = primaryLeft ^ deviceInfo.isThisPodInTheCase; -deviceInfo.isRightPodMicrophone = !primaryLeft ^ deviceInfo.isThisPodInTheCase; -``` - -### Pods Battery Byte - -- Upper nibble: one pod battery (depends on primary) -- Lower nibble: other pod battery - -| Value | Meaning | -|-------|----------------| -| 0x0-0x9 | 0-90% (x10) | -| 0xA-0xE | 100% | -| 0xF | Not available| - -### Flags & Case Battery Byte - -- Upper nibble: case battery (same encoding as pods) -- Lower nibble: flags - -#### Flags (Lower Nibble) - -| Bit | Meaning | -|-----|--------------------------| -| 0 | Right Pod Charging (XOR) | -| 1 | Left Pod Charging (XOR) | -| 2 | Case Charging | - -### Lid Indicator - -| Bits | Meaning | -|------|------------------------| -| 0-2 | Lid Open Counter | -| 3 | Lid State (0=Open, 1=Closed) | - -### Device Color - -| Value | Color | -|-------|-------------| -| 0x00 | White | -| 0x01 | Black | -| 0x02 | Red | -| 0x03 | Blue | -| 0x04 | Pink | -| 0x05 | Gray | -| 0x06 | Silver | -| 0x07 | Gold | -| 0x08 | Rose Gold | -| 0x09 | Space Gray | -| 0x0A | Dark Blue | -| 0x0B | Light Blue | -| 0x0C | Yellow | -| 0x0D+ | Unknown | - -### Connection State - -| Value | State | -|-------|--------------| -| 0x00 | Disconnected | -| 0x04 | Idle | -| 0x05 | Music | -| 0x06 | Call | -| 0x07 | Ringing | -| 0x09 | Hanging Up | -| 0xFF | Unknown | - -## Example Message - -| Byte Index | Example Value | Description | -|------------|--------------|----------------------------| -| 0 | 0x07 | Proximity Pairing Message | -| 1 | 0x12 | Length | -| 2 | 0x01 | Paired | -| 3-4 | 0x0E 0x20 | AirPods Pro | -| 5 | 0x62 | Status | -| 6 | 0xA7 | Pods Battery | -| 7 | 0xB3 | Flags & Case Battery | -| 8 | 0x09 | Lid Indicator | -| 9 | 0x02 | Device Color | -| 10 | 0x04 | Connection State (Idle) | - ---- - -For further details, see [`BleManager`](linux/ble/blemanager.cpp) and [`BleScanner`](linux/ble/blescanner.cpp). \ No newline at end of file diff --git a/head-tracking/README.md b/head-tracking/README.md deleted file mode 100644 index 7ded83c..0000000 --- a/head-tracking/README.md +++ /dev/null @@ -1,60 +0,0 @@ -# AirPods Head Tracking Visualizer - -This implements head tracking with AirPods by gathering sensor data over l2cap, processing orientation and acceleration values, and detecting head gestures. The codebase is split into the following components: - -# How to use - -Connect your airpods and change the mac address in `plot.py` to your airpods mac address. Then run the following command to start the program. - -```bash -python plot.py -``` - -Alternatively, you can directly run the `gestures.py` to just detect gestures. - -```bash -python gestures.py -``` - -- **Connection and Data Collection** - The project uses a custom ConnectionManager (imported in multiple files) to connect via Bluetooth to AirPods. Once connected, sensor packets are received in raw hex format. An AirPodsTracker class (in `plot.py`) handles the start/stop of tracking, logging of raw data, and parsing of packets into useful fields. - -- **Orientation Calculation and Visualization** - The `HeadOrientation` class (in `head_orientation.py`) is responsible for: - - **Calibration:** - A set number of samples (default 10) are collected to calculate the neutral (baseline) values for the sensors. For example: - `o1_neutral = np.mean(samples[:, 0])` - - **Calculating Angles:** - For each new packet, the raw orientation values are normalized by subtracting the neutral baseline. Then: - - **Pitch** is computed as: - ``` - pitch = (o2_norm + o3_norm) / 2 / 32000 * 180 - ``` - This averages the deviations from neutral, scales the result to degrees (assuming a sensor range around 32000), thus giving a smooth estimation of up/down tilt. - - **Yaw** is computed as: - ``` - yaw = (o2_norm - o3_norm) / 2 / 32000 * 180 - ``` - Here, the difference between the two sensor axes is used to detect left/right rotation. - - **ASCII Visualization:** - Based on the calculated pitch and yaw, an ASCII art "face" is generated. The algorithm rotates points on a circle using simple trigonometric formulas (with scaling factors based on sensor depth) to build an approximate visual representation of head orientation. - -- **Live Plotting and Interactive Commands** - The code offers both terminal-based plotting and graphical plotting via matplotlib. The AirPodsTracker manages live plotting by maintaining a buffer of recent packets. When in terminal mode, the code uses libraries like `asciichartpy` and `drawille` to render charts; in graphical mode, it creates live-updating plots. - -- **Gesture Detection** - The `GestureDetector` class (in `gestures.py`) processes the head tracking data to detect nodding ("Yes") or head shaking ("No"): - - **Smoothing:** - Raw horizontal and vertical sensor data undergo moving-average smoothing using small fixed-size buffers. This reduces noise and provides a steadier signal. - - **Peak and Trough Detection:** - The code monitors small sections (e.g. the last 4 values) to compute variance and dynamically determine thresholds for direction changes. When a significant reversal (e.g. from increasing to decreasing) is detected that surpasses the dynamic threshold value (derived partly from a fixed threshold and variance), a peak or trough is recorded. - - **Rhythm Consistency:** - Time intervals between detected peaks are captured. The consistency of these intervals (by comparing them to their mean and computing relative variance) is used to evaluate whether the movement is rhythmic—a trait of intentional gestures. - - **Confidence Calculation:** - Multiple factors are considered: - - **Amplitude Factor:** Compares the average detected peak amplitude with a constant (like 600) to provide a normalized measure. - - **Rhythm Factor:** Derived from the consistency of the time intervals of the peaks. - - **Alternation Factor:** Verifies that the signal alternates (for instance, switching between positive and negative values). - - **Isolation Factor:** Checks that movement on the target axis (vertical for nodding, horizontal for shaking) dominates over the non-target axis. - - A weighted sum of these factors forms a confidence score which, if above a predefined threshold (e.g. 0.7), confirms a detected gesture. diff --git a/head-tracking/colors.py b/head-tracking/colors.py deleted file mode 100644 index cc1ba21..0000000 --- a/head-tracking/colors.py +++ /dev/null @@ -1,29 +0,0 @@ -import logging -from logging import Formatter, LogRecord -from typing import Dict - -class Colors: - RESET: str = "\033[0m" - BOLD: str = "\033[1m" - RED: str = "\033[91m" - GREEN: str = "\033[92m" - YELLOW: str = "\033[93m" - BLUE: str = "\033[94m" - MAGENTA: str = "\033[95m" - CYAN: str = "\033[96m" - WHITE: str = "\033[97m" - BG_BLACK: str = "\033[40m" - -class ColorFormatter(Formatter): - FORMATS: Dict[int, str] = { - logging.DEBUG: f"{Colors.BLUE}[%(levelname)s] %(message)s{Colors.RESET}", - logging.INFO: f"{Colors.GREEN}%(message)s{Colors.RESET}", - logging.WARNING: f"{Colors.YELLOW}%(message)s{Colors.RESET}", - logging.ERROR: f"{Colors.RED}[%(levelname)s] %(message)s{Colors.RESET}", - logging.CRITICAL: f"{Colors.RED}{Colors.BOLD}[%(levelname)s] %(message)s{Colors.RESET}" - } - - def format(self, record: LogRecord) -> str: - log_fmt: str = self.FORMATS.get(record.levelno) - formatter: Formatter = Formatter(log_fmt, datefmt="%H:%M:%S") - return formatter.format(record) diff --git a/head-tracking/connection_manager.py b/head-tracking/connection_manager.py deleted file mode 100644 index ae92dc3..0000000 --- a/head-tracking/connection_manager.py +++ /dev/null @@ -1,64 +0,0 @@ -import bluetooth -import logging -from bluetooth import BluetoothSocket -from logging import Logger - -class ConnectionManager: - INIT_CMD: str = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00" - START_CMD: str = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00" - STOP_CMD: str = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00" - - def __init__(self, bt_addr: str = "28:2D:7F:C2:05:5B", psm: int = 0x1001, logger: Logger = None) -> None: - self.bt_addr: str = bt_addr - self.psm: int = psm - self.logger: Logger = logger if logger else logging.getLogger(__name__) - self.sock: BluetoothSocket = None - self.connected: bool = False - self.started: bool = False - - def connect(self) -> bool: - self.logger.info(f"Connecting to {self.bt_addr} on PSM {self.psm:#04x}...") - try: - self.sock = BluetoothSocket(bluetooth.L2CAP) - self.sock.connect((self.bt_addr, self.psm)) - self.connected = True - self.logger.info("Connected to AirPods.") - self.sock.send(bytes.fromhex(self.INIT_CMD)) - self.logger.info("Initialization complete.") - except Exception as e: - self.logger.error(f"Connection failed: {e}") - self.connected = False - return self.connected - - def send_start(self) -> bool: - if not self.connected: - self.logger.error("Not connected. Cannot send START command.") - return False - if not self.started: - self.sock.send(bytes.fromhex(self.START_CMD)) - self.started = True - self.logger.info("START command sent.") - else: - self.logger.info("START command has already been sent.") - return True - - def send_stop(self) -> None: - if self.connected and self.started: - try: - self.sock.send(bytes.fromhex(self.STOP_CMD)) - self.logger.info("STOP command sent.") - self.started = False - except Exception as e: - self.logger.error(f"Error sending STOP command: {e}") - else: - self.logger.info("Cannot send STOP; not started or not connected.") - - def disconnect(self) -> None: - if self.sock: - try: - self.sock.close() - self.logger.info("Disconnected from AirPods.") - except Exception as e: - self.logger.error(f"Error during disconnect: {e}") - self.connected = False - self.started = False diff --git a/head-tracking/gestures.py b/head-tracking/gestures.py deleted file mode 100644 index a598409..0000000 --- a/head-tracking/gestures.py +++ /dev/null @@ -1,358 +0,0 @@ -import logging -import statistics -import time -from bluetooth import BluetoothSocket -from collections import deque -from colors import * -from connection_manager import ConnectionManager -from logging import Logger, StreamHandler -from threading import Lock, Thread -from typing import Any, Deque, List, Optional, Tuple - -handler: StreamHandler = StreamHandler() -handler.setFormatter(ColorFormatter()) -log: Logger = logging.getLogger(__name__) -log.setLevel(logging.INFO) -log.addHandler(handler) -log.propagate = False - -class GestureDetector: - INIT_CMD: str = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00" - START_CMD: str = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00" - STOP_CMD: str = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00" - - def __init__(self, conn: ConnectionManager = None) -> None: - self.sock: BluetoothSocket = None - self.bt_addr: str = "28:2D:7F:C2:05:5B" - self.psm: int = 0x1001 - self.running: bool = False - self.data_lock: Lock = Lock() - - self.horiz_buffer: Deque[int] = deque(maxlen=100) - self.vert_buffer: Deque[int] = deque(maxlen=100) - - self.horiz_avg_buffer: Deque[float] = deque(maxlen=5) - self.vert_avg_buffer: Deque[float] = deque(maxlen=5) - - self.horiz_peaks: List[int] = [] - self.horiz_troughs: List[int] = [] - self.vert_peaks: List[int] = [] - self.vert_troughs: List[int] = [] - - self.last_peak_time: float = 0 - self.peak_intervals: Deque[float] = deque(maxlen=5) - - self.peak_threshold: int = 400 - self.direction_change_threshold: int = 175 - self.rhythm_consistency_threshold: float = 0.5 - - self.horiz_increasing: Optional[bool] = None - self.vert_increasing: Optional[bool] = None - - self.required_extremes = 3 - self.detection_timeout: int = 15 - - self.min_confidence_threshold: float = 0.7 - - self.conn: ConnectionManager = conn - - def connect(self) -> bool: - try: - log.info(f"Connecting to AirPods at {self.bt_addr}...") - if self.conn is None: - self.conn = ConnectionManager(self.bt_addr, self.psm, logger=log) - if not self.conn.connect(): - return False - else: - if not self.conn.connected: - if not self.conn.connect(): - return False - self.sock = self.conn.sock - log.info(f"{Colors.GREEN}✓ Connected to AirPods via ConnectionManager{Colors.RESET}") - return True - except Exception as e: - log.error(f"{Colors.RED}Connection failed: {e}{Colors.RESET}") - return False - - def process_data(self) -> None: - """Process incoming head tracking data.""" - self.conn.send_start() - log.info(f"{Colors.GREEN}✓ Head tracking activated{Colors.RESET}") - - self.running = True - start_time: float = time.time() - - log.info(f"{Colors.GREEN}Ready! Make a YES or NO gesture{Colors.RESET}") - log.info(f"{Colors.YELLOW}Tip: Use natural, moderate speed head movements{Colors.RESET}") - - while self.running: - if time.time() - start_time > self.detection_timeout: - log.warning(f"{Colors.YELLOW}⚠️ Detection timeout reached. No gesture detected.{Colors.RESET}") - self.running = False - break - - try: - if not self.sock: - log.error("Socket not available.") - break - data: bytes = self.sock.recv(1024) - formatted: str = self.format_hex(data) - if self.is_valid_tracking_packet(formatted): - raw_bytes: bytes = bytes.fromhex(formatted.replace(" ", "")) - horizontal, vertical = self.extract_orientation_values(raw_bytes) - - if horizontal is not None and vertical is not None: - smooth_h, smooth_v = self.apply_smoothing(horizontal, vertical) - - with self.data_lock: - self.horiz_buffer.append(smooth_h) - self.vert_buffer.append(smooth_v) - - self.detect_peaks_and_troughs() - gesture: Optional[str] = self.detect_gestures() - - if gesture: - self.running = False - break - - except Exception as e: - if self.running: - log.error(f"Data processing error: {e}") - break - - def disconnect(self) -> None: - """Disconnect from socket.""" - self.conn.disconnect() - - def format_hex(self, data: bytes) -> str: - """Format binary data to readable hex string.""" - hex_str: str = data.hex() - return ' '.join(hex_str[i:i+2] for i in range(0, len(hex_str), 2)) - - def is_valid_tracking_packet(self, hex_string: str) -> bool: - """Verify packet is a valid head tracking packet.""" - standard_header: str = "04 00 04 00 17 00 00 00 10 00 45 00" - alternate_header: str = "04 00 04 00 17 00 00 00 10 00 44 00" - if not hex_string.startswith(standard_header) and not hex_string.startswith(alternate_header): - return False - - if len(hex_string.split()) < 80: - return False - - return True - - def extract_orientation_values(self, raw_bytes: bytes) -> Tuple[Optional[int], Optional[int]]: - """Extract head orientation data from packet.""" - try: - horizontal: int = int.from_bytes(raw_bytes[51:53], byteorder='little', signed=True) - vertical: int = int.from_bytes(raw_bytes[53:55], byteorder='little', signed=True) - - return horizontal, vertical - except Exception as e: - log.debug(f"Failed to extract orientation: {e}") - return None, None - - def apply_smoothing(self, horizontal: int, vertical: int) -> Tuple[float, float]: - """Apply moving average smoothing (Apple-like filtering).""" - self.horiz_avg_buffer.append(horizontal) - self.vert_avg_buffer.append(vertical) - - smooth_horiz: float = sum(self.horiz_avg_buffer) / len(self.horiz_avg_buffer) - smooth_vert: float = sum(self.vert_avg_buffer) / len(self.vert_avg_buffer) - - return smooth_horiz, smooth_vert - - def detect_peaks_and_troughs(self) -> None: - """Detect motion direction changes with Apple-like refinements.""" - if len(self.horiz_buffer) < 4 or len(self.vert_buffer) < 4: - return - - h_values: List[int] = list(self.horiz_buffer)[-4:] - v_values: List[int] = list(self.vert_buffer)[-4:] - - h_variance: float = statistics.variance(h_values) if len(h_values) > 1 else 0 - v_variance: float = statistics.variance(v_values) if len(v_values) > 1 else 0 - - current: int = self.horiz_buffer[-1] - prev: int = self.horiz_buffer[-2] - - if self.horiz_increasing is None: - self.horiz_increasing = current > prev - - dynamic_h_threshold: float = max(100, min(self.direction_change_threshold, h_variance / 3)) - - if self.horiz_increasing and current < prev - dynamic_h_threshold: - if abs(prev) > self.peak_threshold: - self.horiz_peaks.append((len(self.horiz_buffer)-1, prev, time.time())) - direction: str = "➡️ " if prev > 0 else "⬅️ " - log.info(f"{Colors.CYAN}{direction} Horizontal max: {prev} (threshold: {dynamic_h_threshold:.1f}){Colors.RESET}") - - now: float = time.time() - if self.last_peak_time > 0: - interval: float = now - self.last_peak_time - self.peak_intervals.append(interval) - self.last_peak_time = now - - self.horiz_increasing = False - - elif not self.horiz_increasing and current > prev + dynamic_h_threshold: - if abs(prev) > self.peak_threshold: - self.horiz_troughs.append((len(self.horiz_buffer)-1, prev, time.time())) - direction: str = "➡️ " if prev > 0 else "⬅️ " - log.info(f"{Colors.CYAN}{direction} Horizontal max: {prev} (threshold: {dynamic_h_threshold:.1f}){Colors.RESET}") - - now: float = time.time() - if self.last_peak_time > 0: - interval: float = now - self.last_peak_time - self.peak_intervals.append(interval) - self.last_peak_time = now - - self.horiz_increasing = True - - current: int = self.vert_buffer[-1] - prev: int = self.vert_buffer[-2] - - if self.vert_increasing is None: - self.vert_increasing = current > prev - - dynamic_v_threshold: float = max(100, min(self.direction_change_threshold, v_variance / 3)) - - if self.vert_increasing and current < prev - dynamic_v_threshold: - if abs(prev) > self.peak_threshold: - self.vert_peaks.append((len(self.vert_buffer)-1, prev, time.time())) - direction: str = "⬆️ " if prev > 0 else "⬇️ " - log.info(f"{Colors.MAGENTA}{direction} Vertical max: {prev} (threshold: {dynamic_v_threshold:.1f}){Colors.RESET}") - - now: float = time.time() - if self.last_peak_time > 0: - interval: float = now - self.last_peak_time - self.peak_intervals.append(interval) - self.last_peak_time = now - - self.vert_increasing = False - - elif not self.vert_increasing and current > prev + dynamic_v_threshold: - if abs(prev) > self.peak_threshold: - self.vert_troughs.append((len(self.vert_buffer)-1, prev, time.time())) - direction: str = "⬆️ " if prev > 0 else "⬇️ " - log.info(f"{Colors.MAGENTA}{direction} Vertical max: {prev} (threshold: {dynamic_v_threshold:.1f}){Colors.RESET}") - - now: float = time.time() - if self.last_peak_time > 0: - interval: float = now - self.last_peak_time - self.peak_intervals.append(interval) - self.last_peak_time = now - - self.vert_increasing = True - - def calculate_rhythm_consistency(self) -> float: - """Calculate how consistent the timing between peaks is (Apple-like).""" - if len(self.peak_intervals) < 2: - return 0 - - mean_interval: float = statistics.mean(self.peak_intervals) - if mean_interval == 0: - return 0 - - variances: List[float] = [(i/mean_interval - 1.0) ** 2 for i in self.peak_intervals] - consistency: float = 1.0 - min(1.0, statistics.mean(variances) / self.rhythm_consistency_threshold) - return max(0, consistency) - - def calculate_confidence_score(self, extremes: List[Tuple[int, int, float]], is_vertical: bool = True) -> float: - """Calculate confidence score for gesture detection (Apple-like).""" - if len(extremes) < self.required_extremes: - return 0.0 - - sorted_extremes: List[Tuple[int, int, float]] = sorted(extremes, key=lambda x: x[0]) - - recent: List[Tuple[int, int, float]] = sorted_extremes[-self.required_extremes:] - - avg_amplitude: float = sum(abs(val) for _, val, _ in recent) / len(recent) - amplitude_factor: float = min(1.0, avg_amplitude / 600) - - rhythm_factor: float = self.calculate_rhythm_consistency() - - signs: List[int] = [1 if val > 0 else -1 for _, val, _ in recent] - alternating: bool = all(signs[i] != signs[i-1] for i in range(1, len(signs))) - alternation_factor: float = 1.0 if alternating else 0.5 - - if is_vertical: - vert_amp: float = sum(abs(val) for _, val, _ in recent) / len(recent) - horiz_vals: List[int] = list(self.horiz_buffer)[-len(recent)*2:] - horiz_amp: float = sum(abs(val) for val in horiz_vals) / len(horiz_vals) if horiz_vals else 0 - isolation_factor: float = min(1.0, vert_amp / (horiz_amp + 0.1) * 1.2) - else: - horiz_amp: float = sum(abs(val) for _, val, _ in recent) - vert_vals: List[int] = list(self.vert_buffer)[-len(recent)*2:] - vert_amp: float = sum(abs(val) for val in vert_vals) / len(vert_vals) if vert_vals else 0 - isolation_factor: float = min(1.0, horiz_amp / (vert_amp + 0.1) * 1.2) - - confidence: float = ( - amplitude_factor * 0.4 + - rhythm_factor * 0.2 + - alternation_factor * 0.2 + - isolation_factor * 0.2 - ) - - return confidence - - def detect_gestures(self) -> Optional[str]: - """Recognize head gesture patterns with Apple-like intelligence.""" - if len(self.vert_peaks) + len(self.vert_troughs) >= self.required_extremes: - all_extremes: List[Tuple[int, int, float]] = sorted(self.vert_peaks + self.vert_troughs, key=lambda x: x[0]) - - confidence: float = self.calculate_confidence_score(all_extremes, is_vertical=True) - - log.info(f"Vertical motion confidence: {confidence:.2f} (need {self.min_confidence_threshold:.2f})") - - if confidence >= self.min_confidence_threshold: - log.info(f"{Colors.GREEN}🎯 \"Yes\" Gesture Detected (confidence: {confidence:.2f}){Colors.RESET}") - return "YES" - - if len(self.horiz_peaks) + len(self.horiz_troughs) >= self.required_extremes: - all_extremes: List[Tuple[int, int, float]] = sorted(self.horiz_peaks + self.horiz_troughs, key=lambda x: x[0]) - - confidence: float = self.calculate_confidence_score(all_extremes, is_vertical=False) - - log.info(f"Horizontal motion confidence: {confidence:.2f} (need {self.min_confidence_threshold:.2f})") - - if confidence >= self.min_confidence_threshold: - log.info(f"{Colors.GREEN}🎯 \"No\" gesture detected (confidence: {confidence:.2f}){Colors.RESET}") - return "NO" - - return None - - def start_detection(self) -> None: - """Begin gesture detection process.""" - log.info(f"{Colors.BOLD}{Colors.WHITE}Starting gesture detection...{Colors.RESET}") - - if not self.connect(): - log.error(f"{Colors.RED}Failed to connect to AirPods.{Colors.RESET}") - return - - data_thread: Thread = Thread(target=self.process_data) - data_thread.daemon = True - data_thread.start() - - try: - data_thread.join(timeout=self.detection_timeout + 2) - if data_thread.is_alive(): - log.warning(f"{Colors.YELLOW}⚠️ Timeout reached. Stopping detection.{Colors.RESET}") - self.running = False - except KeyboardInterrupt: - log.info(f"{Colors.YELLOW}Detection canceled by user.{Colors.RESET}") - self.running = False - if __name__ == "__main__": - self.disconnect() - log.info(f"{Colors.GREEN}Gesture detection complete.{Colors.RESET}") - -if __name__ == "__main__": - print(f"{Colors.BG_BLACK}{Colors.CYAN}╔════════════════════════════════════════╗{Colors.RESET}") - print(f"{Colors.BG_BLACK}{Colors.CYAN}║ AirPods Head Gesture Detector ║{Colors.RESET}") - print(f"{Colors.BG_BLACK}{Colors.CYAN}╚════════════════════════════════════════╝{Colors.RESET}") - print(f"\n{Colors.WHITE}This program detects head gestures using AirPods:{Colors.RESET}") - print(f"{Colors.GREEN}• YES: {Colors.WHITE}nodding head up and down{Colors.RESET}") - print(f"{Colors.RED}• NO: {Colors.WHITE}shaking head left and right{Colors.RESET}\n") - - detector: GestureDetector = GestureDetector() - detector.start_detection() \ No newline at end of file diff --git a/head-tracking/head_orientation.py b/head-tracking/head_orientation.py deleted file mode 100644 index 1f90990..0000000 --- a/head-tracking/head_orientation.py +++ /dev/null @@ -1,123 +0,0 @@ -import math -import numpy as np -import logging -import os -from colors import * -from drawille import Canvas -from logging import Logger, StreamHandler -from matplotlib.animation import FuncAnimation -from matplotlib.pyplot import Axes, Figure -from numpy.typing import NDArray -from os import terminal_size as TerminalSize -from typing import Any, Dict, List, Optional, Tuple - -handler: StreamHandler = StreamHandler() -handler.setFormatter(ColorFormatter()) -log: Logger = logging.getLogger(__name__) -log.setLevel(logging.INFO) -log.addHandler(handler) -log.propagate = False - -class HeadOrientation: - def __init__(self, use_terminal: bool = False) -> None: - self.orientation_offset: int = 5500 - self.o1_neutral: int = 19000 - self.o2_neutral: int = 0 - self.o3_neutral: int = 0 - self.calibration_samples: List[List[int]] = [] - self.calibration_complete: bool = False - self.calibration_sample_count: int = 10 - self.fig: Optional[Figure] = None - self.ax: Optional[Axes] = None - self.arrow: Any = None - self.animation: Optional[FuncAnimation] = None - self.use_terminal: bool = use_terminal - - def reset_calibration(self) -> None: - self.calibration_samples = [] - self.calibration_complete = False - - def add_calibration_sample(self, orientation_values: List[int]) -> bool: - if len(self.calibration_samples) < self.calibration_sample_count: - self.calibration_samples.append(orientation_values) - return False - if not self.calibration_complete: - self._calculate_calibration() - return True - return True - - def _calculate_calibration(self) -> None: - if len(self.calibration_samples) < 3: - log.warning("Not enough calibration samples") - return - samples: NDArray[[List[int]]] = np.array(self.calibration_samples) - self.o1_neutral: float = np.mean(samples[:, 0]) - avg_o2: float = np.mean(samples[:, 1]) - avg_o3: float = np.mean(samples[:, 2]) - self.o2_neutral: float = avg_o2 - self.o3_neutral: float = avg_o3 - log.info("Calibration complete: o1_neutral=%.2f, o2_neutral=%.2f, o3_neutral=%.2f", - self.o1_neutral, self.o2_neutral, self.o3_neutral) - self.calibration_complete = True - - def calculate_orientation(self, o1: float, o2: float, o3: float) -> Dict[str, float]: - if not self.calibration_complete: - return {'pitch': 0, 'yaw': 0} - o1_norm: float = o1 - self.o1_neutral - o2_norm: float = o2 - self.o2_neutral - o3_norm: float = o3 - self.o3_neutral - pitch: float = (o2_norm + o3_norm) / 2 / 32000 * 180 - yaw: float = (o2_norm - o3_norm) / 2 / 32000 * 180 - return {'pitch': pitch, 'yaw': yaw} - - def create_face_art(self, pitch: float, yaw: float) -> str: - if self.use_terminal: - try: - ts: TerminalSize = os.get_terminal_size() - width, height = ts.columns, ts.lines * 2 - except Exception: - width, height = 80, 40 - else: - width, height = 80, 40 - center_x, center_y = width // 2, height // 2 - radius: int = (min(width, height) // 2 - 2) // 2 - pitch_rad: float = math.radians(pitch) - yaw_rad: float = math.radians(yaw) - canvas: Canvas = Canvas() - - def rotate_point(x: float, y: float, z: float, pitch_r: float, yaw_r: float) -> Tuple[int, int]: - cos_y, sin_y = math.cos(yaw_r), math.sin(yaw_r) - cos_p, sin_p = math.cos(pitch_r), math.sin(pitch_r) - x1: float = x * cos_y - z * sin_y - z1: float = x * sin_y + z * cos_y - y1: float = y * cos_p - z1 * sin_p - z2: float = y * sin_p + z1 * cos_p - scale: float = 1 + (z2 / width) - return int(center_x + x1 * scale), int(center_y + y1 * scale) - for angle in range(0, 360, 2): - rad: float = math.radians(angle) - x: float = radius * math.cos(rad) - y: float = radius * math.sin(rad) - x1, y1 = rotate_point(x, y, 0, pitch_rad, yaw_rad) - canvas.set(x1, y1) - for eye in [(-radius//2, -radius//3, 2), (radius//2, -radius//3, 2)]: - ex, ey, ez = eye - x1, y1 = rotate_point(ex, ey, ez, pitch_rad, yaw_rad) - for dx in [-1, 0, 1]: - for dy in [-1, 0, 1]: - canvas.set(x1 + dx, y1 + dy) - nx, ny = rotate_point(0, 0, 1, pitch_rad, yaw_rad) - for dx in [-1, 0, 1]: - for dy in [-1, 0, 1]: - canvas.set(nx + dx, ny + dy) - smile_depth: int = radius // 8 - mouth_local_y: int = radius // 4 - mouth_length: int = radius - for x_offset in range(-mouth_length // 2, mouth_length // 2 + 1): - norm: float = abs(x_offset) / (mouth_length / 2) - y_offset: int = int((1 - norm ** 2) * smile_depth) - local_x: int = x_offset - local_y: int = mouth_local_y + y_offset - mx, my = rotate_point(local_x, local_y, 0, pitch_rad, yaw_rad) - canvas.set(mx, my) - return canvas.frame() diff --git a/head-tracking/plot.py b/head-tracking/plot.py deleted file mode 100644 index b1ad79b..0000000 --- a/head-tracking/plot.py +++ /dev/null @@ -1,843 +0,0 @@ -import asciichartpy as acp -import logging -import matplotlib.pyplot as plt -import numpy as np -import os -import struct -import time -from bluetooth import BluetoothSocket -from colors import * -from connection_manager import ConnectionManager -from datetime import datetime as DateTime -from drawille import Canvas -from head_orientation import HeadOrientation -from logging import Logger, StreamHandler -from matplotlib.animation import FuncAnimation -from matplotlib.legend import Legend -from matplotlib.pyplot import Axes, Figure -from numpy.typing import NDArray -from rich.live import Live -from rich.layout import Layout -from rich.panel import Panel -from rich.console import Console -from threading import Lock, Thread -from typing import Any, Dict, List, Optional, TextIO, Tuple, Union - -handler: StreamHandler = StreamHandler() -handler.setFormatter(ColorFormatter()) -logger: Logger = logging.getLogger("airpods-head-tracking") -logger.setLevel(logging.INFO) -logger.addHandler(handler) -logger.propagate = True - -INIT_CMD: str = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00" -NOTIF_CMD: str = "04 00 04 00 0F 00 FF FF FE FF" -START_CMD: str = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00" -STOP_CMD: str = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00" - -KEY_FIELDS: Dict[str, Tuple[int, int]] = { - "orientation 1": (43, 2), - "orientation 2": (45, 2), - "orientation 3": (47, 2), - - "Horizontal Acceleration": (51, 2), - "Vertical Acceleration": (53, 2), - - "unkown 1": (61, 2), - "unkown 2 ": (49, 2), -} - -class AirPodsTracker: - def __init__(self) -> None: - self.sock: BluetoothSocket = None - self.recording: bool = False - self.log_file: Optional[TextIO] = None - self.listener_thread: Optional[Thread] = None - self.bt_addr: str = "28:2D:7F:C2:05:5B" - self.psm: int = 0x1001 - self.raw_packets: List[bytes] = [] - self.parsed_packets: List[bytes] = [] - self.live_data: List[bytes] = [] - self.live_plotting: bool = False - self.animation: FuncAnimation = None - self.fig: Optional[Figure] = None - self.axes: Optional[Axes] = None - self.lines: Dict[str, Any] = {} - self.selected_fields: List[str] = [] - self.data_lock: Lock = Lock() - self.orientation_offset: int = 5500 - self.use_terminal: bool = True # '--terminal' in sys.argv - self.orientation_visualizer: HeadOrientation = HeadOrientation(use_terminal=self.use_terminal) - - self.conn: Optional[ConnectionManager] = None - - def connect(self): - try: - logger.info("Trying to connect to %s on PSM 0x%04X...", self.bt_addr, self.psm) - self.conn = ConnectionManager(self.bt_addr, self.psm, logger=logger) - if not self.conn.connect(): - logger.error("Connection failed via ConnectionManager.") - return False - self.sock = self.conn.sock - self.sock.send(bytes.fromhex(NOTIF_CMD)) - logger.info("Sent initialization command.") - - self.listener_thread = Thread(target=self.listen, daemon=True) - self.listener_thread.start() - return True - except Exception as e: - logger.error("Connection error: %s", e) - return False - - def start_tracking(self, duration: Optional[float] = None) -> None: - if not self.recording: - self.conn.send_start() - filename: str = f"head_tracking_{DateTime.now().strftime('%Y%m%d_%H%M%S')}.log" - self.log_file = open(filename, "w") - self.recording = True - logger.info("Recording started. Saving data to %s", filename) - - if duration is not None and duration > 0: - def auto_stop() -> None: - time.sleep(duration) - if self.recording: - self.stop_tracking() - logger.info("Recording automatically stopped after %s seconds.", duration) - - timer_thread = Thread(target=auto_stop, daemon=True) - timer_thread.start() - logger.info("Will automatically stop recording after %s seconds.", duration) - else: - logger.info("Already recording.") - - def stop_tracking(self) -> None: - if self.recording: - self.conn.send_stop() - self.recording = False - if self.log_file is not None: - self.log_file.close() - self.log_file = None - logger.info("Recording stopped.") - else: - logger.info("Not currently recording.") - - def format_hex(self, data: bytes) -> str: - hex_str: str = data.hex() - return ' '.join(hex_str[i:i + 2] for i in range(0, len(hex_str), 2)) - - def parse_raw_packet(self, hex_string: str) -> bytes: - return bytes.fromhex(hex_string.replace(" ", "")) - - def interpret_bytes(self, raw_bytes: bytes, start: int, length: int, data_type: str = "signed_short") -> Optional[Union[int, float]]: - if start + length > len(raw_bytes): - return None - - match data_type: - case "signed_short": - return int.from_bytes(raw_bytes[start:start + 2], byteorder='little', signed=True) - case "unsigned_short": - return int.from_bytes(raw_bytes[start:start + 2], byteorder='little', signed=False) - case "signed_short_be": - return int.from_bytes(raw_bytes[start:start + 2], byteorder='big', signed=True) - case "float_le": - if start + 4 <= len(raw_bytes): - return struct.unpack('f', raw_bytes[start:start + 4])[0] - case _: - return None - - def normalize_orientation(self, value: Optional[Union[int, float]], field_name: str) -> Optional[Union[int, float]]: - if 'orientation' in field_name.lower(): - return value + self.orientation_offset - - return value - - def parse_packet_all_fields(self, raw_bytes: bytes) -> Dict[str, Union[int, float]]: - packet: Dict[str, Union[int, float]] = {} - - packet["seq_num"] = int.from_bytes(raw_bytes[12:14], byteorder='little') - - for field_name, (start, length) in KEY_FIELDS.items(): - if field_name == "float_val" and start + 4 <= len(raw_bytes): - packet[field_name] = self.interpret_bytes(raw_bytes, start, 4, "float_le") - else: - raw_value = self.interpret_bytes(raw_bytes, start, length, "signed_short") - if raw_value is not None: - packet[field_name] = self.normalize_orientation(raw_value, field_name) - - for i in range(30, min(90, len(raw_bytes) - 1), 2): - field_name: str = f"byte_{i:02d}" - raw_value: Optional[Union[int, float]] = self.interpret_bytes(raw_bytes, i, 2, "signed_short") - if raw_value is not None: - packet[field_name] = self.normalize_orientation(raw_value, field_name) - - return packet - - def apply_dark_theme(self, fig: Figure, axes: List[Axes]) -> None: - fig.patch.set_facecolor('#1e1e1e') - for ax in axes: - ax.set_facecolor('#2d2d2d') - - ax.title.set_color('white') - ax.xaxis.label.set_color('white') - ax.yaxis.label.set_color('white') - ax.tick_params(colors='white') - ax.tick_params(axis='x', colors='white') - ax.tick_params(axis='y', colors='white') - - ax.grid(True, color='#555555', alpha=0.3, linestyle='--') - - for spine in ax.spines.values(): - spine.set_color('#555555') - - legend: Optional[Legend] = ax.get_legend() - if (legend): - legend.get_frame().set_facecolor('#2d2d2d') - legend.get_frame().set_alpha(0.7) - for text in legend.get_texts(): - text.set_color('white') - - def listen(self) -> None: - while True: - try: - data: bytes = self.sock.recv(1024) - formatted: str = self.format_hex(data) - timestamp: str = DateTime.now().isoformat() - - is_valid: bool = self.is_valid_tracking_packet(formatted) - - if not self.live_plotting: - if is_valid: - logger.info("%s - Response: %s...", timestamp, formatted[:60]) - else: - logger.info("%s - Skipped non-tracking packet.", timestamp) - - if is_valid: - if self.recording and self.log_file is not None: - self.log_file.write(formatted + "\n") - self.log_file.flush() - - try: - raw_bytes: bytes = self.parse_raw_packet(formatted) - packet: Dict[str, Union[int, float]] = self.parse_packet_all_fields(raw_bytes) - - with self.data_lock: - self.live_data.append(packet) - if len(self.live_data) > 300: - self.live_data.pop(0) - - except Exception as e: - logger.error(f"Error parsing packet: {e}") - - except Exception as e: - logger.error("Error receiving data: %s", e) - break - - def load_log_file(self, filepath: str) -> bool: - self.raw_packets = [] - self.parsed_packets = [] - try: - with open(filepath, 'r') as f: - for line in f: - line = line.strip() - if line: - try: - raw_bytes: bytes = self.parse_raw_packet(line) - self.raw_packets.append(raw_bytes) - packet: Dict[str, Union[int, float]] = self.parse_packet_all_fields(raw_bytes) - - min_seq_num: int = min( - [parsed_packet["seq_num"] for parsed_packet in self.parsed_packets], default=0 - ) - - if packet["seq_num"] > min_seq_num: - self.parsed_packets.append(packet) - - except Exception as e: - logger.error(f"Error parsing line: {e}") - - logger.info(f"Loaded {len(self.parsed_packets)} packets from {filepath}") - return True - except Exception as e: - logger.error(f"Error loading log file: {e}") - return False - - def extract_field_values(self, field_name: str, data_source: str = 'loaded') -> List[Union[int, float]]: - if data_source == 'loaded': - data: List[Dict[str, Union[int, float]]] = self.parsed_packets - else: - with self.data_lock: - data: List[Dict[str, Union[int, float]]] = self.live_data.copy() - - values: List[Union[int, float]] = [packet.get(field_name, 0) for packet in data if field_name in packet] - - if data_source == 'live' and len(values) > 5: - try: - values: NDArray[Any] = np.array(values, dtype=float) - values = np.convolve(values, np.ones(5) / 5, mode='valid') - except Exception as e: - logger.warning(f"Smoothing error (non-critical): {e}") - - return values - - def is_valid_tracking_packet(self, hex_string: str) -> bool: - standard_header: str = "04 00 04 00 17 00 00 00 10 00" - - if not hex_string.startswith(standard_header): - if self.live_plotting: - logger.warning("Invalid packet header: %s", hex_string[:30]) - return False - - if len(hex_string.split()) < 80: - if self.live_plotting: - logger.warning("Invalid packet length: %s", hex_string[:30]) - return False - - return True - - - def plot_fields(self, field_names: Optional[List[str]] = None) -> None: - if not self.parsed_packets: - logger.error("No data to plot. Load a log file first.") - return - - if field_names is None: - field_names: List[str] = list(KEY_FIELDS.keys()) - - if not self.orientation_visualizer.calibration_complete: - if len(self.parsed_packets) < self.orientation_visualizer.calibration_sample_count: - logger.error("Not enough packets for calibration. Need at least 10 packets.") - return - for packet in self.parsed_packets[:self.orientation_visualizer.calibration_sample_count]: - self.orientation_visualizer.add_calibration_sample([ - packet.get('orientation 1', 0), - packet.get('orientation 2', 0), - packet.get('orientation 3', 0) - ]) - - if self.use_terminal: - self._plot_fields_terminal(field_names) - - else: - acceleration_fields: List[str] = [f for f in field_names if 'acceleration' in f.lower()] - orientation_fields: List[str] = [f for f in field_names if 'orientation' in f.lower()] - other_fields: List[str] = [f for f in field_names if f not in acceleration_fields + orientation_fields] - - fig, axes = plt.subplots(3, 1, figsize=(14, 12), sharex=True) - self.apply_dark_theme(fig, axes) - - acceleration_colors: List[str] = ['#FFFF00', '#00FFFF'] - orientation_colors: List[str] = ['#FF00FF', '#00FF00', '#FFA500'] - other_colors: List[str] = ['#52b788', '#f4a261', '#e76f51', '#2a9d8f'] - - if acceleration_fields: - for i, field in enumerate(acceleration_fields): - values = self.extract_field_values(field) - axes[0].plot(values, label=field, color=acceleration_colors[i % len(acceleration_colors)], linewidth=2) - axes[0].set_title("Acceleration Data", fontsize=14) - axes[0].legend() - - if orientation_fields: - for i, field in enumerate(orientation_fields): - values = self.extract_field_values(field) - axes[1].plot(values, label=field, color=orientation_colors[i % len(orientation_colors)], linewidth=2) - axes[1].set_title("Orientation Data", fontsize=14) - axes[1].legend() - - if other_fields: - for i, field in enumerate(other_fields): - values = self.extract_field_values(field) - axes[2].plot(values, label=field, color=other_colors[i % len(other_colors)], linewidth=2) - axes[2].set_title("Other Fields", fontsize=14) - axes[2].legend() - - plt.xlabel("Packet Index", fontsize=12) - plt.tight_layout() - plt.show() - - def _plot_fields_terminal(self, field_names: List[str]) -> None: - """Internal method for terminal-based plotting""" - terminal_width: int = os.get_terminal_size().columns - plot_width: int = min(terminal_width - 10, 120) - plot_height: int = 15 - - acceleration_fields: List[str] = [f for f in field_names if 'acceleration' in f.lower()] - orientation_fields: List[str] = [f for f in field_names if 'orientation' in f.lower()] - other_fields: List[str] = [f for f in field_names if f not in acceleration_fields + orientation_fields] - - def plot_group(fields: List[str], title: str) -> None: - if not fields: - return - - print(f"\n{title}") - print("=" * len(title)) - - for field in fields: - values: List[float] = self.extract_field_values(field) - if len(values) > plot_width: - values = values[-plot_width:] - - if title == "Acceleration Data": - chart: str = acp.plot(values, {'height': plot_height}) - print(chart) - else: - chart: str = acp.plot(values, {'height': plot_height}) - print(chart) - - print(f"Min: {min(values):.2f}, Max: {max(values):.2f}, " + f"Mean: {np.mean(values):.2f}") - print() - - plot_group(acceleration_fields, "Acceleration Data") - plot_group(orientation_fields, "Orientation Data") - plot_group(other_fields, "Other Fields") - - def create_braille_plot(self, values: List[float], width: int = 80, height: int = 20, y_label: bool = True, fixed_y_min: Optional[float] = None, fixed_y_max: Optional[float] = None) -> str: - canvas: Canvas = Canvas() - if fixed_y_min is None or fixed_y_max is None: - local_min, local_max = min(values), max(values) - else: - local_min, local_max = fixed_y_min, fixed_y_max - y_range: float = local_max - local_min or 1 - x_step: int = max(1, len(values) // width) - for i, v in enumerate(values[::x_step]): - y: int = int(((v - local_min) / y_range) * (height * 2 - 1)) - canvas.set(i, y) - frame: str = canvas.frame() - if y_label: - lines: List[str] = frame.split('\n') - labeled_lines: List[str] = [] - for idx, line in enumerate(lines): - if idx == 0: - labeled_lines.append(f"{local_max:6.0f} {line}") - elif idx == len(lines)-1: - labeled_lines.append(f"{local_min:6.0f} {line}") - else: - labeled_lines.append(" " + line) - frame = "\n".join(labeled_lines) - return frame - - def _start_live_plotting_terminal(self, record_data: bool = False, duration: Optional[float] = None) -> None: - import sys, select, tty, termios - old_settings = termios.tcgetattr(sys.stdin) - tty.setcbreak(sys.stdin.fileno()) - console: Console = Console() - term_width: int = console.width - plot_width: int = round(min(term_width / 2 - 15, 120)) - ori_height: int = 10 - - def make_compact_layout() -> Layout: - layout: Layout = Layout() - layout.split_column( - Layout(name="header", size=3), - Layout(name="main", ratio=1), - ) - layout["main"].split_row( - Layout(name="accelerations", ratio=1), - Layout(name="orientations", ratio=1) - ) - layout["accelerations"].split_column( - Layout(name="vertical", ratio=1), - Layout(name="horizontal", ratio=1) - ) - layout["orientations"].split_column( - Layout(name="face", ratio=1), - Layout(name="raw", ratio=1) - ) - return layout - - layout: Layout = make_compact_layout() - - try: - import time - with Live(layout, refresh_per_second=20, screen=True) as live: - while True: - if sys.stdin in select.select([sys.stdin], [], [], 0)[0]: - ch = sys.stdin.read(1) - if ch == 'p': - self.paused = not self.paused - logger.info("Paused" if self.paused else "Resumed") - if self.paused: - time.sleep(0.1) - rec_str: str = " [red][REC][/red]" if record_data else "" - left: str = "AirPods Head Tracking - v1.0.0" - right: str = "Ctrl+C - Close | p - Pause" + rec_str - status: str = "[bold red]Paused[/bold red]" - header: List[str] = list(" " * term_width) - header[0:len(left)] = list(left) - header[term_width - len(right):] = list(right) - start: int = (term_width - len(status)) // 2 - header[start:start+len(status)] = list(status) - header_text: str = "".join(header) - layout["header"].update(Panel(header_text, style="bold white on black")) - continue - - with self.data_lock: - if len(self.live_data) < 1: - continue - latest: Dict[str, float] = self.live_data[-1] - data: List[Dict[str, float]] = self.live_data[-plot_width:] - - if not self.orientation_visualizer.calibration_complete: - sample: List[float] = [ - latest.get('orientation 1', 0), - latest.get('orientation 2', 0), - latest.get('orientation 3', 0) - ] - self.orientation_visualizer.add_calibration_sample(sample) - time.sleep(0.05) - rec_str: str = " [red][REC][/red]" if record_data else "" - - left: str = "AirPods Head Tracking - v1.0.0" - status: str = "[bold yellow]Calibrating...[/bold yellow]" - right: str = "Ctrl+C - Close | p - Pause" - remaining: int = max(term_width - len(left) - len(right), 0) - header_text: str = f"{left}{status.center(remaining)}{right}{rec_str}" - layout["header"].update(Panel(header_text, style="bold white on black")) - live.refresh() - continue - - o1: float = latest.get('orientation 1', 0) - o2: float = latest.get('orientation 2', 0) - o3: float = latest.get('orientation 3', 0) - orientation: Dict[str, float] = self.orientation_visualizer.calculate_orientation(o1, o2, o3) - pitch: float = orientation['pitch'] - yaw: float = orientation['yaw'] - - h_accel: List[float] = [p.get('Horizontal Acceleration', 0) for p in data] - v_accel: List[float] = [p.get('Vertical Acceleration', 0) for p in data] - if len(h_accel) > plot_width: - h_accel = h_accel[-plot_width:] - if len(v_accel) > plot_width: - v_accel = v_accel[-plot_width:] - global_min: float = min(min(v_accel), min(h_accel)) - global_max: float = max(max(v_accel), max(h_accel)) - config_acc: Dict[str, float] = {'height': 20, 'min': global_min, 'max': global_max} - vert_plot: str = acp.plot(v_accel, config_acc) - horiz_plot: str = acp.plot(h_accel, config_acc) - - rec_str: str = " [red][REC][/red]" if record_data else "" - left: str = "AirPods Head Tracking - v1.0.0" - right: str = "Ctrl+C - Close | p - Pause" + rec_str - status: str = "[bold green]Live[/bold green]" - header: List[str] = list(" " * term_width) - header[0:len(left)] = list(left) - header[term_width - len(right):] = list(right) - start: int = (term_width - len(status)) // 2 - header[start:start+len(status)] = list(status) - header_text: str = "".join(header) - layout["header"].update(Panel(header_text, style="bold white on black")) - - face_art: str = self.orientation_visualizer.create_face_art(pitch, yaw) - layout["accelerations"]["vertical"].update(Panel( - "[bold yellow]Vertical Acceleration[/]\n" + - vert_plot + "\n" + - f"Cur: {v_accel[-1]:6.1f} | Min: {min(v_accel):6.1f} | Max: {max(v_accel):6.1f}", - style="yellow" - )) - layout["accelerations"]["horizontal"].update(Panel( - "[bold cyan]Horizontal Acceleration[/]\n" + - horiz_plot + "\n" + - f"Cur: {h_accel[-1]:6.1f} | Min: {min(h_accel):6.1f} | Max: {max(h_accel):6.1f}", - style="cyan" - )) - layout["orientations"]["face"].update(Panel(face_art, title="[green]Orientation - Visualization[/]", style="green")) - - o2_values: List[float] = [p.get('orientation 2', 0) for p in data[-plot_width:]] - o3_values: List[float] = [p.get('orientation 3', 0) for p in data[-plot_width:]] - o2_values: List[float] = o2_values[:plot_width] - o3_values: List[float] = o3_values[:plot_width] - common_min: float = min(min(o2_values), min(o3_values)) - common_max: float = max(max(o2_values), max(o3_values)) - config_ori: Dict[str, float] = {'height': ori_height, 'min': common_min, 'max': common_max, 'format': "{:6.0f}"} - chart_o2: str = acp.plot(o2_values, config_ori) - chart_o3: str = acp.plot(o3_values, config_ori) - layout["orientations"]["raw"].update(Panel( - "[bold yellow]Orientation 1:[/]\n" + chart_o2 + "\n" + - f"Cur: {o2_values[-1]:6.1f} | Min: {min(o2_values):6.1f} | Max: {max(o2_values):6.1f}\n\n" + - "[bold green]Orientation 2:[/]\n" + chart_o3 + "\n" + - f"Cur: {o3_values[-1]:6.1f} | Min: {min(o3_values):6.1f} | Max: {max(o3_values):6.1f}", - title="[cyan]Orientation Raw[/]", style="yellow" - )) - live.refresh() - time.sleep(0.05) - except KeyboardInterrupt: - logger.info("\nStopped.") - if record_data: - self.stop_tracking() - else: - if self.sock: - self.sock.send(bytes.fromhex(STOP_CMD)) - finally: - termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) - - def _start_live_plotting(self, record_data: bool = False, duration: Optional[float] = None) -> None: - terminal_width: int = os.get_terminal_size().columns - plot_width: int = min(terminal_width - 10, 80) - plot_height: int = 10 - - try: - while True: - os.system('clear' if os.name == 'posix' else 'cls') - with self.data_lock: - if len(self.live_data) == 0: - print("\nWaiting for data...") - time.sleep(0.1) - continue - - data: List[Dict[str, float]] = self.live_data[-plot_width:] - - acceleration_fields: List[str] = [f for f in KEY_FIELDS.keys() if 'acceleration' in f.lower()] - orientation_fields: List[str] = [f for f in KEY_FIELDS.keys() if 'orientation' in f.lower()] - other_fields: List[str] = [f for f in KEY_FIELDS.keys() if f not in acceleration_fields + orientation_fields] - - def plot_group(fields: List[str], title: str) -> None: - if not fields: - return - - print(f"\n{title}") - print("=" * len(title)) - - for field in fields: - values: List[float] = [packet.get(field, 0) for packet in data if field in packet] - if len(values) > 0: - chart: str = acp.plot(values, {'height': plot_height}) - print(chart) - print(f"Current: {values[-1]:.2f}, " + - f"Min: {min(values):.2f}, Max: {max(values):.2f}") - print() - - plot_group(acceleration_fields, "Acceleration Data") - plot_group(orientation_fields, "Orientation Data") - plot_group(other_fields, "Other Fields") - - print("\nPress Ctrl+C to stop plotting") - time.sleep(0.1) - - except KeyboardInterrupt: - logger.info("\nLive plotting stopped.") - self.sock.send(bytes.fromhex(STOP_CMD)) - if record_data: - self.stop_tracking() - self.live_plotting = False - - def start_live_plotting(self, record_data: bool = False, duration: Optional[float] = None) -> None: - if self.sock is None: - if not self.connect(): - logger.error("Could not connect to AirPods. Live plotting aborted.") - return - if not self.recording and record_data: - self.start_tracking(duration) - logger.info("Recording enabled during live plotting") - elif not self.recording: - self.sock.send(bytes.fromhex(START_CMD)) - logger.info("Head tracking started (not recording to file)") - with self.data_lock: - self.live_data = [] - self.live_plotting = True - self.paused = False - if self.use_terminal: - self._start_live_plotting_terminal(record_data, duration) - else: - from matplotlib.gridspec import GridSpec, GridSpecFromSubplotSpec - fig: Figure = plt.figure(figsize=(14, 6)) - gs: GridSpec = GridSpec(1, 2, width_ratios=[1, 1]) - ax_accel: Axes = fig.add_subplot(gs[0]) - subgs: GridSpecFromSubplotSpec = GridSpecFromSubplotSpec(2, 1, subplot_spec=gs[1], height_ratios=[2, 1]) - ax_head_top: Axes = fig.add_subplot(subgs[0], projection='3d') - ax_ori: Axes = fig.add_subplot(subgs[1]) - - ax_accel.set_title("Acceleration Data") - ax_accel.set_xlabel("Packet Index") - ax_accel.set_ylabel("Acceleration") - ax_accel.legend(loc='upper right', framealpha=0.7) - fig.patch.set_facecolor('#1e1e1e') - ax_accel.set_facecolor('#2d2d2d') - self.apply_dark_theme(fig, [ax_accel, ax_head_top, ax_ori]) - plt.ion() - - def update_plot(_: int) -> None: - with self.data_lock: - data: List[Dict[str, float]] = self.live_data.copy() - if len(data) == 0: - return - - latest: Dict[str, float] = data[-1] - - if not self.orientation_visualizer.calibration_complete: - sample: List[float] = [ - latest.get('orientation 1', 0), - latest.get('orientation 2', 0), - latest.get('orientation 3', 0) - ] - self.orientation_visualizer.add_calibration_sample(sample) - ax_head_top.cla() - ax_head_top.text(0.5, 0.5, "Calibrating... please wait", horizontalalignment='center', verticalalignment='center', transform=ax_head_top.transAxes, color='white') - fig.canvas.draw_idle() - return - - h_accel: List[float] = [p.get('Horizontal Acceleration', 0) for p in data] - v_accel: List[float] = [p.get('Vertical Acceleration', 0) for p in data] - x_vals: List[int] = list(range(len(h_accel))) - ax_accel.cla() - ax_accel.plot(x_vals, v_accel, label='Vertical Acceleration', color='#FFFF00', linewidth=2) - ax_accel.plot(x_vals, h_accel, label='Horizontal Acceleration', color='#00FFFF', linewidth=2) - ax_accel.set_title("Acceleration Data") - ax_accel.set_xlabel("Packet Index") - ax_accel.set_ylabel("Acceleration") - ax_accel.legend(loc='upper right', framealpha=0.7) - ax_accel.set_facecolor('#2d2d2d') - ax_accel.title.set_color('white') - ax_accel.xaxis.label.set_color('white') - ax_accel.yaxis.label.set_color('white') - - latest: Dict[str, float] = data[-1] - o1: float = latest.get('orientation 1', 0) - o2: float = latest.get('orientation 2', 0) - o3: float = latest.get('orientation 3', 0) - orientation: Dict[str, float] = self.orientation_visualizer.calculate_orientation(o1, o2, o3) - pitch: float = orientation['pitch'] - yaw: float = orientation['yaw'] - - ax_head_top.cla() - ax_head_top.set_title("Head Orientation") - ax_head_top.set_xlim([-1, 1]) - ax_head_top.set_ylim([-1, 1]) - ax_head_top.set_zlim([-1, 1]) - ax_head_top.set_facecolor('#2d2d2d') - pitch_rad = np.radians(pitch) - yaw_rad = np.radians(yaw) - Rz: NDArray[Any] = np.array([ - [np.cos(yaw_rad), np.sin(yaw_rad), 0], - [-np.sin(yaw_rad), np.cos(yaw_rad), 0], - [0, 0, 1] - ]) - Ry: NDArray[Any] = np.array([ - [np.cos(pitch_rad), 0, np.sin(pitch_rad)], - [0, 1, 0], - [-np.sin(pitch_rad), 0, np.cos(pitch_rad)] - ]) - R: NDArray[Any] = Rz @ Ry - dir_vec: NDArray[Any] = R @ np.array([1, 0, 0]) - ax_head_top.quiver(0, 0, 0, dir_vec[0], dir_vec[1], dir_vec[2], - color='r', length=0.8, linewidth=3) - - ax_ori.cla() - o2_values: List[float] = [p.get('orientation 2', 0) for p in data] - o3_values: List[float] = [p.get('orientation 3', 0) for p in data] - x_range: List[int] = list(range(len(o2_values))) - ax_ori.plot(x_range, o2_values, label='Orientation 1', color='red', linewidth=2) - ax_ori.plot(x_range, o3_values, label='Orientation 2', color='green', linewidth=2) - ax_ori.set_facecolor('#2d2d2d') - ax_ori.tick_params(colors='white') - ax_ori.set_title("Orientation Raw") - ax_ori.legend(facecolor='#2d2d2d', edgecolor='#555555', - labelcolor='white', loc='upper right') - ax_ori.text(0.95, 0.9, f"Pitch: {pitch:.1f}°\nYaw: {yaw:.1f}°", - transform=ax_ori.transAxes, color='white', - ha='right', va='top', bbox=dict(facecolor='#2d2d2d', alpha=0.5)) - fig.canvas.draw_idle() - self.animation = FuncAnimation( - fig, update_plot, - interval=20, - blit=False, - cache_frame_data=False - ) - plt.show(block=True) - self.sock.send(bytes.fromhex(STOP_CMD)) - logger.info("Stopping head tracking AirPods.") - if self.recording and record_data: - self.stop_tracking() - logger.info("Recording stopped after sending close command") - else: - logger.info("Live plotting ended (no recording to stop).") - self.live_plotting = False - self.animation = None - plt.ioff() - - def interactive_mode(self) -> None: - from prompt_toolkit import PromptSession - session: PromptSession = PromptSession("> ") - logger.info("\nAirPods Head Tracking Analyzer") - print("------------------------------") - logger.info("Commands:") - print(" connect - connect to your AirPods") - print(" start [seconds] - start recording head tracking data, optionally for specified duration") - print(" stop - stop recording") - print(" load - load and parse a log file") - print(" plot - plot all sensor data fields") - print(" live [seconds] - start live plotting (without recording), optionally stop recording after seconds") - print(" liver [seconds] - start live plotting with recording, optionally stop recording after seconds") - print(" gestures - start gesture detection") - print(" quit - exit the program") - - while True: - try: - cmd_input: str = session.prompt("> ") - cmd_parts: List[str] = cmd_input.strip().split() - if not cmd_parts: - continue - cmd = cmd_parts[0].lower() - match cmd: - case "connect": - self.connect() - case "start": - duration = float(cmd_parts[1]) if len(cmd_parts) > 1 else None - self.start_tracking(duration) - case "stop": - self.stop_tracking() - case "load": - if len(cmd_parts) > 1: - self.load_log_file(cmd_parts[1]) - case "plot": - self.plot_fields() - case "live": - duration = float(cmd_parts[1]) if len(cmd_parts) > 1 else None - logger.info("Starting live plotting mode (without recording)%s.", - f" for {duration} seconds" if duration else "") - self.start_live_plotting(record_data=False, duration=duration) - case "liver": - duration = float(cmd_parts[1]) if len(cmd_parts) > 1 else None - logger.info("Starting live plotting mode WITH recording%s.", - f" for {duration} seconds" if duration else "") - self.start_live_plotting(record_data=True, duration=duration) - case "gestures": - from gestures import GestureDetector - if self.conn is not None: - detector: GestureDetector = GestureDetector(conn=self.conn) - else: - detector: GestureDetector = GestureDetector() - detector.start_detection() - case "quit": - logger.info("Exiting.") - if self.conn != None: - self.conn.disconnect() - break - case "help": - logger.info("\nAirPods Head Tracking Analyzer") - logger.info("------------------------------") - logger.info("Commands:") - logger.info(" connect - connect to your AirPods") - logger.info(" start [seconds] - start recording head tracking data, optionally for specified duration") - logger.info(" stop - stop recording") - logger.info(" load - load and parse a log file") - logger.info(" plot - plot all sensor data fields") - logger.info(" live [seconds] - start live plotting (without recording), optionally stop recording after seconds") - logger.info(" liver [seconds] - start live plotting with recording, optionally stop recording after seconds") - logger.info(" gestures - start gesture detection") - logger.info(" quit - exit the program") - case _: - logger.info("Unknown command. Type 'help' to see available commands.") - except KeyboardInterrupt: - logger.info("Use 'quit' to exit.") - except EOFError: - logger.info("Exiting.") - if self.conn != None: - self.conn.disconnect() - break - -if __name__ == "__main__": - import sys - tracker: AirPodsTracker = AirPodsTracker() - tracker.interactive_mode() diff --git a/head-tracking/requirements.txt b/head-tracking/requirements.txt deleted file mode 100644 index 0a66bdc..0000000 --- a/head-tracking/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -drawille -numpy -pybluez -matplotlib -asciichartpy -rich \ No newline at end of file