Files
youtube-sync-docker/app.py
ComputerGuru ef903c86d1 fix: wire settings.json to actually drive runtime behavior
Settings page saved to /config/settings.json but nothing downstream read
that file. Schedule changes were silently ignored; max_quality and
sleep_interval changes were silently ignored. The "Settings saved
successfully" flash was a lie.

Fix:
- sync.sh reads max_quality + sleep_interval from settings.json on each
  run (jq -er ... // empty, falling back to env vars on missing/malformed
  file)
- entrypoint.sh reads sync_schedule from settings.json before setting up
  cron, and writes the crond PID to /var/run/crond.pid so Flask can
  SIGHUP it
- app.py adds apply_schedule(): rewrites /etc/crontabs/root, signals
  crond via the recorded PID, restarts crond if the PID is stale, drops
  the crontab when schedule is set to "manual". save_settings_route
  invokes it only when the schedule actually changed; any failure
  flashes a warning so the save still succeeds with the user informed
- bare `except: pass` in get_settings replaced with explicit exception
  types + stderr warning so debugging malformed settings is possible
- sync.sh: one bad channel no longer aborts the whole loop under set -e
- Dockerfile adds jq for the JSON reads in sync.sh / entrypoint.sh
- README: two stale github.com URLs fixed to Gitea; new Running Tests
  section under Building From Source
- tests/test_settings.py: 3 pytest cases covering get_settings()'s
  three branches (missing file, valid file, malformed JSON)

Settings hierarchy unchanged: env-var defaults seed the UI; settings.json
wins when present and parseable.

Timezone (TZ) is not applied live - tzdata is locked in at process start.
Same behavior as before; not in scope for this commit.
2026-05-31 19:21:43 -07:00

467 lines
16 KiB
Python

