#!/usr/bin/env python3 """Client for the mailprotector skill — Mailprotector CloudFilter REST API. Talks to the live Mailprotector CloudFilter platform at emailservice.io. This is the reseller email-security gateway (CloudFilter delivery + INKY annotation) that ACG layers on top of client Exchange mail flow. Held / quarantined mail, mail-flow logs, allow/block rules, and message release all live behind this API. Auth: Bearer token. The API key is used directly as the bearer token: Authorization: Bearer Credentials are NEVER hardcoded. They are loaded at runtime from the SOPS vault entry `msp-tools/mailprotector.sops.yaml`, or from an environment override. Resolution order: 1. MAILPROTECTOR_API_KEY env 2. vault credentials.api_key (read via bash /.claude/scripts/vault.sh) Transport: prefers httpx if installed, else falls back to stdlib urllib so the skill works on a bare Python install. """ from __future__ import annotations import json import os import subprocess import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any, Optional try: import httpx # type: ignore _HAS_HTTPX = True except ImportError: # pragma: no cover _HAS_HTTPX = False SKILL_DIR = Path(__file__).resolve().parent.parent # .../.claude/skills/mailprotector ERROR_BODY_MAX_CHARS = 1500 DEFAULT_TIMEOUT = 60.0 DEFAULT_CONNECT_TIMEOUT = 15.0 API_BASE_URL = os.environ.get( "MAILPROTECTOR_API_BASE_URL", "https://emailservice.io/api/v1" ) VAULT_ENTRY = "msp-tools/mailprotector.sops.yaml" # The five entity types that have logs / messages / configuration sub-resources. VALID_SCOPES = ("resellers", "customers", "domains", "user_groups", "users") class MailprotectorError(Exception): """Any failure talking to the Mailprotector API or loading credentials.""" # --- repo-root + credential loading ------------------------------------------- def _resolve_claudetools_root() -> Path: """Resolve the ClaudeTools repo root: env var, then identity.json, then derived. Final fallback is derived from this file's location so it works on the Mac/Linux fleet, not only the Windows default. """ # SKILL_DIR = .../.claude/skills/mailprotector ; root is three levels up. derived_root = SKILL_DIR.parent.parent.parent env_root = os.environ.get("CLAUDETOOLS_ROOT") if env_root: return Path(env_root) identity_path = derived_root / ".claude" / "identity.json" if identity_path.exists(): try: data = json.loads(identity_path.read_text(encoding="utf-8")) root = data.get("claudetools_root") if root: return Path(root) except (json.JSONDecodeError, OSError): pass return derived_root def _vault_field(field: str) -> Optional[str]: """Read a single field from the mailprotector vault entry. None if absent. Soft failure: a missing field (vault exits non-zero) returns None so the caller can surface a clean setup error. A missing vault wrapper or bash raises, since that is an environment problem the user must fix. """ root = _resolve_claudetools_root() vault_script = root / ".claude" / "scripts" / "vault.sh" if not vault_script.exists(): raise MailprotectorError( f"vault wrapper not found at {vault_script}; set MAILPROTECTOR_API_KEY " "instead." ) try: completed = subprocess.run( ["bash", str(vault_script), "get-field", VAULT_ENTRY, field], capture_output=True, text=True, timeout=60, ) except FileNotFoundError as exc: raise MailprotectorError( "'bash' not found on PATH. Install Git Bash or set MAILPROTECTOR_API_KEY." ) from exc except subprocess.TimeoutExpired as exc: raise MailprotectorError("vault call timed out.") from exc if completed.returncode != 0: return None value = completed.stdout.strip() return value or None def load_api_key() -> str: """Resolve the Mailprotector API key (bearer token). Resolution order: 1. MAILPROTECTOR_API_KEY env 2. vault credentials.api_key Raises MailprotectorError with setup guidance if nothing resolves. """ env_key = os.environ.get("MAILPROTECTOR_API_KEY") if env_key: return env_key.strip() api_key = _vault_field("credentials.api_key") if api_key: return api_key raise MailprotectorError( "No Mailprotector / CloudFilter credentials found.\n" f" Expected vault entry: {VAULT_ENTRY} with:\n" " credentials.api_key (Bearer token for emailservice.io)\n" " Or set the MAILPROTECTOR_API_KEY environment variable for testing.\n" " Provision a reseller API key in the Mailprotector CloudFilter portal,\n" " then store it in the SOPS vault.\n" " See .claude/skills/mailprotector/SKILL.md for the full setup steps." ) def validate_scope(scope: str) -> str: """Ensure a scope is one of the five valid entity types. Raises otherwise.""" if scope not in VALID_SCOPES: raise MailprotectorError( f"Invalid scope '{scope}'. Must be one of: {', '.join(VALID_SCOPES)}" ) return scope # --- client ------------------------------------------------------------------- class MailprotectorClient: def __init__( self, api_base_url: str = API_BASE_URL, timeout: float = DEFAULT_TIMEOUT, connect_timeout: float = DEFAULT_CONNECT_TIMEOUT, ): self.api_base_url = api_base_url.rstrip("/") self.timeout = timeout self.connect_timeout = connect_timeout self._api_key: Optional[str] = None # -- auth ------------------------------------------------------------------ @property def api_key(self) -> str: if self._api_key is None: self._api_key = load_api_key() return self._api_key # -- core transport -------------------------------------------------------- def request( self, method: str, path: str, params: Optional[dict] = None, json_body: Optional[dict] = None, ) -> Any: """One REST call against the API base. `path` is relative (e.g. 'domains').""" url = f"{self.api_base_url}/{path.lstrip('/')}" if params: # Drop None-valued params so optional filters stay off the query string. clean = {k: v for k, v in params.items() if v is not None} if clean: url = f"{url}?{urllib.parse.urlencode(clean, doseq=True)}" data = json.dumps(json_body).encode("utf-8") if json_body is not None else None headers = {"Accept": "application/json"} if data is not None: headers["Content-Type"] = "application/json" return self._http( method, url, data=data, headers=headers, auth_header=f"Bearer {self.api_key}", ) def _http( self, method: str, url: str, data: Optional[bytes] = None, headers: Optional[dict] = None, auth_header: Optional[str] = None, ) -> Any: hdrs = dict(headers or {}) if auth_header: hdrs["Authorization"] = auth_header if _HAS_HTTPX: try: timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout) with httpx.Client(timeout=timeout) as client: resp = client.request(method, url, content=data, headers=hdrs) resp.raise_for_status() return self._parse(resp.content) except httpx.TimeoutException as exc: raise MailprotectorError(f"request timed out: {exc}") from exc except httpx.HTTPStatusError as exc: detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS] raise MailprotectorError( f"HTTP {exc.response.status_code} {method} {url}: {detail}" ) from exc except httpx.HTTPError as exc: raise MailprotectorError(f"request failed: {exc}") from exc # stdlib fallback req = urllib.request.Request(url, data=data, method=method, headers=hdrs) try: with urllib.request.urlopen(req, timeout=self.timeout) as resp: return self._parse(resp.read()) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS] raise MailprotectorError(f"HTTP {exc.code} {method} {url}: {detail}") from exc except urllib.error.URLError as exc: raise MailprotectorError(f"request failed: {exc}") from exc @staticmethod def _parse(raw: bytes) -> Any: if not raw: return None try: return json.loads(raw.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError): return raw.decode("utf-8", errors="replace") @staticmethod def _q(value: str) -> str: """URL-quote a path segment (an id), keeping it safe in a path position.""" return urllib.parse.quote(str(value), safe="") # ====================================================================== # READ METHODS (safe — always live) # ====================================================================== def status(self) -> Any: """Token validation probe: smallest possible authenticated GET.""" return self.request("GET", "domains", params={"per_page": 1}) def domains( self, scope: Optional[str] = None, entity_id: Optional[str] = None, page: int = 1, per_page: int = 25, ) -> Any: """List domains, globally or scoped under an entity.""" params = {"page": page, "per_page": per_page} if scope and entity_id: validate_scope(scope) return self.request( "GET", f"{scope}/{self._q(entity_id)}/domains", params=params ) return self.request("GET", "domains", params=params) def domain(self, domain_id: str) -> Any: return self.request("GET", f"domains/{self._q(domain_id)}") def customers(self, reseller_id: str, page: int = 1, per_page: int = 25) -> Any: return self.request( "GET", f"resellers/{self._q(reseller_id)}/customers", params={"page": page, "per_page": per_page}, ) def customer(self, customer_id: str) -> Any: return self.request("GET", f"customers/{self._q(customer_id)}") def users( self, scope: str, entity_id: str, page: int = 1, per_page: int = 25 ) -> Any: validate_scope(scope) return self.request( "GET", f"{scope}/{self._q(entity_id)}/users", params={"page": page, "per_page": per_page}, ) def user(self, user_id: str) -> Any: return self.request("GET", f"users/{self._q(user_id)}") def find_user(self, address: str) -> Any: """Find a user / alias by email address. This is a READ despite being a POST — it is NOT gated. """ return self.request( "POST", "users/find_by_address", json_body={"address": address} ) def logs( self, scope: str, entity_id: str, sort_direction: Optional[str] = None, sort_field: Optional[str] = None, page: Optional[int] = None, page_size: Optional[int] = None, sender: Optional[str] = None, recipient: Optional[str] = None, subject: Optional[str] = None, decision: Optional[str] = None, ) -> Any: """Mail-flow logs for an entity (passes through the standard log filters).""" validate_scope(scope) params = { "sort_direction": sort_direction, "sort_field": sort_field, "page": page, "page_size": page_size, "sender": sender, "recipient": recipient, "subject": subject, "decision": decision, } return self.request( "GET", f"{scope}/{self._q(entity_id)}/logs", params=params ) def messages( self, scope: str, entity_id: str, sort_direction: Optional[str] = None, sort_field: Optional[str] = None, page: Optional[int] = None, page_size: Optional[int] = None, sender: Optional[str] = None, recipient: Optional[str] = None, subject: Optional[str] = None, decision: Optional[str] = None, ) -> Any: """Held / quarantined messages for an entity (same filters as logs).""" validate_scope(scope) params = { "sort_direction": sort_direction, "sort_field": sort_field, "page": page, "page_size": page_size, "sender": sender, "recipient": recipient, "subject": subject, "decision": decision, } return self.request( "GET", f"{scope}/{self._q(entity_id)}/messages", params=params ) def configuration(self, scope: str, entity_id: str) -> Any: """Entity configuration (includes permissions.messages.allow_spam_release).""" validate_scope(scope) return self.request("GET", f"{scope}/{self._q(entity_id)}/configuration") def allow_block_rules(self, scope: str, entity_id: str) -> Any: validate_scope(scope) return self.request( "GET", f"{scope}/{self._q(entity_id)}/allow_block_rules" ) # ====================================================================== # WRITE METHODS (gated — the CLI requires --confirm before calling these) # ====================================================================== def release_message( self, message_id: str, recipients: Optional[str] = None ) -> Any: """Release (deliver) one held message. POST /messages/{id}/deliver.""" body: dict = {"include_original_recipients": 1} if recipients: body["recipients"] = recipients return self.request( "POST", f"messages/{self._q(message_id)}/deliver", json_body=body ) def release_many( self, scope: str, entity_id: str, ids: Optional[str] = None, all_selected: bool = False, recipients: Optional[str] = None, ) -> Any: """Bulk-release held messages under an entity. POST .../messages/deliver_many.""" validate_scope(scope) body: dict = { "include_original_recipients": 1, "all_selected": all_selected, "ids": ids or "", } if recipients: body["recipients"] = recipients return self.request( "POST", f"{scope}/{self._q(entity_id)}/messages/deliver_many", json_body=body, ) def add_rule( self, scope: str, entity_id: str, value: str, rule_type: str ) -> Any: """Add an allow / block rule on an entity. POST .../allow_block_rules.""" validate_scope(scope) if rule_type not in ("allow", "block"): raise MailprotectorError("rule_type must be 'allow' or 'block'") return self.request( "POST", f"{scope}/{self._q(entity_id)}/allow_block_rules", json_body={"value": value, "rule_type": rule_type}, ) def enable_release(self, scope: str, entity_id: str) -> Any: """Enable spam release on an entity. PUT .../configuration. Sets permissions.messages.allow_spam_release = true. Without this, the entity's held spam cannot be released. """ validate_scope(scope) return self.request( "PUT", f"{scope}/{self._q(entity_id)}/configuration", json_body={"permissions": {"messages": {"allow_spam_release": True}}}, )