Files
claudetools/.claude/skills/bitdefender/scripts/gz_client.py
Howard Enos befd2650c8 fix(bitdefender): fifth-pass - companies lists full fleet, drop unused import
Convergence-pass LOW/NIT cleanup:
- cmd_companies uses list_all_companies() so a >100-company tenant isn't truncated
  in the listing (was page-1 only); matches sweep/inventory.
- removed unused 'field' import from dataclasses.
Deliberately NOT changed: id validation on delete-package/report-delete/blocklist-
remove/quarantine-remove/restore - those ids are not pinned 24-hex format, so
validating could reject valid input; they are --confirm-gated and bad ids match
the expected-error markers (no mislog). 81/81 selftest.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-25 14:05:32 -07:00

1426 lines
60 KiB
Python

#!/usr/bin/env python3
"""GravityZone Cloud Public API client for the bitdefender skill.
Standalone (does not import the api/ service). Reuses the proven JSON-RPC
shape, HTTP Basic auth (api_key as username, empty password), and the ACG
hardcoded tenant IDs.
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
script has no hard third-party dependency.
Credentials: never hardcoded. Loaded at runtime from the SOPS vault, or from
the GRAVITYZONE_API_KEY env var (testing override).
Cache: only the IDENTITY/STRUCTURE tier is cached (company/endpoint/policy
id<->name maps + endpoint fqdn, package list). No secrets are ever cached;
infra identifiers (hostnames/FQDNs) are. Volatile status (infected, lastSeen,
online, signature freshness) is NEVER cached and always pulled live. The cache
dir is gitignored.
"""
from __future__ import annotations
import base64
import json
import os
import random
import socket
import subprocess
import sys
import tempfile
import time
import urllib.error
import urllib.request
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any, Optional
# --- optional httpx -----------------------------------------------------------
# urllib (stdlib) is always imported as the fallback transport; httpx is used
# when present for connection pooling/timeouts.
try:
import httpx # type: ignore
_HAS_HTTPX = True
except ImportError: # pragma: no cover - depends on environment
_HAS_HTTPX = False
# Cap upstream error bodies surfaced in exceptions. The GravityZone key never
# appears here, but `raw` can call arbitrary methods whose responses may reflect
# other data - bound the blast radius rather than echo full bodies into logs.
ERROR_BODY_MAX_CHARS = 500
# --- transient-failure retry policy -------------------------------------------
# The live tenant is rate-limited (real HTTP 429s observed during sweeps, which
# fan out one getManagedEndpointDetails per endpoint across every company). Retry
# 429/5xx/timeout with bounded exponential backoff, honoring Retry-After.
RETRY_STATUSES = frozenset({429, 500, 502, 503, 504})
RETRY_MAX_ATTEMPTS = 4 # total tries = 1 initial + up to (MAX-1) retries
RETRY_BASE_DELAY_SECONDS = 1.0
RETRY_MAX_DELAY_SECONDS = 30.0 # cap on the exponential backoff
RETRY_AFTER_MAX_SECONDS = 120.0 # higher cap for a server-mandated Retry-After
class _RetryableHTTP(Exception):
"""Internal signal that a request failed transiently and may be retried.
`code` is the HTTP status (int) or a string: 'timeout' (ambiguous - may have
reached the server) or 'connect' (pre-send - no side effect, always safe)."""
def __init__(self, code, headers=None, detail=""):
self.code = code
self.headers = headers or {}
self.detail = detail
super().__init__(f"transient {code}")
def _retry_delay(headers, attempt: int) -> float:
"""Seconds to wait before the next retry: honor a Retry-After header when
present (numeric seconds or an HTTP-date), else exponential backoff + jitter."""
ra = None
try:
ra = headers.get("Retry-After") or headers.get("retry-after")
except AttributeError:
ra = None
if ra:
# An explicit server-mandated Retry-After is honored up to a HIGHER cap
# than the exponential backoff (don't retry early into another 429).
try:
# clamp to [0, ceiling]: a malformed negative Retry-After must not
# reach time.sleep() (which raises ValueError on a negative value).
return min(max(float(ra), 0.0), RETRY_AFTER_MAX_SECONDS)
except (TypeError, ValueError):
try:
dt = parsedate_to_datetime(ra)
if dt is not None:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = (dt - datetime.now(timezone.utc)).total_seconds()
if delta > 0:
return min(delta, RETRY_AFTER_MAX_SECONDS)
except (TypeError, ValueError):
pass
backoff = min(RETRY_BASE_DELAY_SECONDS * (2 ** attempt), RETRY_MAX_DELAY_SECONDS)
return backoff + random.uniform(0.0, backoff * 0.25)
# --- constants ----------------------------------------------------------------
GRAVITYZONE_API_BASE_URL = os.environ.get(
"GRAVITYZONE_API_BASE_URL",
"https://cloud.gravityzone.bitdefender.com/api/v1.0/jsonrpc",
)
GRAVITYZONE_TIMEOUT_SECONDS = 60.0
GRAVITYZONE_CONNECT_TIMEOUT_SECONDS = 10.0
# Hard ceiling on paginated loops: a misbehaving API that always returns a full
# page must never spin forever (the sweep also fans out N detail calls per page).
MAX_PAGINATION_PAGES = 1000
ACG_ROOT_COMPANY_ID = "5c4280716c0318f3478b456a"
ACG_COMPANIES_CONTAINER_ID = "5c4280716c0318f3478b456e"
VAULT_ENTRY = "msp-tools/gravityzone.sops.yaml"
VAULT_FIELD = "credentials.api_key"
CACHE_TTL_SECONDS = 86400
SKILL_DIR = Path(__file__).resolve().parent.parent
CACHE_DIR = SKILL_DIR / ".cache"
CACHE_FILE = CACHE_DIR / "inventory.json"
CACHE_LOCK_FILE = CACHE_DIR / "inventory.lock"
# Best-effort advisory lock for read-modify-write of the cache. Short timeout:
# losing a write-through update is acceptable; hanging the CLI is not.
CACHE_LOCK_TIMEOUT_SECONDS = 5.0
CACHE_LOCK_STALE_SECONDS = 30.0
class GravityZoneError(RuntimeError):
"""Raised for transport or JSON-RPC errors."""
@dataclass
class GZEndpointSummary:
endpoint_id: str
name: str
company_id: str
infected: bool
# True = a malware detection is currently active on the endpoint (tracks with infected); NOT an "engine on" indicator.
threat_detected: bool
signature_outdated: bool
product_outdated: bool
last_seen: Optional[str]
agent_version: Optional[str]
state: int
# --- credential loading -------------------------------------------------------
def _resolve_claudetools_root() -> Path:
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived path.
Final fallback is derived from this file's location
(.claude/skills/bitdefender/scripts -> repo root) so it works on the
Mac/Linux fleet machines, not only the Windows default.
"""
# SKILL_DIR = .../.claude/skills/bitdefender ; root is three levels up.
derived_root = SKILL_DIR.parent.parent.parent
env_root = os.environ.get("CLAUDETOOLS_ROOT")
if env_root:
return Path(env_root)
identity_path = derived_root / ".claude" / "identity.json"
if identity_path.exists():
try:
data = json.loads(identity_path.read_text(encoding="utf-8"))
root = data.get("claudetools_root")
if root:
return Path(root)
except (json.JSONDecodeError, OSError):
pass
return derived_root
def load_api_key() -> str:
"""Load the GravityZone API key.
Order: GRAVITYZONE_API_KEY env override, then the SOPS vault wrapper.
Never returns an empty key - raises if it cannot resolve one.
"""
env_key = os.environ.get("GRAVITYZONE_API_KEY")
if env_key:
return env_key.strip()
root = _resolve_claudetools_root()
vault_script = root / ".claude" / "scripts" / "vault.sh"
if not vault_script.exists():
raise GravityZoneError(
f"Cannot load API key: vault wrapper not found at {vault_script} "
"and GRAVITYZONE_API_KEY is not set."
)
try:
completed = subprocess.run(
["bash", str(vault_script), "get-field", VAULT_ENTRY, VAULT_FIELD],
capture_output=True,
text=True,
timeout=60,
)
except FileNotFoundError as exc:
raise GravityZoneError(
"Cannot load API key: 'bash' not found on PATH. Install Git Bash or "
"set GRAVITYZONE_API_KEY."
) from exc
except subprocess.TimeoutExpired as exc:
raise GravityZoneError("Cannot load API key: vault call timed out.") from exc
if completed.returncode != 0:
raise GravityZoneError(
"Cannot load API key from vault "
f"(exit {completed.returncode}): {completed.stderr.strip()}"
)
key = completed.stdout.strip()
if not key:
raise GravityZoneError("Vault returned an empty API key.")
return key
# --- client -------------------------------------------------------------------
class GravityZoneClient:
def __init__(
self,
api_key: Optional[str] = None,
api_base_url: str = GRAVITYZONE_API_BASE_URL,
timeout: float = GRAVITYZONE_TIMEOUT_SECONDS,
connect_timeout: float = GRAVITYZONE_CONNECT_TIMEOUT_SECONDS,
):
self.api_base_url = api_base_url.rstrip("/")
self._api_key = api_key # lazily loaded if None
self.timeout = timeout
self.connect_timeout = connect_timeout
self._httpx_client = None # reused across calls (pooling) when httpx present
def close(self) -> None:
"""Close the pooled httpx client, if one was opened."""
if self._httpx_client is not None:
try:
self._httpx_client.close()
finally:
self._httpx_client = None
def __enter__(self) -> "GravityZoneClient":
return self
def __exit__(self, *exc) -> None:
self.close()
@property
def _client(self):
"""Lazily create and reuse a single httpx.Client so a multi-call sweep
shares one connection pool instead of a TLS handshake per request."""
if self._httpx_client is None:
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
self._httpx_client = httpx.Client(timeout=timeout)
return self._httpx_client
@property
def api_key(self) -> str:
if not self._api_key:
self._api_key = load_api_key()
return self._api_key
# -- core transport --------------------------------------------------------
def _jsonrpc_request(self, module: str, method: str, params: dict) -> Any:
"""Make one JSON-RPC call. Returns body['result'] or raises GravityZoneError."""
url = f"{self.api_base_url}/{module}"
payload = {"id": "1", "jsonrpc": "2.0", "method": method, "params": params}
# Read methods (get*/list*) are safe to retry on timeout/5xx; state-changing
# methods are NOT (a timeout can fire after the server committed -> a retry
# would double-execute, e.g. createScanTask/createPackage). 429 is a
# pre-processing rate-limit reject and is always safe (handled in _post).
idempotent = method.lower().startswith(("get", "list"))
body = self._post(url, payload, idempotent=idempotent)
if isinstance(body, dict) and "error" in body and body["error"] is not None:
err = body["error"]
detail = None
if isinstance(err, dict):
detail = (err.get("data") or {}).get("details") or err.get("message")
raise GravityZoneError(
f"GravityZone API error [{module}.{method}]: {detail or err}"
)
if isinstance(body, dict):
return body.get("result")
return body
def _post(self, url: str, payload: dict, idempotent: bool = False) -> Any:
"""POST with bounded retry on transient failures. Failures with NO
possible side effect (429 pre-processing reject, 'connect' pre-send
failure) are always retried; an ambiguous timeout/5xx (may have committed
server-side) is retried ONLY for idempotent (read) calls, so a
non-idempotent write is never silently re-executed."""
data = json.dumps(payload).encode("utf-8")
for attempt in range(RETRY_MAX_ATTEMPTS):
try:
return self._post_once(url, data)
except _RetryableHTTP as exc:
retry_safe = (exc.code in (429, "connect")) or idempotent
if not retry_safe or attempt >= RETRY_MAX_ATTEMPTS - 1:
note = "" if retry_safe else " (non-idempotent; not retried)"
raise GravityZoneError(
f"GravityZone HTTP {exc.code}{note}: {exc.detail}".rstrip(": ")
) from exc
delay = _retry_delay(exc.headers, attempt)
print(
f"[WARNING] GravityZone {exc.code} - retry "
f"{attempt + 1}/{RETRY_MAX_ATTEMPTS - 1} in {delay:.1f}s",
file=sys.stderr,
)
time.sleep(delay)
# unreachable: the loop either returns or raises on the final attempt
raise GravityZoneError("GravityZone request failed: retries exhausted")
def _post_once(self, url: str, data: bytes) -> Any:
"""One POST. Returns parsed JSON, raises _RetryableHTTP on a transient
failure, or GravityZoneError on a terminal one."""
if _HAS_HTTPX:
try:
resp = self._client.post(
url, content=data, auth=(self.api_key, ""),
headers={"Content-Type": "application/json"})
resp.raise_for_status()
except (httpx.ConnectError, httpx.ConnectTimeout) as exc:
# Pre-send: the connection never established -> no side effect,
# always safe to retry (must precede TimeoutException since
# ConnectTimeout subclasses it).
raise _RetryableHTTP("connect", detail=str(exc)) from exc
except httpx.TimeoutException as exc:
raise _RetryableHTTP("timeout", detail=str(exc)) from exc
except httpx.HTTPStatusError as exc:
code = exc.response.status_code
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
if code in RETRY_STATUSES:
raise _RetryableHTTP(code, exc.response.headers, detail) from exc
raise GravityZoneError(
f"GravityZone HTTP {code}: {detail}") from exc
except httpx.HTTPError as exc:
raise GravityZoneError(
f"GravityZone request failed: {exc}") from exc
try:
return resp.json()
except ValueError as exc:
body = (resp.text or "")[:ERROR_BODY_MAX_CHARS]
raise GravityZoneError(
f"GravityZone returned a non-JSON response: {body}") from exc
# stdlib fallback
token = base64.b64encode(f"{self.api_key}:".encode("utf-8")).decode("ascii")
req = urllib.request.Request(
url,
data=data,
method="POST",
headers={
"Content-Type": "application/json",
"Authorization": f"Basic {token}",
},
)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
raw = resp.read()
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
if exc.code in RETRY_STATUSES:
raise _RetryableHTTP(exc.code, getattr(exc, "headers", None),
detail) from exc
raise GravityZoneError(f"GravityZone HTTP {exc.code}: {detail}") from exc
except TimeoutError as exc:
raise _RetryableHTTP("timeout", detail=str(exc)) from exc
except urllib.error.URLError as exc:
# Classify conservatively: only a KNOWN pre-send failure (connection
# refused / DNS failure) is the always-safe 'connect'. Anything else -
# a connect/read timeout, or an ambiguous post-send reset like
# RemoteDisconnected/ConnectionResetError that urllib also wraps in
# URLError - is 'timeout' so a non-idempotent write is NOT retried.
reason = getattr(exc, "reason", None)
if isinstance(reason, (ConnectionRefusedError, socket.gaierror)):
code = "connect"
else:
code = "timeout"
raise _RetryableHTTP(code, detail=str(exc)) from exc
try:
return json.loads(raw.decode("utf-8"))
except ValueError as exc:
body = raw.decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
raise GravityZoneError(
f"GravityZone returned a non-JSON response: {body}") from exc
# ======================================================================
# READ METHODS (always live)
# ======================================================================
def get_api_status(self) -> dict:
api_key_details = self._jsonrpc_request("general", "getApiKeyDetails", {}) or {}
# Nest the two responses to avoid silent key collisions on the merge.
status: dict = {"apiKey": api_key_details}
try:
status["license"] = (
self._jsonrpc_request("licensing", "getLicenseInfo", {}) or {}
)
except GravityZoneError as exc:
status["_licenseWarning"] = str(exc)
return status
def get_own_company(self) -> dict:
return self._jsonrpc_request("companies", "getCompanyDetails", {}) or {}
def get_company_details(self, company_id: Optional[str] = None) -> dict:
"""Company detail (companies.getCompanyDetails). No id -> own company."""
params: dict = {}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("companies", "getCompanyDetails", params) or {}
def get_company_by_user(self, username: str) -> dict:
"""Company that owns a given user (companies.getCompanyDetailsByUser)."""
return self._jsonrpc_request(
"companies", "getCompanyDetailsByUser", {"username": username}
) or {}
def create_company(
self,
company_type: int,
name: str,
parent_id: Optional[str] = None,
extra: Optional[dict] = None,
) -> Any:
"""Create a company (companies.createCompany). STATE-CHANGING.
Required (verified): `type` (0=Partner, 1=Customer) and `name`. Documented
optional params: parentId, address, country, state, phone, industry,
canBeManagedByAbove, assignedProductType, and a nested `licenseSubscription`
{type (3=monthly subscription, 4=monthly trial), reservedSlots,
endSubscription, autoRenewPeriod, ...}. Pass those via `extra`. Gate at
the call site behind --confirm.
Docs: bitdefender.com/business/support/en/77211-126236-createcompany.html
"""
params: dict = {"type": company_type, "name": name}
if parent_id is not None:
params["parentId"] = parent_id
if extra:
params.update(extra)
return self._jsonrpc_request("companies", "createCompany", params)
def suspend_company(self, company_id: str) -> Any:
"""Suspend a company (companies.suspendCompany). STATE-CHANGING. Gated."""
return self._jsonrpc_request(
"companies", "suspendCompany", {"companyId": company_id}
)
def activate_company(self, company_id: str) -> Any:
"""Activate a suspended company (companies.activateCompany). STATE-CHANGING."""
return self._jsonrpc_request(
"companies", "activateCompany", {"companyId": company_id}
)
def delete_company(self, company_id: str) -> Any:
"""Delete a company (companies.deleteCompany). STATE-CHANGING. Gated."""
return self._jsonrpc_request(
"companies", "deleteCompany", {"companyId": company_id}
)
def list_companies(self, page: int = 1, per_page: int = 100) -> dict:
result = self._jsonrpc_request(
"network",
"getNetworkInventoryItems",
{
"parentId": ACG_COMPANIES_CONTAINER_ID,
"page": page,
"perPage": per_page,
},
) or {}
items = [i for i in result.get("items", []) if i.get("type") == 1]
return {"total": len(items), "items": items}
def list_all_companies(self) -> list[dict]:
"""Every company under the ACG container, paginated (type==1 only).
list_companies() returns only one page; callers that must see the WHOLE
fleet (sweep-all, inventory refresh) use this so a >100-company tenant is
not silently truncated."""
companies: list[dict] = []
page = 1
per_page = 100
while True:
result = self._jsonrpc_request(
"network", "getNetworkInventoryItems",
{"parentId": ACG_COMPANIES_CONTAINER_ID,
"page": page, "perPage": per_page},
) or {}
raw = result.get("items", [])
companies.extend(i for i in raw if i.get("type") == 1)
if len(raw) < per_page: # short raw page = last page
break
page += 1
if page > MAX_PAGINATION_PAGES:
print(f"[WARNING] list_all_companies hit the {MAX_PAGINATION_PAGES}-"
"page ceiling; companies may be truncated.", file=sys.stderr)
break
return companies
def list_endpoints(
self, parent_id: Optional[str] = None, page: int = 1, per_page: int = 100
) -> dict:
params: dict = {"page": page, "perPage": per_page}
if parent_id:
params["parentId"] = parent_id
return self._jsonrpc_request("network", "getEndpointsList", params) or {}
def get_endpoint_details(self, endpoint_id: str) -> dict:
return self._jsonrpc_request(
"network", "getManagedEndpointDetails", {"endpointId": endpoint_id}
) or {}
def list_policies(self, page: int = 1, per_page: int = 100) -> dict:
return self._jsonrpc_request(
"policies", "getPoliciesList", {"page": page, "perPage": per_page}
) or {}
def get_policy_details(self, policy_id: str) -> dict:
return self._jsonrpc_request(
"policies", "getPolicyDetails", {"policyId": policy_id}
) or {}
def list_packages(self, page: int = 1, per_page: int = 100) -> dict:
return self._jsonrpc_request(
"packages", "getPackagesList", {"page": page, "perPage": per_page}
) or {}
def list_quarantine(
self, company_id: str, page: int = 1, per_page: int = 100
) -> dict:
# Service-scoped path 'quarantine/computers' (bare 'quarantine' 404s);
# the param is 'companyId', NOT 'parentId'.
return self._jsonrpc_request(
"quarantine/computers",
"getQuarantineItemsList",
{"companyId": company_id, "page": page, "perPage": per_page},
) or {}
def security_sweep(self, parent_id: str) -> list[GZEndpointSummary]:
"""Live security posture sweep of all endpoints under a parent."""
summaries: list[GZEndpointSummary] = []
page = 1
per_page = 100
while True:
data = self.list_endpoints(parent_id, page=page, per_page=per_page)
items = data.get("items", [])
if not items:
break
for item in items:
endpoint_id = item.get("id", "")
if not endpoint_id:
continue
try:
detail = self.get_endpoint_details(endpoint_id)
except GravityZoneError:
continue
malware = detail.get("malwareStatus", {}) or {}
agent = detail.get("agent", {}) or {}
summaries.append(
GZEndpointSummary(
endpoint_id=endpoint_id,
name=detail.get("name") or item.get("name", ""),
company_id=detail.get("companyId") or item.get("companyId", ""),
infected=bool(malware.get("infected", False)),
threat_detected=bool(malware.get("detection", False)),
signature_outdated=bool(agent.get("signatureOutdated", False)),
product_outdated=bool(agent.get("productOutdated", False)),
last_seen=detail.get("lastSeen"),
agent_version=agent.get("productVersion"),
state=detail.get("state", 0),
)
)
# Stop at the last page. A short page (< per_page) means no more;
# do NOT rely on `total` - some responses omit it, which would
# truncate the sweep after page 1.
if len(items) < per_page:
break
page += 1
if page > MAX_PAGINATION_PAGES:
print(f"[WARNING] security_sweep hit the {MAX_PAGINATION_PAGES}-page "
"ceiling; results may be truncated.", file=sys.stderr)
break
summaries.sort(
key=lambda s: (
not s.infected,
not s.signature_outdated,
not s.product_outdated,
s.name.lower(),
)
)
return summaries
def security_sweep_all_clients(self) -> list[GZEndpointSummary]:
"""Sweep every client company. The companies container is NOT a valid
endpoint parent, so iterate each company and sweep it individually."""
summaries: list[GZEndpointSummary] = []
companies = self.list_all_companies()
for company in companies:
cid = company.get("id")
if not cid:
continue
try:
company_summaries = self.security_sweep(cid)
except GravityZoneError:
continue
for s in company_summaries:
if not s.company_id:
s.company_id = cid
summaries.extend(company_summaries)
summaries.sort(
key=lambda s: (
not s.infected,
not s.signature_outdated,
not s.product_outdated,
s.name.lower(),
)
)
return summaries
# ======================================================================
# MANAGEMENT METHODS (verified only)
# ======================================================================
def create_package(
self,
package_name: str,
company_id: Optional[str] = None,
description: Optional[str] = None,
language: Optional[str] = None,
modules: Optional[dict] = None,
scan_mode: Optional[dict] = None,
settings: Optional[dict] = None,
roles: Optional[dict] = None,
deployment_options: Optional[dict] = None,
) -> Any:
params: dict = {"packageName": package_name}
if company_id:
params["companyId"] = company_id
if description is not None:
params["description"] = description
if language is not None:
params["language"] = language
if modules is not None:
params["modules"] = modules
if scan_mode is not None:
params["scanMode"] = scan_mode
if settings is not None:
params["settings"] = settings
if roles is not None:
params["roles"] = roles
if deployment_options is not None:
params["deploymentOptions"] = deployment_options
result = self._jsonrpc_request("packages", "createPackage", params)
# Write-through: refresh package list in cache.
self._cache_add_package(package_name, result)
return result
def get_installation_links(
self, package_name: str, company_id: Optional[str] = None
) -> Any:
params: dict = {"packageName": package_name}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("packages", "getInstallationLinks", params)
def delete_package(self, package_id: str) -> Any:
"""Delete an installation package (packages.deletePackage).
VERIFIED LIVE 2026-06-21: the param is `packageId` (NOT packageName/
companyId - those error "not expected"). STATE-CHANGING - gate at call site."""
return self._jsonrpc_request(
"packages", "deletePackage", {"packageId": package_id}
)
def create_scan_task(
self,
target_ids: list[str],
scan_type: int,
name: Optional[str] = None,
custom_scan_settings: Optional[dict] = None,
) -> Any:
params: dict = {"targetIds": target_ids, "type": scan_type}
if name is not None:
params["name"] = name
if custom_scan_settings is not None:
params["customScanSettings"] = custom_scan_settings
return self._jsonrpc_request("network", "createScanTask", params)
def move_endpoints(self, endpoint_ids: list[str], group_id: str) -> Any:
return self._jsonrpc_request(
"network",
"moveEndpoints",
{"endpointIds": endpoint_ids, "groupId": group_id},
)
def delete_endpoint(self, endpoint_id: str) -> Any:
return self._jsonrpc_request(
"network", "deleteEndpoint", {"endpointId": endpoint_id}
)
def create_custom_group(self, name: str, parent_id: Optional[str] = None) -> Any:
# API param is `groupName` (verified live 2026-06-21), NOT `name`.
params: dict = {"groupName": name}
if parent_id:
params["parentId"] = parent_id
result = self._jsonrpc_request("network", "createCustomGroup", params)
# Write-through: createCustomGroup returns the new group id (string).
group_id = result if isinstance(result, str) else None
if isinstance(result, dict):
group_id = result.get("id") or result.get("groupId")
if group_id:
self._cache_add_group(group_id, name)
return result
def delete_custom_group(self, group_id: str) -> Any:
return self._jsonrpc_request(
"network", "deleteCustomGroup", {"groupId": group_id}
)
def move_custom_group(self, group_id: str, new_parent_id: str) -> Any:
# API param is `parentId` (verified live 2026-06-21), NOT `newParentId`.
return self._jsonrpc_request(
"network",
"moveCustomGroup",
{"groupId": group_id, "parentId": new_parent_id},
)
# ======================================================================
# INCIDENTS / EDR (module `/incidents`)
# ----------------------------------------------------------------------
# READ methods are always live and never cached (blocklist + incident
# status are volatile). State-changing methods (isolate, restore,
# add/remove blocklist) return the raw upstream result; the CLI caller is
# responsible for gating them behind --confirm.
# ======================================================================
# -- READ (safe) -----------------------------------------------------------
def list_blocklist(
self,
company_id: Optional[str] = None,
page: int = 1,
per_page: int = 100,
) -> dict:
"""List blocklisted hash items. VERIFIED LIVE (incidents.getBlocklistItems).
Returns {total, page, perPage, pagesCount, items:[{id, source,
sourceInfo, hashType, hash, companyId}]}. `companyId` scopes to one
company; omit for the whole partner tenant.
"""
params: dict = {"page": page, "perPage": per_page}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("incidents", "getBlocklistItems", params) or {}
def list_incidents(
self,
parent_id: str,
page: int = 1,
per_page: int = 500,
filters: Optional[dict] = None,
) -> dict:
"""List incidents under a company/group.
`parent_id` (a company/group id) is REQUIRED for this method.
perPage default is 500: live testing surfaced an
"Invalid value for 'perPage' parameter. The value should be between 500
and 10000" error for smaller values, so 100 (the default elsewhere) is
rejected here.
NOTE (UNVERIFIED / possibly unavailable on this tenant): live re-testing
on 2026-05-30 returned "Method not found" for `getIncidentsList` on the
`/incidents` module, even though `getBlocklistItems` on the SAME module
succeeds in the same request (so this is not rate-limiting or a bad key).
The method may be gated by an EDR/incidents license feature that is OFF
on this tenant, or named differently in this API version. Treat the
return value as best-effort and confirm against the official Bitdefender
API reference / console before relying on incident data.
"""
params: dict = {"parentId": parent_id, "page": page, "perPage": per_page}
if filters is not None:
params["filters"] = filters
return self._jsonrpc_request("incidents", "getIncidentsList", params) or {}
# -- STATE-CHANGING (caller MUST gate behind --confirm) --------------------
def isolate_endpoints(self, endpoint_ids: list[str]) -> Any:
"""Isolate endpoints from the network (incidents.createIsolateEndpointTask).
VERIFIED LIVE 2026-06-21: the API takes a SINGLE `endpointId` per call
(an `endpointIds` array errors "not expected"), so we loop. Returns the
single task result for one id, else a list. STATE-CHANGING - gate behind
--confirm at the call site.
"""
results = []
for eid in endpoint_ids:
results.append(self._jsonrpc_request(
"incidents", "createIsolateEndpointTask", {"endpointId": eid}
))
return results if len(results) != 1 else results[0]
def restore_endpoints_from_isolation(self, endpoint_ids: list[str]) -> Any:
"""Un-isolate endpoints (incidents.createRestoreEndpointFromIsolationTask).
VERIFIED LIVE 2026-06-21: a SINGLE `endpointId` per call (not an array),
so we loop. Returns the single task result for one id, else a list.
Note: fails if the isolation task is still in progress - wait + retry.
STATE-CHANGING - gate behind --confirm at the call site.
"""
results = []
for eid in endpoint_ids:
results.append(self._jsonrpc_request(
"incidents",
"createRestoreEndpointFromIsolationTask",
{"endpointId": eid},
))
return results if len(results) != 1 else results[0]
def add_to_blocklist(
self,
company_id: str,
hash_list: list[str],
hash_type: int = 1,
source_info: str = "",
operating_systems: Optional[list[str]] = None,
) -> Any:
"""Add hashes to the blocklist (incidents.addToBlocklist).
`hash_type` is an int (1 is the common value seen live; see the
GravityZone console / API docs for the full mapping). `hash_list` is an
array of hash strings. `source_info` is a free-text description.
STATE-CHANGING - gate behind --confirm at the call site.
"""
params: dict = {
"companyId": company_id,
"hashType": hash_type,
"hashList": hash_list,
"sourceInfo": source_info,
}
if operating_systems is not None:
params["operatingSystems"] = operating_systems
return self._jsonrpc_request("incidents", "addToBlocklist", params)
def remove_from_blocklist(self, hash_item_id: str) -> Any:
"""Remove one blocklist entry (incidents.removeFromBlocklist).
STATE-CHANGING - gate behind --confirm at the call site.
UNVERIFIED: the param name `hashItemId` is the candidate (the `id`
field from getBlocklistItems). Confirm against the official Bitdefender
API reference before relying on it.
"""
return self._jsonrpc_request(
"incidents", "removeFromBlocklist", {"hashItemId": hash_item_id}
)
# ======================================================================
# POLICY ASSIGNMENT (state-changing; gate behind --confirm at call site)
# ----------------------------------------------------------------------
# NOTE: getPolicyDetails (above) returns the FULL granular module config
# (verified live 2026-06-21 - the earlier "shallow only" claim was wrong).
# The Public API still has NO create/edit/clone policy method - authoring
# stays in the console - but assigning an EXISTING policy is supported here.
# ======================================================================
def assign_policy(
self,
policy_id: str,
target_ids: list[str],
force_inheritance: bool = False,
) -> Any:
"""Assign an existing policy to endpoints/groups (network.assignPolicy).
VERIFIED LIVE 2026-06-21: assigning a `policyId` REQUIRES sending
`inheritFromAbove=false` in the same call. An inheriting target defaults
inheritFromAbove to true, so omitting it makes the API reject the call
with a misleading "inheritFromAbove should not be used with policyId"
error. `forcePolicyInheritance` optionally pushes the policy down to
sub-items. (To make a target INHERIT instead, call with
{targetIds, inheritFromAbove:true} and no policyId via `raw`.)
STATE-CHANGING - gate at the call site behind --confirm.
Docs: bitdefender.com/business/support/en/77212-924802-assignpolicy.html
"""
params: dict = {
"policyId": policy_id,
"targetIds": target_ids,
"inheritFromAbove": False,
}
if force_inheritance:
params["forcePolicyInheritance"] = True
return self._jsonrpc_request("network", "assignPolicy", params)
def list_scan_tasks(
self,
page: int = 1,
per_page: int = 100,
name: Optional[str] = None,
status: Optional[int] = None,
) -> dict:
"""List scan tasks (network.getScanTasksList). VERIFIED LIVE."""
params: dict = {"page": page, "perPage": per_page}
if name is not None:
params["name"] = name
if status is not None:
params["status"] = status
return self._jsonrpc_request("network", "getScanTasksList", params) or {}
def get_endpoint_tags(self) -> Any:
"""List endpoint tags defined in the tenant (network.getEndpointTags). Read."""
return self._jsonrpc_request("network", "getEndpointTags", {})
def set_endpoint_label(self, endpoint_id: str, label: str) -> Any:
"""Set an endpoint's label (network.setEndpointLabel). STATE-CHANGING.
Requires `endpointId` and `label` (both verified). Gate at the call site."""
return self._jsonrpc_request(
"network", "setEndpointLabel",
{"endpointId": endpoint_id, "label": label},
)
def reconfigure_client(
self, target_ids: list[str], extra: Optional[dict] = None
) -> Any:
"""Reconfigure installed agents (network.createReconfigureClientTask).
STATE-CHANGING. Requires `targetIds` (verified); the reconfigure body
(which modules/roles/scanMode to change) is documented and passed via
`extra`. Gate at the call site behind --confirm.
"""
params: dict = {"targetIds": target_ids}
if extra:
params.update(extra)
return self._jsonrpc_request(
"network", "createReconfigureClientTask", params
)
# ======================================================================
# REPORTS (module `/reports`) - VERIFIED LIVE
# ======================================================================
def list_reports(self, page: int = 1, per_page: int = 100) -> dict:
"""List saved reports (reports.getReportsList)."""
return self._jsonrpc_request(
"reports", "getReportsList", {"page": page, "perPage": per_page}
) or {}
def get_report_links(self, report_id: str) -> Any:
"""Get download links for a generated report (reports.getDownloadLinks).
Param name `reportId` is the candidate - confirm against the console if
it errors.
"""
return self._jsonrpc_request(
"reports", "getDownloadLinks", {"reportId": report_id}
)
# ======================================================================
# ACCOUNTS (module `/accounts`) - VERIFIED LIVE (read)
# ======================================================================
def list_accounts(self, page: int = 1, per_page: int = 100) -> dict:
"""List GravityZone console accounts/users (accounts.getAccountsList)."""
return self._jsonrpc_request(
"accounts", "getAccountsList", {"page": page, "perPage": per_page}
) or {}
def get_notifications_settings(self) -> dict:
"""Notification configuration (accounts.getNotificationsSettings)."""
return self._jsonrpc_request(
"accounts", "getNotificationsSettings", {}
) or {}
def get_account_details(self, account_id: Optional[str] = None) -> dict:
"""Account detail (accounts.getAccountDetails). With no id, returns the
API key owner's own account (verified live)."""
params: dict = {}
if account_id:
params["accountId"] = account_id
return self._jsonrpc_request("accounts", "getAccountDetails", params) or {}
def create_account(
self,
email: str,
password: Optional[str] = None,
username: Optional[str] = None,
role: Optional[int] = None,
profile: Optional[dict] = None,
rights: Optional[dict] = None,
phone_number: Optional[dict] = None,
extra: Optional[dict] = None,
) -> Any:
"""Create a console account (accounts.createAccount). STATE-CHANGING.
`email` is required (verified). Documented params: userName, password,
role (int), profile{fullName,language,timezone},
phoneNumber{countryCode,subscriberNumber}, rights{manageInventory,
managePoliciesRead/Write,...}. `extra` merges any additional documented
fields verbatim. Gate at the call site behind --confirm.
Docs: bitdefender.com/business/support/en/77212-125284-createaccount.html
"""
params: dict = {"email": email}
if username is not None:
params["userName"] = username
if password is not None:
params["password"] = password
if role is not None:
params["role"] = role
if profile is not None:
params["profile"] = profile
if rights is not None:
params["rights"] = rights
if phone_number is not None:
params["phoneNumber"] = phone_number
if extra:
params.update(extra)
return self._jsonrpc_request("accounts", "createAccount", params)
def update_account(self, account_id: str, fields: dict) -> Any:
"""Update a console account (accounts.updateAccount). STATE-CHANGING.
`accountId` is required (verified); `fields` are the documented
account attributes to change (same shape as create, minus email).
Gate at the call site behind --confirm.
"""
params: dict = {"accountId": account_id}
params.update(fields or {})
return self._jsonrpc_request("accounts", "updateAccount", params)
def delete_account(self, account_id: str) -> Any:
"""Delete a console account (accounts.deleteAccount). STATE-CHANGING.
`accountId` required (verified). Gate at the call site behind --confirm."""
return self._jsonrpc_request(
"accounts", "deleteAccount", {"accountId": account_id}
)
def configure_notifications_settings(self, settings: dict) -> Any:
"""Set notification settings (accounts.configureNotificationsSettings).
STATE-CHANGING - there are NO required params, so an empty payload is
accepted; the caller must pass the intended settings object. Gate at the
call site behind --confirm."""
return self._jsonrpc_request(
"accounts", "configureNotificationsSettings", settings or {}
)
# ======================================================================
# PUSH EVENT SERVICE (module `/push`)
# ----------------------------------------------------------------------
# `get`/`stats` are read (but error when the service was never configured -
# that is an EXPECTED state, not a failure; the CLI handles it cleanly).
# `set` is STATE-CHANGING (it configures where GravityZone POSTs security
# events) - gate behind --confirm at the call site.
# ======================================================================
def get_push_settings(self) -> dict:
"""Current push event service settings (push.getPushEventSettings)."""
return self._jsonrpc_request("push", "getPushEventSettings", {}) or {}
def get_push_stats(self) -> dict:
"""Push event service delivery stats (push.getPushEventStats)."""
return self._jsonrpc_request("push", "getPushEventStats", {}) or {}
def set_push_settings(
self,
status: int,
service_type: str = "jsonRPC",
url: Optional[str] = None,
require_valid_ssl: bool = True,
authorization: Optional[str] = None,
subscribe_event_types: Optional[dict] = None,
) -> Any:
"""Configure the GravityZone Push event service (push.setPushEventSettings).
`status` (1=on / 0=off) is REQUIRED (verified via validation probe
2026-06-21). When enabling, `serviceSettings.url` is the receiver
endpoint GravityZone POSTs events to. The nested shape
(serviceType/serviceSettings/subscribeToEventTypes) follows Bitdefender's
documented push API and is UNVERIFIED beyond `status` on this tenant -
confirm the first successful enable against the live response.
STATE-CHANGING - gate at the call site behind --confirm.
"""
params: dict = {"status": status, "serviceType": service_type}
service_settings: dict = {"requireValidSslCertificate": require_valid_ssl}
if url is not None:
service_settings["url"] = url
if authorization is not None:
service_settings["authorization"] = authorization
params["serviceSettings"] = service_settings
if subscribe_event_types is not None:
params["subscribeToEventTypes"] = subscribe_event_types
return self._jsonrpc_request("push", "setPushEventSettings", params)
def send_test_push_event(self, event_type: str, extra: Optional[dict] = None) -> Any:
"""Send a test push event (push.sendTestPushEvent). Requires `eventType`
(verified). Fires against the configured receiver - STATE-ADJACENT, gate
at the call site behind --confirm."""
params: dict = {"eventType": event_type}
if extra:
params.update(extra)
return self._jsonrpc_request("push", "sendTestPushEvent", params)
# ======================================================================
# PACKAGES (detail) - read
# ======================================================================
def get_package_details(self, package_id: str) -> dict:
"""Installation package detail (packages.getPackageDetails). `packageId`
required (verified)."""
return self._jsonrpc_request(
"packages", "getPackageDetails", {"packageId": package_id}
) or {}
# ======================================================================
# REPORTS (create / delete) - getReportsList + get_report_links above
# ======================================================================
def create_report(self, name: str, extra: Optional[dict] = None) -> Any:
"""Create a report (reports.createReport). `name` required (verified);
`type`, `targetIds`, recurrence/format etc. passed via `extra`.
STATE-CHANGING - gate at the call site behind --confirm."""
params: dict = {"name": name}
if extra:
params.update(extra)
return self._jsonrpc_request("reports", "createReport", params)
def delete_report(self, report_id: str) -> Any:
"""Delete a report (reports.deleteReport). `reportId` required (verified).
STATE-CHANGING - gate at the call site behind --confirm."""
return self._jsonrpc_request(
"reports", "deleteReport", {"reportId": report_id}
)
# ======================================================================
# QUARANTINE (remove / restore) - getQuarantineItemsList above
# ======================================================================
def remove_quarantine_items(
self, quarantine_item_ids: list[str], extra: Optional[dict] = None
) -> Any:
"""Delete quarantined items (quarantine/computers.createRemoveQuarantineItemTask).
`quarantineItemsIds` required (verified). STATE-CHANGING - gate behind --confirm."""
params: dict = {"quarantineItemsIds": quarantine_item_ids}
if extra:
params.update(extra)
return self._jsonrpc_request(
"quarantine/computers", "createRemoveQuarantineItemTask", params
)
def restore_quarantine_items(
self, quarantine_item_ids: list[str], extra: Optional[dict] = None
) -> Any:
"""Restore quarantined items (quarantine/computers.createRestoreQuarantineItemTask).
`quarantineItemsIds` required (verified). `addExclusionInPolicy` etc. via
`extra`. STATE-CHANGING - gate behind --confirm."""
params: dict = {"quarantineItemsIds": quarantine_item_ids}
if extra:
params.update(extra)
return self._jsonrpc_request(
"quarantine/computers", "createRestoreQuarantineItemTask", params
)
# ======================================================================
# INCIDENTS - custom rules + incident status/note (read + state-changing)
# ======================================================================
def list_custom_rules(self, page: int = 1, per_page: int = 100) -> dict:
"""List EDR custom rules (incidents.getCustomRulesList). VERIFIED LIVE."""
return self._jsonrpc_request(
"incidents", "getCustomRulesList", {"page": page, "perPage": per_page}
) or {}
def create_custom_rule(self, name: str, extra: Optional[dict] = None) -> Any:
"""Create an EDR custom rule (incidents.createCustomRule). `name` required
(verified); rule body (settings/companyId/tags) via `extra`.
STATE-CHANGING - gate behind --confirm."""
params: dict = {"name": name}
if extra:
params.update(extra)
return self._jsonrpc_request("incidents", "createCustomRule", params)
def delete_custom_rule(self, rule_id: str) -> Any:
"""Delete an EDR custom rule (incidents.deleteCustomRule). `ruleId` required
(verified). STATE-CHANGING - gate behind --confirm."""
return self._jsonrpc_request(
"incidents", "deleteCustomRule", {"ruleId": rule_id}
)
def change_incident_status(self, incident_type: str, fields: dict) -> Any:
"""Change an incident's status (incidents.changeIncidentStatus). `type`
required (verified) - the incident type/category - plus the incident id +
target status in `fields`. STATE-CHANGING - gate behind --confirm."""
params: dict = {"type": incident_type}
params.update(fields or {})
return self._jsonrpc_request("incidents", "changeIncidentStatus", params)
def update_incident_note(self, incident_type: str, fields: dict) -> Any:
"""Update an incident note (incidents.updateIncidentNote). `type` required
(verified) plus incident id + note text in `fields`. STATE-CHANGING."""
params: dict = {"type": incident_type}
params.update(fields or {})
return self._jsonrpc_request("incidents", "updateIncidentNote", params)
# ======================================================================
# LICENSING (usage) + INTEGRATIONS - read
# ======================================================================
def get_monthly_usage(self) -> dict:
"""Monthly license usage (licensing.getMonthlyUsage). VERIFIED LIVE."""
return self._jsonrpc_request("licensing", "getMonthlyUsage", {}) or {}
def get_configured_integrations(self, page: int = 1, per_page: int = 100) -> dict:
"""Configured third-party integrations (integrations.getConfiguredIntegrations).
VERIFIED LIVE."""
return self._jsonrpc_request(
"integrations", "getConfiguredIntegrations",
{"page": page, "perPage": per_page},
) or {}
# ======================================================================
# CACHE LAYER (identity / structure only - never volatile status)
# ======================================================================
def _read_cache(self) -> Optional[dict]:
if not CACHE_FILE.exists():
return None
try:
return json.loads(CACHE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
def _write_cache(self, cache: dict) -> None:
"""Atomically replace the cache file (temp write + os.replace) so a crash
mid-write or a concurrent reader can never see a truncated file."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
payload = json.dumps(cache, indent=2, sort_keys=True)
fd, tmp = tempfile.mkstemp(dir=str(CACHE_DIR), prefix=".inventory.",
suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(payload)
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp, CACHE_FILE) # atomic on the same filesystem
except BaseException:
try:
os.unlink(tmp)
except OSError:
pass
raise
@contextmanager
def _cache_lock(self):
"""Best-effort cross-platform advisory lock around a read-modify-write of
the cache, so two concurrent gz.py invocations don't lose each other's
write-through update. Steals a stale lock; on timeout proceeds unlocked
(a lost update is tolerable, a hang is not)."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
deadline = time.monotonic() + CACHE_LOCK_TIMEOUT_SECONDS
acquired = False
while True:
try:
fd = os.open(str(CACHE_LOCK_FILE),
os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
acquired = True
break
except FileExistsError:
try:
age = time.time() - os.path.getmtime(CACHE_LOCK_FILE)
if age > CACHE_LOCK_STALE_SECONDS:
os.unlink(CACHE_LOCK_FILE)
continue
except OSError:
pass
if time.monotonic() >= deadline:
break # give up the lock, proceed unlocked
time.sleep(0.1)
try:
yield
finally:
if acquired:
try:
os.unlink(CACHE_LOCK_FILE)
except OSError:
pass
def _cache_is_fresh(self, cache: dict) -> bool:
fetched = cache.get("fetched_at")
ttl = cache.get("ttl_seconds", CACHE_TTL_SECONDS)
if not fetched:
return False
try:
ts = datetime.fromisoformat(fetched)
except ValueError:
return False
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - ts).total_seconds()
return age < ttl
def refresh_inventory(self) -> dict:
"""Full identity/structure pull. Writes and returns the cache."""
companies_map: dict[str, str] = {}
endpoints_map: dict[str, dict] = {}
policies_map: dict[str, str] = {}
companies = self.list_all_companies()
for c in companies:
cid = c.get("id")
if cid:
companies_map[cid] = c.get("name", "")
# Endpoints per company (identity tier only - no status fields).
for cid in list(companies_map.keys()) + [ACG_ROOT_COMPANY_ID]:
page = 1
while True:
try:
data = self.list_endpoints(cid, page=page, per_page=100)
except GravityZoneError:
break
items = data.get("items", [])
if not items:
break
for ep in items:
eid = ep.get("id")
if not eid:
continue
endpoints_map[eid] = {
"name": ep.get("name", ""),
"company_id": ep.get("companyId", cid),
"fqdn": ep.get("fqdn") or ep.get("FQDN") or "",
}
# Last page = a short page; don't rely on `total` (may be omitted).
if len(items) < 100:
break
page += 1
if page > MAX_PAGINATION_PAGES:
print("[WARNING] refresh_inventory hit the "
f"{MAX_PAGINATION_PAGES}-page ceiling for company "
f"{cid}; inventory may be truncated.", file=sys.stderr)
break
try:
for p in self.list_policies(per_page=100).get("items", []):
pid = p.get("id")
if pid:
policies_map[pid] = p.get("name", "")
except GravityZoneError:
pass
packages: list = []
try:
packages = self.list_packages(per_page=100).get("items", [])
except GravityZoneError:
pass
cache = {
"fetched_at": datetime.now(timezone.utc).isoformat(),
"ttl_seconds": CACHE_TTL_SECONDS,
"companies": companies_map,
"endpoints": endpoints_map,
"policies": policies_map,
"packages": packages,
}
self._write_cache(cache)
return cache
def get_inventory(self, refresh: bool = False) -> dict:
"""Return cached identity/structure, refreshing if stale or forced."""
if not refresh:
cache = self._read_cache()
if cache and self._cache_is_fresh(cache):
return cache
return self.refresh_inventory()
def _cache_add_group(self, group_id: str, name: str) -> None:
# Best-effort: the cache is only a hint, so a write failure must NEVER
# turn a successful API mutation (createCustomGroup) into a reported error.
try:
with self._cache_lock():
cache = self._read_cache()
if cache is None:
return # no cache yet - next refresh picks it up
# Groups live in the inventory tree; store under a 'groups' map.
cache.setdefault("groups", {})[group_id] = name
self._write_cache(cache)
except Exception:
pass
def _cache_add_package(self, package_name: str, create_result: Any) -> None:
# Best-effort (see _cache_add_group): never let a cache failure mask a
# successful createPackage.
try:
with self._cache_lock():
cache = self._read_cache()
if cache is None:
return
packages = cache.setdefault("packages", [])
pkg_id = create_result if isinstance(create_result, str) else None
if isinstance(create_result, dict):
pkg_id = create_result.get("id")
if not any(
(isinstance(p, dict) and p.get("name") == package_name)
for p in packages
):
packages.append({"id": pkg_id, "name": package_name})
self._write_cache(cache)
except Exception:
pass
def main() -> int:
"""Minimal self-check: load key (no network call)."""
try:
client = GravityZoneClient()
_ = client.api_key # triggers vault load
print("[OK] API key loaded; transport =",
"httpx" if _HAS_HTTPX else "urllib")
return 0
except GravityZoneError as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())