feat(skills): add /mailprotector — CloudFilter held-mail search + release

Live Mailprotector CloudFilter REST client (emailservice.io/api/v1,
Bearer auth via vault msp-tools/mailprotector.sops.yaml). Lists mail-flow
logs and held/quarantined messages across client domains and releases them
(POST messages/{id}/deliver, deliver_many). Read-only by default; every
release/rule-add/config-change gated behind --confirm. Mirrors the
packetdial skill pattern. Built after diagnosing a Dataforth held-outbound
message that never reached ACG.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-05 07:03:47 -07:00
parent bf58675142
commit ce9744832d
4 changed files with 1086 additions and 0 deletions

View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
"""CLI for the mailprotector skill — Mailprotector CloudFilter REST API.
Read subcommands run freely. Write subcommands (release, release-many, add-rule,
enable-release, raw with a non-GET method) refuse to run unless --confirm is
passed; without it they print what they WOULD do and exit non-zero.
NOTE: find-user is a READ even though it is a POST under the hood — it is NOT
gated.
Read examples:
py mp.py status
py mp.py domains
py mp.py domain <domain_id>
py mp.py customers <reseller_id>
py mp.py users <scope> <id>
py mp.py find-user user@client.com
py mp.py logs <scope> <id> --sender boss@vendor.com --decision quarantine_spam
py mp.py messages <scope> <id> --recipient ceo@client.com
py mp.py config <scope> <id>
py mp.py rules <scope> <id>
Write examples (all require --confirm):
py mp.py release <message_id> --confirm
py mp.py release-many <scope> <id> --ids 111,222,333 --confirm
py mp.py add-rule <scope> <id> --value vendor.com --type allow --confirm
py mp.py enable-release <scope> <id> --confirm
Escape hatch (raw request against any path; non-GET requires --confirm):
py mp.py raw GET domains/123/logs
py mp.py raw POST messages/999/deliver --body '{...}' --confirm
`scope` values are validated against:
resellers, customers, domains, user_groups, users
"""
from __future__ import annotations
import argparse
import json
import sys
from mp_client import MailprotectorClient, MailprotectorError, VALID_SCOPES
def _emit(obj) -> None:
print(json.dumps(obj, indent=2, ensure_ascii=False, default=str))
def _parse_body(raw: str | None) -> dict | None:
if raw is None:
return None
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise SystemExit(f"--body is not valid JSON: {exc}")
if not isinstance(parsed, dict):
raise SystemExit("--body must be a JSON object")
return parsed
def _require_confirm(args, action: str, detail: str) -> None:
if not getattr(args, "confirm", False):
print(f"[DRY RUN] Would {action}: {detail}")
print("Refusing to perform a write without --confirm. Re-run with --confirm.")
raise SystemExit(2)
def _add_log_filters(sp) -> None:
"""Attach the shared logs/messages filter flags to a subparser."""
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--sender")
sp.add_argument("--recipient")
sp.add_argument("--subject")
sp.add_argument(
"--decision",
choices=[
"default",
"deliver",
"quarantine_spam",
"quarantine_virus",
"quarantine_policy",
"bounce",
"encrypt",
"delete",
],
)
sp.add_argument(
"--sort-field",
dest="sort_field",
choices=[
"@timestamp",
"prime.direction",
"prime.from_header_raw",
"prime.recipient",
"prime.subject",
"prime.decision",
"prime.score",
],
)
sp.add_argument(
"--sort-direction", dest="sort_direction", choices=["desc", "asc"]
)
sp.add_argument("--page", type=int)
sp.add_argument("--page-size", dest="page_size", type=int)
def main(argv=None) -> int:
p = argparse.ArgumentParser(
prog="mp.py", description="Mailprotector CloudFilter REST API CLI"
)
p.add_argument("--json", action="store_true", help="emit raw JSON (default)")
sub = p.add_subparsers(dest="cmd", required=True)
# --- read ---
sub.add_parser("status", help="validate token (GET /domains per_page=1)")
sp = sub.add_parser("domains", help="list domains (global or scoped)")
sp.add_argument("--scope", choices=VALID_SCOPES)
sp.add_argument("--id", help="entity id (required if --scope given)")
sp.add_argument("--page", type=int, default=1)
sp.add_argument("--per-page", dest="per_page", type=int, default=25)
sp = sub.add_parser("domain", help="one domain")
sp.add_argument("domain_id")
sp = sub.add_parser("customers", help="customers under a reseller")
sp.add_argument("reseller_id")
sp.add_argument("--page", type=int, default=1)
sp.add_argument("--per-page", dest="per_page", type=int, default=25)
sp = sub.add_parser("customer", help="one customer")
sp.add_argument("customer_id")
sp = sub.add_parser("users", help="users under an entity")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--page", type=int, default=1)
sp.add_argument("--per-page", dest="per_page", type=int, default=25)
sp = sub.add_parser("user", help="one user")
sp.add_argument("user_id")
sp = sub.add_parser("find-user", help="find a user/alias by email address")
sp.add_argument("address")
sp = sub.add_parser("logs", help="mail-flow logs for an entity")
_add_log_filters(sp)
sp = sub.add_parser("messages", help="held/quarantined messages for an entity")
_add_log_filters(sp)
sp = sub.add_parser("config", help="entity configuration")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp = sub.add_parser("rules", help="allow/block rules for an entity")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
# --- write (gated) ---
sp = sub.add_parser("release", help="release one held message")
sp.add_argument("message_id")
sp.add_argument("--recipients", help="optional csv of override recipients")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("release-many", help="bulk-release held messages")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--ids", help="csv of message ids to release")
sp.add_argument("--all", action="store_true", help="release all selected")
sp.add_argument("--recipients", help="optional csv of override recipients")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("add-rule", help="add an allow/block rule")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--value", required=True)
sp.add_argument("--type", dest="rule_type", required=True, choices=["allow", "block"])
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser(
"enable-release", help="enable spam release on an entity (allow_spam_release)"
)
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--confirm", action="store_true")
# --- raw escape hatch ---
sp = sub.add_parser("raw", help="raw request against any path")
sp.add_argument("method", choices=["GET", "POST", "PUT", "PATCH", "DELETE"])
sp.add_argument("path", help="relative path, e.g. domains/123/logs")
sp.add_argument("--body")
sp.add_argument("--confirm", action="store_true")
args = p.parse_args(argv)
client = MailprotectorClient()
try:
if args.cmd == "status":
result = client.status()
_emit({"status": "ok", "auth": "valid", "sample": result})
elif args.cmd == "domains":
if args.scope and not args.id:
raise SystemExit("--id is required when --scope is given")
_emit(
client.domains(
scope=args.scope,
entity_id=args.id,
page=args.page,
per_page=args.per_page,
)
)
elif args.cmd == "domain":
_emit(client.domain(args.domain_id))
elif args.cmd == "customers":
_emit(
client.customers(
args.reseller_id, page=args.page, per_page=args.per_page
)
)
elif args.cmd == "customer":
_emit(client.customer(args.customer_id))
elif args.cmd == "users":
_emit(
client.users(
args.scope, args.id, page=args.page, per_page=args.per_page
)
)
elif args.cmd == "user":
_emit(client.user(args.user_id))
elif args.cmd == "find-user":
_emit(client.find_user(args.address))
elif args.cmd == "logs":
_emit(
client.logs(
args.scope,
args.id,
sort_direction=args.sort_direction,
sort_field=args.sort_field,
page=args.page,
page_size=args.page_size,
sender=args.sender,
recipient=args.recipient,
subject=args.subject,
decision=args.decision,
)
)
elif args.cmd == "messages":
_emit(
client.messages(
args.scope,
args.id,
sort_direction=args.sort_direction,
sort_field=args.sort_field,
page=args.page,
page_size=args.page_size,
sender=args.sender,
recipient=args.recipient,
subject=args.subject,
decision=args.decision,
)
)
elif args.cmd == "config":
_emit(client.configuration(args.scope, args.id))
elif args.cmd == "rules":
_emit(client.allow_block_rules(args.scope, args.id))
elif args.cmd == "release":
detail = args.message_id
if args.recipients:
detail += f" -> {args.recipients}"
_require_confirm(args, "RELEASE held message", detail)
_emit(client.release_message(args.message_id, recipients=args.recipients))
elif args.cmd == "release-many":
if not args.ids and not args.all:
raise SystemExit("release-many requires --ids <csv> or --all")
target = "ALL selected" if args.all else f"ids={args.ids}"
_require_confirm(
args, "BULK RELEASE held messages", f"{args.scope}/{args.id}: {target}"
)
_emit(
client.release_many(
args.scope,
args.id,
ids=args.ids,
all_selected=args.all,
recipients=args.recipients,
)
)
elif args.cmd == "add-rule":
_require_confirm(
args,
f"add {args.rule_type} rule",
f"{args.scope}/{args.id}: {args.value}",
)
_emit(
client.add_rule(args.scope, args.id, args.value, args.rule_type)
)
elif args.cmd == "enable-release":
_require_confirm(
args,
"enable spam release (allow_spam_release=true)",
f"{args.scope}/{args.id}",
)
_emit(client.enable_release(args.scope, args.id))
elif args.cmd == "raw":
body = _parse_body(args.body)
if args.method != "GET":
_require_confirm(args, f"{args.method} {args.path}", json.dumps(body))
_emit(client.request(args.method, args.path, json_body=body))
else:
p.error(f"unknown command {args.cmd}")
except MailprotectorError as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,443 @@
#!/usr/bin/env python3
"""Client for the mailprotector skill — Mailprotector CloudFilter REST API.
Talks to the live Mailprotector CloudFilter platform at emailservice.io. This is
the reseller email-security gateway (CloudFilter delivery + INKY annotation) that
ACG layers on top of client Exchange mail flow. Held / quarantined mail, mail-flow
logs, allow/block rules, and message release all live behind this API.
Auth: Bearer token. The API key is used directly as the bearer token:
Authorization: Bearer <api_key>
Credentials are NEVER hardcoded. They are loaded at runtime from the SOPS vault
entry `msp-tools/mailprotector.sops.yaml`, or from an environment override.
Resolution order:
1. MAILPROTECTOR_API_KEY env
2. vault credentials.api_key (read via bash <root>/.claude/scripts/vault.sh)
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
skill works on a bare Python install.
"""
from __future__ import annotations
import json
import os
import subprocess
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Optional
try:
import httpx # type: ignore
_HAS_HTTPX = True
except ImportError: # pragma: no cover
_HAS_HTTPX = False
SKILL_DIR = Path(__file__).resolve().parent.parent # .../.claude/skills/mailprotector
ERROR_BODY_MAX_CHARS = 1500
DEFAULT_TIMEOUT = 60.0
DEFAULT_CONNECT_TIMEOUT = 15.0
API_BASE_URL = os.environ.get(
"MAILPROTECTOR_API_BASE_URL", "https://emailservice.io/api/v1"
)
VAULT_ENTRY = "msp-tools/mailprotector.sops.yaml"
# The five entity types that have logs / messages / configuration sub-resources.
VALID_SCOPES = ("resellers", "customers", "domains", "user_groups", "users")
class MailprotectorError(Exception):
"""Any failure talking to the Mailprotector API or loading credentials."""
# --- repo-root + credential loading -------------------------------------------
def _resolve_claudetools_root() -> Path:
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived.
Final fallback is derived from this file's location so it works on the
Mac/Linux fleet, not only the Windows default.
"""
# SKILL_DIR = .../.claude/skills/mailprotector ; root is three levels up.
derived_root = SKILL_DIR.parent.parent.parent
env_root = os.environ.get("CLAUDETOOLS_ROOT")
if env_root:
return Path(env_root)
identity_path = derived_root / ".claude" / "identity.json"
if identity_path.exists():
try:
data = json.loads(identity_path.read_text(encoding="utf-8"))
root = data.get("claudetools_root")
if root:
return Path(root)
except (json.JSONDecodeError, OSError):
pass
return derived_root
def _vault_field(field: str) -> Optional[str]:
"""Read a single field from the mailprotector vault entry. None if absent.
Soft failure: a missing field (vault exits non-zero) returns None so the
caller can surface a clean setup error. A missing vault wrapper or bash
raises, since that is an environment problem the user must fix.
"""
root = _resolve_claudetools_root()
vault_script = root / ".claude" / "scripts" / "vault.sh"
if not vault_script.exists():
raise MailprotectorError(
f"vault wrapper not found at {vault_script}; set MAILPROTECTOR_API_KEY "
"instead."
)
try:
completed = subprocess.run(
["bash", str(vault_script), "get-field", VAULT_ENTRY, field],
capture_output=True,
text=True,
timeout=60,
)
except FileNotFoundError as exc:
raise MailprotectorError(
"'bash' not found on PATH. Install Git Bash or set MAILPROTECTOR_API_KEY."
) from exc
except subprocess.TimeoutExpired as exc:
raise MailprotectorError("vault call timed out.") from exc
if completed.returncode != 0:
return None
value = completed.stdout.strip()
return value or None
def load_api_key() -> str:
"""Resolve the Mailprotector API key (bearer token).
Resolution order:
1. MAILPROTECTOR_API_KEY env
2. vault credentials.api_key
Raises MailprotectorError with setup guidance if nothing resolves.
"""
env_key = os.environ.get("MAILPROTECTOR_API_KEY")
if env_key:
return env_key.strip()
api_key = _vault_field("credentials.api_key")
if api_key:
return api_key
raise MailprotectorError(
"No Mailprotector / CloudFilter credentials found.\n"
f" Expected vault entry: {VAULT_ENTRY} with:\n"
" credentials.api_key (Bearer token for emailservice.io)\n"
" Or set the MAILPROTECTOR_API_KEY environment variable for testing.\n"
" Provision a reseller API key in the Mailprotector CloudFilter portal,\n"
" then store it in the SOPS vault.\n"
" See .claude/skills/mailprotector/SKILL.md for the full setup steps."
)
def validate_scope(scope: str) -> str:
"""Ensure a scope is one of the five valid entity types. Raises otherwise."""
if scope not in VALID_SCOPES:
raise MailprotectorError(
f"Invalid scope '{scope}'. Must be one of: {', '.join(VALID_SCOPES)}"
)
return scope
# --- client -------------------------------------------------------------------
class MailprotectorClient:
def __init__(
self,
api_base_url: str = API_BASE_URL,
timeout: float = DEFAULT_TIMEOUT,
connect_timeout: float = DEFAULT_CONNECT_TIMEOUT,
):
self.api_base_url = api_base_url.rstrip("/")
self.timeout = timeout
self.connect_timeout = connect_timeout
self._api_key: Optional[str] = None
# -- auth ------------------------------------------------------------------
@property
def api_key(self) -> str:
if self._api_key is None:
self._api_key = load_api_key()
return self._api_key
# -- core transport --------------------------------------------------------
def request(
self,
method: str,
path: str,
params: Optional[dict] = None,
json_body: Optional[dict] = None,
) -> Any:
"""One REST call against the API base. `path` is relative (e.g. 'domains')."""
url = f"{self.api_base_url}/{path.lstrip('/')}"
if params:
# Drop None-valued params so optional filters stay off the query string.
clean = {k: v for k, v in params.items() if v is not None}
if clean:
url = f"{url}?{urllib.parse.urlencode(clean, doseq=True)}"
data = json.dumps(json_body).encode("utf-8") if json_body is not None else None
headers = {"Accept": "application/json"}
if data is not None:
headers["Content-Type"] = "application/json"
return self._http(
method, url, data=data, headers=headers,
auth_header=f"Bearer {self.api_key}",
)
def _http(
self,
method: str,
url: str,
data: Optional[bytes] = None,
headers: Optional[dict] = None,
auth_header: Optional[str] = None,
) -> Any:
hdrs = dict(headers or {})
if auth_header:
hdrs["Authorization"] = auth_header
if _HAS_HTTPX:
try:
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
with httpx.Client(timeout=timeout) as client:
resp = client.request(method, url, content=data, headers=hdrs)
resp.raise_for_status()
return self._parse(resp.content)
except httpx.TimeoutException as exc:
raise MailprotectorError(f"request timed out: {exc}") from exc
except httpx.HTTPStatusError as exc:
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
raise MailprotectorError(
f"HTTP {exc.response.status_code} {method} {url}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise MailprotectorError(f"request failed: {exc}") from exc
# stdlib fallback
req = urllib.request.Request(url, data=data, method=method, headers=hdrs)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
return self._parse(resp.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
raise MailprotectorError(f"HTTP {exc.code} {method} {url}: {detail}") from exc
except urllib.error.URLError as exc:
raise MailprotectorError(f"request failed: {exc}") from exc
@staticmethod
def _parse(raw: bytes) -> Any:
if not raw:
return None
try:
return json.loads(raw.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return raw.decode("utf-8", errors="replace")
@staticmethod
def _q(value: str) -> str:
"""URL-quote a path segment (an id), keeping it safe in a path position."""
return urllib.parse.quote(str(value), safe="")
# ======================================================================
# READ METHODS (safe — always live)
# ======================================================================
def status(self) -> Any:
"""Token validation probe: smallest possible authenticated GET."""
return self.request("GET", "domains", params={"per_page": 1})
def domains(
self,
scope: Optional[str] = None,
entity_id: Optional[str] = None,
page: int = 1,
per_page: int = 25,
) -> Any:
"""List domains, globally or scoped under an entity."""
params = {"page": page, "per_page": per_page}
if scope and entity_id:
validate_scope(scope)
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/domains", params=params
)
return self.request("GET", "domains", params=params)
def domain(self, domain_id: str) -> Any:
return self.request("GET", f"domains/{self._q(domain_id)}")
def customers(self, reseller_id: str, page: int = 1, per_page: int = 25) -> Any:
return self.request(
"GET",
f"resellers/{self._q(reseller_id)}/customers",
params={"page": page, "per_page": per_page},
)
def customer(self, customer_id: str) -> Any:
return self.request("GET", f"customers/{self._q(customer_id)}")
def users(
self, scope: str, entity_id: str, page: int = 1, per_page: int = 25
) -> Any:
validate_scope(scope)
return self.request(
"GET",
f"{scope}/{self._q(entity_id)}/users",
params={"page": page, "per_page": per_page},
)
def user(self, user_id: str) -> Any:
return self.request("GET", f"users/{self._q(user_id)}")
def find_user(self, address: str) -> Any:
"""Find a user / alias by email address.
This is a READ despite being a POST — it is NOT gated.
"""
return self.request(
"POST", "users/find_by_address", json_body={"address": address}
)
def logs(
self,
scope: str,
entity_id: str,
sort_direction: Optional[str] = None,
sort_field: Optional[str] = None,
page: Optional[int] = None,
page_size: Optional[int] = None,
sender: Optional[str] = None,
recipient: Optional[str] = None,
subject: Optional[str] = None,
decision: Optional[str] = None,
) -> Any:
"""Mail-flow logs for an entity (passes through the standard log filters)."""
validate_scope(scope)
params = {
"sort_direction": sort_direction,
"sort_field": sort_field,
"page": page,
"page_size": page_size,
"sender": sender,
"recipient": recipient,
"subject": subject,
"decision": decision,
}
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/logs", params=params
)
def messages(
self,
scope: str,
entity_id: str,
sort_direction: Optional[str] = None,
sort_field: Optional[str] = None,
page: Optional[int] = None,
page_size: Optional[int] = None,
sender: Optional[str] = None,
recipient: Optional[str] = None,
subject: Optional[str] = None,
decision: Optional[str] = None,
) -> Any:
"""Held / quarantined messages for an entity (same filters as logs)."""
validate_scope(scope)
params = {
"sort_direction": sort_direction,
"sort_field": sort_field,
"page": page,
"page_size": page_size,
"sender": sender,
"recipient": recipient,
"subject": subject,
"decision": decision,
}
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/messages", params=params
)
def configuration(self, scope: str, entity_id: str) -> Any:
"""Entity configuration (includes permissions.messages.allow_spam_release)."""
validate_scope(scope)
return self.request("GET", f"{scope}/{self._q(entity_id)}/configuration")
def allow_block_rules(self, scope: str, entity_id: str) -> Any:
validate_scope(scope)
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/allow_block_rules"
)
# ======================================================================
# WRITE METHODS (gated — the CLI requires --confirm before calling these)
# ======================================================================
def release_message(
self, message_id: str, recipients: Optional[str] = None
) -> Any:
"""Release (deliver) one held message. POST /messages/{id}/deliver."""
body: dict = {"include_original_recipients": 1}
if recipients:
body["recipients"] = recipients
return self.request(
"POST", f"messages/{self._q(message_id)}/deliver", json_body=body
)
def release_many(
self,
scope: str,
entity_id: str,
ids: Optional[str] = None,
all_selected: bool = False,
recipients: Optional[str] = None,
) -> Any:
"""Bulk-release held messages under an entity. POST .../messages/deliver_many."""
validate_scope(scope)
body: dict = {
"include_original_recipients": 1,
"all_selected": all_selected,
"ids": ids or "",
}
if recipients:
body["recipients"] = recipients
return self.request(
"POST",
f"{scope}/{self._q(entity_id)}/messages/deliver_many",
json_body=body,
)
def add_rule(
self, scope: str, entity_id: str, value: str, rule_type: str
) -> Any:
"""Add an allow / block rule on an entity. POST .../allow_block_rules."""
validate_scope(scope)
if rule_type not in ("allow", "block"):
raise MailprotectorError("rule_type must be 'allow' or 'block'")
return self.request(
"POST",
f"{scope}/{self._q(entity_id)}/allow_block_rules",
json_body={"value": value, "rule_type": rule_type},
)
def enable_release(self, scope: str, entity_id: str) -> Any:
"""Enable spam release on an entity. PUT .../configuration.
Sets permissions.messages.allow_spam_release = true. Without this, the
entity's held spam cannot be released.
"""
validate_scope(scope)
return self.request(
"PUT",
f"{scope}/{self._q(entity_id)}/configuration",
json_body={"permissions": {"messages": {"allow_spam_release": True}}},
)