sync: auto-sync from HOWARD-HOME at 2026-06-02 15:03:53
Author: Howard Enos Machine: HOWARD-HOME Timestamp: 2026-06-02 15:03:53
This commit is contained in:
169
.claude/skills/packetdial/scripts/ns.py
Normal file
169
.claude/skills/packetdial/scripts/ns.py
Normal file
@@ -0,0 +1,169 @@
|
||||
#!/usr/bin/env python3
|
||||
"""CLI for the packetdial skill — NetSapiens SNAPsolution API v2.
|
||||
|
||||
Read subcommands run freely. Write subcommands (create-*, update-*, delete-*,
|
||||
raw with a non-GET method) refuse to run unless --confirm is passed; without it
|
||||
they print what they WOULD do and exit non-zero.
|
||||
|
||||
Output: --json emits raw JSON (default for most commands); a few commands render
|
||||
a readable table when --json is omitted.
|
||||
|
||||
Read examples:
|
||||
python ns.py status
|
||||
python ns.py whoami
|
||||
python ns.py domains
|
||||
python ns.py domain <domain>
|
||||
python ns.py users <domain>
|
||||
python ns.py user <domain> <user>
|
||||
python ns.py phones <domain>
|
||||
python ns.py dids <domain>
|
||||
python ns.py devices <domain> <user>
|
||||
python ns.py cdrs --domain <domain> --start 2026-06-01 --end 2026-06-02
|
||||
python ns.py resellers
|
||||
|
||||
Write examples (all require --confirm):
|
||||
python ns.py create-domain --body '{"domain":"acme","description":"Acme Inc"}' --confirm
|
||||
python ns.py create-user <domain> --body '{"user":"101","name-first-name":"Jane"}' --confirm
|
||||
python ns.py delete-user <domain> <user> --confirm
|
||||
|
||||
Escape hatch (raw request against any of the 239 v2 paths):
|
||||
python ns.py raw GET domains/acme/users
|
||||
python ns.py raw POST domains/acme/users --body '{...}' --confirm
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
|
||||
from ns_client import NetSapiensClient, PacketDialError
|
||||
|
||||
|
||||
def _emit(obj) -> None:
|
||||
print(json.dumps(obj, indent=2, ensure_ascii=False, default=str))
|
||||
|
||||
|
||||
def _parse_body(raw: str | None) -> dict | None:
|
||||
if raw is None:
|
||||
return None
|
||||
try:
|
||||
parsed = json.loads(raw)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise SystemExit(f"--body is not valid JSON: {exc}")
|
||||
if not isinstance(parsed, dict):
|
||||
raise SystemExit("--body must be a JSON object")
|
||||
return parsed
|
||||
|
||||
|
||||
def _require_confirm(args, action: str, detail: str) -> None:
|
||||
if not getattr(args, "confirm", False):
|
||||
print(f"[DRY RUN] Would {action}: {detail}")
|
||||
print("Refusing to perform a write without --confirm. Re-run with --confirm.")
|
||||
raise SystemExit(2)
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
p = argparse.ArgumentParser(prog="ns.py", description="PacketDial / NetSapiens v2 CLI")
|
||||
p.add_argument("--json", action="store_true", help="emit raw JSON (default for most)")
|
||||
sub = p.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
# --- read ---
|
||||
sub.add_parser("status", help="API version + authenticated key identity")
|
||||
sub.add_parser("whoami", help="details of the authenticated API key")
|
||||
sub.add_parser("domains", help="list all domains")
|
||||
sp = sub.add_parser("domain", help="one domain"); sp.add_argument("domain")
|
||||
sp = sub.add_parser("users", help="users in a domain"); sp.add_argument("domain")
|
||||
sp = sub.add_parser("user", help="one user"); sp.add_argument("domain"); sp.add_argument("user")
|
||||
sp = sub.add_parser("phones", help="phones (devices) in a domain"); sp.add_argument("domain")
|
||||
sp = sub.add_parser("dids", help="phone numbers in a domain"); sp.add_argument("domain")
|
||||
sp = sub.add_parser("devices", help="devices for a user"); sp.add_argument("domain"); sp.add_argument("user")
|
||||
sub.add_parser("resellers", help="list resellers")
|
||||
sp = sub.add_parser("cdrs", help="call detail records")
|
||||
sp.add_argument("--domain"); sp.add_argument("--start"); sp.add_argument("--end")
|
||||
sp.add_argument("--limit", type=int, default=100)
|
||||
|
||||
# --- write (gated) ---
|
||||
sp = sub.add_parser("create-domain"); sp.add_argument("--body", required=True); sp.add_argument("--confirm", action="store_true")
|
||||
sp = sub.add_parser("create-user"); sp.add_argument("domain"); sp.add_argument("--body", required=True); sp.add_argument("--confirm", action="store_true")
|
||||
sp = sub.add_parser("update-user"); sp.add_argument("domain"); sp.add_argument("user"); sp.add_argument("--body", required=True); sp.add_argument("--confirm", action="store_true")
|
||||
sp = sub.add_parser("delete-user"); sp.add_argument("domain"); sp.add_argument("user"); sp.add_argument("--confirm", action="store_true")
|
||||
sp = sub.add_parser("create-phone"); sp.add_argument("domain"); sp.add_argument("--body", required=True); sp.add_argument("--confirm", action="store_true")
|
||||
sp = sub.add_parser("create-did"); sp.add_argument("domain"); sp.add_argument("--body", required=True); sp.add_argument("--confirm", action="store_true")
|
||||
|
||||
# --- raw escape hatch ---
|
||||
sp = sub.add_parser("raw", help="raw request against any v2 path")
|
||||
sp.add_argument("method", choices=["GET", "POST", "PUT", "PATCH", "DELETE"])
|
||||
sp.add_argument("path", help="relative path, e.g. domains/acme/users")
|
||||
sp.add_argument("--body"); sp.add_argument("--confirm", action="store_true")
|
||||
|
||||
args = p.parse_args(argv)
|
||||
client = NetSapiensClient()
|
||||
|
||||
try:
|
||||
if args.cmd == "status":
|
||||
_emit({"version": client.version(), "apiKey": client.whoami()})
|
||||
elif args.cmd == "whoami":
|
||||
_emit(client.whoami())
|
||||
elif args.cmd == "domains":
|
||||
_emit(client.domains())
|
||||
elif args.cmd == "domain":
|
||||
_emit(client.domain(args.domain))
|
||||
elif args.cmd == "users":
|
||||
_emit(client.users(args.domain))
|
||||
elif args.cmd == "user":
|
||||
_emit(client.user(args.domain, args.user))
|
||||
elif args.cmd == "phones":
|
||||
_emit(client.phones(args.domain))
|
||||
elif args.cmd == "dids":
|
||||
_emit(client.phonenumbers(args.domain))
|
||||
elif args.cmd == "devices":
|
||||
_emit(client.devices(args.domain, args.user))
|
||||
elif args.cmd == "resellers":
|
||||
_emit(client.resellers())
|
||||
elif args.cmd == "cdrs":
|
||||
filters = {"limit": args.limit}
|
||||
if args.start:
|
||||
filters["start-date"] = args.start
|
||||
if args.end:
|
||||
filters["end-date"] = args.end
|
||||
_emit(client.cdrs(domain=args.domain, **filters))
|
||||
|
||||
elif args.cmd == "create-domain":
|
||||
body = _parse_body(args.body)
|
||||
_require_confirm(args, "create domain", json.dumps(body))
|
||||
_emit(client.create_domain(body))
|
||||
elif args.cmd == "create-user":
|
||||
body = _parse_body(args.body)
|
||||
_require_confirm(args, "create user", f"{args.domain}: {json.dumps(body)}")
|
||||
_emit(client.create_user(args.domain, body))
|
||||
elif args.cmd == "update-user":
|
||||
body = _parse_body(args.body)
|
||||
_require_confirm(args, "update user", f"{args.domain}/{args.user}: {json.dumps(body)}")
|
||||
_emit(client.update_user(args.domain, args.user, body))
|
||||
elif args.cmd == "delete-user":
|
||||
_require_confirm(args, "DELETE user", f"{args.domain}/{args.user}")
|
||||
_emit(client.delete_user(args.domain, args.user))
|
||||
elif args.cmd == "create-phone":
|
||||
body = _parse_body(args.body)
|
||||
_require_confirm(args, "create phone", f"{args.domain}: {json.dumps(body)}")
|
||||
_emit(client.create_phone(args.domain, body))
|
||||
elif args.cmd == "create-did":
|
||||
body = _parse_body(args.body)
|
||||
_require_confirm(args, "create phone number", f"{args.domain}: {json.dumps(body)}")
|
||||
_emit(client.create_phonenumber(args.domain, body))
|
||||
|
||||
elif args.cmd == "raw":
|
||||
body = _parse_body(args.body)
|
||||
if args.method != "GET":
|
||||
_require_confirm(args, f"{args.method} {args.path}", json.dumps(body))
|
||||
_emit(client.request(args.method, args.path, json_body=body))
|
||||
else:
|
||||
p.error(f"unknown command {args.cmd}")
|
||||
except PacketDialError as exc:
|
||||
print(f"[ERROR] {exc}", file=sys.stderr)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
381
.claude/skills/packetdial/scripts/ns_client.py
Normal file
381
.claude/skills/packetdial/scripts/ns_client.py
Normal file
@@ -0,0 +1,381 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Client for the packetdial skill — NetSapiens SNAPsolution API v2.
|
||||
|
||||
Talks to the live PacketDial / OITVOIP reseller PBX at pbx.packetdial.com
|
||||
(NetSapiens v44.4). voip.packetdial.com is the customer-facing white-label
|
||||
portal and is NOT the API host — the programmable surface lives on pbx.
|
||||
|
||||
Auth: Bearer token. Two credential shapes are supported, tried in this order:
|
||||
1. Static API key (`nsr_` reseller-scoped or `nss_` system) used directly as
|
||||
the bearer token. Preferred for machine-to-machine.
|
||||
2. OAuth2 password grant: client_id + client_secret + username + password
|
||||
POSTed to /tokens, which returns a short-lived access_token (JWT).
|
||||
|
||||
Credentials are NEVER hardcoded. They are loaded at runtime from the SOPS vault
|
||||
entry `msp-tools/oitvoip.sops.yaml`, or from environment overrides. As of the
|
||||
skill's creation that vault entry does not yet exist — see SKILL.md for the
|
||||
one-time provisioning steps.
|
||||
|
||||
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
|
||||
skill works on a bare Python install.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
try:
|
||||
import httpx # type: ignore
|
||||
|
||||
_HAS_HTTPX = True
|
||||
except ImportError: # pragma: no cover
|
||||
_HAS_HTTPX = False
|
||||
|
||||
SKILL_DIR = Path(__file__).resolve().parent.parent # .../.claude/skills/packetdial
|
||||
ERROR_BODY_MAX_CHARS = 1500
|
||||
DEFAULT_TIMEOUT = 60.0
|
||||
DEFAULT_CONNECT_TIMEOUT = 15.0
|
||||
|
||||
# pbx host is the API host; voip host is the customer portal (no API).
|
||||
API_BASE_URL = os.environ.get(
|
||||
"PACKETDIAL_API_BASE_URL", "https://pbx.packetdial.com/ns-api/v2"
|
||||
)
|
||||
TOKEN_URL = os.environ.get(
|
||||
"PACKETDIAL_TOKEN_URL", "https://pbx.packetdial.com/ns-api/v2/tokens"
|
||||
)
|
||||
|
||||
VAULT_ENTRY = "msp-tools/oitvoip.sops.yaml"
|
||||
|
||||
|
||||
class PacketDialError(Exception):
|
||||
"""Any failure talking to the NetSapiens API or loading credentials."""
|
||||
|
||||
|
||||
# --- repo-root + credential loading -------------------------------------------
|
||||
def _resolve_claudetools_root() -> Path:
|
||||
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived.
|
||||
|
||||
Final fallback is derived from this file's location so it works on the
|
||||
Mac/Linux fleet, not only the Windows default.
|
||||
"""
|
||||
# SKILL_DIR = .../.claude/skills/packetdial ; root is three levels up.
|
||||
derived_root = SKILL_DIR.parent.parent.parent
|
||||
|
||||
env_root = os.environ.get("CLAUDETOOLS_ROOT")
|
||||
if env_root:
|
||||
return Path(env_root)
|
||||
|
||||
identity_path = derived_root / ".claude" / "identity.json"
|
||||
if identity_path.exists():
|
||||
try:
|
||||
data = json.loads(identity_path.read_text(encoding="utf-8"))
|
||||
root = data.get("claudetools_root")
|
||||
if root:
|
||||
return Path(root)
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
return derived_root
|
||||
|
||||
|
||||
def _vault_field(field: str) -> Optional[str]:
|
||||
"""Read a single field from the oitvoip vault entry. Returns None if absent.
|
||||
|
||||
Soft failure: a missing field (vault exits non-zero) returns None so the
|
||||
caller can try the next credential shape. A missing vault wrapper or bash
|
||||
raises, since that is an environment problem the user must fix.
|
||||
"""
|
||||
root = _resolve_claudetools_root()
|
||||
vault_script = root / ".claude" / "scripts" / "vault.sh"
|
||||
if not vault_script.exists():
|
||||
raise PacketDialError(
|
||||
f"vault wrapper not found at {vault_script}; set PACKETDIAL_API_KEY "
|
||||
"or PACKETDIAL_* OAuth env vars instead."
|
||||
)
|
||||
try:
|
||||
completed = subprocess.run(
|
||||
["bash", str(vault_script), "get-field", VAULT_ENTRY, field],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
)
|
||||
except FileNotFoundError as exc:
|
||||
raise PacketDialError(
|
||||
"'bash' not found on PATH. Install Git Bash or set PACKETDIAL_API_KEY."
|
||||
) from exc
|
||||
except subprocess.TimeoutExpired as exc:
|
||||
raise PacketDialError("vault call timed out.") from exc
|
||||
|
||||
if completed.returncode != 0:
|
||||
return None
|
||||
value = completed.stdout.strip()
|
||||
return value or None
|
||||
|
||||
|
||||
def load_credentials() -> dict:
|
||||
"""Resolve NetSapiens credentials. Returns a dict describing the auth mode.
|
||||
|
||||
Returns either:
|
||||
{"mode": "apikey", "api_key": "nsr_..."}
|
||||
{"mode": "oauth", "client_id": ..., "client_secret": ...,
|
||||
"username": ..., "password": ...}
|
||||
|
||||
Resolution order:
|
||||
1. PACKETDIAL_API_KEY env -> apikey mode
|
||||
2. PACKETDIAL_CLIENT_ID + ... env -> oauth mode
|
||||
3. vault credentials.api_key -> apikey mode
|
||||
4. vault credentials.{client_id,client_secret,username,password} -> oauth
|
||||
|
||||
Raises PacketDialError with provisioning guidance if nothing resolves.
|
||||
"""
|
||||
env_key = os.environ.get("PACKETDIAL_API_KEY")
|
||||
if env_key:
|
||||
return {"mode": "apikey", "api_key": env_key.strip()}
|
||||
|
||||
env_client = os.environ.get("PACKETDIAL_CLIENT_ID")
|
||||
if env_client:
|
||||
return {
|
||||
"mode": "oauth",
|
||||
"client_id": env_client.strip(),
|
||||
"client_secret": (os.environ.get("PACKETDIAL_CLIENT_SECRET") or "").strip(),
|
||||
"username": (os.environ.get("PACKETDIAL_USERNAME") or "").strip(),
|
||||
"password": (os.environ.get("PACKETDIAL_PASSWORD") or "").strip(),
|
||||
}
|
||||
|
||||
api_key = _vault_field("credentials.api_key")
|
||||
if api_key:
|
||||
return {"mode": "apikey", "api_key": api_key}
|
||||
|
||||
client_id = _vault_field("credentials.client_id")
|
||||
if client_id:
|
||||
return {
|
||||
"mode": "oauth",
|
||||
"client_id": client_id,
|
||||
"client_secret": _vault_field("credentials.client_secret") or "",
|
||||
"username": _vault_field("credentials.username") or "",
|
||||
"password": _vault_field("credentials.password") or "",
|
||||
}
|
||||
|
||||
raise PacketDialError(
|
||||
"No PacketDial / NetSapiens credentials found.\n"
|
||||
f" Expected vault entry: {VAULT_ENTRY} with either:\n"
|
||||
" credentials.api_key (nsr_ reseller bearer key) -- preferred\n"
|
||||
" or OAuth password-grant fields:\n"
|
||||
" credentials.client_id\n"
|
||||
" credentials.client_secret\n"
|
||||
" credentials.username\n"
|
||||
" credentials.password\n"
|
||||
" Provision a key in pbx.packetdial.com -> Admin > API Keys, then store it.\n"
|
||||
" See .claude/skills/packetdial/SKILL.md for the full setup steps."
|
||||
)
|
||||
|
||||
|
||||
# --- client -------------------------------------------------------------------
|
||||
class NetSapiensClient:
|
||||
def __init__(
|
||||
self,
|
||||
api_base_url: str = API_BASE_URL,
|
||||
token_url: str = TOKEN_URL,
|
||||
timeout: float = DEFAULT_TIMEOUT,
|
||||
connect_timeout: float = DEFAULT_CONNECT_TIMEOUT,
|
||||
):
|
||||
self.api_base_url = api_base_url.rstrip("/")
|
||||
self.token_url = token_url
|
||||
self.timeout = timeout
|
||||
self.connect_timeout = connect_timeout
|
||||
self._creds: Optional[dict] = None
|
||||
self._bearer: Optional[str] = None
|
||||
self._bearer_expiry: float = 0.0
|
||||
|
||||
# -- auth ------------------------------------------------------------------
|
||||
@property
|
||||
def creds(self) -> dict:
|
||||
if self._creds is None:
|
||||
self._creds = load_credentials()
|
||||
return self._creds
|
||||
|
||||
def _bearer_token(self) -> str:
|
||||
"""Return a valid bearer token, fetching/refreshing an OAuth one if needed."""
|
||||
creds = self.creds
|
||||
if creds["mode"] == "apikey":
|
||||
return creds["api_key"]
|
||||
|
||||
# oauth: reuse cached token until ~60s before expiry
|
||||
if self._bearer and time.monotonic() < self._bearer_expiry - 60:
|
||||
return self._bearer
|
||||
|
||||
form = {
|
||||
"grant_type": "password",
|
||||
"client_id": creds["client_id"],
|
||||
"client_secret": creds["client_secret"],
|
||||
"username": creds["username"],
|
||||
"password": creds["password"],
|
||||
}
|
||||
body = self._http(
|
||||
"POST",
|
||||
self.token_url,
|
||||
data=urllib.parse.urlencode(form).encode("utf-8"),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
auth_header=None,
|
||||
)
|
||||
if not isinstance(body, dict) or "access_token" not in body:
|
||||
raise PacketDialError(f"OAuth token response missing access_token: {body}")
|
||||
self._bearer = body["access_token"]
|
||||
expires_in = float(body.get("expires_in", 3600))
|
||||
self._bearer_expiry = time.monotonic() + expires_in
|
||||
return self._bearer
|
||||
|
||||
# -- core transport --------------------------------------------------------
|
||||
def request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
params: Optional[dict] = None,
|
||||
json_body: Optional[dict] = None,
|
||||
) -> Any:
|
||||
"""One REST call against the API base. `path` is relative (e.g. 'domains')."""
|
||||
url = f"{self.api_base_url}/{path.lstrip('/')}"
|
||||
if params:
|
||||
url = f"{url}?{urllib.parse.urlencode(params, doseq=True)}"
|
||||
data = json.dumps(json_body).encode("utf-8") if json_body is not None else None
|
||||
headers = {"Accept": "application/json"}
|
||||
if data is not None:
|
||||
headers["Content-Type"] = "application/json"
|
||||
return self._http(
|
||||
method, url, data=data, headers=headers,
|
||||
auth_header=f"Bearer {self._bearer_token()}",
|
||||
)
|
||||
|
||||
def _http(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
data: Optional[bytes] = None,
|
||||
headers: Optional[dict] = None,
|
||||
auth_header: Optional[str] = "__use_bearer__",
|
||||
) -> Any:
|
||||
hdrs = dict(headers or {})
|
||||
if auth_header and auth_header != "__use_bearer__":
|
||||
hdrs["Authorization"] = auth_header
|
||||
|
||||
if _HAS_HTTPX:
|
||||
try:
|
||||
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
|
||||
with httpx.Client(timeout=timeout) as client:
|
||||
resp = client.request(method, url, content=data, headers=hdrs)
|
||||
resp.raise_for_status()
|
||||
return self._parse(resp.content)
|
||||
except httpx.TimeoutException as exc:
|
||||
raise PacketDialError(f"request timed out: {exc}") from exc
|
||||
except httpx.HTTPStatusError as exc:
|
||||
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
|
||||
raise PacketDialError(
|
||||
f"HTTP {exc.response.status_code} {method} {url}: {detail}"
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
raise PacketDialError(f"request failed: {exc}") from exc
|
||||
|
||||
# stdlib fallback
|
||||
req = urllib.request.Request(url, data=data, method=method, headers=hdrs)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
|
||||
return self._parse(resp.read())
|
||||
except urllib.error.HTTPError as exc:
|
||||
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
|
||||
raise PacketDialError(f"HTTP {exc.code} {method} {url}: {detail}") from exc
|
||||
except urllib.error.URLError as exc:
|
||||
raise PacketDialError(f"request failed: {exc}") from exc
|
||||
|
||||
@staticmethod
|
||||
def _parse(raw: bytes) -> Any:
|
||||
if not raw:
|
||||
return None
|
||||
try:
|
||||
return json.loads(raw.decode("utf-8"))
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
return raw.decode("utf-8", errors="replace")
|
||||
|
||||
# ======================================================================
|
||||
# READ METHODS (safe — always live)
|
||||
# ======================================================================
|
||||
def version(self) -> Any:
|
||||
return self.request("GET", "version")
|
||||
|
||||
def whoami(self) -> Any:
|
||||
"""Details of the currently authenticated API key."""
|
||||
return self.request("GET", "apikeys/~")
|
||||
|
||||
def domains(self, limit: int = 1000) -> Any:
|
||||
return self.request("GET", "domains", params={"limit": limit})
|
||||
|
||||
def domain(self, domain: str) -> Any:
|
||||
return self.request("GET", f"domains/{urllib.parse.quote(domain)}")
|
||||
|
||||
def users(self, domain: str) -> Any:
|
||||
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/users")
|
||||
|
||||
def user(self, domain: str, user: str) -> Any:
|
||||
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
|
||||
return self.request("GET", f"domains/{d}/users/{u}")
|
||||
|
||||
def phones(self, domain: str) -> Any:
|
||||
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/phones")
|
||||
|
||||
def devices(self, domain: str, user: str) -> Any:
|
||||
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
|
||||
return self.request("GET", f"domains/{d}/users/{u}/devices")
|
||||
|
||||
def phonenumbers(self, domain: str) -> Any:
|
||||
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/phonenumbers")
|
||||
|
||||
def resellers(self) -> Any:
|
||||
return self.request("GET", "resellers")
|
||||
|
||||
def cdrs(self, domain: Optional[str] = None, **filters) -> Any:
|
||||
if domain:
|
||||
return self.request(
|
||||
"GET", f"domains/{urllib.parse.quote(domain)}/cdrs", params=filters or None
|
||||
)
|
||||
return self.request("GET", "cdrs", params=filters or None)
|
||||
|
||||
def apikeys(self) -> Any:
|
||||
return self.request("GET", "apikeys/~")
|
||||
|
||||
def subscriptions(self) -> Any:
|
||||
return self.request("GET", "subscriptions")
|
||||
|
||||
# ======================================================================
|
||||
# WRITE METHODS (gated — the CLI requires --confirm before calling these)
|
||||
# ======================================================================
|
||||
def create_domain(self, body: dict) -> Any:
|
||||
return self.request("POST", "domains", json_body=body)
|
||||
|
||||
def create_user(self, domain: str, body: dict) -> Any:
|
||||
return self.request(
|
||||
"POST", f"domains/{urllib.parse.quote(domain)}/users", json_body=body
|
||||
)
|
||||
|
||||
def update_user(self, domain: str, user: str, body: dict) -> Any:
|
||||
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
|
||||
return self.request("PUT", f"domains/{d}/users/{u}", json_body=body)
|
||||
|
||||
def delete_user(self, domain: str, user: str) -> Any:
|
||||
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
|
||||
return self.request("DELETE", f"domains/{d}/users/{u}")
|
||||
|
||||
def create_phone(self, domain: str, body: dict) -> Any:
|
||||
return self.request(
|
||||
"POST", f"domains/{urllib.parse.quote(domain)}/phones", json_body=body
|
||||
)
|
||||
|
||||
def create_phonenumber(self, domain: str, body: dict) -> Any:
|
||||
return self.request(
|
||||
"POST", f"domains/{urllib.parse.quote(domain)}/phonenumbers", json_body=body
|
||||
)
|
||||
Reference in New Issue
Block a user