Files
claudetools/mcp-servers/ticktick/ticktick_auth.py
Mike Swanson b26e185a80 Add TickTick integration, MCP server, and dev project tracking
New integration with TickTick API for project/task management:
- OAuth 2.0 auth flow (mcp-servers/ticktick/ticktick_auth.py)
- MCP server with 9 tools for Claude Code (ticktick_mcp.py)
- FastAPI service with SOPS vault credentials (api/services/ticktick_service.py)
- JWT-protected REST router at /api/ticktick/ (api/routers/ticktick.py)
- Credentials stored in SOPS vault (services/ticktick.sops.yaml)

Dev project tracking (hybrid TickTick + DB):
- New dev_projects table migration (14 columns, status index)
- TickTick "Dev Projects" list for mobile visibility
- First project seeded: TickTick Integration (linked both sides)

Security: .tokens.json gitignored, token file permissions restricted,
HTML-escaped OAuth callback, SOPS vault (not env vars) for secrets.

Also: Installed Tailscale on ACG-5070 for office network access.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 10:08:53 -07:00

314 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
TickTick OAuth 2.0 Authentication Script
Performs the one-time OAuth flow to obtain access and refresh tokens from TickTick.
Reads client credentials from the SOPS vault, opens a browser for user authorization,
captures the callback on a local HTTP server, exchanges the code for tokens, and
saves them to an encrypted local file.
Usage:
python ticktick_auth.py
"""
import json
import os
import secrets
import stat
import subprocess
import sys
import threading
import time
import webbrowser
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from urllib.parse import urlencode, urlparse, parse_qs
from urllib.request import Request, urlopen
from html import escape as html_escape
from urllib.error import URLError, HTTPError
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
VAULT_SCRIPT = "D:/vault/scripts/vault.sh"
VAULT_ENTRY = "services/ticktick.sops.yaml"
AUTH_URL = "https://ticktick.com/oauth/authorize"
TOKEN_URL = "https://ticktick.com/oauth/token"
REDIRECT_URI = "http://localhost:9876/callback"
SCOPES = "tasks:read tasks:write"
CALLBACK_PORT = 9876
CALLBACK_TIMEOUT_SECONDS = 60
TOKEN_FILE = Path(__file__).resolve().parent / ".tokens.json"
# ---------------------------------------------------------------------------
# Vault credential retrieval
# ---------------------------------------------------------------------------
def vault_get_field(field: str) -> str:
"""Retrieve a single field from the SOPS vault entry."""
try:
result = subprocess.run(
["bash", VAULT_SCRIPT, "get-field", VAULT_ENTRY, field],
capture_output=True,
text=True,
timeout=15,
)
except FileNotFoundError:
print(f"[ERROR] Could not find bash or vault script at {VAULT_SCRIPT}")
sys.exit(1)
except subprocess.TimeoutExpired:
print(f"[ERROR] Vault command timed out while retrieving {field}")
sys.exit(1)
if result.returncode != 0:
stderr = result.stderr.strip()
print(f"[ERROR] Vault returned non-zero exit code for field '{field}'")
if stderr:
print(f" {stderr}")
sys.exit(1)
value = result.stdout.strip()
if not value:
print(f"[ERROR] Vault returned empty value for field '{field}'")
sys.exit(1)
return value
# ---------------------------------------------------------------------------
# Callback HTTP server
# ---------------------------------------------------------------------------
class _CallbackState:
"""Shared mutable state between the HTTP handler and the main thread."""
def __init__(self) -> None:
self.authorization_code: str | None = None
self.error: str | None = None
self.received = threading.Event()
class _CallbackHandler(BaseHTTPRequestHandler):
"""Handles the OAuth redirect callback from TickTick."""
state: _CallbackState # set on the class before the server starts
expected_csrf: str # set on the class before the server starts
def do_GET(self) -> None: # noqa: N802 required method name
parsed = urlparse(self.path)
if parsed.path != "/callback":
self._respond(404, "Not found")
return
params = parse_qs(parsed.query)
# Check for error response from provider
if "error" in params:
error_msg = params["error"][0]
description = params.get("error_description", [""])[0]
self.state.error = f"{error_msg}: {description}" if description else error_msg
self._respond(400, f"Authorization failed: {self.state.error}")
self.state.received.set()
return
# Validate CSRF state parameter
returned_state = params.get("state", [None])[0]
if returned_state != self.expected_csrf:
self.state.error = "CSRF state mismatch -- possible request forgery"
self._respond(400, self.state.error)
self.state.received.set()
return
# Extract authorization code
code = params.get("code", [None])[0]
if not code:
self.state.error = "No authorization code in callback"
self._respond(400, self.state.error)
self.state.received.set()
return
self.state.authorization_code = code
self._respond(
200,
"Authorization successful! You can close this tab and return to the terminal.",
)
self.state.received.set()
def _respond(self, status: int, body: str) -> None:
self.send_response(status)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
html = (
"<!DOCTYPE html><html><head><title>TickTick Auth</title></head>"
f"<body><h2>{html_escape(body)}</h2></body></html>"
)
self.wfile.write(html.encode("utf-8"))
# Silence default request logging
def log_message(self, format: str, *args: object) -> None: # noqa: A002
pass
# ---------------------------------------------------------------------------
# Token exchange
# ---------------------------------------------------------------------------
def exchange_code_for_tokens(
code: str,
client_id: str,
client_secret: str,
) -> dict:
"""Exchange an authorization code for access and refresh tokens."""
body = urlencode({
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": client_id,
"client_secret": client_secret,
"scope": SCOPES,
}).encode("utf-8")
request = Request(
TOKEN_URL,
data=body,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
method="POST",
)
try:
with urlopen(request, timeout=15) as response:
data = json.loads(response.read().decode("utf-8"))
except HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
print(f"[ERROR] Token exchange failed (HTTP {exc.code})")
print(f" Response: {error_body}")
sys.exit(1)
except URLError as exc:
print(f"[ERROR] Could not reach token endpoint: {exc.reason}")
sys.exit(1)
except json.JSONDecodeError:
print("[ERROR] Token endpoint returned invalid JSON")
sys.exit(1)
if "access_token" not in data:
print("[ERROR] Token response missing 'access_token'")
print(f" Full response: {json.dumps(data, indent=2)}")
sys.exit(1)
return data
# ---------------------------------------------------------------------------
# Token persistence
# ---------------------------------------------------------------------------
def save_tokens(token_data: dict) -> None:
"""Persist tokens to a local JSON file."""
payload = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token", ""),
"token_type": token_data.get("token_type", "bearer"),
"obtained_at": datetime.now(timezone.utc).isoformat(),
}
TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
TOKEN_FILE.write_text(json.dumps(payload, indent=2), encoding="utf-8")
# Restrict file permissions (owner read/write only)
try:
TOKEN_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass # Windows may not support POSIX permissions
print(f"[OK] Tokens saved to {TOKEN_FILE}")
# ---------------------------------------------------------------------------
# Main flow
# ---------------------------------------------------------------------------
def main() -> None:
print("[INFO] TickTick OAuth 2.0 Authentication")
print("=" * 50)
# -- 1. Read credentials from SOPS vault ----------------------------------
print("[INFO] Reading credentials from SOPS vault ...")
client_id = vault_get_field("credentials.client_id")
client_secret = vault_get_field("credentials.client_secret")
print(f"[OK] Client ID retrieved (ends ...{client_id[-4:]})")
# -- 2. Prepare CSRF state and authorization URL --------------------------
csrf_state = secrets.token_urlsafe(32)
auth_params = urlencode({
"client_id": client_id,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": SCOPES,
"state": csrf_state,
})
full_auth_url = f"{AUTH_URL}?{auth_params}"
# -- 3. Start local callback server ---------------------------------------
callback_state = _CallbackState()
_CallbackHandler.state = callback_state
_CallbackHandler.expected_csrf = csrf_state
try:
server = HTTPServer(("127.0.0.1", CALLBACK_PORT), _CallbackHandler)
except OSError as exc:
print(f"[ERROR] Could not start callback server on port {CALLBACK_PORT}: {exc}")
sys.exit(1)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
print(f"[OK] Callback server listening on http://127.0.0.1:{CALLBACK_PORT}/callback")
# -- 4. Open browser for authorization ------------------------------------
print("[INFO] Opening browser for TickTick authorization ...")
print(f"[INFO] If the browser does not open, visit this URL manually:")
print(f" {full_auth_url}")
webbrowser.open(full_auth_url)
# -- 5. Wait for callback -------------------------------------------------
print(f"[INFO] Waiting up to {CALLBACK_TIMEOUT_SECONDS}s for authorization callback ...")
received = callback_state.received.wait(timeout=CALLBACK_TIMEOUT_SECONDS)
server.shutdown()
if not received:
print(f"[ERROR] Timed out after {CALLBACK_TIMEOUT_SECONDS}s waiting for callback")
print(" Make sure you completed the authorization in your browser.")
sys.exit(1)
if callback_state.error:
print(f"[ERROR] Authorization failed: {callback_state.error}")
sys.exit(1)
code = callback_state.authorization_code
if not code:
print("[ERROR] No authorization code received (unknown error)")
sys.exit(1)
print("[OK] Authorization code received")
# -- 6. Exchange code for tokens ------------------------------------------
print("[INFO] Exchanging authorization code for tokens ...")
token_data = exchange_code_for_tokens(code, client_id, client_secret)
print("[OK] Token exchange successful")
# -- 7. Save tokens -------------------------------------------------------
save_tokens(token_data)
print("=" * 50)
print("[OK] TickTick authentication complete")
if __name__ == "__main__":
main()