change the layout and fix f string bug on raspberry pi python 3

This commit is contained in:
William Bell
2025-12-26 19:47:30 +00:00
parent d069150c54
commit d86587e526
6 changed files with 371 additions and 253 deletions

View File

@@ -1,14 +1,11 @@
from jellyfin_apiclient_python import JellyfinClient
import json
import uuid
import subprocess
from dotenv import load_dotenv
import os
import time
import ffmpeg
import requests
import threading
from urllib.parse import urlencode, urljoin
import subprocess
import numpy as np
import sounddevice as sd
@@ -18,17 +15,67 @@ import sys
import io
import fcntl
from dataclasses import dataclass
import numpy as np
from collections import deque
from jelly import server, client
from urllib.parse import urlencode, urljoin
import requests
from pathlib import Path
import mimetypes
os.makedirs("logs", exist_ok=True)
def song_data_to_Song(data, client_data) -> Song:
# """
# Build a Jellyfin audio stream URL using urllib.parse.
# """
item_id = data["Id"]
path = f"/Audio/{item_id}/universal"
params = {
"UserId": client_data["UserId"],
"Container": "flac",
"AudioCodec": "flac", # <-- IMPORTANT
"api_key": client_data["AccessToken"],
}
query = urlencode(params)
url = urljoin(client_data["address"], path) + "?" + query
album_cover_url = urljoin(
client_data["address"], f"/Items/{data['AlbumId']}/Images/Primary"
)
r = requests.get(album_cover_url)
r.raise_for_status()
content_type = r.headers.get("Content-Type") # e.g. "image/jpeg"
ext = mimetypes.guess_extension(content_type) # ".jpg"
if ext is None:
ext = ".jpg" # safe fallback for album art
saved_path = Path("data", "images", data["AlbumId"] + ext).as_posix()
with open(saved_path, "wb") as f:
f.write(r.content)
return Song(
item_id,
url,
data["Name"],
data["RunTimeTicks"] / 10_000_000,
data["Album"],
saved_path,
data["AlbumArtist"],
)
#os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
os.makedirs("data/images", exist_ok=True)
@dataclass
class Song:
id: str
url: str
name: str
duration: float
@@ -38,18 +85,16 @@ class Song:
class GaplessPlayer:
def __init__(self, samplerate: int = 44100, channels: int = 2):
def __init__(self, samplerate: int = 96000, channels: int = 2):
self.samplerate = samplerate
self.channels = channels
self.next_preload_state = 0
self.closed = False
self.playing = False
self.position = 0.0
self.song_list = []
self.song_list: list[Song] = []
self.current_song_in_list = -1
@@ -62,6 +107,7 @@ class GaplessPlayer:
callback=self._callback,
)
self.stream.start()
self.oscilloscope_data_points = deque(maxlen=samplerate//60)
def get_current_song(self):
if self.current_song_in_list >= 0 and self.current_song_in_list < len(
@@ -92,6 +138,8 @@ class GaplessPlayer:
stderr=subprocess.PIPE,
)
print("yo")
# --- make stdout non-blocking ---
fd = proc.stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
@@ -169,7 +217,7 @@ class GaplessPlayer:
if song:
song.ffmpeg = self._open_ffmpeg(song)
song.preload_state = 2
def preload_next_threaded(self):
next_song = self.get_next_song()
if not next_song or next_song.preload_state:
@@ -200,6 +248,13 @@ class GaplessPlayer:
self.position += len(data) / (self.samplerate * self.channels * 2)
if self.position >= current_song.duration - 10:
self.preload_next_threaded()
else:
next_song = self.get_next_song()
if next_song and next_song.ffmpeg:
if next_song.ffmpeg.poll() is None:
next_song.ffmpeg.kill()
next_song.ffmpeg = None
next_song.preload_state = 0
if current_song.ffmpeg.poll() is not None and len(data) < needed:
if round(self.position, 2) >= current_song.duration - 0.1:
self._start_next()
@@ -223,76 +278,52 @@ class GaplessPlayer:
)
data += new_data
else:
# if current_song.ffmpeg and current_song.ffmpeg.poll() is not None:
# current_song.ffmpeg.kill()
# current_song.ffmpeg = None
current_song.ffmpeg = self._open_ffmpeg(
current_song, self.position
)
samples = np.frombuffer(data, dtype=np.int16)
left = samples[0::2]
right = samples[1::2]
norm = 32769.0
x = left / norm
y = right / norm
points = list(zip(x, y))
# step = max(1, len(points) // 1000)
# points = points[::step]
self.oscilloscope_data_points.extend(points)
outdata[: len(data)] = data
outdata[len(data) :] = b"\x00" * (needed - len(data))
def save_state(self, path):
with open(path,"w") as f:
data = {
"queue": [song.id for song in self.song_list],
"current_song": self.current_song_in_list,
"position": self.position
}
json.dump(data, f)
def load_state(self, path):
try:
with open(path,"r") as f:
data = json.load(f)
self.song_list = []
for song in data["queue"]:
songOBJ = song_data_to_Song(client.jellyfin.get_item(song), server)
songOBJ.ffmpeg = None
songOBJ.preload_state = 0
self.song_list.append(songOBJ)
self.current_song_in_list = data['current_song']
self.seek(data['position'])
except:
return
album_covers = []
def song_data_to_Song(data, client_data) -> Song:
# """
# Build a Jellyfin audio stream URL using urllib.parse.
# """
item_id = data["Id"]
path = f"/Audio/{item_id}/universal"
params = {
"UserId": client_data["UserId"],
"Container": "flac",
"AudioCodec": "flac", # <-- IMPORTANT
"api_key": client_data["AccessToken"],
}
query = urlencode(params)
url = urljoin(client_data["address"], path) + "?" + query
album_cover_url = urljoin(
client_data["address"], f"/Items/{data["AlbumId"]}/Images/Primary"
)
r = requests.get(album_cover_url)
r.raise_for_status()
content_type = r.headers.get("Content-Type") # e.g. "image/jpeg"
ext = mimetypes.guess_extension(content_type) # ".jpg"
if ext is None:
ext = ".jpg" # safe fallback for album art
saved_path = Path("data", "images", data["AlbumId"] + ext).as_posix()
with open(saved_path, "wb") as f:
f.write(r.content)
return Song(
url,
data["Name"],
data["RunTimeTicks"] / 10_000_000,
data["Album"],
saved_path,
data["AlbumArtist"],
)
client = JellyfinClient()
load_dotenv()
client.config.app("UgPod", "0.0.1", "UgPod prototype", "UgPod_prototype_1")
client.config.data["auth.ssl"] = True
client.auth.connect_to_address(os.getenv("host"))
client.auth.login(os.getenv("URL"), os.getenv("username"), os.getenv("password"))
credentials = client.auth.credentials.get_credentials()
server = credentials["Servers"][0]
print(json.dumps(server))
# while True:
# duration = player.playback_info_to_duration(player.playback_info)