mirror of
https://github.com/kavishdevar/librepods.git
synced 2026-01-28 22:01:50 +00:00
140 lines
5.7 KiB
Python
140 lines
5.7 KiB
Python
import socket
|
|
import json
|
|
import subprocess
|
|
from aln.Notifications import Battery
|
|
import threading
|
|
import time
|
|
import os
|
|
import logging
|
|
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# Colorful logging
|
|
logging.addLevelName(logging.DEBUG, "\033[1;34m%s\033[1;0m" % logging.getLevelName(logging.DEBUG))
|
|
logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO))
|
|
logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
|
|
logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
|
|
logging.addLevelName(logging.CRITICAL, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL))
|
|
|
|
SOCKET_PATH = "/tmp/airpods_daemon.sock"
|
|
|
|
class MediaController:
|
|
def __init__(self):
|
|
self.earStatus = "Both out"
|
|
self.wasMusicPlayingInSingle = False
|
|
self.wasMusicPlayingInBoth = False
|
|
self.firstEarOutTime = 0
|
|
self.stop_thread_event = threading.Event()
|
|
|
|
def playMusic(self):
|
|
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
|
|
|
|
def pauseMusic(self):
|
|
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
|
|
|
|
def isPlaying(self):
|
|
return subprocess.check_output(["playerctl", "status", "--player", "spotify"]).decode("utf-8").strip() == "Playing"
|
|
|
|
def handlePlayPause(self, data):
|
|
primary_status = data[0]
|
|
secondary_status = data[1]
|
|
|
|
logging.debug(f"Handle play/pause called with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
|
|
|
|
def delayed_action(s):
|
|
if not self.stop_thread_event.is_set():
|
|
if self.wasMusicPlayingInSingle:
|
|
self.playMusic()
|
|
self.wasMusicPlayingInBoth = False
|
|
elif self.wasMusicPlayingInBoth or s:
|
|
self.wasMusicPlayingInBoth = True
|
|
self.wasMusicPlayingInSingle = False
|
|
|
|
if primary_status and secondary_status:
|
|
if self.earStatus != "Both out":
|
|
s = self.isPlaying()
|
|
self.pauseMusic()
|
|
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
|
|
if self.earStatus == "Only one in":
|
|
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
|
|
self.wasMusicPlayingInSingle = True
|
|
self.wasMusicPlayingInBoth = True
|
|
self.stop_thread_event.set()
|
|
else:
|
|
if s:
|
|
self.wasMusicPlayingInSingle = True
|
|
else:
|
|
self.wasMusicPlayingInSingle = False
|
|
elif self.earStatus == "Both in":
|
|
s = self.isPlaying()
|
|
if s:
|
|
self.wasMusicPlayingInBoth = True
|
|
self.wasMusicPlayingInSingle = False
|
|
else:
|
|
self.wasMusicPlayingInSingle = False
|
|
self.earStatus = "Both out"
|
|
return "Both out"
|
|
elif not primary_status and not secondary_status:
|
|
if self.earStatus != "Both in":
|
|
if self.earStatus == "Both out":
|
|
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
|
|
elif self.earStatus == "Only one in":
|
|
self.stop_thread_event.set()
|
|
s = self.isPlaying()
|
|
if s:
|
|
self.wasMusicPlayingInBoth = True
|
|
if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth:
|
|
self.playMusic()
|
|
self.wasMusicPlayingInBoth = True
|
|
self.wasMusicPlayingInSingle = False
|
|
self.earStatus = "Both in"
|
|
return "Both in"
|
|
elif (primary_status and not secondary_status) or (not primary_status and secondary_status):
|
|
if self.earStatus != "Only one in":
|
|
self.stop_thread_event.clear()
|
|
s = self.isPlaying()
|
|
self.pauseMusic()
|
|
delayed_thread = threading.Timer(0.3, delayed_action, args=[s])
|
|
delayed_thread.start()
|
|
self.firstEarOutTime = time.time()
|
|
if self.earStatus == "Both out":
|
|
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
|
|
self.earStatus = "Only one in"
|
|
return "Only one in"
|
|
|
|
def read():
|
|
"""Send a command to the daemon via UNIX domain socket."""
|
|
client_socket = None
|
|
try:
|
|
# Create a socket connection to the daemon
|
|
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
logging.info("Connecting to daemon...")
|
|
client_socket.connect(SOCKET_PATH)
|
|
|
|
media_controller = MediaController()
|
|
|
|
# Receive data
|
|
while True:
|
|
d = client_socket.recv(1024)
|
|
if d:
|
|
try:
|
|
data: dict = json.loads(d.decode('utf-8'))
|
|
if data["type"] == "ear_detection":
|
|
media_controller.handlePlayPause([data['primary'], data['secondary']])
|
|
except json.JSONDecodeError as e:
|
|
logging.error(f"Error deserializing data: {e}")
|
|
else:
|
|
break
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error communicating with daemon: {e}")
|
|
finally:
|
|
if client_socket:
|
|
client_socket.close()
|
|
logging.warning("Socket closed")
|
|
|
|
if __name__ == "__main__":
|
|
read()
|