#!/usr/bin/env python3 """ YouTube Sync Web Interface Provides a web UI for managing YouTube channel downloads """ import os import subprocess import json import re from datetime import datetime from pathlib import Path from urllib.parse import urlparse, parse_qs from flask import Flask, render_template, request, jsonify, redirect, url_for, flash app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'youtube-sync-secret-key-change-me') # Configuration DOWNLOAD_DIR = os.environ.get('DOWNLOAD_DIR', '/downloads') CONFIG_DIR = os.environ.get('CONFIG_DIR', '/config') CHANNELS_FILE = os.path.join(CONFIG_DIR, 'channels.txt') COOKIES_FILE = os.path.join(CONFIG_DIR, 'cookies.txt') SETTINGS_FILE = os.path.join(CONFIG_DIR, 'settings.json') LOG_FILE = '/var/log/youtube-sync.log' def get_settings(): """Load settings from file or return defaults""" defaults = { 'sync_schedule': os.environ.get('SYNC_SCHEDULE', '0 2 * * *'), 'max_quality': os.environ.get('MAX_QUALITY', '1080'), 'sleep_interval': os.environ.get('SLEEP_INTERVAL', '2'), 'timezone': os.environ.get('TZ', 'America/Phoenix') } if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, 'r') as f: return json.load(f) except: pass return defaults def save_settings(settings): """Save settings to file""" with open(SETTINGS_FILE, 'w') as f: json.dump(settings, f, indent=2) def extract_channel_id(url_or_id): """ Extract channel ID from various YouTube URL formats or validate direct ID Returns tuple: (channel_id, channel_name_hint or None) """ # If it looks like a channel ID already (24 characters starting with UC) if re.match(r'^UC[\w-]{22}$', url_or_id.strip()): return url_or_id.strip(), None # Parse as URL try: parsed = urlparse(url_or_id) # Format: youtube.com/channel/CHANNEL_ID if '/channel/' in parsed.path: channel_id = parsed.path.split('/channel/')[-1].split('/')[0] if channel_id: return channel_id, None # Format: youtube.com/@handle or youtube.com/c/name or youtube.com/user/name # These require fetching the page to get the actual channel ID if parsed.netloc in ['youtube.com', 'www.youtube.com', 'm.youtube.com']: # Use yt-dlp to extract channel ID from URL try: result = subprocess.run( ['yt-dlp', '--dump-json', '--playlist-items', '0', url_or_id], capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout: data = json.loads(result.stdout.split('\n')[0]) channel_id = data.get('channel_id') channel_name = data.get('channel') if channel_id: return channel_id, channel_name except: pass except: pass return None, None def get_channels(): """Read channels from channels.txt""" channels = [] if not os.path.exists(CHANNELS_FILE): return channels with open(CHANNELS_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split('|') if len(parts) == 2: channels.append({ 'id': parts[0].strip(), 'name': parts[1].strip() }) return channels def save_channels(channels): """Write channels to channels.txt""" with open(CHANNELS_FILE, 'w') as f: f.write('# YouTube Channel Configuration\n') f.write('# Format: CHANNEL_ID|Channel Name\n') f.write('# One channel per line. Lines starting with # are ignored.\n') f.write('#\n') for channel in channels: f.write(f"{channel['id']}|{channel['name']}\n") def get_channel_stats(): """Get statistics for each channel""" stats = [] for channel in get_channels(): channel_dir = os.path.join(DOWNLOAD_DIR, channel['name']) video_count = 0 total_size = 0 last_sync = None if os.path.exists(channel_dir): # Count videos (mp4 files) for root, dirs, files in os.walk(channel_dir): for file in files: if file.endswith('.mp4'): video_count += 1 file_path = os.path.join(root, file) total_size += os.path.getsize(file_path) # Get last sync time from .downloaded.txt downloaded_file = os.path.join(channel_dir, '.downloaded.txt') if os.path.exists(downloaded_file): last_sync = datetime.fromtimestamp(os.path.getmtime(downloaded_file)) stats.append({ 'name': channel['name'], 'id': channel['id'], 'video_count': video_count, 'total_size': format_size(total_size), 'last_sync': last_sync.strftime('%Y-%m-%d %H:%M') if last_sync else 'Never' }) return stats def format_size(bytes): """Format bytes into human readable size""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes < 1024.0: return f"{bytes:.1f} {unit}" bytes /= 1024.0 return f"{bytes:.1f} PB" def get_logs(lines=100): """Get last N lines from log file""" if not os.path.exists(LOG_FILE): return [] try: result = subprocess.run( ['tail', '-n', str(lines), LOG_FILE], capture_output=True, text=True ) return result.stdout.split('\n') except: return [] def is_sync_running(): """Check if sync script is currently running""" try: result = subprocess.run( ['pgrep', '-f', 'sync.sh'], capture_output=True ) return result.returncode == 0 except: return False @app.route('/') def index(): """Dashboard page""" stats = get_channel_stats() settings = get_settings() sync_running = is_sync_running() has_cookies = os.path.exists(COOKIES_FILE) return render_template('index.html', stats=stats, settings=settings, sync_running=sync_running, has_cookies=has_cookies) @app.route('/channels') def channels(): """Channel management page""" channels = get_channels() return render_template('channels.html', channels=channels) @app.route('/channels/add', methods=['POST']) def add_channel(): """Add a new channel""" channel_input = request.form.get('channel_id', '').strip() channel_name = request.form.get('channel_name', '').strip() if not channel_input: flash('Channel ID or URL is required', 'error') return redirect(url_for('channels')) # Extract channel ID from URL or validate direct ID channel_id, auto_name = extract_channel_id(channel_input) if not channel_id: flash('Invalid channel ID or URL. Please check and try again.', 'error') return redirect(url_for('channels')) # Use auto-detected name if no name was provided if not channel_name and auto_name: channel_name = auto_name elif not channel_name: flash('Channel name is required (could not auto-detect from URL)', 'error') return redirect(url_for('channels')) # Check for duplicates channels = get_channels() for existing in channels: if existing['id'] == channel_id: flash(f'Channel already exists: {existing["name"]}', 'error') return redirect(url_for('channels')) channels.append({'id': channel_id, 'name': channel_name}) save_channels(channels) flash(f'Added channel: {channel_name} ({channel_id})', 'success') return redirect(url_for('channels')) @app.route('/channels/delete/') def delete_channel(index): """Delete a channel""" channels = get_channels() if 0 <= index < len(channels): channel_name = channels[index]['name'] del channels[index] save_channels(channels) flash(f'Deleted channel: {channel_name}', 'success') else: flash('Channel not found', 'error') return redirect(url_for('channels')) @app.route('/settings') def settings(): """Settings page""" settings = get_settings() return render_template('settings.html', settings=settings) @app.route('/settings/save', methods=['POST']) def save_settings_route(): """Save settings""" settings = { 'sync_schedule': request.form.get('sync_schedule', '0 2 * * *'), 'max_quality': request.form.get('max_quality', '1080'), 'sleep_interval': request.form.get('sleep_interval', '2'), 'timezone': request.form.get('timezone', 'America/Phoenix') } save_settings(settings) flash('Settings saved successfully', 'success') return redirect(url_for('settings')) @app.route('/cookies', methods=['GET', 'POST']) def cookies(): """Cookie management page""" if request.method == 'POST': if 'cookies_file' not in request.files: flash('No file selected', 'error') return redirect(url_for('cookies')) file = request.files['cookies_file'] if file.filename == '': flash('No file selected', 'error') return redirect(url_for('cookies')) file.save(COOKIES_FILE) flash('Cookies file uploaded successfully', 'success') return redirect(url_for('index')) has_cookies = os.path.exists(COOKIES_FILE) return render_template('cookies.html', has_cookies=has_cookies) @app.route('/cookies/delete') def delete_cookies(): """Delete cookies file""" if os.path.exists(COOKIES_FILE): os.remove(COOKIES_FILE) flash('Cookies file deleted', 'success') return redirect(url_for('cookies')) @app.route('/logs') def logs(): """Logs page""" log_lines = get_logs(200) return render_template('logs.html', logs=log_lines) @app.route('/api/sync/start', methods=['POST']) def start_sync(): """Start manual sync""" if is_sync_running(): return jsonify({'status': 'error', 'message': 'Sync already running'}), 409 try: # Start sync in background subprocess.Popen( ['/app/sync.sh'], stdout=open(LOG_FILE, 'a'), stderr=subprocess.STDOUT ) return jsonify({'status': 'success', 'message': 'Sync started'}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 @app.route('/api/sync/status') def sync_status(): """Get sync status""" return jsonify({ 'running': is_sync_running() }) @app.route('/api/stats') def api_stats(): """Get channel statistics""" return jsonify(get_channel_stats()) if __name__ == '__main__': # Ensure directories exist os.makedirs(CONFIG_DIR, exist_ok=True) os.makedirs(DOWNLOAD_DIR, exist_ok=True) # Run Flask app app.run(host='0.0.0.0', port=8080, debug=False)