This commit is contained in:
William Bell
2025-12-27 16:45:42 +00:00
8 changed files with 940 additions and 419 deletions

1
.gitignore vendored
View File

@@ -176,3 +176,4 @@ cython_debug/
music
logs
data

382
app.py
View File

@@ -1,286 +1,164 @@
import pyray as pr
import math
from ctypes import c_float
from gapless_player import GaplessPlayer, build_jellyfin_audio_url, server, client
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
# --- Configuration Constants ---
INITIAL_SCREEN_WIDTH = 800
INITIAL_SCREEN_HEIGHT = 600
TARGET_FPS = 60
# Copyright (c) 2017 Adafruit Industries
# Author: James DeVito
# Ported to RGB Display by Melissa LeBlanc-Williams
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# --- State Variables ---
state = {
"screen_width": INITIAL_SCREEN_WIDTH,
"screen_height": INITIAL_SCREEN_HEIGHT,
"current_time": 120.0,
"total_time": 300.0,
"is_playing": True,
# 3D Camera State
"camera": None,
"render_texture": None,
# Assets
"album_texture": None,
"album_model": None
}
# --- Utility Functions ---
def format_time_mm_ss(seconds):
"""Converts a time in seconds to an 'MM:SS' string format."""
seconds = int(seconds)
minutes = seconds // 60
seconds_remainder = seconds % 60
return f"{minutes:02d}:{seconds_remainder:02d}"
# --- Dynamic Layout Functions ---
def get_3d_render_area(screen_width, screen_height):
ASPECT_WIDTH = 2.0
ASPECT_HEIGHT = 1.0
ASPECT_RATIO = ASPECT_WIDTH / ASPECT_HEIGHT
max_available_width = screen_width * 0.7
max_available_height = screen_height * 0.5
if (max_available_width / max_available_height) > ASPECT_RATIO:
height = max_available_height
width = height * ASPECT_RATIO
else:
width = max_available_width
height = width / ASPECT_RATIO
x = (screen_width - width) / 2
y = screen_height * 0.15
return pr.Rectangle(x, y, width, height)
def get_progress_bar_rect(screen_width, screen_height):
width = screen_width * 0.7
height = screen_height * 0.03
x = (screen_width - width) / 2
y = screen_height * 0.75
return pr.Rectangle(x, y, width, height)
def draw_progress_bar(rect, current_time, total_time):
if total_time > 0:
progress_ratio = current_time / total_time
else:
progress_ratio = 0.0
pr.draw_rectangle_rec(rect, pr.Color(100, 100, 100, 255))
progress_width = rect.width * progress_ratio
pr.draw_rectangle(int(rect.x), int(rect.y), int(progress_width), int(rect.height), pr.Color(200, 50, 50, 255))
pr.draw_rectangle_lines_ex(rect, 2, pr.Color(50, 50, 50, 255))
time_text = f"{format_time_mm_ss(current_time)} / {format_time_mm_ss(total_time)}"
text_width = pr.measure_text(time_text, int(rect.height * 0.7))
pr.draw_text(time_text,
int(rect.x + rect.width / 2 - text_width / 2),
int(rect.y + rect.height * 0.15),
int(rect.height * 0.7),
pr.WHITE)
# --- ASSET MANAGEMENT ---
def load_album_assets():
"""Loads the texture, creates the 3D model, and applies the flipped texture."""
# 1. Load Image
try:
image = pr.load_image("music/albumcover.png")
except:
print("WARNING: 'music/albumcover.png' not found. Using placeholder.")
image = pr.gen_image_checked(512, 512, 32, 32, pr.DARKGRAY, pr.WHITE)
# --- THE FIX: FLIP THE IMAGE VERTICALLY ---
pr.image_flip_vertical(image)
# 2. Create Texture
texture = pr.load_texture_from_image(image)
pr.unload_image(image)
# 3. Generate Mesh (CD Case)
mesh = pr.gen_mesh_cube(1.5, 1.5, 0.0)
# 4. Load Model
model = pr.load_model_from_mesh(mesh)
# 5. Apply Texture
# We use index 0 for the Albedo/Diffuse map
map_index = 0
# Use MATERIAL_MAP_ALBEDO if the binding is modern enough
if hasattr(pr.MaterialMapIndex, 'MATERIAL_MAP_ALBEDO'):
map_index = pr.MaterialMapIndex.MATERIAL_MAP_ALBEDO
model.materials[0].maps[map_index].texture = texture
return texture, model
# --- CORE 3D RENDERING ---
def setup_3d_environment(render_width, render_height):
camera = pr.Camera3D()
camera.position = pr.Vector3(0.0, -0.35, 4.0) # Moved back slightly to fit the new models
camera.target = pr.Vector3(0.0, 0.0, 0.0)
camera.up = pr.Vector3(0.0, 1.0, 0.0)
camera.fovy = 45.0
camera.projection = pr.CameraProjection.CAMERA_PERSPECTIVE
return camera
def draw_3d_cover_flow(camera, model):
# This example is for use on (Linux) computers that are using CPython with
# Adafruit Blinka to support CircuitPython libraries. CircuitPython does
# not support PIL/pillow (python imaging library)!
"""
Draws the textured model using the existing Matrix logic.
This example is for use on (Linux) computers that are using CPython with
Adafruit Blinka to support CircuitPython libraries. CircuitPython does
not support PIL/pillow (python imaging library)!
"""
pr.begin_mode_3d(camera)
# We use pr.WHITE as the tint so the texture shows its original colors.
# If you use pr.RED, the album cover will look red-tinted.
import random
import time
from colorsys import hsv_to_rgb
# --------------------------------------------------------
# 2. CURRENT ALBUM (Center)
# --------------------------------------------------------
# Draw model at (0,0,0) with 1.0 scale
pr.rl_push_matrix()
pr.rl_translatef(0.0, -0.0, 1.5) # Spaced out slightly more
pr.rl_rotatef(5.0, 1.0, 0.0, 0.0) # Sharper angle
pr.draw_model(model, pr.Vector3(0.0, 0.0, 0.0), 1.0, pr.WHITE)
pr.rl_pop_matrix()
import board
from digitalio import DigitalInOut, Direction
from PIL import Image, ImageDraw, ImageFont
from adafruit_rgb_display import st7789
import old_app
for i in range(-5, 0):
pr.rl_push_matrix()
pr.rl_translatef(-1.5+0.15*i, 0.0, 0.5) # Added slight Z offset for depth
pr.rl_rotatef(50.0, 0.0, 1.0, 0.0)
pr.draw_model(model, pr.Vector3(0.0, 0.0, 0.0), 1.0, pr.WHITE)
pr.rl_pop_matrix()
# Create the display
cs_pin = DigitalInOut(board.CE0)
dc_pin = DigitalInOut(board.D25)
reset_pin = DigitalInOut(board.D24)
BAUDRATE = 24000000
spi = board.SPI()
disp = st7789.ST7789(
spi,
height=240,
y_offset=80,
rotation=180,
cs=cs_pin,
dc=dc_pin,
rst=reset_pin,
baudrate=BAUDRATE,
)
# Input pins:
button_A = DigitalInOut(board.D5)
button_A.direction = Direction.INPUT
for i in range(1,6):
pr.rl_push_matrix()
pr.rl_translatef(1.5+0.15*i, 0.0, 0.5)
pr.rl_rotatef(-50.0, 0.0, 1.0, 0.0)
pr.draw_model(model, pr.Vector3(0.0, 0.0, 0.0), 1.0, pr.WHITE)
pr.rl_pop_matrix()
button_B = DigitalInOut(board.D6)
button_B.direction = Direction.INPUT
pr.end_mode_3d()
button_L = DigitalInOut(board.D27)
button_L.direction = Direction.INPUT
# --- Main Setup and Loop ---
button_R = DigitalInOut(board.D23)
button_R.direction = Direction.INPUT
# Initialization
pr.set_config_flags(pr.ConfigFlags.FLAG_WINDOW_RESIZABLE)
pr.set_config_flags(pr.FLAG_MSAA_4X_HINT)
pr.init_window(state["screen_width"], state["screen_height"], "UgPod")
pr.set_target_fps(TARGET_FPS)
button_U = DigitalInOut(board.D17)
button_U.direction = Direction.INPUT
player = GaplessPlayer()
button_D = DigitalInOut(board.D22)
button_D.direction = Direction.INPUT
print("add queue")
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("dab6efb24bb2372794d2b4fb53a12376")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("58822c0fc47ec63ba798ba4f04ea3cf3")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("6382005f9dbae8d187d80a5cdca3e7a6")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("a5d2453e07a4998ea20e957c44f90be6")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("398d481a7b85287ad200578b5ab997b0")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("f9f32ca67be7f83139cee3c66e1e4965")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("2f651e103b1fd22ea2f202d6f3398b36")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("164b95968ab1a725fff060fa8c351cc8")["Id"], server["AccessToken"], server["UserId"]))
button_C = DigitalInOut(board.D4)
button_C.direction = Direction.INPUT
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("38a6c21561f54d284a6acad89a3ea8b0")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("631aeddb0557fef65f49463abb20ad7f")["Id"], server["AccessToken"], server["UserId"]))
# Turn on the Backlight
backlight = DigitalInOut(board.D26)
backlight.switch_to_output()
backlight.value = True
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("3d611c8664c5b2072edbf46da2a76c89")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("66559c40d5904944a3f97198d0297894")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("84b75eeb5c8e862d002bae05d2671b1b")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("7ef66992426093252696e1d8666a22e4")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("f37982227942d3df031381e653ec5790")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("0e8fc5fcf119de0439f5a15a4f255c5c")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue('music/pink floyd/dark side of the moon/01 Speak to Me.flac')#(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("99067e877d91be1a66eb5a7ff2f4128f")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue('music/pink floyd/dark side of the moon/02 Breathe (In the Air).flac')#(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("916eda422f48efd8705f29e0600a3e60")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue('music/pink floyd/dark side of the moon/03 On the Run.flac')#(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("5e1067d59ed98979ad12a58548b27b83")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue('music/pink floyd/dark side of the moon/04 Time.flac')#(build_jellyfin_audio_url(server["address"], client.jellyfin.get_item("8bcf8240d12aa5c3b14dc3b57f32fef7")["Id"], server["AccessToken"], server["UserId"]))
player.add_to_queue('music/pink floyd/dark side of the moon/05 The Great Gig in the Sky.flac')
player.add_to_queue('music/pink floyd/dark side of the moon/06 Money.flac')
player.add_to_queue('music/pink floyd/dark side of the moon/07 Us and Them.flac')
player.add_to_queue('music/pink floyd/dark side of the moon/08 Any Colour You Like.flac')
player.add_to_queue('music/pink floyd/dark side of the moon/09 Brain Damage.flac')
player.add_to_queue('music/pink floyd/dark side of the moon/10 Eclipse.flac')
print("add queue done")
# Create blank image for drawing.
# Make sure to create image with mode 'RGB' for color.
width = disp.width
height = disp.height
image = Image.new("RGB", (width, height))
# Initial setup
render_rect = get_3d_render_area(state["screen_width"], state["screen_height"])
state["render_texture"] = pr.load_render_texture(int(render_rect.width), int(render_rect.height))
state["camera"] = setup_3d_environment(int(render_rect.width), int(render_rect.height))
# Get drawing object to draw on image.
draw = ImageDraw.Draw(image)
# LOAD THE ASSETS
state["album_texture"], state["album_model"] = load_album_assets()
# Clear display.
draw.rectangle((0, 0, width, height), outline=0, fill=(255, 0, 0))
disp.image(image)
# --- Main Game Loop ---
while not pr.window_should_close():
# 1. Update
current_width = pr.get_screen_width()
current_height = pr.get_screen_height()
# Get drawing object to draw on image.
draw = ImageDraw.Draw(image)
if pr.is_window_resized():
state["screen_width"] = current_width
state["screen_height"] = current_height
render_rect = get_3d_render_area(current_width, current_height)
pr.unload_render_texture(state["render_texture"])
state["render_texture"] = pr.load_render_texture(int(render_rect.width), int(render_rect.height))
# Draw a black filled box to clear the image.
draw.rectangle((0, 0, width, height), outline=0, fill=0)
delta_time = pr.get_frame_time()
udlr_fill = "#00FF00"
udlr_outline = "#00FFFF"
button_fill = "#FF00FF"
button_outline = "#FFFFFF"
if pr.is_key_pressed(pr.KeyboardKey.KEY_SPACE):
if player.playing:
player.pause()
else:
player.play()
fnt = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 30)
if pr.is_key_pressed(pr.KeyboardKey.KEY_LEFT):
player.seek(player.position-5)
if pr.is_key_pressed(pr.KeyboardKey.KEY_RIGHT):
player.seek(player.position+5)
while True:
up_fill = 0
if not button_U.value: # up pressed
up_fill = udlr_fill
draw.polygon([(40, 40), (60, 4), (80, 40)], outline=udlr_outline, fill=up_fill) # Up
down_fill = 0
if not button_D.value: # down pressed
down_fill = udlr_fill
draw.polygon([(60, 120), (80, 84), (40, 84)], outline=udlr_outline, fill=down_fill) # down
# ----------------------------------------------------
# 2. DRAW 3D SCENE
# ----------------------------------------------------
render_rect = get_3d_render_area(current_width, current_height)
left_fill = 0
if not button_L.value: # left pressed
left_fill = udlr_fill
draw.polygon([(0, 60), (36, 42), (36, 81)], outline=udlr_outline, fill=left_fill) # left
pr.begin_texture_mode(state["render_texture"])
pr.clear_background(pr.Color(20, 20, 20, 255))
right_fill = 0
if not button_R.value: # right pressed
right_fill = udlr_fill
draw.polygon([(120, 60), (84, 42), (84, 82)], outline=udlr_outline, fill=right_fill) # right
# Pass the loaded model to the draw function
draw_3d_cover_flow(state["camera"], state["album_model"])
center_fill = 0
if not button_C.value: # center pressed
center_fill = button_fill
draw.rectangle((40, 44, 80, 80), outline=button_outline, fill=center_fill) # center
pr.end_texture_mode()
A_fill = 0
if not button_A.value: # left pressed
A_fill = button_fill
draw.ellipse((140, 80, 180, 120), outline=button_outline, fill=A_fill) # A button
# ----------------------------------------------------
# 3. DRAW 2D GUI
# ----------------------------------------------------
pr.begin_drawing()
pr.clear_background(pr.Color(40, 40, 40, 255))
B_fill = 0
if not button_B.value: # left pressed
B_fill = button_fill
draw.ellipse((190, 40, 230, 80), outline=button_outline, fill=B_fill) # B button
progress_rect = get_progress_bar_rect(current_width, current_height)
# make a random color and print text
rcolor = tuple(int(x * 255) for x in hsv_to_rgb(random.random(), 1, 1))
draw.text((20, 150), "Hello World", font=fnt, fill=rcolor)
rcolor = tuple(int(x * 255) for x in hsv_to_rgb(random.random(), 1, 1))
draw.text((20, 180), "Hello World", font=fnt, fill=rcolor)
rcolor = tuple(int(x * 255) for x in hsv_to_rgb(random.random(), 1, 1))
draw.text((20, 210), "Hello World", font=fnt, fill=rcolor)
title_size = int(current_height * 0.05)
pr.draw_text("UgPod", int(current_width * 0.05), int(current_height * 0.05), title_size, pr.SKYBLUE)
# Display the Image
disp.image(image)
source_rect = pr.Rectangle(0, 0, state["render_texture"].texture.width, -state["render_texture"].texture.height)
pr.draw_texture_pro(state["render_texture"].texture,
source_rect, render_rect, pr.Vector2(0, 0), 0.0, pr.WHITE)
pr.draw_rectangle_lines_ex(render_rect, 3, pr.LIME)
draw_progress_bar(progress_rect, player.position, player.playback_info_to_duration(player.playback_info))
pr.draw_text(f"Status: {'Playing' if player.playing else 'Paused'} (SPACE)",
int(current_width * 0.05), int(current_height * 0.9), int(current_height * 0.03), pr.LIME)
pr.end_drawing()
# --- De-initialization ---
pr.unload_texture(state["album_texture"]) # Unload the texture
pr.unload_model(state["album_model"]) # Unload the model/mesh
pr.unload_render_texture(state["render_texture"])
pr.close_window()
time.sleep(0.1)

