feat(skills): add /mailprotector — CloudFilter held-mail search + release

Live Mailprotector CloudFilter REST client (emailservice.io/api/v1,
Bearer auth via vault msp-tools/mailprotector.sops.yaml). Lists mail-flow
logs and held/quarantined messages across client domains and releases them
(POST messages/{id}/deliver, deliver_many). Read-only by default; every
release/rule-add/config-change gated behind --confirm. Mirrors the
packetdial skill pattern. Built after diagnosing a Dataforth held-outbound
message that never reached ACG.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-05 07:03:47 -07:00
parent bf58675142
commit ce9744832d
4 changed files with 1086 additions and 0 deletions

View File

@@ -0,0 +1,166 @@
---
name: mailprotector
description: >-
Manage the Arizona Computer Guru (ACG) Mailprotector CloudFilter email-security
gateway via the live CloudFilter REST API (emailservice.io). Search and release
held / quarantined mail (inbound and outbound), pull mail-flow logs to explain
why a message did or did not deliver, inspect entity configuration and
allow/block rules, find a user or alias by email, and manage allow/block rules.
Read-only by default; every release / rule-add / config-change is gated behind
--confirm. Invoke for: "mailprotector", "cloudfilter", "emailservice.io", "held
mail", "quarantined email", "release email", "outbound quarantine", "why didn't
my email arrive", "email security gateway", "INKY", "mail flow logs", "allow
block rule", "release spam". This skill talks to the LIVE production reseller
CloudFilter platform — treat releases conservatively.
---
# Mailprotector / CloudFilter Skill
Standalone CLI client for the **Mailprotector CloudFilter REST API**
(`emailservice.io`), the reseller email-security platform ACG layers on top of
client mail flow. Read-only by default; every write (release, rule add, config
change) is gated behind `--confirm`.
## The two-layer context (important)
ACG's email security sits in front of client mailboxes as two cooperating layers:
| Layer | What it does |
|---|---|
| **Mailprotector CloudFilter** | The delivery / filtering gateway. Inbound and outbound mail passes through it; spam, virus, and policy hits are **held / quarantined** here. Releasing a held message re-injects it for delivery. This is the API this skill drives. |
| **INKY** | Email annotation / phishing-banner layer. Adds the warning banners and protects against impersonation. Not part of this API surface. |
Both sit **layered on top of the client's own Exchange / M365 mail flow** — so a
"missing email" investigation usually means: was it held at CloudFilter (check
`messages` / `logs`), or did it pass CloudFilter and stall in Exchange?
## Connection
| Item | Value |
|---|---|
| Base URL | `https://emailservice.io/api/v1` (override `MAILPROTECTOR_API_BASE_URL`) |
| Auth | `Authorization: Bearer <api_key>` |
| Vault entry | `msp-tools/mailprotector.sops.yaml`, field `credentials.api_key` |
| Env override | `MAILPROTECTOR_API_KEY` |
Credential resolution order: `MAILPROTECTOR_API_KEY` env -> vault
`credentials.api_key`. The key is never hardcoded; a clear setup error is raised
if neither resolves.
### Scopes
Five entity types carry `logs` / `messages` / `configuration` /
`allow_block_rules` / `users` / `domains` sub-resources. Path form is
`/{scope}/{id}/...`:
```
resellers, customers, domains, user_groups, users
```
The CLI validates `scope` against this set.
## Running the CLI
This machine's Python launcher is `py` (per identity.json); `python` / `python3`
also work. Run from the scripts dir so the two modules resolve.
```bash
cd C:/claudetools/.claude/skills/mailprotector/scripts
py mp.py status # validate token (GET /domains, per_page=1)
py mp.py domains # list domains (global)
py mp.py domains --scope customers --id <id>
py mp.py domain <domain_id>
py mp.py customers <reseller_id>
py mp.py customer <customer_id>
py mp.py users <scope> <id>
py mp.py user <user_id>
py mp.py find-user user@client.com # locate a user / alias by email (a READ)
py mp.py config <scope> <id> # shows permissions.messages.allow_spam_release
py mp.py rules <scope> <id>
```
### Mail-flow logs and held mail (the common investigation)
Both accept the same filters: `--sender --recipient --subject --decision
--sort-field --sort-direction --page --page-size`.
```bash
# Why didn't this arrive? Look at the decision in the flow logs.
py mp.py logs domains <domain_id> --recipient ceo@client.com --decision quarantine_spam
# Held / quarantined mail search.
py mp.py messages domains <domain_id> --sender boss@vendor.com
```
`--decision` values: `default`, `deliver`, `quarantine_spam`,
`quarantine_virus`, `quarantine_policy`, `bounce`, `encrypt`, `delete`.
`--sort-field` values: `@timestamp` (default), `prime.direction`,
`prime.from_header_raw`, `prime.recipient`, `prime.subject`, `prime.decision`,
`prime.score`.
## Writes (gated)
Every mutating command prints a `[DRY RUN]` line and exits non-zero unless you
pass `--confirm`.
```bash
py mp.py release <message_id> --confirm
py mp.py release <message_id> --recipients alt@client.com --confirm
py mp.py release-many <scope> <id> --ids 111,222,333 --confirm
py mp.py release-many <scope> <id> --all --confirm
py mp.py add-rule <scope> <id> --value vendor.com --type allow --confirm
py mp.py enable-release <scope> <id> --confirm
```
## The `allow_spam_release` gotcha
Releasing a held **spam** message fails if the owning entity does not have
`permissions.messages.allow_spam_release = true`. Workflow:
1. `py mp.py config <scope> <id>` — check `allow_spam_release`.
2. If `false`: `py mp.py enable-release <scope> <id> --confirm`.
3. Re-run the `release` / `release-many`.
Virus and policy quarantines are governed separately — only spam release is
gated by this permission.
## Example workflow: find a client's held outbound mail from a sender and release it
```bash
# 1. Find the client's domain.
py mp.py domains --scope customers --id <customer_id>
# 2. Search held messages from the sender (outbound = sender is the client user).
py mp.py messages domains <domain_id> --sender user@client.com --decision quarantine_spam
# 3. If it's spam-held, make sure release is permitted on the domain.
py mp.py config domains <domain_id> # check allow_spam_release
py mp.py enable-release domains <domain_id> --confirm # only if needed
# 4. Release by message id (DRY RUN first — omit --confirm to preview).
py mp.py release <message_id> # [DRY RUN]
py mp.py release <message_id> --confirm # actually release
```
## Raw escape hatch
The named commands cover the common surface; for anything else, hit the path
directly. Non-GET methods still require `--confirm`.
```bash
py mp.py raw GET domains/<id>/logs
py mp.py raw POST messages/<id>/deliver --body '{"include_original_recipients":1}' --confirm
```
## Notes
- This is the **LIVE production reseller CloudFilter platform**. A release
re-delivers real mail to real recipients, and an allow rule can let real spam
or phishing through — confirm the target entity with a read command before any
write, and prefer releasing specific message ids over `--all`.
- Pagination: `page` (default 1) and `per_page` (default 25); reseller
`messages` caps `per_page` at 50. The `X-Pagination` response header carries
the page/total metadata.
- Full endpoint catalog, filter tables, and the global `field[op]=value`
operators live in `references/api.md`.

