#!/usr/bin/env python3 """Client for the packetdial skill — NetSapiens SNAPsolution API v2. Talks to the live PacketDial / OITVOIP reseller PBX at pbx.packetdial.com (NetSapiens v44.4). voip.packetdial.com is the customer-facing white-label portal and is NOT the API host — the programmable surface lives on pbx. Auth: Bearer token. Two credential shapes are supported, tried in this order: 1. Static API key (`nsr_` reseller-scoped or `nss_` system) used directly as the bearer token. Preferred for machine-to-machine. 2. OAuth2 password grant: client_id + client_secret + username + password POSTed to /tokens, which returns a short-lived access_token (JWT). Credentials are NEVER hardcoded. They are loaded at runtime from the SOPS vault entry `msp-tools/oitvoip.sops.yaml`, or from environment overrides. As of the skill's creation that vault entry does not yet exist — see SKILL.md for the one-time provisioning steps. Transport: prefers httpx if installed, else falls back to stdlib urllib so the skill works on a bare Python install. """ from __future__ import annotations import json import os import subprocess import time import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any, Optional try: import httpx # type: ignore _HAS_HTTPX = True except ImportError: # pragma: no cover _HAS_HTTPX = False SKILL_DIR = Path(__file__).resolve().parent.parent # .../.claude/skills/packetdial ERROR_BODY_MAX_CHARS = 1500 DEFAULT_TIMEOUT = 60.0 DEFAULT_CONNECT_TIMEOUT = 15.0 # pbx host is the API host; voip host is the customer portal (no API). API_BASE_URL = os.environ.get( "PACKETDIAL_API_BASE_URL", "https://pbx.packetdial.com/ns-api/v2" ) TOKEN_URL = os.environ.get( "PACKETDIAL_TOKEN_URL", "https://pbx.packetdial.com/ns-api/v2/tokens" ) VAULT_ENTRY = "msp-tools/oitvoip.sops.yaml" class PacketDialError(Exception): """Any failure talking to the NetSapiens API or loading credentials.""" # --- repo-root + credential loading ------------------------------------------- def _resolve_claudetools_root() -> Path: """Resolve the ClaudeTools repo root: env var, then identity.json, then derived. Final fallback is derived from this file's location so it works on the Mac/Linux fleet, not only the Windows default. """ # SKILL_DIR = .../.claude/skills/packetdial ; root is three levels up. derived_root = SKILL_DIR.parent.parent.parent env_root = os.environ.get("CLAUDETOOLS_ROOT") if env_root: return Path(env_root) identity_path = derived_root / ".claude" / "identity.json" if identity_path.exists(): try: data = json.loads(identity_path.read_text(encoding="utf-8")) root = data.get("claudetools_root") if root: return Path(root) except (json.JSONDecodeError, OSError): pass return derived_root def _vault_field(field: str) -> Optional[str]: """Read a single field from the oitvoip vault entry. Returns None if absent. Soft failure: a missing field (vault exits non-zero) returns None so the caller can try the next credential shape. A missing vault wrapper or bash raises, since that is an environment problem the user must fix. """ root = _resolve_claudetools_root() vault_script = root / ".claude" / "scripts" / "vault.sh" if not vault_script.exists(): raise PacketDialError( f"vault wrapper not found at {vault_script}; set PACKETDIAL_API_KEY " "or PACKETDIAL_* OAuth env vars instead." ) try: completed = subprocess.run( ["bash", str(vault_script), "get-field", VAULT_ENTRY, field], capture_output=True, text=True, timeout=60, ) except FileNotFoundError as exc: raise PacketDialError( "'bash' not found on PATH. Install Git Bash or set PACKETDIAL_API_KEY." ) from exc except subprocess.TimeoutExpired as exc: raise PacketDialError("vault call timed out.") from exc if completed.returncode != 0: return None value = completed.stdout.strip() return value or None def load_credentials() -> dict: """Resolve NetSapiens credentials. Returns a dict describing the auth mode. Returns either: {"mode": "apikey", "api_key": "nsr_..."} {"mode": "oauth", "client_id": ..., "client_secret": ..., "username": ..., "password": ...} Resolution order: 1. PACKETDIAL_API_KEY env -> apikey mode 2. PACKETDIAL_CLIENT_ID + ... env -> oauth mode 3. vault credentials.api_key -> apikey mode 4. vault credentials.{client_id,client_secret,username,password} -> oauth Raises PacketDialError with provisioning guidance if nothing resolves. """ env_key = os.environ.get("PACKETDIAL_API_KEY") if env_key: return {"mode": "apikey", "api_key": env_key.strip()} env_client = os.environ.get("PACKETDIAL_CLIENT_ID") if env_client: return { "mode": "oauth", "client_id": env_client.strip(), "client_secret": (os.environ.get("PACKETDIAL_CLIENT_SECRET") or "").strip(), "username": (os.environ.get("PACKETDIAL_USERNAME") or "").strip(), "password": (os.environ.get("PACKETDIAL_PASSWORD") or "").strip(), } api_key = _vault_field("credentials.api_key") if api_key: return {"mode": "apikey", "api_key": api_key} client_id = _vault_field("credentials.client_id") if client_id: return { "mode": "oauth", "client_id": client_id, "client_secret": _vault_field("credentials.client_secret") or "", "username": _vault_field("credentials.username") or "", "password": _vault_field("credentials.password") or "", } raise PacketDialError( "No PacketDial / NetSapiens credentials found.\n" f" Expected vault entry: {VAULT_ENTRY} with either:\n" " credentials.api_key (nsr_ reseller bearer key) -- preferred\n" " or OAuth password-grant fields:\n" " credentials.client_id\n" " credentials.client_secret\n" " credentials.username\n" " credentials.password\n" " Provision a key in pbx.packetdial.com -> Admin > API Keys, then store it.\n" " See .claude/skills/packetdial/SKILL.md for the full setup steps." ) # --- client ------------------------------------------------------------------- class NetSapiensClient: def __init__( self, api_base_url: str = API_BASE_URL, token_url: str = TOKEN_URL, timeout: float = DEFAULT_TIMEOUT, connect_timeout: float = DEFAULT_CONNECT_TIMEOUT, ): self.api_base_url = api_base_url.rstrip("/") self.token_url = token_url self.timeout = timeout self.connect_timeout = connect_timeout self._creds: Optional[dict] = None self._bearer: Optional[str] = None self._bearer_expiry: float = 0.0 # -- auth ------------------------------------------------------------------ @property def creds(self) -> dict: if self._creds is None: self._creds = load_credentials() return self._creds def _bearer_token(self) -> str: """Return a valid bearer token, fetching/refreshing an OAuth one if needed.""" creds = self.creds if creds["mode"] == "apikey": return creds["api_key"] # oauth: reuse cached token until ~60s before expiry if self._bearer and time.monotonic() < self._bearer_expiry - 60: return self._bearer form = { "grant_type": "password", "client_id": creds["client_id"], "client_secret": creds["client_secret"], "username": creds["username"], "password": creds["password"], } body = self._http( "POST", self.token_url, data=urllib.parse.urlencode(form).encode("utf-8"), headers={"Content-Type": "application/x-www-form-urlencoded"}, auth_header=None, ) if not isinstance(body, dict) or "access_token" not in body: raise PacketDialError(f"OAuth token response missing access_token: {body}") self._bearer = body["access_token"] expires_in = float(body.get("expires_in", 3600)) self._bearer_expiry = time.monotonic() + expires_in return self._bearer # -- core transport -------------------------------------------------------- def request( self, method: str, path: str, params: Optional[dict] = None, json_body: Optional[dict] = None, ) -> Any: """One REST call against the API base. `path` is relative (e.g. 'domains').""" url = f"{self.api_base_url}/{path.lstrip('/')}" if params: url = f"{url}?{urllib.parse.urlencode(params, doseq=True)}" data = json.dumps(json_body).encode("utf-8") if json_body is not None else None headers = {"Accept": "application/json"} if data is not None: headers["Content-Type"] = "application/json" return self._http( method, url, data=data, headers=headers, auth_header=f"Bearer {self._bearer_token()}", ) def _http( self, method: str, url: str, data: Optional[bytes] = None, headers: Optional[dict] = None, auth_header: Optional[str] = "__use_bearer__", ) -> Any: hdrs = dict(headers or {}) if auth_header and auth_header != "__use_bearer__": hdrs["Authorization"] = auth_header if _HAS_HTTPX: try: timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout) with httpx.Client(timeout=timeout) as client: resp = client.request(method, url, content=data, headers=hdrs) resp.raise_for_status() return self._parse(resp.content) except httpx.TimeoutException as exc: raise PacketDialError(f"request timed out: {exc}") from exc except httpx.HTTPStatusError as exc: detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS] raise PacketDialError( f"HTTP {exc.response.status_code} {method} {url}: {detail}" ) from exc except httpx.HTTPError as exc: raise PacketDialError(f"request failed: {exc}") from exc # stdlib fallback req = urllib.request.Request(url, data=data, method=method, headers=hdrs) try: with urllib.request.urlopen(req, timeout=self.timeout) as resp: return self._parse(resp.read()) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS] raise PacketDialError(f"HTTP {exc.code} {method} {url}: {detail}") from exc except urllib.error.URLError as exc: raise PacketDialError(f"request failed: {exc}") from exc @staticmethod def _parse(raw: bytes) -> Any: if not raw: return None try: return json.loads(raw.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError): return raw.decode("utf-8", errors="replace") # ====================================================================== # READ METHODS (safe — always live) # ====================================================================== def version(self) -> Any: return self.request("GET", "version") def whoami(self) -> Any: """Details of the currently authenticated API key.""" return self.request("GET", "apikeys/~") def domains(self, limit: int = 1000) -> Any: return self.request("GET", "domains", params={"limit": limit}) def domain(self, domain: str) -> Any: return self.request("GET", f"domains/{urllib.parse.quote(domain)}") def users(self, domain: str) -> Any: return self.request("GET", f"domains/{urllib.parse.quote(domain)}/users") def user(self, domain: str, user: str) -> Any: d, u = urllib.parse.quote(domain), urllib.parse.quote(user) return self.request("GET", f"domains/{d}/users/{u}") def phones(self, domain: str) -> Any: return self.request("GET", f"domains/{urllib.parse.quote(domain)}/phones") def devices(self, domain: str, user: str) -> Any: d, u = urllib.parse.quote(domain), urllib.parse.quote(user) return self.request("GET", f"domains/{d}/users/{u}/devices") def phonenumbers(self, domain: str) -> Any: return self.request("GET", f"domains/{urllib.parse.quote(domain)}/phonenumbers") def resellers(self) -> Any: return self.request("GET", "resellers") def cdrs(self, domain: Optional[str] = None, **filters) -> Any: if domain: return self.request( "GET", f"domains/{urllib.parse.quote(domain)}/cdrs", params=filters or None ) return self.request("GET", "cdrs", params=filters or None) def apikeys(self) -> Any: return self.request("GET", "apikeys/~") def subscriptions(self) -> Any: return self.request("GET", "subscriptions") # ====================================================================== # WRITE METHODS (gated — the CLI requires --confirm before calling these) # ====================================================================== def create_domain(self, body: dict) -> Any: return self.request("POST", "domains", json_body=body) def create_user(self, domain: str, body: dict) -> Any: return self.request( "POST", f"domains/{urllib.parse.quote(domain)}/users", json_body=body ) def update_user(self, domain: str, user: str, body: dict) -> Any: d, u = urllib.parse.quote(domain), urllib.parse.quote(user) return self.request("PUT", f"domains/{d}/users/{u}", json_body=body) def delete_user(self, domain: str, user: str) -> Any: d, u = urllib.parse.quote(domain), urllib.parse.quote(user) return self.request("DELETE", f"domains/{d}/users/{u}") def create_phone(self, domain: str, body: dict) -> Any: return self.request( "POST", f"domains/{urllib.parse.quote(domain)}/phones", json_body=body ) def create_phonenumber(self, domain: str, body: dict) -> Any: return self.request( "POST", f"domains/{urllib.parse.quote(domain)}/phonenumbers", json_body=body )