Files
claudetools/.claude/skills/bitdefender/scripts/gz_client.py
Howard Enos db6aa3683f fix(bitdefender): all-clients sweep, quarantine path, EDR controls, self-test
Several bugs found and fixed during live testing against the ACG GravityZone
tenant:
- security_sweep_all_clients: iterate each company (the companies container is
  not a valid endpoint parent; passing it 400'd the whole sweep)
- list_quarantine: use service-scoped path quarantine/computers with companyId
  (bare quarantine module 404'd; param is companyId not parentId)
- rename GZEndpointSummary.detection_active -> threat_detected with corrected
  semantics (True = active threat, tracks with infected; not an engine-on flag)
- status: readable sectioned table renderer for the nested apiKey/license dict
- portable CLAUDETOOLS_ROOT resolution (derive from file path, not a Windows
  literal) so it works on the Mac/Linux fleet

Adds scripts/selftest.py: a 29-check read-only harness (all passing) covering
every read command, --json, error exit codes, and destructive-action gating.
EDR/incident commands (blocklist, isolate/unisolate, blocklist-add/remove) and
raw destructive-method gating are included from this session's work.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-30 07:29:55 -07:00

763 lines
29 KiB
Python

#!/usr/bin/env python3
"""GravityZone Cloud Public API client for the bitdefender skill.
Standalone (does not import the api/ service). Reuses the proven JSON-RPC
shape, HTTP Basic auth (api_key as username, empty password), and the ACG
hardcoded tenant IDs.
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
script has no hard third-party dependency.
Credentials: never hardcoded. Loaded at runtime from the SOPS vault, or from
the GRAVITYZONE_API_KEY env var (testing override).
Cache: only the IDENTITY/STRUCTURE tier is cached (company/endpoint/policy
id<->name maps, package list). Volatile status (infected, lastSeen, online,
signature freshness) is NEVER cached and always pulled live.
"""
from __future__ import annotations
import base64
import json
import os
import subprocess
import sys
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
# --- optional httpx -----------------------------------------------------------
# urllib (stdlib) is always imported as the fallback transport; httpx is used
# when present for connection pooling/timeouts.
try:
import httpx # type: ignore
_HAS_HTTPX = True
except ImportError: # pragma: no cover - depends on environment
_HAS_HTTPX = False
# Cap upstream error bodies surfaced in exceptions. The GravityZone key never
# appears here, but `raw` can call arbitrary methods whose responses may reflect
# other data — bound the blast radius rather than echo full bodies into logs.
ERROR_BODY_MAX_CHARS = 500
# --- constants ----------------------------------------------------------------
GRAVITYZONE_API_BASE_URL = os.environ.get(
"GRAVITYZONE_API_BASE_URL",
"https://cloud.gravityzone.bitdefender.com/api/v1.0/jsonrpc",
)
GRAVITYZONE_TIMEOUT_SECONDS = 60.0
GRAVITYZONE_CONNECT_TIMEOUT_SECONDS = 10.0
ACG_ROOT_COMPANY_ID = "5c4280716c0318f3478b456a"
ACG_COMPANIES_CONTAINER_ID = "5c4280716c0318f3478b456e"
VAULT_ENTRY = "msp-tools/gravityzone.sops.yaml"
VAULT_FIELD = "credentials.api_key"
CACHE_TTL_SECONDS = 86400
SKILL_DIR = Path(__file__).resolve().parent.parent
CACHE_DIR = SKILL_DIR / ".cache"
CACHE_FILE = CACHE_DIR / "inventory.json"
class GravityZoneError(RuntimeError):
"""Raised for transport or JSON-RPC errors."""
@dataclass
class GZEndpointSummary:
endpoint_id: str
name: str
company_id: str
infected: bool
# True = a malware detection is currently active on the endpoint (tracks with infected); NOT an "engine on" indicator.
threat_detected: bool
signature_outdated: bool
product_outdated: bool
last_seen: Optional[str]
agent_version: Optional[str]
state: int
# --- credential loading -------------------------------------------------------
def _resolve_claudetools_root() -> Path:
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived path.
Final fallback is derived from this file's location
(.claude/skills/bitdefender/scripts -> repo root) so it works on the
Mac/Linux fleet machines, not only the Windows default.
"""
# SKILL_DIR = .../.claude/skills/bitdefender ; root is three levels up.
derived_root = SKILL_DIR.parent.parent.parent
env_root = os.environ.get("CLAUDETOOLS_ROOT")
if env_root:
return Path(env_root)
identity_path = derived_root / ".claude" / "identity.json"
if identity_path.exists():
try:
data = json.loads(identity_path.read_text(encoding="utf-8"))
root = data.get("claudetools_root")
if root:
return Path(root)
except (json.JSONDecodeError, OSError):
pass
return derived_root
def load_api_key() -> str:
"""Load the GravityZone API key.
Order: GRAVITYZONE_API_KEY env override, then the SOPS vault wrapper.
Never returns an empty key — raises if it cannot resolve one.
"""
env_key = os.environ.get("GRAVITYZONE_API_KEY")
if env_key:
return env_key.strip()
root = _resolve_claudetools_root()
vault_script = root / ".claude" / "scripts" / "vault.sh"
if not vault_script.exists():
raise GravityZoneError(
f"Cannot load API key: vault wrapper not found at {vault_script} "
"and GRAVITYZONE_API_KEY is not set."
)
try:
completed = subprocess.run(
["bash", str(vault_script), "get-field", VAULT_ENTRY, VAULT_FIELD],
capture_output=True,
text=True,
timeout=60,
)
except FileNotFoundError as exc:
raise GravityZoneError(
"Cannot load API key: 'bash' not found on PATH. Install Git Bash or "
"set GRAVITYZONE_API_KEY."
) from exc
except subprocess.TimeoutExpired as exc:
raise GravityZoneError("Cannot load API key: vault call timed out.") from exc
if completed.returncode != 0:
raise GravityZoneError(
"Cannot load API key from vault "
f"(exit {completed.returncode}): {completed.stderr.strip()}"
)
key = completed.stdout.strip()
if not key:
raise GravityZoneError("Vault returned an empty API key.")
return key
# --- client -------------------------------------------------------------------
class GravityZoneClient:
def __init__(
self,
api_key: Optional[str] = None,
api_base_url: str = GRAVITYZONE_API_BASE_URL,
timeout: float = GRAVITYZONE_TIMEOUT_SECONDS,
connect_timeout: float = GRAVITYZONE_CONNECT_TIMEOUT_SECONDS,
):
self.api_base_url = api_base_url.rstrip("/")
self._api_key = api_key # lazily loaded if None
self.timeout = timeout
self.connect_timeout = connect_timeout
@property
def api_key(self) -> str:
if not self._api_key:
self._api_key = load_api_key()
return self._api_key
# -- core transport --------------------------------------------------------
def _jsonrpc_request(self, module: str, method: str, params: dict) -> Any:
"""Make one JSON-RPC call. Returns body['result'] or raises GravityZoneError."""
url = f"{self.api_base_url}/{module}"
payload = {"id": "1", "jsonrpc": "2.0", "method": method, "params": params}
body = self._post(url, payload)
if isinstance(body, dict) and "error" in body and body["error"] is not None:
err = body["error"]
detail = None
if isinstance(err, dict):
detail = (err.get("data") or {}).get("details") or err.get("message")
raise GravityZoneError(
f"GravityZone API error [{module}.{method}]: {detail or err}"
)
if isinstance(body, dict):
return body.get("result")
return body
def _post(self, url: str, payload: dict) -> Any:
data = json.dumps(payload).encode("utf-8")
if _HAS_HTTPX:
try:
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
with httpx.Client(timeout=timeout) as client:
resp = client.post(url, content=data, auth=(self.api_key, ""),
headers={"Content-Type": "application/json"})
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException as exc:
raise GravityZoneError(f"GravityZone request timed out: {exc}") from exc
except httpx.HTTPStatusError as exc:
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
raise GravityZoneError(
f"GravityZone HTTP {exc.response.status_code}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise GravityZoneError(f"GravityZone request failed: {exc}") from exc
# stdlib fallback
token = base64.b64encode(f"{self.api_key}:".encode("utf-8")).decode("ascii")
req = urllib.request.Request(
url,
data=data,
method="POST",
headers={
"Content-Type": "application/json",
"Authorization": f"Basic {token}",
},
)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
raw = resp.read()
return json.loads(raw.decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
raise GravityZoneError(f"GravityZone HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise GravityZoneError(f"GravityZone request failed: {exc}") from exc
# ======================================================================
# READ METHODS (always live)
# ======================================================================
def get_api_status(self) -> dict:
api_key_details = self._jsonrpc_request("general", "getApiKeyDetails", {}) or {}
# Nest the two responses to avoid silent key collisions on the merge.
status: dict = {"apiKey": api_key_details}
try:
status["license"] = (
self._jsonrpc_request("licensing", "getLicenseInfo", {}) or {}
)
except GravityZoneError as exc:
status["_licenseWarning"] = str(exc)
return status
def get_own_company(self) -> dict:
return self._jsonrpc_request("companies", "getCompanyDetails", {}) or {}
def list_companies(self, page: int = 1, per_page: int = 100) -> dict:
result = self._jsonrpc_request(
"network",
"getNetworkInventoryItems",
{
"parentId": ACG_COMPANIES_CONTAINER_ID,
"page": page,
"perPage": per_page,
},
) or {}
items = [i for i in result.get("items", []) if i.get("type") == 1]
return {"total": len(items), "items": items}
def list_endpoints(
self, parent_id: Optional[str] = None, page: int = 1, per_page: int = 100
) -> dict:
params: dict = {"page": page, "perPage": per_page}
if parent_id:
params["parentId"] = parent_id
return self._jsonrpc_request("network", "getEndpointsList", params) or {}
def get_endpoint_details(self, endpoint_id: str) -> dict:
return self._jsonrpc_request(
"network", "getManagedEndpointDetails", {"endpointId": endpoint_id}
) or {}
def list_policies(self, page: int = 1, per_page: int = 100) -> dict:
return self._jsonrpc_request(
"policies", "getPoliciesList", {"page": page, "perPage": per_page}
) or {}
def get_policy_details(self, policy_id: str) -> dict:
return self._jsonrpc_request(
"policies", "getPolicyDetails", {"policyId": policy_id}
) or {}
def list_packages(self, page: int = 1, per_page: int = 100) -> dict:
return self._jsonrpc_request(
"packages", "getPackagesList", {"page": page, "perPage": per_page}
) or {}
def list_quarantine(
self, company_id: str, page: int = 1, per_page: int = 100
) -> dict:
# Service-scoped path 'quarantine/computers' (bare 'quarantine' 404s);
# the param is 'companyId', NOT 'parentId'.
return self._jsonrpc_request(
"quarantine/computers",
"getQuarantineItemsList",
{"companyId": company_id, "page": page, "perPage": per_page},
) or {}
def security_sweep(self, parent_id: str) -> list[GZEndpointSummary]:
"""Live security posture sweep of all endpoints under a parent."""
summaries: list[GZEndpointSummary] = []
page = 1
per_page = 100
while True:
data = self.list_endpoints(parent_id, page=page, per_page=per_page)
items = data.get("items", [])
if not items:
break
for item in items:
endpoint_id = item.get("id", "")
if not endpoint_id:
continue
try:
detail = self.get_endpoint_details(endpoint_id)
except GravityZoneError:
continue
malware = detail.get("malwareStatus", {}) or {}
agent = detail.get("agent", {}) or {}
summaries.append(
GZEndpointSummary(
endpoint_id=endpoint_id,
name=detail.get("name") or item.get("name", ""),
company_id=detail.get("companyId") or item.get("companyId", ""),
infected=bool(malware.get("infected", False)),
threat_detected=bool(malware.get("detection", False)),
signature_outdated=bool(agent.get("signatureOutdated", False)),
product_outdated=bool(agent.get("productOutdated", False)),
last_seen=detail.get("lastSeen"),
agent_version=agent.get("productVersion"),
state=detail.get("state", 0),
)
)
total = data.get("total", 0)
if page * per_page >= total:
break
page += 1
summaries.sort(
key=lambda s: (
not s.infected,
not s.signature_outdated,
not s.product_outdated,
s.name.lower(),
)
)
return summaries
def security_sweep_all_clients(self) -> list[GZEndpointSummary]:
"""Sweep every client company. The companies container is NOT a valid
endpoint parent, so iterate each company and sweep it individually."""
summaries: list[GZEndpointSummary] = []
companies = self.list_companies(per_page=100).get("items", [])
for company in companies:
cid = company.get("id")
if not cid:
continue
try:
company_summaries = self.security_sweep(cid)
except GravityZoneError:
continue
for s in company_summaries:
if not s.company_id:
s.company_id = cid
summaries.extend(company_summaries)
summaries.sort(
key=lambda s: (
not s.infected,
not s.signature_outdated,
not s.product_outdated,
s.name.lower(),
)
)
return summaries
# ======================================================================
# MANAGEMENT METHODS (verified only)
# ======================================================================
def create_package(
self,
package_name: str,
company_id: Optional[str] = None,
description: Optional[str] = None,
language: Optional[str] = None,
modules: Optional[dict] = None,
scan_mode: Optional[dict] = None,
settings: Optional[dict] = None,
roles: Optional[dict] = None,
deployment_options: Optional[dict] = None,
) -> Any:
params: dict = {"packageName": package_name}
if company_id:
params["companyId"] = company_id
if description is not None:
params["description"] = description
if language is not None:
params["language"] = language
if modules is not None:
params["modules"] = modules
if scan_mode is not None:
params["scanMode"] = scan_mode
if settings is not None:
params["settings"] = settings
if roles is not None:
params["roles"] = roles
if deployment_options is not None:
params["deploymentOptions"] = deployment_options
result = self._jsonrpc_request("packages", "createPackage", params)
# Write-through: refresh package list in cache.
self._cache_add_package(package_name, result)
return result
def get_installation_links(
self, package_name: str, company_id: Optional[str] = None
) -> Any:
params: dict = {"packageName": package_name}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("packages", "getInstallationLinks", params)
def delete_package(
self, package_name: str, company_id: Optional[str] = None
) -> Any:
params: dict = {"packageName": package_name}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("packages", "deletePackage", params)
def create_scan_task(
self,
target_ids: list[str],
scan_type: int,
name: Optional[str] = None,
custom_scan_settings: Optional[dict] = None,
) -> Any:
params: dict = {"targetIds": target_ids, "type": scan_type}
if name is not None:
params["name"] = name
if custom_scan_settings is not None:
params["customScanSettings"] = custom_scan_settings
return self._jsonrpc_request("network", "createScanTask", params)
def move_endpoints(self, endpoint_ids: list[str], group_id: str) -> Any:
return self._jsonrpc_request(
"network",
"moveEndpoints",
{"endpointIds": endpoint_ids, "groupId": group_id},
)
def delete_endpoint(self, endpoint_id: str) -> Any:
return self._jsonrpc_request(
"network", "deleteEndpoint", {"endpointId": endpoint_id}
)
def create_custom_group(self, name: str, parent_id: Optional[str] = None) -> Any:
params: dict = {"name": name}
if parent_id:
params["parentId"] = parent_id
result = self._jsonrpc_request("network", "createCustomGroup", params)
# Write-through: createCustomGroup returns the new group id (string).
group_id = result if isinstance(result, str) else None
if isinstance(result, dict):
group_id = result.get("id") or result.get("groupId")
if group_id:
self._cache_add_group(group_id, name)
return result
def delete_custom_group(self, group_id: str) -> Any:
return self._jsonrpc_request(
"network", "deleteCustomGroup", {"groupId": group_id}
)
def move_custom_group(self, group_id: str, new_parent_id: str) -> Any:
return self._jsonrpc_request(
"network",
"moveCustomGroup",
{"groupId": group_id, "newParentId": new_parent_id},
)
# ======================================================================
# INCIDENTS / EDR (module `/incidents`)
# ----------------------------------------------------------------------
# READ methods are always live and never cached (blocklist + incident
# status are volatile). State-changing methods (isolate, restore,
# add/remove blocklist) return the raw upstream result; the CLI caller is
# responsible for gating them behind --confirm.
# ======================================================================
# -- READ (safe) -----------------------------------------------------------
def list_blocklist(
self,
company_id: Optional[str] = None,
page: int = 1,
per_page: int = 100,
) -> dict:
"""List blocklisted hash items. VERIFIED LIVE (incidents.getBlocklistItems).
Returns {total, page, perPage, pagesCount, items:[{id, source,
sourceInfo, hashType, hash, companyId}]}. `companyId` scopes to one
company; omit for the whole partner tenant.
"""
params: dict = {"page": page, "perPage": per_page}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("incidents", "getBlocklistItems", params) or {}
def list_incidents(
self,
parent_id: str,
page: int = 1,
per_page: int = 500,
filters: Optional[dict] = None,
) -> dict:
"""List incidents under a company/group.
`parent_id` (a company/group id) is REQUIRED for this method.
perPage default is 500: live testing surfaced an
"Invalid value for 'perPage' parameter. The value should be between 500
and 10000" error for smaller values, so 100 (the default elsewhere) is
rejected here.
NOTE (UNVERIFIED / possibly unavailable on this tenant): live re-testing
on 2026-05-30 returned "Method not found" for `getIncidentsList` on the
`/incidents` module, even though `getBlocklistItems` on the SAME module
succeeds in the same request (so this is not rate-limiting or a bad key).
The method may be gated by an EDR/incidents license feature that is OFF
on this tenant, or named differently in this API version. Treat the
return value as best-effort and confirm against the official Bitdefender
API reference / console before relying on incident data.
"""
params: dict = {"parentId": parent_id, "page": page, "perPage": per_page}
if filters is not None:
params["filters"] = filters
return self._jsonrpc_request("incidents", "getIncidentsList", params) or {}
# -- STATE-CHANGING (caller MUST gate behind --confirm) --------------------
def isolate_endpoints(self, endpoint_ids: list[str]) -> Any:
"""Isolate endpoints from the network (incidents.createIsolateEndpointTask).
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
task ids. STATE-CHANGING — gate behind --confirm at the call site.
"""
if len(endpoint_ids) > 1000:
raise GravityZoneError(
"isolate_endpoints accepts at most 1000 endpoint ids per call "
f"(got {len(endpoint_ids)})."
)
return self._jsonrpc_request(
"incidents", "createIsolateEndpointTask", {"endpointIds": endpoint_ids}
)
def restore_endpoints_from_isolation(self, endpoint_ids: list[str]) -> Any:
"""Un-isolate endpoints (incidents.createRestoreEndpointFromIsolationTask).
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
task ids. STATE-CHANGING — gate behind --confirm at the call site.
"""
if len(endpoint_ids) > 1000:
raise GravityZoneError(
"restore_endpoints_from_isolation accepts at most 1000 endpoint "
f"ids per call (got {len(endpoint_ids)})."
)
return self._jsonrpc_request(
"incidents",
"createRestoreEndpointFromIsolationTask",
{"endpointIds": endpoint_ids},
)
def add_to_blocklist(
self,
company_id: str,
hash_list: list[str],
hash_type: int = 1,
source_info: str = "",
operating_systems: Optional[list[str]] = None,
) -> Any:
"""Add hashes to the blocklist (incidents.addToBlocklist).
`hash_type` is an int (1 is the common value seen live; see the
GravityZone console / API docs for the full mapping). `hash_list` is an
array of hash strings. `source_info` is a free-text description.
STATE-CHANGING — gate behind --confirm at the call site.
"""
params: dict = {
"companyId": company_id,
"hashType": hash_type,
"hashList": hash_list,
"sourceInfo": source_info,
}
if operating_systems is not None:
params["operatingSystems"] = operating_systems
return self._jsonrpc_request("incidents", "addToBlocklist", params)
def remove_from_blocklist(self, hash_item_id: str) -> Any:
"""Remove one blocklist entry (incidents.removeFromBlocklist).
STATE-CHANGING — gate behind --confirm at the call site.
UNVERIFIED: the param name `hashItemId` is the candidate (the `id`
field from getBlocklistItems). Confirm against the official Bitdefender
API reference before relying on it.
"""
return self._jsonrpc_request(
"incidents", "removeFromBlocklist", {"hashItemId": hash_item_id}
)
# ======================================================================
# CACHE LAYER (identity / structure only — never volatile status)
# ======================================================================
def _read_cache(self) -> Optional[dict]:
if not CACHE_FILE.exists():
return None
try:
return json.loads(CACHE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
def _write_cache(self, cache: dict) -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(
json.dumps(cache, indent=2, sort_keys=True), encoding="utf-8"
)
def _cache_is_fresh(self, cache: dict) -> bool:
fetched = cache.get("fetched_at")
ttl = cache.get("ttl_seconds", CACHE_TTL_SECONDS)
if not fetched:
return False
try:
ts = datetime.fromisoformat(fetched)
except ValueError:
return False
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - ts).total_seconds()
return age < ttl
def refresh_inventory(self) -> dict:
"""Full identity/structure pull. Writes and returns the cache."""
companies_map: dict[str, str] = {}
endpoints_map: dict[str, dict] = {}
policies_map: dict[str, str] = {}
companies = self.list_companies(per_page=100).get("items", [])
for c in companies:
cid = c.get("id")
if cid:
companies_map[cid] = c.get("name", "")
# Endpoints per company (identity tier only — no status fields).
for cid in list(companies_map.keys()) + [ACG_ROOT_COMPANY_ID]:
page = 1
while True:
try:
data = self.list_endpoints(cid, page=page, per_page=100)
except GravityZoneError:
break
items = data.get("items", [])
if not items:
break
for ep in items:
eid = ep.get("id")
if not eid:
continue
endpoints_map[eid] = {
"name": ep.get("name", ""),
"company_id": ep.get("companyId", cid),
"fqdn": ep.get("fqdn") or ep.get("FQDN") or "",
}
total = data.get("total", 0)
if page * 100 >= total:
break
page += 1
try:
for p in self.list_policies(per_page=100).get("items", []):
pid = p.get("id")
if pid:
policies_map[pid] = p.get("name", "")
except GravityZoneError:
pass
packages: list = []
try:
packages = self.list_packages(per_page=100).get("items", [])
except GravityZoneError:
pass
cache = {
"fetched_at": datetime.now(timezone.utc).isoformat(),
"ttl_seconds": CACHE_TTL_SECONDS,
"companies": companies_map,
"endpoints": endpoints_map,
"policies": policies_map,
"packages": packages,
}
self._write_cache(cache)
return cache
def get_inventory(self, refresh: bool = False) -> dict:
"""Return cached identity/structure, refreshing if stale or forced."""
if not refresh:
cache = self._read_cache()
if cache and self._cache_is_fresh(cache):
return cache
return self.refresh_inventory()
def _cache_add_group(self, group_id: str, name: str) -> None:
cache = self._read_cache()
if cache is None:
return # no cache yet — next refresh picks it up
cache.setdefault("companies", {})
# Groups live in the inventory tree; store under a 'groups' map.
cache.setdefault("groups", {})[group_id] = name
self._write_cache(cache)
def _cache_add_package(self, package_name: str, create_result: Any) -> None:
cache = self._read_cache()
if cache is None:
return
packages = cache.setdefault("packages", [])
pkg_id = create_result if isinstance(create_result, str) else None
if isinstance(create_result, dict):
pkg_id = create_result.get("id")
if not any(
(isinstance(p, dict) and p.get("name") == package_name) for p in packages
):
packages.append({"id": pkg_id, "name": package_name})
self._write_cache(cache)
def main() -> int:
"""Minimal self-check: load key (no network call)."""
try:
client = GravityZoneClient()
_ = client.api_key # triggers vault load
print("[OK] API key loaded; transport =",
"httpx" if _HAS_HTTPX else "urllib")
return 0
except GravityZoneError as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())