View File

@@ -1,14 +1,11 @@
from jellyfin_apiclient_python import JellyfinClient
import json
import uuid
import subprocess
from dotenv import load_dotenv
import os
import time
import ffmpeg
import requests
import threading
from urllib.parse import urlencode, urljoin
import subprocess
import numpy as np
import sounddevice as sd
@@ -18,38 +15,87 @@ import sys
import io
import fcntl
from dataclasses import dataclass
os.makedirs("logs", exist_ok=True)
import numpy as np
from collections import deque
from jelly import server, client
from urllib.parse import urlencode, urljoin
import requests
from pathlib import Path
import mimetypes
@dataclass
class Song:
id: str
url: str
duration: float
name: str
duration: float
album_name: str
album_cover:str
album_cover_path: str
artist_name: str
def song_data_to_Song(data, client_data) -> Song:
# """
# Build a Jellyfin audio stream URL using urllib.parse.
# """
item_id = data["Id"]
path = f"/Audio/{item_id}/universal"
params = {
"UserId": client_data["UserId"],
"Container": "flac",
"AudioCodec": "flac", # <-- IMPORTANT
"api_key": client_data["AccessToken"],
}
query = urlencode(params)
url = urljoin(client_data["address"], path) + "?" + query
album_cover_url = urljoin(
client_data["address"], f"/Items/{data['AlbumId']}/Images/Primary"
)
# r = requests.get(album_cover_url)
# r.raise_for_status()
# content_type = r.headers.get("Content-Type") # e.g. "image/jpeg"
# ext = mimetypes.guess_extension(content_type) # ".jpg"
ext = None
if ext is None:
ext = ".jpg" # safe fallback for album art
saved_path = Path("data", "images", data["AlbumId"] + ext).as_posix()
with open(saved_path, "wb") as f:
f.write(r.content)
return Song(
item_id,
url,
data["Name"],
data["RunTimeTicks"] / 10_000_000,
data["Album"],
saved_path,
data["AlbumArtist"],
)
#os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
os.makedirs("data/images", exist_ok=True)
class GaplessPlayer:
def __init__(self, samplerate:int=44100, channels:int=2):
def __init__(self, samplerate: int = 96000, channels: int = 2):
self.samplerate = samplerate
self.channels = channels
self.proc = None
self.next_proc = None
self.current_song: Song = None
self.next_file: Song = None
self.next_preload_state = 0
self.closed = False
self.playing = False
self.position = 0.0
self.song_list = []
self.song_list: list[Song] = []
self.current_song_in_list = -1
@@ -59,28 +105,42 @@ class GaplessPlayer:
samplerate=self.samplerate,
channels=self.channels,
dtype="int16",
callback=self._callback
callback=self._callback,
)
self.stream.start()
self.oscilloscope_data_points = deque(maxlen=samplerate//60)
def _open_ffmpeg(self, url, seek=0):
self.song+=1
def get_current_song(self):
if self.current_song_in_list >= 0 and self.current_song_in_list < len(
self.song_list
):
return self.song_list[self.current_song_in_list]
def _open_ffmpeg(self, song, seek=0):
proc = subprocess.Popen(
[
"ffmpeg",
# "-re",
"-ss", str(seek),
"-i", url,
"-f", "s16le",
"-ac", str(self.channels),
"-ar", str(self.samplerate),
"-loglevel", "verbose",
"-"
"-ss",
str(seek),
"-i",
song.url,
"-f",
"s16le",
"-ac",
str(self.channels),
"-ar",
str(self.samplerate),
"-loglevel",
"verbose",
"-",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
stderr=subprocess.PIPE,
)
print("yo")
# --- make stdout non-blocking ---
fd = proc.stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
@@ -90,172 +150,179 @@ class GaplessPlayer:
def seek(self, pos):
with self.lock:
pos = min(max(0,pos), self.playback_info_to_duration(self.playback_info))
if self.proc:
self.proc.kill()
self.proc = self._open_ffmpeg(self.current_file, pos)
song = self.get_current_song()
if song:
pos = min(max(0, pos), song.duration)
if song.ffmpeg:
song.ffmpeg.kill()
song.ffmpeg = None
if self.playing:
song.ffmpeg = self._open_ffmpeg(song, pos)
self.position = pos
def close(self):
self.closed = True
self.stream.close()
def get_stream_info(self, url):
"""Return duration in seconds for the track"""
try:
result = subprocess.run(
[
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
url
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return json.loads(result.stdout)
except Exception as e:
print("ffprobe error:", e)
return None
def add_to_queue(self, song: Song):
self.current_song_in_list.append(song)
song.ffmpeg = None
song.preload_state = 0
self.song_list.append(song)
def play(self):
with self.lock:
if not self.playing:
if not self.proc and self.current_file:
self.proc = self._open_ffmpeg(self.current_file, self.position)
current_song = self.get_current_song()
if current_song and not current_song.ffmpeg:
current_song.ffmpeg = self._open_ffmpeg(current_song, self.position)
self.playing = True
def pause(self):
with self.lock:
if self.proc:
self.proc.kill()
self.proc = None
# current_song = self.get_current_song()
# if current_song and current_song.ffmpeg:
# current_song.ffmpeg.kill()
# current_song.ffmpeg = None
self.playing = False
def _start_next(self):
# Kill old pipeline
if self.proc:
self.proc.kill()
current_song = self.get_current_song()
if current_song and current_song.ffmpeg:
current_song.ffmpeg.kill()
current_song.ffmpeg = None
# Move next pipeline into active
self.position = 0.0
self.proc = self.next_proc
self.current_file = self.next_file
self.playback_info = self.next_playback_info
self.next_proc=None
self.next_playback_info = None
self.next_preload_state = 0
self.current_song_in_list += 1
def preload_next(self):
self.next_file = self.song_queue.get()
self.next_playback_info = self.get_stream_info(self.next_file)
self.next_proc = self._open_ffmpeg(self.next_file)
self.next_preload_state = 2
def get_next_song(self):
if self.current_song_in_list + 1 >= 0 and self.current_song_in_list + 1 < len(
self.song_list
):
return self.song_list[self.current_song_in_list + 1]
return None
def forward_song(self):
current_song = self.get_current_song()
if current_song and current_song.ffmpeg:
current_song.ffmpeg.kill()
current_song.ffmpeg = None
if self.current_song_in_list < len(
self.song_list
):
self.current_song_in_list += 1
def load_song(self, song: Song):
if song:
song.ffmpeg = self._open_ffmpeg(song)
song.preload_state = 2
def preload_next_threaded(self):
if self.next_preload_state: return
self.next_preload_state = 1
threading.Thread(target=self.preload_next).start()
next_song = self.get_next_song()
if not next_song or next_song.preload_state:
return
next_song.preload_state = 1
threading.Thread(target=self.load_song, args=(next_song,)).start()
def playback_info_to_duration(self, info):
if info is None: return 0.0
if "streams" in info:
for s in info["streams"]:
if "duration" in s:
return float(s["duration"])
if "format" in info and "duration" in info["format"]:
return float(info["format"]["duration"])
return 0.0
return None
def _callback(self, outdata, frames, t, status):
with self.lock:
needed = frames * self.channels * 2
data = b''
data = b""
if self.playing:
if self.proc is None:
if self.next_preload_state==2:
current_song = self.get_current_song()
if not current_song or current_song.ffmpeg is None:
next_song = self.get_next_song()
if next_song:
if next_song.preload_state == 2:
self._start_next()
elif self.next_preload_state == 0:
elif next_song.preload_state == 0:
self.preload_next_threaded()
else:
elif current_song:
try:
data = self.proc.stdout.read(needed) or b''
data = current_song.ffmpeg.stdout.read(needed) or b""
except BlockingIOError:
pass
self.position += len(data) / (self.samplerate * self.channels * 2)
if self.position >= self.playback_info_to_duration(self.playback_info)-10:
if self.position >= current_song.duration - 10:
self.preload_next_threaded()
if self.proc.poll() is not None and len(data)<needed:
if round(self.position, 2) >= self.playback_info_to_duration(self.playback_info)-0.1:
else:
next_song = self.get_next_song()
if next_song and next_song.ffmpeg:
if next_song.ffmpeg.poll() is None:
next_song.ffmpeg.kill()
next_song.ffmpeg = None
next_song.preload_state = 0
if current_song.ffmpeg.poll() is not None and len(data) < needed:
if round(self.position, 2) >= current_song.duration - 0.1:
self._start_next()
if self.proc is not None and self.proc.poll() is None:
current_song = self.get_current_song()
if (
current_song
and current_song.ffmpeg is not None
and current_song.ffmpeg.poll() is None
):
try:
new_data = self.proc.stdout.read(needed-len(data)) or b''
new_data = (
current_song.ffmpeg.stdout.read(
needed - len(data)
)
or b""
)
except BlockingIOError:
new_data = b''
self.position += len(new_data) / (self.samplerate * self.channels * 2)
new_data = b""
self.position += len(new_data) / (
self.samplerate * self.channels * 2
)
data += new_data
else:
self.proc = self._open_ffmpeg(self.current_file, self.position)
# if current_song.ffmpeg and current_song.ffmpeg.poll() is not None:
# current_song.ffmpeg.kill()
# current_song.ffmpeg = None
current_song.ffmpeg = self._open_ffmpeg(
current_song, self.position
)
samples = np.frombuffer(data, dtype=np.int16)
left = samples[0::2]
right = samples[1::2]
norm = 32769.0
x = left / norm
y = right / norm
points = list(zip(x, y))
# step = max(1, len(points) // 1000)
# points = points[::step]
self.oscilloscope_data_points.extend(points)
outdata[: len(data)] = data
outdata[len(data):] = b'\x00'*(needed-len(data))
def build_jellyfin_audio_url(
base_url: str,
item_id: str,
api_key: str,
user_id: str,
container: str = "flac",
audio_codec: str = "flac",
bitrate: int | None = None,
media_source_id: str | None = None,
) -> str:
"""
Build a Jellyfin audio stream URL using urllib.parse.
"""
path = f"/Audio/{item_id}/universal"
params = {
"UserId": user_id,
"Container": container,
"AudioCodec": audio_codec, # <-- IMPORTANT
"api_key": api_key,
outdata[len(data) :] = b"\x00" * (needed - len(data))
def save_state(self, path):
with open(path,"w") as f:
data = {
"queue": [song.id for song in self.song_list],
"current_song": self.current_song_in_list,
"position": self.position
}
json.dump(data, f)
def load_state(self, path):
try:
with open(path,"r") as f:
data = json.load(f)
self.song_list = []
for song in data["queue"]:
songOBJ = song_data_to_Song(client.jellyfin.get_item(song), server)
songOBJ.ffmpeg = None
songOBJ.preload_state = 0
self.song_list.append(songOBJ)
if bitrate is not None:
params["Bitrate"] = bitrate
if media_source_id is not None:
params["MediaSourceId"] = media_source_id
query = urlencode(params)
return urljoin(base_url, path) + "?" + query
client = JellyfinClient()
load_dotenv()
client.config.app('UgPod', '0.0.1', 'UgPod prototype', 'UgPod_prototype_1')
client.config.data["auth.ssl"] = True
client.auth.connect_to_address(os.getenv("host"))
client.auth.login(os.getenv("URL"), os.getenv("username"), os.getenv("password"))
credentials = client.auth.credentials.get_credentials()
server = credentials["Servers"][0]
print(json.dumps(server))
self.current_song_in_list = data['current_song']
self.seek(data['position'])
except:
return

44
jelly.py Normal file
View File

@@ -0,0 +1,44 @@
from jellyfin_apiclient_python import JellyfinClient
import os
from dotenv import load_dotenv
import json
load_dotenv()
# album_covers = {}
# client = JellyfinClient()
# client.config.app("UgPod", "0.0.1", "UgPod prototype", "UgPod_prototype_1")
# client.config.data["auth.ssl"] = True
# try:
# with open("data/auth.json", "r") as f:
# credentials = json.load(f)
# client.authenticate(credentials, discover=False)
# # 🔴 THIS IS THE MISSING STEP
# server = credentials["Servers"][0]
# client.config.data["auth.server"] = server["Id"]
# client.config.data["auth.servers"] = credentials["Servers"]
# client.start()
# server = credentials["Servers"][0]
# assert server["Address"].startswith("http")
# print("Server address:", server["Address"])
# print("Server ID:", server["Id"])
# except:
# print("authenticating")
# client.auth.connect_to_address(os.getenv("host"))
# client.auth.login(os.getenv("URL"), os.getenv("username"), os.getenv("password"))
# credentials = client.auth.credentials.get_credentials()
# # with open("data/auth.json", 'w') as f:
# # json.dump(credentials, f)
# server = credentials["Servers"][0]
client = None
server = None

451
old_app.py Normal file
View File

@@ -0,0 +1,451 @@
import pyray as pr
import math
from ctypes import c_float
from gapless_player import GaplessPlayer, Song, song_data_to_Song
from scrolling_text import ScrollingText
import numpy as np
import threading
import time
import os
from jelly import server, client
# # --- Configuration Constants ---
# INITIAL_SCREEN_WIDTH = 240
# INITIAL_SCREEN_HEIGHT = 240
# TARGET_FPS =60
# # --- State Variables ---
# state = {
# "screen_width": INITIAL_SCREEN_WIDTH,
# "screen_height": INITIAL_SCREEN_HEIGHT,
# }
# # --- Utility Functions ---
# def format_time_mm_ss(seconds):
# """Converts a time in seconds to an 'MM:SS' string format."""
# seconds = int(seconds)
# minutes = seconds // 60
# seconds_remainder = seconds % 60
# return f"{minutes:02d}:{seconds_remainder:02d}"
# def get_progress_bar_rect(screen_width, screen_height):
# width = screen_width
# height = screen_height*0.021
# x = (screen_width - width) / 2
# y = screen_height - height
# return pr.Rectangle(x, y, width, height)
# def draw_progress_bar(rect, current_time, total_time):
# if total_time > 0:
# progress_ratio = current_time / total_time
# else:
# progress_ratio = 0.0
# pr.draw_rectangle_rec(rect, pr.Color(100, 100, 100, 255))
# progress_width = rect.width * progress_ratio
# pr.draw_rectangle(
# int(rect.x),
# int(rect.y)+1,
# int(progress_width),
# int(rect.height),
# pr.Color(200, 50, 50, 255),
# )
# # pr.draw_rectangle_lines_ex(rect, 2, pr.Color(50, 50, 50, 255))
# time_text = f"{format_time_mm_ss(current_time)} / {format_time_mm_ss(total_time)}"
# text_width = pr.measure_text(time_text, int(rect.height * 0.7))
# # pr.draw_text(
# # time_text,
# # int(rect.x + rect.width / 2 - text_width / 2),
# # int(rect.y + rect.height * 0.15),
# # int(rect.height * 0.7),
# # pr.WHITE,
# # )
# pr.set_config_flags(pr.ConfigFlags.FLAG_WINDOW_RESIZABLE)
# # pr.set_config_flags(pr.FLAG_MSAA_4X_HINT)
# #pr.set_config_flags(pr.FLAG_FULLSCREEN_MODE)
# pr.init_window(state["screen_width"], state["screen_height"], "UgPod")
# pr.set_target_fps(TARGET_FPS)
player = GaplessPlayer()
print("add queue")
player.add_to_queue(
Song(
"bruhh",
"music/pink floyd/dark side of the moon/06 Money.flac",
"Money",
382.200000,
"The Dark Side Of The Moon",
"",
"Pink Floyd",
)
)
player.add_to_queue(
Song(
"bruhh",
"music/pink floyd/dark side of the moon/07 Us and Them.flac",
"Us and Them",
470.333333,
"The Dark Side Of The Moon",
"",
"Pink Floyd",
)
)
# albums = client.jellyfin.user_items(
# params={
# "IncludeItemTypes": "MusicAlbum",
# "SearchTerm": "Dawn FM", # album name
# "Recursive": True,
# },
# )
# album = albums["Items"][0] # pick the album you want
# album_id = album["Id"]
# tracks = client.jellyfin.user_items(
# params={
# "ParentId": album_id,
# "IncludeItemTypes": "Audio",
# "SortBy": "IndexNumber",
# "SortOrder": "Ascending",
# },
# )
# for track in tracks["Items"]:
# player.add_to_queue(
# song_data_to_Song(
# track, server
# )
# )
# print("add queue done")
# player.load_state("data/player.json")
# close_event = threading.Event()
# def save_state_loop():
# while not close_event.wait(10):
# player.save_state("data/player.lock.json")
# os.rename("data/player.lock.json", "data/player.json")
# # save_state_thread = threading.Thread(target=save_state_loop)
# # save_state_thread.start()
# current_path = None
# texture = None
# def load_texture(path):
# global texture, current_path
# if not path:
# return
# if path == current_path:
# return
# if texture is not None:
# pr.unload_texture(texture)
# texture = pr.load_texture(path)
# current_path = path
# def draw_play_pause_button(pos: pr.Vector2, size: pr.Vector2, is_playing: bool) -> bool:
# clicked = False
# rect = pr.Rectangle(pos.x, pos.y, size.x, size.y)
# # Optional hover background
# if pr.check_collision_point_rec(pr.get_mouse_position(), rect):
# pr.draw_rectangle_rec(rect, pr.fade(pr.BLACK, 0.4))
# if pr.is_mouse_button_pressed(pr.MOUSE_LEFT_BUTTON):
# clicked = True
# cx = pos.x + size.x / 2
# cy = pos.y + size.y / 2
# icon_padding = size.x * 0.25
# icon_size = size.x - icon_padding * 2
# if is_playing:
# # PAUSE (two bars centered, same visual weight as play)
# bar_width = icon_size * 0.25
# bar_height = icon_size
# left_x = cx - bar_width - bar_width * 0.4
# right_x = cx + bar_width * 0.4
# top_y = 1+cy - bar_height / 2
# pr.draw_rectangle(
# int(left_x),
# int(top_y),
# int(bar_width),
# int(bar_height),
# pr.WHITE,
# )
# pr.draw_rectangle(
# int(right_x),
# int(top_y),
# int(bar_width),
# int(bar_height),
# pr.WHITE,
# )
# else:
# # PLAY (centered triangle)
# p1 = pr.Vector2(cx - icon_size / 2, cy - icon_size / 2)
# p2 = pr.Vector2(cx - icon_size / 2, cy + icon_size / 2)
# p3 = pr.Vector2(cx + icon_size / 2, cy)
# pr.draw_triangle(p1, p2, p3, pr.WHITE)
# return clicked
# title = ScrollingText(
# "",
# 15
# )
# # --- Main Game Loop ---
# while not pr.window_should_close():
# # 1. Update
# current_width = pr.get_screen_width()
# current_height = pr.get_screen_height()
# if pr.is_key_pressed(pr.KEY_F11):
# pr.toggle_fullscreen()
# if pr.is_key_pressed(pr.KeyboardKey.KEY_SPACE):
# if player.playing:
# player.pause()
# else:
# player.play()
# if pr.is_key_pressed(pr.KeyboardKey.KEY_LEFT):
# player.seek(player.position - 5)
# if pr.is_key_pressed(pr.KeyboardKey.KEY_RIGHT):
# player.seek(player.position + 5)
# pr.begin_drawing()
# pr.clear_background(pr.Color(40, 40, 40, 255))
# dt = pr.get_frame_time()
# progress_rect = get_progress_bar_rect(current_width, current_height)
# # pr.draw_text(
# # "UgPod",
# # int(current_width * 0.05),
# # int(current_height * 0.05),
# # int(current_height * 0.05),
# # pr.SKYBLUE,
# # )
# current_song = player.get_current_song()
# draw_progress_bar(
# progress_rect,
# player.position,
# (current_song and current_song.duration) or 0.0,
# )
# if current_song:
# load_texture(current_song.album_cover_path)
# title_font_size = int(current_height*0.05)
# album_cover_size = int(min(current_width, current_height*0.7))
# title.speed = title_font_size*2.5
# title_size = pr.Vector2(current_width-int(current_height * 0.01)*2, title_font_size)
# title.update(dt,title_size)
# title.set_text(f"{current_song.name} - {current_song.artist_name}", title_font_size)
# title.draw(pr.Vector2(int(current_height * 0.01),int(current_height * 0.8)),title_size)
# # pr.draw_text(
# # ,
# # ,
# # int(current_height * 0.03),
# # pr.WHITE,
# # )
# points = player.oscilloscope_data_points
# if texture is not None:
# scale = min(album_cover_size / texture.width, album_cover_size / texture.height)
# dest_rect = pr.Rectangle(
# current_width//2 - album_cover_size//2,
# (current_height*0.8)//2 - album_cover_size//2,
# texture.width * scale,
# texture.height * scale,
# )
# src_rect = pr.Rectangle(0, 0, texture.width, texture.height)
# pr.draw_texture_pro(
# texture, src_rect, dest_rect, pr.Vector2(0, 0), 0.0, pr.WHITE
# )
# else:
# clip = pr.Rectangle(int(current_width//2 - album_cover_size//2),
# int((current_height*0.8)//2 - album_cover_size//2),
# int(album_cover_size),
# int(album_cover_size))
# pr.begin_scissor_mode(
# int(clip.x),
# int(clip.y),
# int(clip.width),
# int(clip.height),
# )
# pr.draw_rectangle(
# int(clip.x),
# int(clip.y),
# int(clip.width),
# int(clip.height), pr.BLACK)
# # cx = current_width * 0.5+1
# # cy = current_height * 0.4+1
# # MAX_LEN = album_cover_size * 0.25 # tune this
# # MIN_ALPHA = 10
# # MAX_ALPHA = 255
# # for i in range(len(points) - 1):
# # x1 = cx + points[i][0] * album_cover_size * 0.5
# # y1 = cy + -points[i][1] * album_cover_size * 0.5
# # x2 = cx + points[i+1][0] * album_cover_size * 0.5
# # y2 = cy + -points[i+1][1] * album_cover_size * 0.5
# # dx = x2 - x1
# # dy = y2 - y1
# # length = (dx * dx + dy * dy) ** 0.5
# # # 1.0 = short line, 0.0 = long line
# # t = max(0.0, min(1.0, 1.0 - (length / MAX_LEN)))*math.pow(i/len(points), 2)
# # alpha = int(MIN_ALPHA + t * (MAX_ALPHA - MIN_ALPHA))
# # color = pr.Color(255, 255, 255, alpha)
# # pr.draw_line(int(x1), int(y1), int(x2), int(y2), color)
# # draw background square
# if len(points) >= 2:
# samples = np.fromiter(
# ((p[0] + p[1]) * 0.5 for p in points),
# dtype=np.float32
# )
# # Guard: FFT must have meaningful size
# if samples.size > 128:
# rect_x = int(current_width // 2 - album_cover_size // 2)
# rect_y = int((current_height * 0.8) // 2 - album_cover_size // 2)
# # ---- FFT ----
# FFT_SIZE = min(samples.size, 2048)
# window = np.hanning(FFT_SIZE)
# fft = np.fft.rfft(samples[:FFT_SIZE] * window)
# magnitudes = np.abs(fft)
# # remove DC component (important for visuals)
# magnitudes[0] = 0.0
# # ---- LOG BINNING ----
# num_bars = album_cover_size//10
# num_bins = magnitudes.size
# # logarithmic bin edges (low end stretched)
# log_min = 1
# log_max = math.log10(num_bins)
# log_edges = np.logspace(
# math.log10(log_min),
# log_max,
# num_bars + 1
# ).astype(int)
# bar_values = np.zeros(num_bars, dtype=np.float32)
# for i in range(num_bars):
# start = log_edges[i]
# end = log_edges[i + 1]
# if end <= start:
# continue
# bar_values[i] = np.mean(magnitudes[start:end])
# # ---- STATIC SCALING ----
# # Instead of normalizing to the max of the frame, we scale by the FFT size.
# # For a Hanning windowed FFT, dividing by (FFT_SIZE / 4) maps
# # maximum possible volume roughly to 1.0.
# bar_values = bar_values / (FFT_SIZE / 4.0)
# # ---- DRAW ----
# def map_to_screen(val):
# return rect_x + (math.log10(max(1, val)) / log_max) * album_cover_size
# spacing = 0
# for i in range(num_bars):
# # 1. Calculate integer pixel boundaries first
# # This ensures the right edge of one bar is exactly the left edge of the next
# x_start_int = int(map_to_screen(log_edges[i]))
# x_end_int = int(map_to_screen(log_edges[i+1]))
# # 2. Width is the difference between these fixed integer points
# w = (x_end_int - x_start_int) - spacing
# value = bar_values[i]
# h = int(min(1.0, value) * album_cover_size)
# # 3. Anchor to bottom
# y = (rect_y + album_cover_size) - h
# alpha = min(1.0, ((value+1)**2)-1)
# r = 255
# g = 0
# b = 0
# # Keep alpha at 255 (fully opaque)
# color = pr.Color(r, g, b, int(255 * alpha))
# # 4. Draw the bar
# # Use max(1, w) to ensure high-frequency bars don't disappear on small screens
# pr.draw_rectangle(
# x_start_int,
# int(y),
# max(1, int(w)),
# h,
# color
# )
# pr.end_scissor_mode()
# pos = pr.Vector2(current_width * 0.5 - current_height * 0.05, current_height * 0.9-progress_rect.height)
# size = pr.Vector2(current_height * 0.1, current_height * 0.1)
# if draw_play_pause_button(pos, size, player.playing):
# if player.playing:
# player.pause()
# else:
# player.play()
# pr.end_drawing()
# # Cleanup
# if texture is not None:
# pr.unload_texture(texture)
# pr.close_window()
# close_event.set()
# # save_state_thread.join()
player.play()
# while True:
# time.sleep(1)

View File

@@ -11,6 +11,7 @@ ffmpeg-python==0.2.0
frozenlist==1.8.0
future==1.0.0
idna==3.11
inflection==0.5.1
jellyfin-apiclient-python==1.11.0
multidict==6.7.0
numpy==2.3.5
@@ -18,8 +19,7 @@ propcache==0.4.1
pycparser==2.23
pyee==13.0.0
python-dotenv==1.2.1
python-vlc==3.0.21203
raylib==5.5.0.3
raylib==5.5.0.4
requests==2.32.5
sounddevice==0.5.3
typing_extensions==4.15.0

2
run-fb.sh Executable file
View File

@@ -0,0 +1,2 @@
export LIBGL_ALWAYS_SOFTWARE=1
python3 app.py

78
scrolling_text.py Normal file
View File

@@ -0,0 +1,78 @@
import pyray as pr
class ScrollingText:
def __init__(
self,
text: str,
font_size: int,
speed: float = 40.0, # pixels per second
pause_time: float = 1.0, # seconds before scrolling
color=pr.WHITE,
):
self.font_size = font_size
self.speed = speed
self.pause_time = pause_time
self.color = color
self.text = None
self.set_text(text, font_size)
def set_text(self, text: str, font_size: int):
if text == self.text and font_size == self.font_size:
return
self.text = text
self.font_size = font_size
self.text_width = pr.measure_text(self.text, self.font_size)
self.reset()
def reset(self):
self.offset = 0.0
self.timer = 0.0
self.scrolling = False
def update(self, dt: float, size: pr.Vector2):
if self.text_width <= size.x:
return self.reset()
self.timer += dt
if not self.scrolling:
if self.timer >= self.pause_time:
self.scrolling = True
self.timer = 0.0
else:
self.offset += self.speed * dt
if self.offset >= self.text_width + self.font_size*2.5:
self.reset()
def draw(self, pos: pr.Vector2, size: pr.Vector2):
clip = pr.Rectangle(pos.x, pos.y, size.x, size.y)
pr.begin_scissor_mode(
int(clip.x),
int(clip.y),
int(clip.width),
int(clip.height),
)
y = pos.y + (size.y - self.font_size) / 2
pr.draw_text(
self.text,
int(pos.x - self.offset),
int(y),
self.font_size,
self.color,
)
# Second copy for seamless loop
if self.text_width > size.x:
pr.draw_text(
self.text,
int(pos.x - self.offset + self.text_width + self.font_size*2.5),
int(y),
self.font_size,
self.color,
)
pr.end_scissor_mode()