#!/usr/bin/env python3
"""
TickTick OAuth 2.0 Authentication Script
Performs the one-time OAuth flow to obtain access and refresh tokens from TickTick.
Reads client credentials from the SOPS vault, opens a browser for user authorization,
captures the callback on a local HTTP server, exchanges the code for tokens, and
saves them to an encrypted local file.
Usage:
python ticktick_auth.py
"""
import json
import os
import secrets
import stat
import subprocess
import sys
import threading
import time
import webbrowser
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from urllib.parse import urlencode, urlparse, parse_qs
from urllib.request import Request, urlopen
from html import escape as html_escape
from urllib.error import URLError, HTTPError
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
VAULT_SCRIPT = "D:/vault/scripts/vault.sh"
VAULT_ENTRY = "services/ticktick.sops.yaml"
AUTH_URL = "https://ticktick.com/oauth/authorize"
TOKEN_URL = "https://ticktick.com/oauth/token"
REDIRECT_URI = "http://localhost:9876/callback"
SCOPES = "tasks:read tasks:write"
CALLBACK_PORT = 9876
CALLBACK_TIMEOUT_SECONDS = 60
TOKEN_FILE = Path(__file__).resolve().parent / ".tokens.json"
# ---------------------------------------------------------------------------
# Vault credential retrieval
# ---------------------------------------------------------------------------
def vault_get_field(field: str) -> str:
"""Retrieve a single field from the SOPS vault entry."""
try:
result = subprocess.run(
["bash", VAULT_SCRIPT, "get-field", VAULT_ENTRY, field],
capture_output=True,
text=True,
timeout=15,
)
except FileNotFoundError:
print(f"[ERROR] Could not find bash or vault script at {VAULT_SCRIPT}")
sys.exit(1)
except subprocess.TimeoutExpired:
print(f"[ERROR] Vault command timed out while retrieving {field}")
sys.exit(1)
if result.returncode != 0:
stderr = result.stderr.strip()
print(f"[ERROR] Vault returned non-zero exit code for field '{field}'")
if stderr:
print(f" {stderr}")
sys.exit(1)
value = result.stdout.strip()
if not value:
print(f"[ERROR] Vault returned empty value for field '{field}'")
sys.exit(1)
return value
# ---------------------------------------------------------------------------
# Callback HTTP server
# ---------------------------------------------------------------------------
class _CallbackState:
"""Shared mutable state between the HTTP handler and the main thread."""
def __init__(self) -> None:
self.authorization_code: str | None = None
self.error: str | None = None
self.received = threading.Event()
class _CallbackHandler(BaseHTTPRequestHandler):
"""Handles the OAuth redirect callback from TickTick."""
state: _CallbackState # set on the class before the server starts
expected_csrf: str # set on the class before the server starts
def do_GET(self) -> None: # noqa: N802 – required method name
parsed = urlparse(self.path)
if parsed.path != "/callback":
self._respond(404, "Not found")
return
params = parse_qs(parsed.query)
# Check for error response from provider
if "error" in params:
error_msg = params["error"][0]
description = params.get("error_description", [""])[0]
self.state.error = f"{error_msg}: {description}" if description else error_msg
self._respond(400, f"Authorization failed: {self.state.error}")
self.state.received.set()
return
# Validate CSRF state parameter
returned_state = params.get("state", [None])[0]
if returned_state != self.expected_csrf:
self.state.error = "CSRF state mismatch -- possible request forgery"
self._respond(400, self.state.error)
self.state.received.set()
return
# Extract authorization code
code = params.get("code", [None])[0]
if not code:
self.state.error = "No authorization code in callback"
self._respond(400, self.state.error)
self.state.received.set()
return
self.state.authorization_code = code
self._respond(
200,
"Authorization successful! You can close this tab and return to the terminal.",
)
self.state.received.set()
def _respond(self, status: int, body: str) -> None:
self.send_response(status)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
html = (
"
TickTick Auth"
f"{html_escape(body)}
"
)
self.wfile.write(html.encode("utf-8"))
# Silence default request logging
def log_message(self, format: str, *args: object) -> None: # noqa: A002
pass
# ---------------------------------------------------------------------------
# Token exchange
# ---------------------------------------------------------------------------
def exchange_code_for_tokens(
code: str,
client_id: str,
client_secret: str,
) -> dict:
"""Exchange an authorization code for access and refresh tokens."""
body = urlencode({
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": client_id,
"client_secret": client_secret,
"scope": SCOPES,
}).encode("utf-8")
request = Request(
TOKEN_URL,
data=body,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
method="POST",
)
try:
with urlopen(request, timeout=15) as response:
data = json.loads(response.read().decode("utf-8"))
except HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
print(f"[ERROR] Token exchange failed (HTTP {exc.code})")
print(f" Response: {error_body}")
sys.exit(1)
except URLError as exc:
print(f"[ERROR] Could not reach token endpoint: {exc.reason}")
sys.exit(1)
except json.JSONDecodeError:
print("[ERROR] Token endpoint returned invalid JSON")
sys.exit(1)
if "access_token" not in data:
print("[ERROR] Token response missing 'access_token'")
print(f" Full response: {json.dumps(data, indent=2)}")
sys.exit(1)
return data
# ---------------------------------------------------------------------------
# Token persistence
# ---------------------------------------------------------------------------
def save_tokens(token_data: dict) -> None:
"""Persist tokens to a local JSON file."""
payload = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token", ""),
"token_type": token_data.get("token_type", "bearer"),
"obtained_at": datetime.now(timezone.utc).isoformat(),
}
TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
TOKEN_FILE.write_text(json.dumps(payload, indent=2), encoding="utf-8")
# Restrict file permissions (owner read/write only)
try:
TOKEN_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass # Windows may not support POSIX permissions
print(f"[OK] Tokens saved to {TOKEN_FILE}")
# ---------------------------------------------------------------------------
# Main flow
# ---------------------------------------------------------------------------
def main() -> None:
print("[INFO] TickTick OAuth 2.0 Authentication")
print("=" * 50)
# -- 1. Read credentials from SOPS vault ----------------------------------
print("[INFO] Reading credentials from SOPS vault ...")
client_id = vault_get_field("credentials.client_id")
client_secret = vault_get_field("credentials.client_secret")
print(f"[OK] Client ID retrieved (ends ...{client_id[-4:]})")
# -- 2. Prepare CSRF state and authorization URL --------------------------
csrf_state = secrets.token_urlsafe(32)
auth_params = urlencode({
"client_id": client_id,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": SCOPES,
"state": csrf_state,
})
full_auth_url = f"{AUTH_URL}?{auth_params}"
# -- 3. Start local callback server ---------------------------------------
callback_state = _CallbackState()
_CallbackHandler.state = callback_state
_CallbackHandler.expected_csrf = csrf_state
try:
server = HTTPServer(("127.0.0.1", CALLBACK_PORT), _CallbackHandler)
except OSError as exc:
print(f"[ERROR] Could not start callback server on port {CALLBACK_PORT}: {exc}")
sys.exit(1)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
print(f"[OK] Callback server listening on http://127.0.0.1:{CALLBACK_PORT}/callback")
# -- 4. Open browser for authorization ------------------------------------
print("[INFO] Opening browser for TickTick authorization ...")
print(f"[INFO] If the browser does not open, visit this URL manually:")
print(f" {full_auth_url}")
webbrowser.open(full_auth_url)
# -- 5. Wait for callback -------------------------------------------------
print(f"[INFO] Waiting up to {CALLBACK_TIMEOUT_SECONDS}s for authorization callback ...")
received = callback_state.received.wait(timeout=CALLBACK_TIMEOUT_SECONDS)
server.shutdown()
if not received:
print(f"[ERROR] Timed out after {CALLBACK_TIMEOUT_SECONDS}s waiting for callback")
print(" Make sure you completed the authorization in your browser.")
sys.exit(1)
if callback_state.error:
print(f"[ERROR] Authorization failed: {callback_state.error}")
sys.exit(1)
code = callback_state.authorization_code
if not code:
print("[ERROR] No authorization code received (unknown error)")
sys.exit(1)
print("[OK] Authorization code received")
# -- 6. Exchange code for tokens ------------------------------------------
print("[INFO] Exchanging authorization code for tokens ...")
token_data = exchange_code_for_tokens(code, client_id, client_secret)
print("[OK] Token exchange successful")
# -- 7. Save tokens -------------------------------------------------------
save_tokens(token_data)
print("=" * 50)
print("[OK] TickTick authentication complete")
if __name__ == "__main__":
main()