View File

@@ -0,0 +1,155 @@
# Mailprotector CloudFilter REST API — Reference
Full endpoint catalog and filter tables for the `mailprotector` skill. SKILL.md
stays lean; the detail lives here.
## Connection
| Item | Value |
|---|---|
| Base URL | `https://emailservice.io/api/v1` |
| Override env | `MAILPROTECTOR_API_BASE_URL` |
| Auth | `Authorization: Bearer <api_key>` |
| Key env override | `MAILPROTECTOR_API_KEY` |
| Vault entry | `msp-tools/mailprotector.sops.yaml`, field `credentials.api_key` |
Credential resolution order: `MAILPROTECTOR_API_KEY` env -> vault
`credentials.api_key` (read via `bash <root>/.claude/scripts/vault.sh get-field`).
A clear setup error is raised if neither resolves.
## Scopes
The five entity types that carry `logs`, `messages`, `configuration`,
`users`, `domains`, and `allow_block_rules` sub-resources. Path form is
`/{scope}/{id}/...`:
```
resellers, customers, domains, user_groups, users
```
The CLI validates `scope` against this set.
## Pagination
| Param | Default | Notes |
|---|---|---|
| `page` | 1 | 1-indexed page number |
| `per_page` | 25 | Max **50** on reseller `messages` |
The response includes an `X-Pagination` response header (a JSON document with
the page/total metadata).
## Global list filtering
List endpoints accept `field[op]=value` filters. Operators:
| Op | Meaning |
|---|---|
| `Gt` | greater than |
| `Geq` | greater than or equal |
| `Lt` | less than |
| `Leq` | less than or equal |
| `Eq` | equal |
Example: `created_at[geq]=2026-06-01`.
## Logs / messages filtering
Every `.../logs` and `.../messages` endpoint accepts these params:
| Param | Default | Allowed values |
|---|---|---|
| `sort_direction` | `desc` | `desc`, `asc` |
| `sort_field` | `@timestamp` | `@timestamp`, `prime.direction`, `prime.from_header_raw`, `prime.recipient`, `prime.subject`, `prime.decision`, `prime.score` |
| `page` | 1 | integer |
| `page_size` | (API default) | integer |
| `sender` | (none) | sender filter |
| `recipient` | (none) | recipient filter |
| `subject` | (none) | subject filter |
| `decision` | `all` | `default`, `deliver`, `quarantine_spam`, `quarantine_virus`, `quarantine_policy`, `bounce`, `encrypt`, `delete` |
## READ endpoints
| Method | Path | Client method | CLI |
|---|---|---|---|
| GET | `/domains` | `domains()` | `domains` |
| GET | `/{scope}/{id}/domains` | `domains(scope,id)` | `domains --scope --id` |
| GET | `/domains/{id}` | `domain(id)` | `domain <id>` |
| GET | `/resellers/{id}/customers` | `customers(id)` | `customers <reseller_id>` |
| GET | `/customers/{id}` | `customer(id)` | `customer <id>` |
| GET | `/{scope}/{id}/users` | `users(scope,id)` | `users <scope> <id>` |
| GET | `/users/{id}` | `user(id)` | `user <id>` |
| POST | `/users/find_by_address` | `find_user(address)` | `find-user <address>` |
| GET | `/{scope}/{id}/logs` | `logs(scope,id,...)` | `logs <scope> <id>` |
| GET | `/{scope}/{id}/messages` | `messages(scope,id,...)` | `messages <scope> <id>` |
| GET | `/{scope}/{id}/configuration` | `configuration(scope,id)` | `config <scope> <id>` |
| GET | `/{scope}/{id}/allow_block_rules` | `allow_block_rules(scope,id)` | `rules <scope> <id>` |
**`find_by_address` is a READ** despite being a POST — it looks up a user / alias
by email. It is NOT gated behind `--confirm`.
`status` is a synthetic read: `GET /domains?per_page=1` used purely to validate
the bearer token (HTTP 200 = key good).
## WRITE endpoints (gated behind `--confirm`)
Without `--confirm` the CLI prints `[DRY RUN] Would <action>: <detail>` and exits
with code 2. With `--confirm` it performs the call.
### Release one held message
```
POST /messages/{message_id}/deliver
body: {"include_original_recipients": 1, "recipients": "<optional csv>"}
```
Client: `release_message(message_id, recipients=None)` — CLI: `release <message_id> [--recipients csv] --confirm`
### Bulk release held messages
```
POST /{scope}/{id}/messages/deliver_many
body: {"include_original_recipients": 1, "recipients": "<optional>",
"all_selected": false, "ids": "<csv ids>"}
```
Client: `release_many(scope, id, ids=None, all_selected=False, recipients=None)`
CLI: `release-many <scope> <id> [--ids csv | --all] [--recipients csv] --confirm`
### Add allow / block rule
```
POST /{scope}/{id}/allow_block_rules
body: {"value": "...", "rule_type": "allow" | "block"}
```
Client: `add_rule(scope, id, value, rule_type)` — CLI: `add-rule <scope> <id> --value <v> --type allow|block --confirm`
### Enable spam release on an entity
```
PUT /{scope}/{id}/configuration
body: {"permissions": {"messages": {"allow_spam_release": true}}}
```
Client: `enable_release(scope, id)` — CLI: `enable-release <scope> <id> --confirm`
This is required before an entity's held **spam** can be released. Check the
state first with `config <scope> <id>` and look at
`permissions.messages.allow_spam_release`.
## Raw escape hatch
```
py mp.py raw <METHOD> <path> [--body JSON] [--confirm]
```
Non-GET methods require `--confirm`. Use for any endpoint not wrapped by a named
command.
## The `allow_spam_release` gotcha
Releasing a held **spam** message will fail (or silently no-op) if the owning
entity does not have `permissions.messages.allow_spam_release = true`. The fix:
1. `py mp.py config <scope> <id>` — confirm `allow_spam_release` is `false`.
2. `py mp.py enable-release <scope> <id> --confirm` — flip it to `true`.
3. Re-run the `release` / `release-many`.
Virus and policy quarantines are governed separately — only spam release is
gated by this permission.

