Files
youtube-sync-docker/app.py
azcomputerguru 71ba63ffe3 Fix @handle URL resolution using --flat-playlist
- Updated extract_channel_id() to use --flat-playlist instead of --playlist-items 0
- Append /videos to URLs to ensure playlist metadata is available
- Extract playlist_channel_id and playlist_channel from flat-playlist output
- Increased timeout to 15 seconds for network fetch
- All URL formats now working: direct IDs, /channel/ URLs, @handle URLs

Tested and verified in Docker container.
2026-05-08 19:29:48 -04:00

366 lines
12 KiB
Python

#!/usr/bin/env python3
"""
YouTube Sync Web Interface
Provides a web UI for managing YouTube channel downloads
"""
import os
import subprocess
import json
import re
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', 'youtube-sync-secret-key-change-me')
# Configuration
DOWNLOAD_DIR = os.environ.get('DOWNLOAD_DIR', '/downloads')
CONFIG_DIR = os.environ.get('CONFIG_DIR', '/config')
CHANNELS_FILE = os.path.join(CONFIG_DIR, 'channels.txt')
COOKIES_FILE = os.path.join(CONFIG_DIR, 'cookies.txt')
SETTINGS_FILE = os.path.join(CONFIG_DIR, 'settings.json')
LOG_FILE = '/var/log/youtube-sync.log'
def get_settings():
"""Load settings from file or return defaults"""
defaults = {
'sync_schedule': os.environ.get('SYNC_SCHEDULE', '0 2 * * *'),
'max_quality': os.environ.get('MAX_QUALITY', '1080'),
'sleep_interval': os.environ.get('SLEEP_INTERVAL', '2'),
'timezone': os.environ.get('TZ', 'America/Phoenix')
}
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, 'r') as f:
return json.load(f)
except:
pass
return defaults
def save_settings(settings):
"""Save settings to file"""
with open(SETTINGS_FILE, 'w') as f:
json.dump(settings, f, indent=2)
def extract_channel_id(url_or_id):
"""
Extract channel ID from various YouTube URL formats or validate direct ID
Returns tuple: (channel_id, channel_name_hint or None)
"""
# If it looks like a channel ID already (24 characters starting with UC)
if re.match(r'^UC[\w-]{22}$', url_or_id.strip()):
return url_or_id.strip(), None
# Parse as URL
try:
parsed = urlparse(url_or_id)
# Format: youtube.com/channel/CHANNEL_ID
if '/channel/' in parsed.path:
channel_id = parsed.path.split('/channel/')[-1].split('/')[0]
if channel_id:
return channel_id, None
# Format: youtube.com/@handle or youtube.com/c/name or youtube.com/user/name
# These require fetching the page to get the actual channel ID
if parsed.netloc in ['youtube.com', 'www.youtube.com', 'm.youtube.com']:
# Use yt-dlp with --flat-playlist to extract channel ID without downloading
# Append /videos if not already present to ensure we get playlist metadata
fetch_url = url_or_id
if not fetch_url.endswith('/videos'):
fetch_url = fetch_url.rstrip('/') + '/videos'
try:
result = subprocess.run(
['yt-dlp', '--flat-playlist', '--dump-json', '--playlist-items', '1', fetch_url],
capture_output=True,
text=True,
timeout=15
)
if result.returncode == 0 and result.stdout:
# Get first line of JSON output
lines = [l for l in result.stdout.split('\n') if l.strip()]
if lines:
data = json.loads(lines[0])
# flat-playlist returns playlist_channel_id instead of channel_id
channel_id = data.get('playlist_channel_id') or data.get('channel_id')
channel_name = data.get('playlist_channel') or data.get('channel')
if channel_id:
return channel_id, channel_name
except:
pass
except:
pass
return None, None
def get_channels():
"""Read channels from channels.txt"""
channels = []
if not os.path.exists(CHANNELS_FILE):
return channels
with open(CHANNELS_FILE, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split('|')
if len(parts) == 2:
channels.append({
'id': parts[0].strip(),
'name': parts[1].strip()
})
return channels
def save_channels(channels):
"""Write channels to channels.txt"""
with open(CHANNELS_FILE, 'w') as f:
f.write('# YouTube Channel Configuration\n')
f.write('# Format: CHANNEL_ID|Channel Name\n')
f.write('# One channel per line. Lines starting with # are ignored.\n')
f.write('#\n')
for channel in channels:
f.write(f"{channel['id']}|{channel['name']}\n")
def get_channel_stats():
"""Get statistics for each channel"""
stats = []
for channel in get_channels():
channel_dir = os.path.join(DOWNLOAD_DIR, channel['name'])
video_count = 0
total_size = 0
last_sync = None
if os.path.exists(channel_dir):
# Count videos (mp4 files)
for root, dirs, files in os.walk(channel_dir):
for file in files:
if file.endswith('.mp4'):
video_count += 1
file_path = os.path.join(root, file)
total_size += os.path.getsize(file_path)
# Get last sync time from .downloaded.txt
downloaded_file = os.path.join(channel_dir, '.downloaded.txt')
if os.path.exists(downloaded_file):
last_sync = datetime.fromtimestamp(os.path.getmtime(downloaded_file))
stats.append({
'name': channel['name'],
'id': channel['id'],
'video_count': video_count,
'total_size': format_size(total_size),
'last_sync': last_sync.strftime('%Y-%m-%d %H:%M') if last_sync else 'Never'
})
return stats
def format_size(bytes):
"""Format bytes into human readable size"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.1f} {unit}"
bytes /= 1024.0
return f"{bytes:.1f} PB"
def get_logs(lines=100):
"""Get last N lines from log file"""
if not os.path.exists(LOG_FILE):
return []
try:
result = subprocess.run(
['tail', '-n', str(lines), LOG_FILE],
capture_output=True,
text=True
)
return result.stdout.split('\n')
except:
return []
def is_sync_running():
"""Check if sync script is currently running"""
try:
result = subprocess.run(
['pgrep', '-f', 'sync.sh'],
capture_output=True
)
return result.returncode == 0
except:
return False
@app.route('/')
def index():
"""Dashboard page"""
stats = get_channel_stats()
settings = get_settings()
sync_running = is_sync_running()
has_cookies = os.path.exists(COOKIES_FILE)
return render_template('index.html',
stats=stats,
settings=settings,
sync_running=sync_running,
has_cookies=has_cookies)
@app.route('/channels')
def channels():
"""Channel management page"""
channels = get_channels()
return render_template('channels.html', channels=channels)
@app.route('/channels/add', methods=['POST'])
def add_channel():
"""Add a new channel"""
channel_input = request.form.get('channel_id', '').strip()
channel_name = request.form.get('channel_name', '').strip()
if not channel_input:
flash('Channel ID or URL is required', 'error')
return redirect(url_for('channels'))
# Extract channel ID from URL or validate direct ID
channel_id, auto_name = extract_channel_id(channel_input)
if not channel_id:
flash('Invalid channel ID or URL. Please check and try again.', 'error')
return redirect(url_for('channels'))
# Use auto-detected name if no name was provided
if not channel_name and auto_name:
channel_name = auto_name
elif not channel_name:
flash('Channel name is required (could not auto-detect from URL)', 'error')
return redirect(url_for('channels'))
# Check for duplicates
channels = get_channels()
for existing in channels:
if existing['id'] == channel_id:
flash(f'Channel already exists: {existing["name"]}', 'error')
return redirect(url_for('channels'))
channels.append({'id': channel_id, 'name': channel_name})
save_channels(channels)
flash(f'Added channel: {channel_name} ({channel_id})', 'success')
return redirect(url_for('channels'))
@app.route('/channels/delete/<int:index>')
def delete_channel(index):
"""Delete a channel"""
channels = get_channels()
if 0 <= index < len(channels):
channel_name = channels[index]['name']
del channels[index]
save_channels(channels)
flash(f'Deleted channel: {channel_name}', 'success')
else:
flash('Channel not found', 'error')
return redirect(url_for('channels'))
@app.route('/settings')
def settings():
"""Settings page"""
settings = get_settings()
return render_template('settings.html', settings=settings)
@app.route('/settings/save', methods=['POST'])
def save_settings_route():
"""Save settings"""
settings = {
'sync_schedule': request.form.get('sync_schedule', '0 2 * * *'),
'max_quality': request.form.get('max_quality', '1080'),
'sleep_interval': request.form.get('sleep_interval', '2'),
'timezone': request.form.get('timezone', 'America/Phoenix')
}
save_settings(settings)
flash('Settings saved successfully', 'success')
return redirect(url_for('settings'))
@app.route('/cookies', methods=['GET', 'POST'])
def cookies():
"""Cookie management page"""
if request.method == 'POST':
if 'cookies_file' not in request.files:
flash('No file selected', 'error')
return redirect(url_for('cookies'))
file = request.files['cookies_file']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('cookies'))
file.save(COOKIES_FILE)
flash('Cookies file uploaded successfully', 'success')
return redirect(url_for('index'))
has_cookies = os.path.exists(COOKIES_FILE)
return render_template('cookies.html', has_cookies=has_cookies)
@app.route('/cookies/delete')
def delete_cookies():
"""Delete cookies file"""
if os.path.exists(COOKIES_FILE):
os.remove(COOKIES_FILE)
flash('Cookies file deleted', 'success')
return redirect(url_for('cookies'))
@app.route('/logs')
def logs():
"""Logs page"""
log_lines = get_logs(200)
return render_template('logs.html', logs=log_lines)
@app.route('/api/sync/start', methods=['POST'])
def start_sync():
"""Start manual sync"""
if is_sync_running():
return jsonify({'status': 'error', 'message': 'Sync already running'}), 409
try:
# Start sync in background
subprocess.Popen(
['/app/sync.sh'],
stdout=open(LOG_FILE, 'a'),
stderr=subprocess.STDOUT
)
return jsonify({'status': 'success', 'message': 'Sync started'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/sync/status')
def sync_status():
"""Get sync status"""
return jsonify({
'running': is_sync_running()
})
@app.route('/api/stats')
def api_stats():
"""Get channel statistics"""
return jsonify(get_channel_stats())
if __name__ == '__main__':
# Ensure directories exist
os.makedirs(CONFIG_DIR, exist_ok=True)
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
# Run Flask app
app.run(host='0.0.0.0', port=8080, debug=False)