Files
claudetools/.claude/skills/packetdial/scripts/ns_client.py
Mike Swanson d1d1302d55 packetdial: add onboard-domain wrapper (GUI Add-a-Domain -> 3-call API flow)
onboard-domain runs POST /domains -> addresses/validate (gen E911 pidflo) -> addresses/create
from one JSON body (domain fields + optional `emergency` block), gated --confirm. Reverse-
engineered from the OITVOIP wizard screenshots; live-created the real client domain
vwp.91912.service (Valley Wide Plastering) + E911 address, and proved the wrapper with a
throwaway create->delete (no leftovers, vwp intact). Documented GUI->API mapping + the two
manual gaps (voicemail user-defaults, email-send-from-address pending the packetdial.com mailbox)
+ the domain-type "no"-on-create quirk.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 09:34:48 -07:00

601 lines
26 KiB
Python

#!/usr/bin/env python3
"""Client for the packetdial skill — NetSapiens SNAPsolution API v2.
Talks to the live PacketDial / OITVOIP reseller PBX at pbx.packetdial.com
(NetSapiens v44.4). voip.packetdial.com is the customer-facing white-label
portal and is NOT the API host — the programmable surface lives on pbx.
Auth: Bearer token. Two credential shapes are supported, tried in this order:
1. Static API key (`nsr_` reseller-scoped or `nss_` system) used directly as
the bearer token. Preferred for machine-to-machine.
2. OAuth2 password grant: client_id + client_secret + username + password
POSTed to /tokens, which returns a short-lived access_token (JWT).
Credentials are NEVER hardcoded. They are loaded at runtime from the SOPS vault
entry `msp-tools/oitvoip.sops.yaml`, or from environment overrides. As of the
skill's creation that vault entry does not yet exist — see SKILL.md for the
one-time provisioning steps.
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
skill works on a bare Python install.
"""
from __future__ import annotations
import json
import os
import subprocess
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Optional
try:
import httpx # type: ignore
_HAS_HTTPX = True
except ImportError: # pragma: no cover
_HAS_HTTPX = False
SKILL_DIR = Path(__file__).resolve().parent.parent # .../.claude/skills/packetdial
ERROR_BODY_MAX_CHARS = 1500
DEFAULT_TIMEOUT = 60.0
DEFAULT_CONNECT_TIMEOUT = 15.0
# pbx host is the API host; voip host is the customer portal (no API).
API_BASE_URL = os.environ.get(
"PACKETDIAL_API_BASE_URL", "https://pbx.packetdial.com/ns-api/v2"
)
TOKEN_URL = os.environ.get(
"PACKETDIAL_TOKEN_URL", "https://pbx.packetdial.com/ns-api/v2/tokens"
)
VAULT_ENTRY = "msp-tools/oitvoip.sops.yaml"
class PacketDialError(Exception):
"""Any failure talking to the NetSapiens API or loading credentials."""
# --- repo-root + credential loading -------------------------------------------
def _resolve_claudetools_root() -> Path:
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived.
Final fallback is derived from this file's location so it works on the
Mac/Linux fleet, not only the Windows default.
"""
# SKILL_DIR = .../.claude/skills/packetdial ; root is three levels up.
derived_root = SKILL_DIR.parent.parent.parent
env_root = os.environ.get("CLAUDETOOLS_ROOT")
if env_root:
return Path(env_root)
identity_path = derived_root / ".claude" / "identity.json"
if identity_path.exists():
try:
data = json.loads(identity_path.read_text(encoding="utf-8"))
root = data.get("claudetools_root")
if root:
return Path(root)
except (json.JSONDecodeError, OSError):
pass
return derived_root
def _vault_field(field: str) -> Optional[str]:
"""Read a single field from the oitvoip vault entry. Returns None if absent.
Soft failure: a missing field (vault exits non-zero) returns None so the
caller can try the next credential shape. A missing vault wrapper or bash
raises, since that is an environment problem the user must fix.
"""
root = _resolve_claudetools_root()
vault_script = root / ".claude" / "scripts" / "vault.sh"
if not vault_script.exists():
raise PacketDialError(
f"vault wrapper not found at {vault_script}; set PACKETDIAL_API_KEY "
"or PACKETDIAL_* OAuth env vars instead."
)
try:
completed = subprocess.run(
["bash", str(vault_script), "get-field", VAULT_ENTRY, field],
capture_output=True,
text=True,
timeout=60,
)
except FileNotFoundError as exc:
raise PacketDialError(
"'bash' not found on PATH. Install Git Bash or set PACKETDIAL_API_KEY."
) from exc
except subprocess.TimeoutExpired as exc:
raise PacketDialError("vault call timed out.") from exc
if completed.returncode != 0:
return None
value = completed.stdout.strip()
return value or None
def load_credentials() -> dict:
"""Resolve NetSapiens credentials. Returns a dict describing the auth mode.
Returns either:
{"mode": "apikey", "api_key": "nsr_..."}
{"mode": "oauth", "client_id": ..., "client_secret": ...,
"username": ..., "password": ...}
Resolution order:
1. PACKETDIAL_API_KEY env -> apikey mode
2. PACKETDIAL_CLIENT_ID + ... env -> oauth mode
3. vault credentials.api_key -> apikey mode
4. vault credentials.{client_id,client_secret,username,password} -> oauth
Raises PacketDialError with provisioning guidance if nothing resolves.
"""
env_key = os.environ.get("PACKETDIAL_API_KEY")
if env_key:
return {"mode": "apikey", "api_key": env_key.strip()}
env_client = os.environ.get("PACKETDIAL_CLIENT_ID")
if env_client:
return {
"mode": "oauth",
"client_id": env_client.strip(),
"client_secret": (os.environ.get("PACKETDIAL_CLIENT_SECRET") or "").strip(),
"username": (os.environ.get("PACKETDIAL_USERNAME") or "").strip(),
"password": (os.environ.get("PACKETDIAL_PASSWORD") or "").strip(),
}
api_key = _vault_field("credentials.api_key")
if api_key:
return {"mode": "apikey", "api_key": api_key}
client_id = _vault_field("credentials.client_id")
if client_id:
return {
"mode": "oauth",
"client_id": client_id,
"client_secret": _vault_field("credentials.client_secret") or "",
"username": _vault_field("credentials.username") or "",
"password": _vault_field("credentials.password") or "",
}
raise PacketDialError(
"No PacketDial / NetSapiens credentials found.\n"
f" Expected vault entry: {VAULT_ENTRY} with either:\n"
" credentials.api_key (nsr_ reseller bearer key) -- preferred\n"
" or OAuth password-grant fields:\n"
" credentials.client_id\n"
" credentials.client_secret\n"
" credentials.username\n"
" credentials.password\n"
" Provision a key in pbx.packetdial.com -> Admin > API Keys, then store it.\n"
" See .claude/skills/packetdial/SKILL.md for the full setup steps."
)
# --- client -------------------------------------------------------------------
class NetSapiensClient:
def __init__(
self,
api_base_url: str = API_BASE_URL,
token_url: str = TOKEN_URL,
timeout: float = DEFAULT_TIMEOUT,
connect_timeout: float = DEFAULT_CONNECT_TIMEOUT,
):
self.api_base_url = api_base_url.rstrip("/")
self.token_url = token_url
self.timeout = timeout
self.connect_timeout = connect_timeout
self._creds: Optional[dict] = None
self._bearer: Optional[str] = None
self._bearer_expiry: float = 0.0
# -- auth ------------------------------------------------------------------
@property
def creds(self) -> dict:
if self._creds is None:
self._creds = load_credentials()
return self._creds
def _bearer_token(self) -> str:
"""Return a valid bearer token, fetching/refreshing an OAuth one if needed."""
creds = self.creds
if creds["mode"] == "apikey":
return creds["api_key"]
# oauth: reuse cached token until ~60s before expiry
if self._bearer and time.monotonic() < self._bearer_expiry - 60:
return self._bearer
form = {
"grant_type": "password",
"client_id": creds["client_id"],
"client_secret": creds["client_secret"],
"username": creds["username"],
"password": creds["password"],
}
body = self._http(
"POST",
self.token_url,
data=urllib.parse.urlencode(form).encode("utf-8"),
headers={"Content-Type": "application/x-www-form-urlencoded"},
auth_header=None,
)
if not isinstance(body, dict) or "access_token" not in body:
raise PacketDialError(f"OAuth token response missing access_token: {body}")
self._bearer = body["access_token"]
expires_in = float(body.get("expires_in", 3600))
self._bearer_expiry = time.monotonic() + expires_in
return self._bearer
# -- core transport --------------------------------------------------------
def request(
self,
method: str,
path: str,
params: Optional[dict] = None,
json_body: Optional[dict] = None,
) -> Any:
"""One REST call against the API base. `path` is relative (e.g. 'domains')."""
url = f"{self.api_base_url}/{path.lstrip('/')}"
if params:
url = f"{url}?{urllib.parse.urlencode(params, doseq=True)}"
data = json.dumps(json_body).encode("utf-8") if json_body is not None else None
headers = {"Accept": "application/json"}
if data is not None:
headers["Content-Type"] = "application/json"
return self._http(
method, url, data=data, headers=headers,
auth_header=f"Bearer {self._bearer_token()}",
)
def _http(
self,
method: str,
url: str,
data: Optional[bytes] = None,
headers: Optional[dict] = None,
auth_header: Optional[str] = "__use_bearer__",
) -> Any:
hdrs = dict(headers or {})
if auth_header and auth_header != "__use_bearer__":
hdrs["Authorization"] = auth_header
if _HAS_HTTPX:
try:
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
with httpx.Client(timeout=timeout) as client:
resp = client.request(method, url, content=data, headers=hdrs)
resp.raise_for_status()
return self._parse(resp.content)
except httpx.TimeoutException as exc:
raise PacketDialError(f"request timed out: {exc}") from exc
except httpx.HTTPStatusError as exc:
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
raise PacketDialError(
f"HTTP {exc.response.status_code} {method} {url}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise PacketDialError(f"request failed: {exc}") from exc
# stdlib fallback
req = urllib.request.Request(url, data=data, method=method, headers=hdrs)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
return self._parse(resp.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
raise PacketDialError(f"HTTP {exc.code} {method} {url}: {detail}") from exc
except urllib.error.URLError as exc:
raise PacketDialError(f"request failed: {exc}") from exc
@staticmethod
def _parse(raw: bytes) -> Any:
if not raw:
return None
try:
return json.loads(raw.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return raw.decode("utf-8", errors="replace")
# ======================================================================
# READ METHODS (safe — always live)
# ======================================================================
def version(self) -> Any:
return self.request("GET", "version")
def whoami(self) -> Any:
"""Details of the currently authenticated API key."""
return self.request("GET", "apikeys/~")
def domains(self, limit: int = 1000) -> Any:
return self.request("GET", "domains", params={"limit": limit})
def domain(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}")
def users(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/users")
def user(self, domain: str, user: str) -> Any:
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
return self.request("GET", f"domains/{d}/users/{u}")
def phones(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/phones")
def devices(self, domain: str, user: str) -> Any:
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
return self.request("GET", f"domains/{d}/users/{u}/devices")
def phonenumbers(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/phonenumbers")
def resellers(self) -> Any:
return self.request("GET", "resellers")
def cdrs(self, domain: Optional[str] = None, **filters) -> Any:
if domain:
return self.request(
"GET", f"domains/{urllib.parse.quote(domain)}/cdrs", params=filters or None
)
return self.request("GET", "cdrs", params=filters or None)
def apikeys(self) -> Any:
return self.request("GET", "apikeys/~")
def subscriptions(self) -> Any:
return self.request("GET", "subscriptions")
# --- per-domain feature resources (live-verified shapes, 2026-06-22) ---
def callqueues(self, domain: str) -> Any:
"""ACD call queues in a domain (agents, dispatch type, live queued count)."""
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/callqueues")
def timeframes(self, domain: str) -> Any:
"""Time-based routing schedules (business hours / holidays) for a domain."""
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/timeframes")
def sites(self, domain: str) -> Any:
"""Multi-site definitions within a domain."""
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/sites")
def contacts(self, domain: str) -> Any:
"""Shared/domain contacts (address book)."""
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/contacts")
def autoattendants(self, domain: str) -> Any:
"""Auto-attendants (IVR menus) in a domain."""
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/autoattendants")
def billing(self, domain: str) -> Any:
"""Domain billing/limits snapshot: max + current counts (users, queues, AAs, calls)."""
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/billing")
# ======================================================================
# WRITE METHODS (gated — the CLI requires --confirm before calling these)
# ======================================================================
def create_domain(self, body: dict) -> Any:
return self.request("POST", "domains", json_body=body)
def create_user(self, domain: str, body: dict) -> Any:
return self.request(
"POST", f"domains/{urllib.parse.quote(domain)}/users", json_body=body
)
def update_user(self, domain: str, user: str, body: dict) -> Any:
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
return self.request("PUT", f"domains/{d}/users/{u}", json_body=body)
def delete_user(self, domain: str, user: str) -> Any:
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
return self.request("DELETE", f"domains/{d}/users/{u}")
def create_phone(self, domain: str, body: dict) -> Any:
return self.request(
"POST", f"domains/{urllib.parse.quote(domain)}/phones", json_body=body
)
def create_phonenumber(self, domain: str, body: dict) -> Any:
return self.request(
"POST", f"domains/{urllib.parse.quote(domain)}/phonenumbers", json_body=body
)
# --- call queue writes (live-verified on arizonacomputerguru, 2026-06-22) ---
# Create body requires {"synchronous":"yes","callqueue":"<id>"} (+ optional config).
def create_callqueue(self, domain: str, body: dict) -> Any:
return self.request(
"POST", f"domains/{urllib.parse.quote(domain)}/callqueues", json_body=body
)
def update_callqueue(self, domain: str, callqueue: str, body: dict) -> Any:
d, c = urllib.parse.quote(domain), urllib.parse.quote(callqueue)
return self.request("PUT", f"domains/{d}/callqueues/{c}", json_body=body)
def delete_callqueue(self, domain: str, callqueue: str) -> Any:
d, c = urllib.parse.quote(domain), urllib.parse.quote(callqueue)
return self.request("DELETE", f"domains/{d}/callqueues/{c}")
def add_callqueue_agent(self, domain: str, callqueue: str, body: dict) -> Any:
# body requires {"callqueue-agent-id": "<ext>@<domain>"}.
d, c = urllib.parse.quote(domain), urllib.parse.quote(callqueue)
return self.request("POST", f"domains/{d}/callqueues/{c}/agents", json_body=body)
def update_callqueue_agent(self, domain: str, callqueue: str, agent_id: str, body: dict) -> Any:
d, c, a = (urllib.parse.quote(domain), urllib.parse.quote(callqueue),
urllib.parse.quote(agent_id))
return self.request("PUT", f"domains/{d}/callqueues/{c}/agents/{a}", json_body=body)
def remove_callqueue_agent(self, domain: str, callqueue: str, agent_id: str) -> Any:
d, c, a = (urllib.parse.quote(domain), urllib.parse.quote(callqueue),
urllib.parse.quote(agent_id))
return self.request("DELETE", f"domains/{d}/callqueues/{c}/agents/{a}")
# --- timeframe writes ---
# Same path, body-discriminated variants (NetSapiens routes PUT/DELETE by body):
# create: {"synchronous":"yes","timeframe-name":"X","timeframe-type":"days-of-week|specific-dates|holiday|custom|always"}
# update PUT body carries the type-specific array(s); pass --body verbatim.
# delete with no body = delete the whole timeframe; with {"child_id":N} = delete one entry.
def create_timeframe(self, domain: str, body: dict) -> Any:
return self.request(
"POST", f"domains/{urllib.parse.quote(domain)}/timeframes", json_body=body
)
def update_timeframe(self, domain: str, tf_id: str, body: dict) -> Any:
d, t = urllib.parse.quote(domain), urllib.parse.quote(tf_id)
return self.request("PUT", f"domains/{d}/timeframes/{t}", json_body=body)
def delete_timeframe(self, domain: str, tf_id: str, body: Optional[dict] = None) -> Any:
d, t = urllib.parse.quote(domain), urllib.parse.quote(tf_id)
return self.request("DELETE", f"domains/{d}/timeframes/{t}", json_body=body)
# --- DID (phonenumber) update/delete (completes create-did + read) ---
def update_phonenumber(self, domain: str, number: str, body: dict) -> Any:
d, n = urllib.parse.quote(domain), urllib.parse.quote(number)
return self.request("PUT", f"domains/{d}/phonenumbers/{n}", json_body=body)
def delete_phonenumber(self, domain: str, number: str) -> Any:
d, n = urllib.parse.quote(domain), urllib.parse.quote(number)
return self.request("DELETE", f"domains/{d}/phonenumbers/{n}")
# --- per-user SIP devices (write; read via devices()) ---
def create_device(self, domain: str, user: str, body: dict) -> Any:
d, u = urllib.parse.quote(domain), urllib.parse.quote(user)
return self.request("POST", f"domains/{d}/users/{u}/devices", json_body=body)
def update_device(self, domain: str, user: str, device: str, body: dict) -> Any:
d, u, v = (urllib.parse.quote(domain), urllib.parse.quote(user),
urllib.parse.quote(device))
return self.request("PUT", f"domains/{d}/users/{u}/devices/{v}", json_body=body)
def delete_device(self, domain: str, user: str, device: str) -> Any:
d, u, v = (urllib.parse.quote(domain), urllib.parse.quote(user),
urllib.parse.quote(device))
return self.request("DELETE", f"domains/{d}/users/{u}/devices/{v}")
# --- E911 addresses (read + write) ---
def addresses(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/addresses")
def create_address(self, domain: str, body: dict) -> Any:
return self.request("POST", f"domains/{urllib.parse.quote(domain)}/addresses", json_body=body)
def update_address(self, domain: str, address_id: str, body: dict) -> Any:
d, a = urllib.parse.quote(domain), urllib.parse.quote(address_id)
return self.request("PUT", f"domains/{d}/addresses/{a}", json_body=body)
def delete_address(self, domain: str, address_id: str) -> Any:
d, a = urllib.parse.quote(domain), urllib.parse.quote(address_id)
return self.request("DELETE", f"domains/{d}/addresses/{a}")
# --- domain contacts (write; read via contacts()) ---
def create_contact(self, domain: str, body: dict) -> Any:
return self.request("POST", f"domains/{urllib.parse.quote(domain)}/contacts", json_body=body)
def update_contact(self, domain: str, contact_id: str, body: dict) -> Any:
d, c = urllib.parse.quote(domain), urllib.parse.quote(contact_id)
return self.request("PUT", f"domains/{d}/contacts/{c}", json_body=body)
def delete_contact(self, domain: str, contact_id: str) -> Any:
d, c = urllib.parse.quote(domain), urllib.parse.quote(contact_id)
return self.request("DELETE", f"domains/{d}/contacts/{c}")
# --- sites (write; read via sites()) ---
def create_site(self, domain: str, body: dict) -> Any:
return self.request("POST", f"domains/{urllib.parse.quote(domain)}/sites", json_body=body)
def update_site(self, domain: str, site: str, body: dict) -> Any:
d, s = urllib.parse.quote(domain), urllib.parse.quote(site)
return self.request("PUT", f"domains/{d}/sites/{s}", json_body=body)
# --- auto-attendants (create; read via autoattendants()) ---
def create_autoattendant(self, domain: str, body: dict) -> Any:
return self.request("POST", f"domains/{urllib.parse.quote(domain)}/autoattendants", json_body=body)
# --- SMS numbers (read + write) ---
def smsnumbers(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/smsnumbers")
def create_smsnumber(self, domain: str, body: dict) -> Any:
return self.request("POST", f"domains/{urllib.parse.quote(domain)}/smsnumbers", json_body=body)
def update_smsnumber(self, domain: str, smsnumber: str, body: dict) -> Any:
d, s = urllib.parse.quote(domain), urllib.parse.quote(smsnumber)
return self.request("PUT", f"domains/{d}/smsnumbers/{s}", json_body=body)
def delete_smsnumber(self, domain: str, smsnumber: str) -> Any:
d, s = urllib.parse.quote(domain), urllib.parse.quote(smsnumber)
return self.request("DELETE", f"domains/{d}/smsnumbers/{s}")
# --- blocked-number filters (read + add/remove) ---
def blocked_numbers(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/number-filters")
def block_numbers(self, domain: str, body: dict) -> Any:
return self.request("POST", f"domains/{urllib.parse.quote(domain)}/number-filters", json_body=body)
def unblock_numbers(self, domain: str, body: dict) -> Any:
return self.request("DELETE", f"domains/{urllib.parse.quote(domain)}/number-filters", json_body=body)
# --- music on hold (read + TTS create/delete; uploads are multipart -> raw) ---
def moh(self, domain: str) -> Any:
return self.request("GET", f"domains/{urllib.parse.quote(domain)}/moh")
def create_moh(self, domain: str, body: dict) -> Any:
return self.request("POST", f"domains/{urllib.parse.quote(domain)}/moh", json_body=body)
def delete_moh(self, domain: str, index: str) -> Any:
d, i = urllib.parse.quote(domain), urllib.parse.quote(index)
return self.request("DELETE", f"domains/{d}/moh/{i}")
# --- dial rules / call data (read) ---
def dialrules(self, domain: str, dialplan: str) -> Any:
d, p = urllib.parse.quote(domain), urllib.parse.quote(dialplan)
return self.request("GET", f"domains/{d}/dialplans/{p}/dialrules")
def recording(self, domain: str, callid: str) -> Any:
d, c = urllib.parse.quote(domain), urllib.parse.quote(callid)
return self.request("GET", f"domains/{d}/recordings/{c}")
def transcriptions(self, domain: str, **filters) -> Any:
return self.request(
"GET", f"domains/{urllib.parse.quote(domain)}/transcriptions", params=filters or None
)
# ======================================================================
# ORCHESTRATION — onboard a new client domain (mirrors the GUI "Add a Domain"
# wizard: Basic + Defaults + Limitations -> POST /domains; Emergency tab ->
# validate (generates the pidflo) -> create E911 address). Live-proven against
# vwp.91912.service 2026-06-22.
# ======================================================================
def onboard_domain(self, body: dict) -> dict:
"""Create a domain and (optionally) its E911 address in one flow.
body = the POST /domains fields PLUS an optional "emergency" sub-object
(the address fields). Returns {"domain":..., "address_validate":...,
"address":...}. Requires body["domain"] (full, e.g. vwp.91912.service).
"""
body = dict(body)
emergency = body.pop("emergency", None)
body.setdefault("synchronous", "yes")
domain = body.get("domain")
if not domain:
raise PacketDialError("onboard_domain: body must include 'domain' (e.g. acme.91912.service)")
result = {"domain": self.create_domain(body)}
if emergency:
d = urllib.parse.quote(domain)
v = self.request("POST", f"domains/{d}/addresses/validate", json_body=dict(emergency))
vobj = v[0] if isinstance(v, list) and v else v
addr = dict(emergency)
addr["address-formatted-pidflo"] = vobj.get("address-formatted-pidflo")
addr["emergency-address-id"] = vobj.get("emergency-address-id")
result["address_validate"] = {"status": (vobj or {}).get("ValidationStatus")
or (vobj.get("address-formatted-pidflo") or {}).get("ValidationStatus")
if isinstance(vobj, dict) else None}
result["address"] = self.create_address(domain, addr)
return result