View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
"""CLI for the mailprotector skill — Mailprotector CloudFilter REST API.
Read subcommands run freely. Write subcommands (release, release-many, add-rule,
enable-release, raw with a non-GET method) refuse to run unless --confirm is
passed; without it they print what they WOULD do and exit non-zero.
NOTE: find-user is a READ even though it is a POST under the hood — it is NOT
gated.
Read examples:
py mp.py status
py mp.py domains
py mp.py domain <domain_id>
py mp.py customers <reseller_id>
py mp.py users <scope> <id>
py mp.py find-user user@client.com
py mp.py logs <scope> <id> --sender boss@vendor.com --decision quarantine_spam
py mp.py messages <scope> <id> --recipient ceo@client.com
py mp.py config <scope> <id>
py mp.py rules <scope> <id>
Write examples (all require --confirm):
py mp.py release <message_id> --confirm
py mp.py release-many <scope> <id> --ids 111,222,333 --confirm
py mp.py add-rule <scope> <id> --value vendor.com --type allow --confirm
py mp.py enable-release <scope> <id> --confirm
Escape hatch (raw request against any path; non-GET requires --confirm):
py mp.py raw GET domains/123/logs
py mp.py raw POST messages/999/deliver --body '{...}' --confirm
`scope` values are validated against:
resellers, customers, domains, user_groups, users
"""
from __future__ import annotations
import argparse
import json
import sys
from mp_client import MailprotectorClient, MailprotectorError, VALID_SCOPES
def _emit(obj) -> None:
print(json.dumps(obj, indent=2, ensure_ascii=False, default=str))
def _parse_body(raw: str | None) -> dict | None:
if raw is None:
return None
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise SystemExit(f"--body is not valid JSON: {exc}")
if not isinstance(parsed, dict):
raise SystemExit("--body must be a JSON object")
return parsed
def _require_confirm(args, action: str, detail: str) -> None:
if not getattr(args, "confirm", False):
print(f"[DRY RUN] Would {action}: {detail}")
print("Refusing to perform a write without --confirm. Re-run with --confirm.")
raise SystemExit(2)
def _add_log_filters(sp) -> None:
"""Attach the shared logs/messages filter flags to a subparser."""
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--sender")
sp.add_argument("--recipient")
sp.add_argument("--subject")
sp.add_argument(
"--decision",
choices=[
"default",
"deliver",
"quarantine_spam",
"quarantine_virus",
"quarantine_policy",
"bounce",
"encrypt",
"delete",
],
)
sp.add_argument(
"--sort-field",
dest="sort_field",
choices=[
"@timestamp",
"prime.direction",
"prime.from_header_raw",
"prime.recipient",
"prime.subject",
"prime.decision",
"prime.score",
],
)
sp.add_argument(
"--sort-direction", dest="sort_direction", choices=["desc", "asc"]
)
sp.add_argument("--page", type=int)
sp.add_argument("--page-size", dest="page_size", type=int)
def main(argv=None) -> int:
p = argparse.ArgumentParser(
prog="mp.py", description="Mailprotector CloudFilter REST API CLI"
)
p.add_argument("--json", action="store_true", help="emit raw JSON (default)")
sub = p.add_subparsers(dest="cmd", required=True)
# --- read ---
sub.add_parser("status", help="validate token (GET /domains per_page=1)")
sp = sub.add_parser("domains", help="list domains (global or scoped)")
sp.add_argument("--scope", choices=VALID_SCOPES)
sp.add_argument("--id", help="entity id (required if --scope given)")
sp.add_argument("--page", type=int, default=1)
sp.add_argument("--per-page", dest="per_page", type=int, default=25)
sp = sub.add_parser("domain", help="one domain")
sp.add_argument("domain_id")
sp = sub.add_parser("customers", help="customers under a reseller")
sp.add_argument("reseller_id")
sp.add_argument("--page", type=int, default=1)
sp.add_argument("--per-page", dest="per_page", type=int, default=25)
sp = sub.add_parser("customer", help="one customer")
sp.add_argument("customer_id")
sp = sub.add_parser("users", help="users under an entity")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--page", type=int, default=1)
sp.add_argument("--per-page", dest="per_page", type=int, default=25)
sp = sub.add_parser("user", help="one user")
sp.add_argument("user_id")
sp = sub.add_parser("find-user", help="find a user/alias by email address")
sp.add_argument("address")
sp = sub.add_parser("logs", help="mail-flow logs for an entity")
_add_log_filters(sp)
sp = sub.add_parser("messages", help="held/quarantined messages for an entity")
_add_log_filters(sp)
sp = sub.add_parser("config", help="entity configuration")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp = sub.add_parser("rules", help="allow/block rules for an entity")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
# --- write (gated) ---
sp = sub.add_parser("release", help="release one held message")
sp.add_argument("message_id")
sp.add_argument("--recipients", help="optional csv of override recipients")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("release-many", help="bulk-release held messages")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--ids", help="csv of message ids to release")
sp.add_argument("--all", action="store_true", help="release all selected")
sp.add_argument("--recipients", help="optional csv of override recipients")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("add-rule", help="add an allow/block rule")
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--value", required=True)
sp.add_argument("--type", dest="rule_type", required=True, choices=["allow", "block"])
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser(
"enable-release", help="enable spam release on an entity (allow_spam_release)"
)
sp.add_argument("scope", choices=VALID_SCOPES)
sp.add_argument("id")
sp.add_argument("--confirm", action="store_true")
# --- raw escape hatch ---
sp = sub.add_parser("raw", help="raw request against any path")
sp.add_argument("method", choices=["GET", "POST", "PUT", "PATCH", "DELETE"])
sp.add_argument("path", help="relative path, e.g. domains/123/logs")
sp.add_argument("--body")
sp.add_argument("--confirm", action="store_true")
args = p.parse_args(argv)
client = MailprotectorClient()
try:
if args.cmd == "status":
result = client.status()
_emit({"status": "ok", "auth": "valid", "sample": result})
elif args.cmd == "domains":
if args.scope and not args.id:
raise SystemExit("--id is required when --scope is given")
_emit(
client.domains(
scope=args.scope,
entity_id=args.id,
page=args.page,
per_page=args.per_page,
)
)
elif args.cmd == "domain":
_emit(client.domain(args.domain_id))
elif args.cmd == "customers":
_emit(
client.customers(
args.reseller_id, page=args.page, per_page=args.per_page
)
)
elif args.cmd == "customer":
_emit(client.customer(args.customer_id))
elif args.cmd == "users":
_emit(
client.users(
args.scope, args.id, page=args.page, per_page=args.per_page
)
)
elif args.cmd == "user":
_emit(client.user(args.user_id))
elif args.cmd == "find-user":
_emit(client.find_user(args.address))
elif args.cmd == "logs":
_emit(
client.logs(
args.scope,
args.id,
sort_direction=args.sort_direction,
sort_field=args.sort_field,
page=args.page,
page_size=args.page_size,
sender=args.sender,
recipient=args.recipient,
subject=args.subject,
decision=args.decision,
)
)
elif args.cmd == "messages":
_emit(
client.messages(
args.scope,
args.id,
sort_direction=args.sort_direction,
sort_field=args.sort_field,
page=args.page,
page_size=args.page_size,
sender=args.sender,
recipient=args.recipient,
subject=args.subject,
decision=args.decision,
)
)
elif args.cmd == "config":
_emit(client.configuration(args.scope, args.id))
elif args.cmd == "rules":
_emit(client.allow_block_rules(args.scope, args.id))
elif args.cmd == "release":
detail = args.message_id
if args.recipients:
detail += f" -> {args.recipients}"
_require_confirm(args, "RELEASE held message", detail)
_emit(client.release_message(args.message_id, recipients=args.recipients))
elif args.cmd == "release-many":
if not args.ids and not args.all:
raise SystemExit("release-many requires --ids <csv> or --all")
target = "ALL selected" if args.all else f"ids={args.ids}"
_require_confirm(
args, "BULK RELEASE held messages", f"{args.scope}/{args.id}: {target}"
)
_emit(
client.release_many(
args.scope,
args.id,
ids=args.ids,
all_selected=args.all,
recipients=args.recipients,
)
)
elif args.cmd == "add-rule":
_require_confirm(
args,
f"add {args.rule_type} rule",
f"{args.scope}/{args.id}: {args.value}",
)
_emit(
client.add_rule(args.scope, args.id, args.value, args.rule_type)
)
elif args.cmd == "enable-release":
_require_confirm(
args,
"enable spam release (allow_spam_release=true)",
f"{args.scope}/{args.id}",
)
_emit(client.enable_release(args.scope, args.id))
elif args.cmd == "raw":
body = _parse_body(args.body)
if args.method != "GET":
_require_confirm(args, f"{args.method} {args.path}", json.dumps(body))
_emit(client.request(args.method, args.path, json_body=body))
else:
p.error(f"unknown command {args.cmd}")
except MailprotectorError as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,443 @@
#!/usr/bin/env python3
"""Client for the mailprotector skill — Mailprotector CloudFilter REST API.
Talks to the live Mailprotector CloudFilter platform at emailservice.io. This is
the reseller email-security gateway (CloudFilter delivery + INKY annotation) that
ACG layers on top of client Exchange mail flow. Held / quarantined mail, mail-flow
logs, allow/block rules, and message release all live behind this API.
Auth: Bearer token. The API key is used directly as the bearer token:
Authorization: Bearer <api_key>
Credentials are NEVER hardcoded. They are loaded at runtime from the SOPS vault
entry `msp-tools/mailprotector.sops.yaml`, or from an environment override.
Resolution order:
1. MAILPROTECTOR_API_KEY env
2. vault credentials.api_key (read via bash <root>/.claude/scripts/vault.sh)
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
skill works on a bare Python install.
"""
from __future__ import annotations
import json
import os
import subprocess
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Optional
try:
import httpx # type: ignore
_HAS_HTTPX = True
except ImportError: # pragma: no cover
_HAS_HTTPX = False
SKILL_DIR = Path(__file__).resolve().parent.parent # .../.claude/skills/mailprotector
ERROR_BODY_MAX_CHARS = 1500
DEFAULT_TIMEOUT = 60.0
DEFAULT_CONNECT_TIMEOUT = 15.0
API_BASE_URL = os.environ.get(
"MAILPROTECTOR_API_BASE_URL", "https://emailservice.io/api/v1"
)
VAULT_ENTRY = "msp-tools/mailprotector.sops.yaml"
# The five entity types that have logs / messages / configuration sub-resources.
VALID_SCOPES = ("resellers", "customers", "domains", "user_groups", "users")
class MailprotectorError(Exception):
"""Any failure talking to the Mailprotector API or loading credentials."""
# --- repo-root + credential loading -------------------------------------------
def _resolve_claudetools_root() -> Path:
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived.
Final fallback is derived from this file's location so it works on the
Mac/Linux fleet, not only the Windows default.
"""
# SKILL_DIR = .../.claude/skills/mailprotector ; root is three levels up.
derived_root = SKILL_DIR.parent.parent.parent
env_root = os.environ.get("CLAUDETOOLS_ROOT")
if env_root:
return Path(env_root)
identity_path = derived_root / ".claude" / "identity.json"
if identity_path.exists():
try:
data = json.loads(identity_path.read_text(encoding="utf-8"))
root = data.get("claudetools_root")
if root:
return Path(root)
except (json.JSONDecodeError, OSError):
pass
return derived_root
def _vault_field(field: str) -> Optional[str]:
"""Read a single field from the mailprotector vault entry. None if absent.
Soft failure: a missing field (vault exits non-zero) returns None so the
caller can surface a clean setup error. A missing vault wrapper or bash
raises, since that is an environment problem the user must fix.
"""
root = _resolve_claudetools_root()
vault_script = root / ".claude" / "scripts" / "vault.sh"
if not vault_script.exists():
raise MailprotectorError(
f"vault wrapper not found at {vault_script}; set MAILPROTECTOR_API_KEY "
"instead."
)
try:
completed = subprocess.run(
["bash", str(vault_script), "get-field", VAULT_ENTRY, field],
capture_output=True,
text=True,
timeout=60,
)
except FileNotFoundError as exc:
raise MailprotectorError(
"'bash' not found on PATH. Install Git Bash or set MAILPROTECTOR_API_KEY."
) from exc
except subprocess.TimeoutExpired as exc:
raise MailprotectorError("vault call timed out.") from exc
if completed.returncode != 0:
return None
value = completed.stdout.strip()
return value or None
def load_api_key() -> str:
"""Resolve the Mailprotector API key (bearer token).
Resolution order:
1. MAILPROTECTOR_API_KEY env
2. vault credentials.api_key
Raises MailprotectorError with setup guidance if nothing resolves.
"""
env_key = os.environ.get("MAILPROTECTOR_API_KEY")
if env_key:
return env_key.strip()
api_key = _vault_field("credentials.api_key")
if api_key:
return api_key
raise MailprotectorError(
"No Mailprotector / CloudFilter credentials found.\n"
f" Expected vault entry: {VAULT_ENTRY} with:\n"
" credentials.api_key (Bearer token for emailservice.io)\n"
" Or set the MAILPROTECTOR_API_KEY environment variable for testing.\n"
" Provision a reseller API key in the Mailprotector CloudFilter portal,\n"
" then store it in the SOPS vault.\n"
" See .claude/skills/mailprotector/SKILL.md for the full setup steps."
)
def validate_scope(scope: str) -> str:
"""Ensure a scope is one of the five valid entity types. Raises otherwise."""
if scope not in VALID_SCOPES:
raise MailprotectorError(
f"Invalid scope '{scope}'. Must be one of: {', '.join(VALID_SCOPES)}"
)
return scope
# --- client -------------------------------------------------------------------
class MailprotectorClient:
def __init__(
self,
api_base_url: str = API_BASE_URL,
timeout: float = DEFAULT_TIMEOUT,
connect_timeout: float = DEFAULT_CONNECT_TIMEOUT,
):
self.api_base_url = api_base_url.rstrip("/")
self.timeout = timeout
self.connect_timeout = connect_timeout
self._api_key: Optional[str] = None
# -- auth ------------------------------------------------------------------
@property
def api_key(self) -> str:
if self._api_key is None:
self._api_key = load_api_key()
return self._api_key
# -- core transport --------------------------------------------------------
def request(
self,
method: str,
path: str,
params: Optional[dict] = None,
json_body: Optional[dict] = None,
) -> Any:
"""One REST call against the API base. `path` is relative (e.g. 'domains')."""
url = f"{self.api_base_url}/{path.lstrip('/')}"
if params:
# Drop None-valued params so optional filters stay off the query string.
clean = {k: v for k, v in params.items() if v is not None}
if clean:
url = f"{url}?{urllib.parse.urlencode(clean, doseq=True)}"
data = json.dumps(json_body).encode("utf-8") if json_body is not None else None
headers = {"Accept": "application/json"}
if data is not None:
headers["Content-Type"] = "application/json"
return self._http(
method, url, data=data, headers=headers,
auth_header=f"Bearer {self.api_key}",
)
def _http(
self,
method: str,
url: str,
data: Optional[bytes] = None,
headers: Optional[dict] = None,
auth_header: Optional[str] = None,
) -> Any:
hdrs = dict(headers or {})
if auth_header:
hdrs["Authorization"] = auth_header
if _HAS_HTTPX:
try:
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
with httpx.Client(timeout=timeout) as client:
resp = client.request(method, url, content=data, headers=hdrs)
resp.raise_for_status()
return self._parse(resp.content)
except httpx.TimeoutException as exc:
raise MailprotectorError(f"request timed out: {exc}") from exc
except httpx.HTTPStatusError as exc:
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
raise MailprotectorError(
f"HTTP {exc.response.status_code} {method} {url}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise MailprotectorError(f"request failed: {exc}") from exc
# stdlib fallback
req = urllib.request.Request(url, data=data, method=method, headers=hdrs)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
return self._parse(resp.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
raise MailprotectorError(f"HTTP {exc.code} {method} {url}: {detail}") from exc
except urllib.error.URLError as exc:
raise MailprotectorError(f"request failed: {exc}") from exc
@staticmethod
def _parse(raw: bytes) -> Any:
if not raw:
return None
try:
return json.loads(raw.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return raw.decode("utf-8", errors="replace")
@staticmethod
def _q(value: str) -> str:
"""URL-quote a path segment (an id), keeping it safe in a path position."""
return urllib.parse.quote(str(value), safe="")
# ======================================================================
# READ METHODS (safe — always live)
# ======================================================================
def status(self) -> Any:
"""Token validation probe: smallest possible authenticated GET."""
return self.request("GET", "domains", params={"per_page": 1})
def domains(
self,
scope: Optional[str] = None,
entity_id: Optional[str] = None,
page: int = 1,
per_page: int = 25,
) -> Any:
"""List domains, globally or scoped under an entity."""
params = {"page": page, "per_page": per_page}
if scope and entity_id:
validate_scope(scope)
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/domains", params=params
)
return self.request("GET", "domains", params=params)
def domain(self, domain_id: str) -> Any:
return self.request("GET", f"domains/{self._q(domain_id)}")
def customers(self, reseller_id: str, page: int = 1, per_page: int = 25) -> Any:
return self.request(
"GET",
f"resellers/{self._q(reseller_id)}/customers",
params={"page": page, "per_page": per_page},
)
def customer(self, customer_id: str) -> Any:
return self.request("GET", f"customers/{self._q(customer_id)}")
def users(
self, scope: str, entity_id: str, page: int = 1, per_page: int = 25
) -> Any:
validate_scope(scope)
return self.request(
"GET",
f"{scope}/{self._q(entity_id)}/users",
params={"page": page, "per_page": per_page},
)
def user(self, user_id: str) -> Any:
return self.request("GET", f"users/{self._q(user_id)}")
def find_user(self, address: str) -> Any:
"""Find a user / alias by email address.
This is a READ despite being a POST — it is NOT gated.
"""
return self.request(
"POST", "users/find_by_address", json_body={"address": address}
)
def logs(
self,
scope: str,
entity_id: str,
sort_direction: Optional[str] = None,
sort_field: Optional[str] = None,
page: Optional[int] = None,
page_size: Optional[int] = None,
sender: Optional[str] = None,
recipient: Optional[str] = None,
subject: Optional[str] = None,
decision: Optional[str] = None,
) -> Any:
"""Mail-flow logs for an entity (passes through the standard log filters)."""
validate_scope(scope)
params = {
"sort_direction": sort_direction,
"sort_field": sort_field,
"page": page,
"page_size": page_size,
"sender": sender,
"recipient": recipient,
"subject": subject,
"decision": decision,
}
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/logs", params=params
)
def messages(
self,
scope: str,
entity_id: str,
sort_direction: Optional[str] = None,
sort_field: Optional[str] = None,
page: Optional[int] = None,
page_size: Optional[int] = None,
sender: Optional[str] = None,
recipient: Optional[str] = None,
subject: Optional[str] = None,
decision: Optional[str] = None,
) -> Any:
"""Held / quarantined messages for an entity (same filters as logs)."""
validate_scope(scope)
params = {
"sort_direction": sort_direction,
"sort_field": sort_field,
"page": page,
"page_size": page_size,
"sender": sender,
"recipient": recipient,
"subject": subject,
"decision": decision,
}
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/messages", params=params
)
def configuration(self, scope: str, entity_id: str) -> Any:
"""Entity configuration (includes permissions.messages.allow_spam_release)."""
validate_scope(scope)
return self.request("GET", f"{scope}/{self._q(entity_id)}/configuration")
def allow_block_rules(self, scope: str, entity_id: str) -> Any:
validate_scope(scope)
return self.request(
"GET", f"{scope}/{self._q(entity_id)}/allow_block_rules"
)
# ======================================================================
# WRITE METHODS (gated — the CLI requires --confirm before calling these)
# ======================================================================
def release_message(
self, message_id: str, recipients: Optional[str] = None
) -> Any:
"""Release (deliver) one held message. POST /messages/{id}/deliver."""
body: dict = {"include_original_recipients": 1}
if recipients:
body["recipients"] = recipients
return self.request(
"POST", f"messages/{self._q(message_id)}/deliver", json_body=body
)
def release_many(
self,
scope: str,
entity_id: str,
ids: Optional[str] = None,
all_selected: bool = False,
recipients: Optional[str] = None,
) -> Any:
"""Bulk-release held messages under an entity. POST .../messages/deliver_many."""
validate_scope(scope)
body: dict = {
"include_original_recipients": 1,
"all_selected": all_selected,
"ids": ids or "",
}
if recipients:
body["recipients"] = recipients
return self.request(
"POST",
f"{scope}/{self._q(entity_id)}/messages/deliver_many",
json_body=body,
)
def add_rule(
self, scope: str, entity_id: str, value: str, rule_type: str
) -> Any:
"""Add an allow / block rule on an entity. POST .../allow_block_rules."""
validate_scope(scope)
if rule_type not in ("allow", "block"):
raise MailprotectorError("rule_type must be 'allow' or 'block'")
return self.request(
"POST",
f"{scope}/{self._q(entity_id)}/allow_block_rules",
json_body={"value": value, "rule_type": rule_type},
)
def enable_release(self, scope: str, entity_id: str) -> Any:
"""Enable spam release on an entity. PUT .../configuration.
Sets permissions.messages.allow_spam_release = true. Without this, the
entity's held spam cannot be released.
"""
validate_scope(scope)
return self.request(
"PUT",
f"{scope}/{self._q(entity_id)}/configuration",
json_body={"permissions": {"messages": {"allow_spam_release": True}}},
)