#!/usr/bin/env python3 """ YouTube Sync Web Interface Provides a web UI for managing YouTube channel downloads """ import os import signal import subprocess import sys import json import re from datetime import datetime from pathlib import Path from urllib.parse import urlparse, parse_qs from flask import Flask, render_template, request, jsonify, redirect, url_for, flash app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'youtube-sync-secret-key-change-me') # Configuration DOWNLOAD_DIR = os.environ.get('DOWNLOAD_DIR', '/downloads') CONFIG_DIR = os.environ.get('CONFIG_DIR', '/config') CHANNELS_FILE = os.path.join(CONFIG_DIR, 'channels.txt') COOKIES_FILE = os.path.join(CONFIG_DIR, 'cookies.txt') SETTINGS_FILE = os.path.join(CONFIG_DIR, 'settings.json') LOG_FILE = '/var/log/youtube-sync.log' CRONTAB_FILE = '/etc/crontabs/root' CROND_PID_FILE = '/var/run/crond.pid' def get_settings(): """Load settings from file or return defaults""" defaults = { 'sync_schedule': os.environ.get('SYNC_SCHEDULE', '0 2 * * *'), 'max_quality': os.environ.get('MAX_QUALITY', '1080'), 'sleep_interval': os.environ.get('SLEEP_INTERVAL', '2'), 'timezone': os.environ.get('TZ', 'America/Phoenix') } if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, 'r') as f: return json.load(f) except (OSError, json.JSONDecodeError) as e: # Malformed or unreadable settings.json: log and fall back to defaults # rather than crashing the UI. The user can re-save from the Settings page # to rewrite a clean file. print( f"[WARNING] Could not read {SETTINGS_FILE}: {e}; using env-var defaults", file=sys.stderr, ) return defaults def save_settings(settings): """Save settings to file""" with open(SETTINGS_FILE, 'w') as f: json.dump(settings, f, indent=2) def apply_schedule(new_schedule): """ Rewrite /etc/crontabs/root with the new schedule and reload crond so the change takes effect without a container restart. - new_schedule == "manual": remove the crontab entirely. - otherwise: write a single line invoking /app/sync.sh on the given schedule. crond is reloaded by SIGHUP'ing the PID recorded by entrypoint.sh. If crond isn't running (e.g. container started in manual mode) and we now have a real schedule, start it. Raises whatever the underlying OS calls raise; callers are expected to catch and surface a warning to the user. """ if new_schedule == 'manual': # Drop the crontab so crond stops firing the job. Leave crond running — # cheaper than killing/restarting it and it'll just idle. if os.path.exists(CRONTAB_FILE): os.remove(CRONTAB_FILE) else: os.makedirs(os.path.dirname(CRONTAB_FILE), exist_ok=True) line = f"{new_schedule} /app/sync.sh >> {LOG_FILE} 2>&1\n" with open(CRONTAB_FILE, 'w') as f: f.write(line) # Reload crond. dcron (Alpine) re-reads crontabs on SIGHUP. pid = None if os.path.exists(CROND_PID_FILE): try: with open(CROND_PID_FILE, 'r') as f: pid = int(f.read().strip()) except (OSError, ValueError) as e: print( f"[WARNING] Could not read crond pid file {CROND_PID_FILE}: {e}", file=sys.stderr, ) pid = None if pid is not None: try: os.kill(pid, signal.SIGHUP) return except ProcessLookupError: # PID file is stale — crond exited. Fall through to start it fresh # below if we have a non-manual schedule. print( f"[WARNING] crond pid {pid} no longer running; restarting", file=sys.stderr, ) pid = None # No live crond. Start one if we have an active schedule to run. if new_schedule != 'manual': proc = subprocess.Popen( ['crond', '-f', '-l', '2'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) try: with open(CROND_PID_FILE, 'w') as f: f.write(str(proc.pid)) except OSError as e: # Non-fatal: cron is running, we just can't reload it on the next change. print( f"[WARNING] crond started (pid {proc.pid}) but pid file write failed: {e}", file=sys.stderr, ) def extract_channel_id(url_or_id): """ Extract channel ID from various YouTube URL formats or validate direct ID Returns tuple: (channel_id, channel_name_hint or None) """ # If it looks like a channel ID already (24 characters starting with UC) if re.match(r'^UC[\w-]{22}$', url_or_id.strip()): return url_or_id.strip(), None # Parse as URL try: parsed = urlparse(url_or_id) # Format: youtube.com/channel/CHANNEL_ID if '/channel/' in parsed.path: channel_id = parsed.path.split('/channel/')[-1].split('/')[0] if channel_id: return channel_id, None # Format: youtube.com/@handle or youtube.com/c/name or youtube.com/user/name # These require fetching the page to get the actual channel ID if parsed.netloc in ['youtube.com', 'www.youtube.com', 'm.youtube.com']: # Use yt-dlp with --flat-playlist to extract channel ID without downloading # Append /videos if not already present to ensure we get playlist metadata fetch_url = url_or_id if not fetch_url.endswith('/videos'): fetch_url = fetch_url.rstrip('/') + '/videos' try: result = subprocess.run( ['yt-dlp', '--flat-playlist', '--dump-json', '--playlist-items', '1', fetch_url], capture_output=True, text=True, timeout=15 ) if result.returncode == 0 and result.stdout: # Get first line of JSON output lines = [l for l in result.stdout.split('\n') if l.strip()] if lines: data = json.loads(lines[0]) # flat-playlist returns playlist_channel_id instead of channel_id channel_id = data.get('playlist_channel_id') or data.get('channel_id') channel_name = data.get('playlist_channel') or data.get('channel') if channel_id: return channel_id, channel_name except: pass except: pass return None, None def get_channels(): """Read channels from channels.txt""" channels = [] if not os.path.exists(CHANNELS_FILE): return channels with open(CHANNELS_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split('|') if len(parts) == 2: channels.append({ 'id': parts[0].strip(), 'name': parts[1].strip() }) return channels def save_channels(channels): """Write channels to channels.txt""" with open(CHANNELS_FILE, 'w') as f: f.write('# YouTube Channel Configuration\n') f.write('# Format: CHANNEL_ID|Channel Name\n') f.write('# One channel per line. Lines starting with # are ignored.\n') f.write('#\n') for channel in channels: f.write(f"{channel['id']}|{channel['name']}\n") def get_channel_stats(): """Get statistics for each channel""" stats = [] for channel in get_channels(): channel_dir = os.path.join(DOWNLOAD_DIR, channel['name']) video_count = 0 total_size = 0 last_sync = None if os.path.exists(channel_dir): # Count videos (mp4 files) for root, dirs, files in os.walk(channel_dir): for file in files: if file.endswith('.mp4'): video_count += 1 file_path = os.path.join(root, file) total_size += os.path.getsize(file_path) # Get last sync time from .downloaded.txt downloaded_file = os.path.join(channel_dir, '.downloaded.txt') if os.path.exists(downloaded_file): last_sync = datetime.fromtimestamp(os.path.getmtime(downloaded_file)) stats.append({ 'name': channel['name'], 'id': channel['id'], 'video_count': video_count, 'total_size': format_size(total_size), 'last_sync': last_sync.strftime('%Y-%m-%d %H:%M') if last_sync else 'Never' }) return stats def format_size(bytes): """Format bytes into human readable size""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes < 1024.0: return f"{bytes:.1f} {unit}" bytes /= 1024.0 return f"{bytes:.1f} PB" def get_logs(lines=100): """Get last N lines from log file""" if not os.path.exists(LOG_FILE): return [] try: result = subprocess.run( ['tail', '-n', str(lines), LOG_FILE], capture_output=True, text=True ) return result.stdout.split('\n') except: return [] def is_sync_running(): """Check if sync script is currently running""" try: result = subprocess.run( ['pgrep', '-f', 'sync.sh'], capture_output=True ) return result.returncode == 0 except: return False @app.route('/') def index(): """Dashboard page""" stats = get_channel_stats() settings = get_settings() sync_running = is_sync_running() has_cookies = os.path.exists(COOKIES_FILE) return render_template('index.html', stats=stats, settings=settings, sync_running=sync_running, has_cookies=has_cookies) @app.route('/channels') def channels(): """Channel management page""" channels = get_channels() return render_template('channels.html', channels=channels) @app.route('/channels/add', methods=['POST']) def add_channel(): """Add a new channel""" channel_input = request.form.get('channel_id', '').strip() channel_name = request.form.get('channel_name', '').strip() if not channel_input: flash('Channel ID or URL is required', 'error') return redirect(url_for('channels')) # Extract channel ID from URL or validate direct ID channel_id, auto_name = extract_channel_id(channel_input) if not channel_id: flash('Invalid channel ID or URL. Please check and try again.', 'error') return redirect(url_for('channels')) # Use auto-detected name if no name was provided if not channel_name and auto_name: channel_name = auto_name elif not channel_name: flash('Channel name is required (could not auto-detect from URL)', 'error') return redirect(url_for('channels')) # Check for duplicates channels = get_channels() for existing in channels: if existing['id'] == channel_id: flash(f'Channel already exists: {existing["name"]}', 'error') return redirect(url_for('channels')) channels.append({'id': channel_id, 'name': channel_name}) save_channels(channels) flash(f'Added channel: {channel_name} ({channel_id})', 'success') return redirect(url_for('channels')) @app.route('/channels/delete/') def delete_channel(index): """Delete a channel""" channels = get_channels() if 0 <= index < len(channels): channel_name = channels[index]['name'] del channels[index] save_channels(channels) flash(f'Deleted channel: {channel_name}', 'success') else: flash('Channel not found', 'error') return redirect(url_for('channels')) @app.route('/settings') def settings(): """Settings page""" settings = get_settings() return render_template('settings.html', settings=settings) @app.route('/settings/save', methods=['POST']) def save_settings_route(): """Save settings""" previous = get_settings() settings = { 'sync_schedule': request.form.get('sync_schedule', '0 2 * * *'), 'max_quality': request.form.get('max_quality', '1080'), 'sleep_interval': request.form.get('sleep_interval', '2'), 'timezone': request.form.get('timezone', 'America/Phoenix') } save_settings(settings) flash('Settings saved successfully', 'success') # Apply the new cron schedule immediately if it changed. max_quality / # sleep_interval are read fresh by sync.sh on each run, so no action needed # for them. Timezone changes still require a container restart (system tzdata # is locked in at container start); we don't claim otherwise. if settings['sync_schedule'] != previous.get('sync_schedule'): try: apply_schedule(settings['sync_schedule']) except (OSError, subprocess.SubprocessError) as e: # Save succeeded; the schedule reload didn't. Let the user know # it'll take effect on next restart rather than silently failing. print( f"[WARNING] Could not reload cron schedule: {e}", file=sys.stderr, ) flash( 'Schedule saved, but live reload failed. ' 'New schedule will take effect after container restart.', 'warning', ) return redirect(url_for('settings')) @app.route('/cookies', methods=['GET', 'POST']) def cookies(): """Cookie management page""" if request.method == 'POST': if 'cookies_file' not in request.files: flash('No file selected', 'error') return redirect(url_for('cookies')) file = request.files['cookies_file'] if file.filename == '': flash('No file selected', 'error') return redirect(url_for('cookies')) file.save(COOKIES_FILE) flash('Cookies file uploaded successfully', 'success') return redirect(url_for('index')) has_cookies = os.path.exists(COOKIES_FILE) return render_template('cookies.html', has_cookies=has_cookies) @app.route('/cookies/delete') def delete_cookies(): """Delete cookies file""" if os.path.exists(COOKIES_FILE): os.remove(COOKIES_FILE) flash('Cookies file deleted', 'success') return redirect(url_for('cookies')) @app.route('/logs') def logs(): """Logs page""" log_lines = get_logs(200) return render_template('logs.html', logs=log_lines) @app.route('/api/sync/start', methods=['POST']) def start_sync(): """Start manual sync""" if is_sync_running(): return jsonify({'status': 'error', 'message': 'Sync already running'}), 409 try: # Start sync in background subprocess.Popen( ['/app/sync.sh'], stdout=open(LOG_FILE, 'a'), stderr=subprocess.STDOUT ) return jsonify({'status': 'success', 'message': 'Sync started'}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 @app.route('/api/sync/status') def sync_status(): """Get sync status""" return jsonify({ 'running': is_sync_running() }) @app.route('/api/stats') def api_stats(): """Get channel statistics""" return jsonify(get_channel_stats()) if __name__ == '__main__': # Ensure directories exist os.makedirs(CONFIG_DIR, exist_ok=True) os.makedirs(DOWNLOAD_DIR, exist_ok=True) # Run Flask app app.run(host='0.0.0.0', port=8080, debug=False)