#!/usr/bin/env python3
"""
YouTube Sync Web Interface
Provides a web UI for managing YouTube channel downloads
"""
import os
import signal
import subprocess
import sys
import json
import re
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', 'youtube-sync-secret-key-change-me')
# Configuration
DOWNLOAD_DIR = os.environ.get('DOWNLOAD_DIR', '/downloads')
CONFIG_DIR = os.environ.get('CONFIG_DIR', '/config')
CHANNELS_FILE = os.path.join(CONFIG_DIR, 'channels.txt')
COOKIES_FILE = os.path.join(CONFIG_DIR, 'cookies.txt')
SETTINGS_FILE = os.path.join(CONFIG_DIR, 'settings.json')
LOG_FILE = '/var/log/youtube-sync.log'
CRONTAB_FILE = '/etc/crontabs/root'
CROND_PID_FILE = '/var/run/crond.pid'
def get_settings():
"""Load settings from file or return defaults"""
defaults = {
'sync_schedule': os.environ.get('SYNC_SCHEDULE', '0 2 * * *'),
'max_quality': os.environ.get('MAX_QUALITY', '1080'),
'sleep_interval': os.environ.get('SLEEP_INTERVAL', '2'),
'timezone': os.environ.get('TZ', 'America/Phoenix')
}
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, 'r') as f:
return json.load(f)
except (OSError, json.JSONDecodeError) as e:
# Malformed or unreadable settings.json: log and fall back to defaults
# rather than crashing the UI. The user can re-save from the Settings page
# to rewrite a clean file.
print(
f"[WARNING] Could not read {SETTINGS_FILE}: {e}; using env-var defaults",
file=sys.stderr,
)
return defaults
def save_settings(settings):
"""Save settings to file"""
with open(SETTINGS_FILE, 'w') as f:
json.dump(settings, f, indent=2)
def apply_schedule(new_schedule):
"""
Rewrite /etc/crontabs/root with the new schedule and reload crond so the
change takes effect without a container restart.
- new_schedule == "manual": remove the crontab entirely.
- otherwise: write a single line invoking /app/sync.sh on the given schedule.
crond is reloaded by SIGHUP'ing the PID recorded by entrypoint.sh. If crond
isn't running (e.g. container started in manual mode) and we now have a real
schedule, start it.
Raises whatever the underlying OS calls raise; callers are expected to
catch and surface a warning to the user.
"""
if new_schedule == 'manual':
# Drop the crontab so crond stops firing the job. Leave crond running —
# cheaper than killing/restarting it and it'll just idle.
if os.path.exists(CRONTAB_FILE):
os.remove(CRONTAB_FILE)
else:
os.makedirs(os.path.dirname(CRONTAB_FILE), exist_ok=True)
line = f"{new_schedule} /app/sync.sh >> {LOG_FILE} 2>&1\n"
with open(CRONTAB_FILE, 'w') as f:
f.write(line)
# Reload crond. dcron (Alpine) re-reads crontabs on SIGHUP.
pid = None
if os.path.exists(CROND_PID_FILE):
try:
with open(CROND_PID_FILE, 'r') as f:
pid = int(f.read().strip())
except (OSError, ValueError) as e:
print(
f"[WARNING] Could not read crond pid file {CROND_PID_FILE}: {e}",
file=sys.stderr,
)
pid = None
if pid is not None:
try:
os.kill(pid, signal.SIGHUP)
return
except ProcessLookupError:
# PID file is stale — crond exited. Fall through to start it fresh
# below if we have a non-manual schedule.
print(
f"[WARNING] crond pid {pid} no longer running; restarting",
file=sys.stderr,
)
pid = None
# No live crond. Start one if we have an active schedule to run.
if new_schedule != 'manual':
proc = subprocess.Popen(
['crond', '-f', '-l', '2'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
with open(CROND_PID_FILE, 'w') as f:
f.write(str(proc.pid))
except OSError as e:
# Non-fatal: cron is running, we just can't reload it on the next change.
print(
f"[WARNING] crond started (pid {proc.pid}) but pid file write failed: {e}",
file=sys.stderr,
)
def extract_channel_id(url_or_id):
"""
Extract channel ID from various YouTube URL formats or validate direct ID
Returns tuple: (channel_id, channel_name_hint or None)
"""
# If it looks like a channel ID already (24 characters starting with UC)
if re.match(r'^UC[\w-]{22}$', url_or_id.strip()):
return url_or_id.strip(), None
# Parse as URL
try:
parsed = urlparse(url_or_id)
# Format: youtube.com/channel/CHANNEL_ID
if '/channel/' in parsed.path:
channel_id = parsed.path.split('/channel/')[-1].split('/')[0]
if channel_id:
return channel_id, None
# Format: youtube.com/@handle or youtube.com/c/name or youtube.com/user/name
# These require fetching the page to get the actual channel ID
if parsed.netloc in ['youtube.com', 'www.youtube.com', 'm.youtube.com']:
# Use yt-dlp with --flat-playlist to extract channel ID without downloading
# Append /videos if not already present to ensure we get playlist metadata
fetch_url = url_or_id
if not fetch_url.endswith('/videos'):
fetch_url = fetch_url.rstrip('/') + '/videos'
try:
result = subprocess.run(
['yt-dlp', '--flat-playlist', '--dump-json', '--playlist-items', '1', fetch_url],
capture_output=True,
text=True,
timeout=15
)
if result.returncode == 0 and result.stdout:
# Get first line of JSON output
lines = [l for l in result.stdout.split('\n') if l.strip()]
if lines:
data = json.loads(lines[0])
# flat-playlist returns playlist_channel_id instead of channel_id
channel_id = data.get('playlist_channel_id') or data.get('channel_id')
channel_name = data.get('playlist_channel') or data.get('channel')
if channel_id:
return channel_id, channel_name
except:
pass
except:
pass
return None, None
def get_channels():
"""Read channels from channels.txt"""
channels = []
if not os.path.exists(CHANNELS_FILE):
return channels
with open(CHANNELS_FILE, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split('|')
if len(parts) == 2:
channels.append({
'id': parts[0].strip(),
'name': parts[1].strip()
})
return channels
def save_channels(channels):
"""Write channels to channels.txt"""
with open(CHANNELS_FILE, 'w') as f:
f.write('# YouTube Channel Configuration\n')
f.write('# Format: CHANNEL_ID|Channel Name\n')
f.write('# One channel per line. Lines starting with # are ignored.\n')
f.write('#\n')
for channel in channels:
f.write(f"{channel['id']}|{channel['name']}\n")
def get_channel_stats():
"""Get statistics for each channel"""
stats = []
for channel in get_channels():
channel_dir = os.path.join(DOWNLOAD_DIR, channel['name'])
video_count = 0
total_size = 0
last_sync = None
if os.path.exists(channel_dir):
# Count videos (mp4 files)
for root, dirs, files in os.walk(channel_dir):
for file in files:
if file.endswith('.mp4'):
video_count += 1
file_path = os.path.join(root, file)
total_size += os.path.getsize(file_path)
# Get last sync time from .downloaded.txt
downloaded_file = os.path.join(channel_dir, '.downloaded.txt')
if os.path.exists(downloaded_file):
last_sync = datetime.fromtimestamp(os.path.getmtime(downloaded_file))
stats.append({
'name': channel['name'],
'id': channel['id'],
'video_count': video_count,
'total_size': format_size(total_size),
'last_sync': last_sync.strftime('%Y-%m-%d %H:%M') if last_sync else 'Never'
})
return stats
def format_size(bytes):
"""Format bytes into human readable size"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.1f} {unit}"
bytes /= 1024.0
return f"{bytes:.1f} PB"
def get_logs(lines=100):
"""Get last N lines from log file"""
if not os.path.exists(LOG_FILE):
return []
try:
result = subprocess.run(
['tail', '-n', str(lines), LOG_FILE],
capture_output=True,
text=True
)
return result.stdout.split('\n')
except:
return []
def is_sync_running():
"""Check if sync script is currently running"""
try:
result = subprocess.run(
['pgrep', '-f', 'sync.sh'],
capture_output=True
)
return result.returncode == 0
except:
return False
@app.route('/')
def index():
"""Dashboard page"""
stats = get_channel_stats()
settings = get_settings()
sync_running = is_sync_running()
has_cookies = os.path.exists(COOKIES_FILE)
return render_template('index.html',
stats=stats,
settings=settings,
sync_running=sync_running,
has_cookies=has_cookies)
@app.route('/channels')
def channels():
"""Channel management page"""
channels = get_channels()
return render_template('channels.html', channels=channels)
@app.route('/channels/add', methods=['POST'])
def add_channel():
"""Add a new channel"""
channel_input = request.form.get('channel_id', '').strip()
channel_name = request.form.get('channel_name', '').strip()
if not channel_input:
flash('Channel ID or URL is required', 'error')
return redirect(url_for('channels'))
# Extract channel ID from URL or validate direct ID
channel_id, auto_name = extract_channel_id(channel_input)
if not channel_id:
flash('Invalid channel ID or URL. Please check and try again.', 'error')
return redirect(url_for('channels'))
# Use auto-detected name if no name was provided
if not channel_name and auto_name:
channel_name = auto_name
elif not channel_name:
flash('Channel name is required (could not auto-detect from URL)', 'error')
return redirect(url_for('channels'))
# Check for duplicates
channels = get_channels()
for existing in channels:
if existing['id'] == channel_id:
flash(f'Channel already exists: {existing["name"]}', 'error')
return redirect(url_for('channels'))
channels.append({'id': channel_id, 'name': channel_name})
save_channels(channels)
flash(f'Added channel: {channel_name} ({channel_id})', 'success')
return redirect(url_for('channels'))
@app.route('/channels/delete/<int:index>')
def delete_channel(index):
"""Delete a channel"""
channels = get_channels()
if 0 <= index < len(channels):
channel_name = channels[index]['name']
del channels[index]
save_channels(channels)
flash(f'Deleted channel: {channel_name}', 'success')
else:
flash('Channel not found', 'error')
return redirect(url_for('channels'))
@app.route('/settings')
def settings():
"""Settings page"""
settings = get_settings()
return render_template('settings.html', settings=settings)
@app.route('/settings/save', methods=['POST'])
def save_settings_route():
"""Save settings"""
previous = get_settings()
settings = {
'sync_schedule': request.form.get('sync_schedule', '0 2 * * *'),
'max_quality': request.form.get('max_quality', '1080'),
'sleep_interval': request.form.get('sleep_interval', '2'),
'timezone': request.form.get('timezone', 'America/Phoenix')
}
save_settings(settings)
flash('Settings saved successfully', 'success')
# Apply the new cron schedule immediately if it changed. max_quality /
# sleep_interval are read fresh by sync.sh on each run, so no action needed
# for them. Timezone changes still require a container restart (system tzdata
# is locked in at container start); we don't claim otherwise.
if settings['sync_schedule'] != previous.get('sync_schedule'):
try:
apply_schedule(settings['sync_schedule'])
except (OSError, subprocess.SubprocessError) as e:
# Save succeeded; the schedule reload didn't. Let the user know
# it'll take effect on next restart rather than silently failing.
print(
f"[WARNING] Could not reload cron schedule: {e}",
file=sys.stderr,
)
flash(
'Schedule saved, but live reload failed. '
'New schedule will take effect after container restart.',
'warning',
)
return redirect(url_for('settings'))
@app.route('/cookies', methods=['GET', 'POST'])
def cookies():
"""Cookie management page"""
if request.method == 'POST':
if 'cookies_file' not in request.files:
flash('No file selected', 'error')
return redirect(url_for('cookies'))
file = request.files['cookies_file']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('cookies'))
file.save(COOKIES_FILE)
flash('Cookies file uploaded successfully', 'success')
return redirect(url_for('index'))
has_cookies = os.path.exists(COOKIES_FILE)
return render_template('cookies.html', has_cookies=has_cookies)
@app.route('/cookies/delete')
def delete_cookies():
"""Delete cookies file"""
if os.path.exists(COOKIES_FILE):
os.remove(COOKIES_FILE)
flash('Cookies file deleted', 'success')
return redirect(url_for('cookies'))
@app.route('/logs')
def logs():
"""Logs page"""
log_lines = get_logs(200)
return render_template('logs.html', logs=log_lines)
@app.route('/api/sync/start', methods=['POST'])
def start_sync():
"""Start manual sync"""
if is_sync_running():
return jsonify({'status': 'error', 'message': 'Sync already running'}), 409
try:
# Start sync in background
subprocess.Popen(
['/app/sync.sh'],
stdout=open(LOG_FILE, 'a'),
stderr=subprocess.STDOUT
)
return jsonify({'status': 'success', 'message': 'Sync started'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/sync/status')
def sync_status():
"""Get sync status"""
return jsonify({
'running': is_sync_running()
})
@app.route('/api/stats')
def api_stats():
"""Get channel statistics"""
return jsonify(get_channel_stats())
if __name__ == '__main__':
# Ensure directories exist
os.makedirs(CONFIG_DIR, exist_ok=True)
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
# Run Flask app
app.run(host='0.0.0.0', port=8080, debug=False)