Convergence-pass LOW/NIT cleanup: - cmd_companies uses list_all_companies() so a >100-company tenant isn't truncated in the listing (was page-1 only); matches sweep/inventory. - removed unused 'field' import from dataclasses. Deliberately NOT changed: id validation on delete-package/report-delete/blocklist- remove/quarantine-remove/restore - those ids are not pinned 24-hex format, so validating could reject valid input; they are --confirm-gated and bad ids match the expected-error markers (no mislog). 81/81 selftest. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1426 lines
60 KiB
Python
1426 lines
60 KiB
Python
#!/usr/bin/env python3
|
|
"""GravityZone Cloud Public API client for the bitdefender skill.
|
|
|
|
Standalone (does not import the api/ service). Reuses the proven JSON-RPC
|
|
shape, HTTP Basic auth (api_key as username, empty password), and the ACG
|
|
hardcoded tenant IDs.
|
|
|
|
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
|
|
script has no hard third-party dependency.
|
|
|
|
Credentials: never hardcoded. Loaded at runtime from the SOPS vault, or from
|
|
the GRAVITYZONE_API_KEY env var (testing override).
|
|
|
|
Cache: only the IDENTITY/STRUCTURE tier is cached (company/endpoint/policy
|
|
id<->name maps + endpoint fqdn, package list). No secrets are ever cached;
|
|
infra identifiers (hostnames/FQDNs) are. Volatile status (infected, lastSeen,
|
|
online, signature freshness) is NEVER cached and always pulled live. The cache
|
|
dir is gitignored.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import random
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from email.utils import parsedate_to_datetime
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
# --- optional httpx -----------------------------------------------------------
|
|
# urllib (stdlib) is always imported as the fallback transport; httpx is used
|
|
# when present for connection pooling/timeouts.
|
|
try:
|
|
import httpx # type: ignore
|
|
|
|
_HAS_HTTPX = True
|
|
except ImportError: # pragma: no cover - depends on environment
|
|
_HAS_HTTPX = False
|
|
|
|
# Cap upstream error bodies surfaced in exceptions. The GravityZone key never
|
|
# appears here, but `raw` can call arbitrary methods whose responses may reflect
|
|
# other data - bound the blast radius rather than echo full bodies into logs.
|
|
ERROR_BODY_MAX_CHARS = 500
|
|
|
|
# --- transient-failure retry policy -------------------------------------------
|
|
# The live tenant is rate-limited (real HTTP 429s observed during sweeps, which
|
|
# fan out one getManagedEndpointDetails per endpoint across every company). Retry
|
|
# 429/5xx/timeout with bounded exponential backoff, honoring Retry-After.
|
|
RETRY_STATUSES = frozenset({429, 500, 502, 503, 504})
|
|
RETRY_MAX_ATTEMPTS = 4 # total tries = 1 initial + up to (MAX-1) retries
|
|
RETRY_BASE_DELAY_SECONDS = 1.0
|
|
RETRY_MAX_DELAY_SECONDS = 30.0 # cap on the exponential backoff
|
|
RETRY_AFTER_MAX_SECONDS = 120.0 # higher cap for a server-mandated Retry-After
|
|
|
|
|
|
class _RetryableHTTP(Exception):
|
|
"""Internal signal that a request failed transiently and may be retried.
|
|
`code` is the HTTP status (int) or a string: 'timeout' (ambiguous - may have
|
|
reached the server) or 'connect' (pre-send - no side effect, always safe)."""
|
|
|
|
def __init__(self, code, headers=None, detail=""):
|
|
self.code = code
|
|
self.headers = headers or {}
|
|
self.detail = detail
|
|
super().__init__(f"transient {code}")
|
|
|
|
|
|
def _retry_delay(headers, attempt: int) -> float:
|
|
"""Seconds to wait before the next retry: honor a Retry-After header when
|
|
present (numeric seconds or an HTTP-date), else exponential backoff + jitter."""
|
|
ra = None
|
|
try:
|
|
ra = headers.get("Retry-After") or headers.get("retry-after")
|
|
except AttributeError:
|
|
ra = None
|
|
if ra:
|
|
# An explicit server-mandated Retry-After is honored up to a HIGHER cap
|
|
# than the exponential backoff (don't retry early into another 429).
|
|
try:
|
|
# clamp to [0, ceiling]: a malformed negative Retry-After must not
|
|
# reach time.sleep() (which raises ValueError on a negative value).
|
|
return min(max(float(ra), 0.0), RETRY_AFTER_MAX_SECONDS)
|
|
except (TypeError, ValueError):
|
|
try:
|
|
dt = parsedate_to_datetime(ra)
|
|
if dt is not None:
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
delta = (dt - datetime.now(timezone.utc)).total_seconds()
|
|
if delta > 0:
|
|
return min(delta, RETRY_AFTER_MAX_SECONDS)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
backoff = min(RETRY_BASE_DELAY_SECONDS * (2 ** attempt), RETRY_MAX_DELAY_SECONDS)
|
|
return backoff + random.uniform(0.0, backoff * 0.25)
|
|
|
|
# --- constants ----------------------------------------------------------------
|
|
GRAVITYZONE_API_BASE_URL = os.environ.get(
|
|
"GRAVITYZONE_API_BASE_URL",
|
|
"https://cloud.gravityzone.bitdefender.com/api/v1.0/jsonrpc",
|
|
)
|
|
GRAVITYZONE_TIMEOUT_SECONDS = 60.0
|
|
GRAVITYZONE_CONNECT_TIMEOUT_SECONDS = 10.0
|
|
|
|
# Hard ceiling on paginated loops: a misbehaving API that always returns a full
|
|
# page must never spin forever (the sweep also fans out N detail calls per page).
|
|
MAX_PAGINATION_PAGES = 1000
|
|
|
|
ACG_ROOT_COMPANY_ID = "5c4280716c0318f3478b456a"
|
|
ACG_COMPANIES_CONTAINER_ID = "5c4280716c0318f3478b456e"
|
|
|
|
VAULT_ENTRY = "msp-tools/gravityzone.sops.yaml"
|
|
VAULT_FIELD = "credentials.api_key"
|
|
|
|
CACHE_TTL_SECONDS = 86400
|
|
SKILL_DIR = Path(__file__).resolve().parent.parent
|
|
CACHE_DIR = SKILL_DIR / ".cache"
|
|
CACHE_FILE = CACHE_DIR / "inventory.json"
|
|
CACHE_LOCK_FILE = CACHE_DIR / "inventory.lock"
|
|
# Best-effort advisory lock for read-modify-write of the cache. Short timeout:
|
|
# losing a write-through update is acceptable; hanging the CLI is not.
|
|
CACHE_LOCK_TIMEOUT_SECONDS = 5.0
|
|
CACHE_LOCK_STALE_SECONDS = 30.0
|
|
|
|
|
|
class GravityZoneError(RuntimeError):
|
|
"""Raised for transport or JSON-RPC errors."""
|
|
|
|
|
|
@dataclass
|
|
class GZEndpointSummary:
|
|
endpoint_id: str
|
|
name: str
|
|
company_id: str
|
|
infected: bool
|
|
# True = a malware detection is currently active on the endpoint (tracks with infected); NOT an "engine on" indicator.
|
|
threat_detected: bool
|
|
signature_outdated: bool
|
|
product_outdated: bool
|
|
last_seen: Optional[str]
|
|
agent_version: Optional[str]
|
|
state: int
|
|
|
|
|
|
# --- credential loading -------------------------------------------------------
|
|
def _resolve_claudetools_root() -> Path:
|
|
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived path.
|
|
|
|
Final fallback is derived from this file's location
|
|
(.claude/skills/bitdefender/scripts -> repo root) so it works on the
|
|
Mac/Linux fleet machines, not only the Windows default.
|
|
"""
|
|
# SKILL_DIR = .../.claude/skills/bitdefender ; root is three levels up.
|
|
derived_root = SKILL_DIR.parent.parent.parent
|
|
|
|
env_root = os.environ.get("CLAUDETOOLS_ROOT")
|
|
if env_root:
|
|
return Path(env_root)
|
|
|
|
identity_path = derived_root / ".claude" / "identity.json"
|
|
if identity_path.exists():
|
|
try:
|
|
data = json.loads(identity_path.read_text(encoding="utf-8"))
|
|
root = data.get("claudetools_root")
|
|
if root:
|
|
return Path(root)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
return derived_root
|
|
|
|
|
|
def load_api_key() -> str:
|
|
"""Load the GravityZone API key.
|
|
|
|
Order: GRAVITYZONE_API_KEY env override, then the SOPS vault wrapper.
|
|
Never returns an empty key - raises if it cannot resolve one.
|
|
"""
|
|
env_key = os.environ.get("GRAVITYZONE_API_KEY")
|
|
if env_key:
|
|
return env_key.strip()
|
|
|
|
root = _resolve_claudetools_root()
|
|
vault_script = root / ".claude" / "scripts" / "vault.sh"
|
|
if not vault_script.exists():
|
|
raise GravityZoneError(
|
|
f"Cannot load API key: vault wrapper not found at {vault_script} "
|
|
"and GRAVITYZONE_API_KEY is not set."
|
|
)
|
|
|
|
try:
|
|
completed = subprocess.run(
|
|
["bash", str(vault_script), "get-field", VAULT_ENTRY, VAULT_FIELD],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
)
|
|
except FileNotFoundError as exc:
|
|
raise GravityZoneError(
|
|
"Cannot load API key: 'bash' not found on PATH. Install Git Bash or "
|
|
"set GRAVITYZONE_API_KEY."
|
|
) from exc
|
|
except subprocess.TimeoutExpired as exc:
|
|
raise GravityZoneError("Cannot load API key: vault call timed out.") from exc
|
|
|
|
if completed.returncode != 0:
|
|
raise GravityZoneError(
|
|
"Cannot load API key from vault "
|
|
f"(exit {completed.returncode}): {completed.stderr.strip()}"
|
|
)
|
|
|
|
key = completed.stdout.strip()
|
|
if not key:
|
|
raise GravityZoneError("Vault returned an empty API key.")
|
|
return key
|
|
|
|
|
|
# --- client -------------------------------------------------------------------
|
|
class GravityZoneClient:
|
|
def __init__(
|
|
self,
|
|
api_key: Optional[str] = None,
|
|
api_base_url: str = GRAVITYZONE_API_BASE_URL,
|
|
timeout: float = GRAVITYZONE_TIMEOUT_SECONDS,
|
|
connect_timeout: float = GRAVITYZONE_CONNECT_TIMEOUT_SECONDS,
|
|
):
|
|
self.api_base_url = api_base_url.rstrip("/")
|
|
self._api_key = api_key # lazily loaded if None
|
|
self.timeout = timeout
|
|
self.connect_timeout = connect_timeout
|
|
self._httpx_client = None # reused across calls (pooling) when httpx present
|
|
|
|
def close(self) -> None:
|
|
"""Close the pooled httpx client, if one was opened."""
|
|
if self._httpx_client is not None:
|
|
try:
|
|
self._httpx_client.close()
|
|
finally:
|
|
self._httpx_client = None
|
|
|
|
def __enter__(self) -> "GravityZoneClient":
|
|
return self
|
|
|
|
def __exit__(self, *exc) -> None:
|
|
self.close()
|
|
|
|
@property
|
|
def _client(self):
|
|
"""Lazily create and reuse a single httpx.Client so a multi-call sweep
|
|
shares one connection pool instead of a TLS handshake per request."""
|
|
if self._httpx_client is None:
|
|
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
|
|
self._httpx_client = httpx.Client(timeout=timeout)
|
|
return self._httpx_client
|
|
|
|
@property
|
|
def api_key(self) -> str:
|
|
if not self._api_key:
|
|
self._api_key = load_api_key()
|
|
return self._api_key
|
|
|
|
# -- core transport --------------------------------------------------------
|
|
def _jsonrpc_request(self, module: str, method: str, params: dict) -> Any:
|
|
"""Make one JSON-RPC call. Returns body['result'] or raises GravityZoneError."""
|
|
url = f"{self.api_base_url}/{module}"
|
|
payload = {"id": "1", "jsonrpc": "2.0", "method": method, "params": params}
|
|
# Read methods (get*/list*) are safe to retry on timeout/5xx; state-changing
|
|
# methods are NOT (a timeout can fire after the server committed -> a retry
|
|
# would double-execute, e.g. createScanTask/createPackage). 429 is a
|
|
# pre-processing rate-limit reject and is always safe (handled in _post).
|
|
idempotent = method.lower().startswith(("get", "list"))
|
|
body = self._post(url, payload, idempotent=idempotent)
|
|
|
|
if isinstance(body, dict) and "error" in body and body["error"] is not None:
|
|
err = body["error"]
|
|
detail = None
|
|
if isinstance(err, dict):
|
|
detail = (err.get("data") or {}).get("details") or err.get("message")
|
|
raise GravityZoneError(
|
|
f"GravityZone API error [{module}.{method}]: {detail or err}"
|
|
)
|
|
if isinstance(body, dict):
|
|
return body.get("result")
|
|
return body
|
|
|
|
def _post(self, url: str, payload: dict, idempotent: bool = False) -> Any:
|
|
"""POST with bounded retry on transient failures. Failures with NO
|
|
possible side effect (429 pre-processing reject, 'connect' pre-send
|
|
failure) are always retried; an ambiguous timeout/5xx (may have committed
|
|
server-side) is retried ONLY for idempotent (read) calls, so a
|
|
non-idempotent write is never silently re-executed."""
|
|
data = json.dumps(payload).encode("utf-8")
|
|
for attempt in range(RETRY_MAX_ATTEMPTS):
|
|
try:
|
|
return self._post_once(url, data)
|
|
except _RetryableHTTP as exc:
|
|
retry_safe = (exc.code in (429, "connect")) or idempotent
|
|
if not retry_safe or attempt >= RETRY_MAX_ATTEMPTS - 1:
|
|
note = "" if retry_safe else " (non-idempotent; not retried)"
|
|
raise GravityZoneError(
|
|
f"GravityZone HTTP {exc.code}{note}: {exc.detail}".rstrip(": ")
|
|
) from exc
|
|
delay = _retry_delay(exc.headers, attempt)
|
|
print(
|
|
f"[WARNING] GravityZone {exc.code} - retry "
|
|
f"{attempt + 1}/{RETRY_MAX_ATTEMPTS - 1} in {delay:.1f}s",
|
|
file=sys.stderr,
|
|
)
|
|
time.sleep(delay)
|
|
# unreachable: the loop either returns or raises on the final attempt
|
|
raise GravityZoneError("GravityZone request failed: retries exhausted")
|
|
|
|
def _post_once(self, url: str, data: bytes) -> Any:
|
|
"""One POST. Returns parsed JSON, raises _RetryableHTTP on a transient
|
|
failure, or GravityZoneError on a terminal one."""
|
|
if _HAS_HTTPX:
|
|
try:
|
|
resp = self._client.post(
|
|
url, content=data, auth=(self.api_key, ""),
|
|
headers={"Content-Type": "application/json"})
|
|
resp.raise_for_status()
|
|
except (httpx.ConnectError, httpx.ConnectTimeout) as exc:
|
|
# Pre-send: the connection never established -> no side effect,
|
|
# always safe to retry (must precede TimeoutException since
|
|
# ConnectTimeout subclasses it).
|
|
raise _RetryableHTTP("connect", detail=str(exc)) from exc
|
|
except httpx.TimeoutException as exc:
|
|
raise _RetryableHTTP("timeout", detail=str(exc)) from exc
|
|
except httpx.HTTPStatusError as exc:
|
|
code = exc.response.status_code
|
|
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
|
|
if code in RETRY_STATUSES:
|
|
raise _RetryableHTTP(code, exc.response.headers, detail) from exc
|
|
raise GravityZoneError(
|
|
f"GravityZone HTTP {code}: {detail}") from exc
|
|
except httpx.HTTPError as exc:
|
|
raise GravityZoneError(
|
|
f"GravityZone request failed: {exc}") from exc
|
|
try:
|
|
return resp.json()
|
|
except ValueError as exc:
|
|
body = (resp.text or "")[:ERROR_BODY_MAX_CHARS]
|
|
raise GravityZoneError(
|
|
f"GravityZone returned a non-JSON response: {body}") from exc
|
|
|
|
# stdlib fallback
|
|
token = base64.b64encode(f"{self.api_key}:".encode("utf-8")).decode("ascii")
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=data,
|
|
method="POST",
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Basic {token}",
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
|
|
raw = resp.read()
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
|
|
if exc.code in RETRY_STATUSES:
|
|
raise _RetryableHTTP(exc.code, getattr(exc, "headers", None),
|
|
detail) from exc
|
|
raise GravityZoneError(f"GravityZone HTTP {exc.code}: {detail}") from exc
|
|
except TimeoutError as exc:
|
|
raise _RetryableHTTP("timeout", detail=str(exc)) from exc
|
|
except urllib.error.URLError as exc:
|
|
# Classify conservatively: only a KNOWN pre-send failure (connection
|
|
# refused / DNS failure) is the always-safe 'connect'. Anything else -
|
|
# a connect/read timeout, or an ambiguous post-send reset like
|
|
# RemoteDisconnected/ConnectionResetError that urllib also wraps in
|
|
# URLError - is 'timeout' so a non-idempotent write is NOT retried.
|
|
reason = getattr(exc, "reason", None)
|
|
if isinstance(reason, (ConnectionRefusedError, socket.gaierror)):
|
|
code = "connect"
|
|
else:
|
|
code = "timeout"
|
|
raise _RetryableHTTP(code, detail=str(exc)) from exc
|
|
try:
|
|
return json.loads(raw.decode("utf-8"))
|
|
except ValueError as exc:
|
|
body = raw.decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
|
|
raise GravityZoneError(
|
|
f"GravityZone returned a non-JSON response: {body}") from exc
|
|
|
|
# ======================================================================
|
|
# READ METHODS (always live)
|
|
# ======================================================================
|
|
def get_api_status(self) -> dict:
|
|
api_key_details = self._jsonrpc_request("general", "getApiKeyDetails", {}) or {}
|
|
# Nest the two responses to avoid silent key collisions on the merge.
|
|
status: dict = {"apiKey": api_key_details}
|
|
try:
|
|
status["license"] = (
|
|
self._jsonrpc_request("licensing", "getLicenseInfo", {}) or {}
|
|
)
|
|
except GravityZoneError as exc:
|
|
status["_licenseWarning"] = str(exc)
|
|
return status
|
|
|
|
def get_own_company(self) -> dict:
|
|
return self._jsonrpc_request("companies", "getCompanyDetails", {}) or {}
|
|
|
|
def get_company_details(self, company_id: Optional[str] = None) -> dict:
|
|
"""Company detail (companies.getCompanyDetails). No id -> own company."""
|
|
params: dict = {}
|
|
if company_id:
|
|
params["companyId"] = company_id
|
|
return self._jsonrpc_request("companies", "getCompanyDetails", params) or {}
|
|
|
|
def get_company_by_user(self, username: str) -> dict:
|
|
"""Company that owns a given user (companies.getCompanyDetailsByUser)."""
|
|
return self._jsonrpc_request(
|
|
"companies", "getCompanyDetailsByUser", {"username": username}
|
|
) or {}
|
|
|
|
def create_company(
|
|
self,
|
|
company_type: int,
|
|
name: str,
|
|
parent_id: Optional[str] = None,
|
|
extra: Optional[dict] = None,
|
|
) -> Any:
|
|
"""Create a company (companies.createCompany). STATE-CHANGING.
|
|
|
|
Required (verified): `type` (0=Partner, 1=Customer) and `name`. Documented
|
|
optional params: parentId, address, country, state, phone, industry,
|
|
canBeManagedByAbove, assignedProductType, and a nested `licenseSubscription`
|
|
{type (3=monthly subscription, 4=monthly trial), reservedSlots,
|
|
endSubscription, autoRenewPeriod, ...}. Pass those via `extra`. Gate at
|
|
the call site behind --confirm.
|
|
Docs: bitdefender.com/business/support/en/77211-126236-createcompany.html
|
|
"""
|
|
params: dict = {"type": company_type, "name": name}
|
|
if parent_id is not None:
|
|
params["parentId"] = parent_id
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request("companies", "createCompany", params)
|
|
|
|
def suspend_company(self, company_id: str) -> Any:
|
|
"""Suspend a company (companies.suspendCompany). STATE-CHANGING. Gated."""
|
|
return self._jsonrpc_request(
|
|
"companies", "suspendCompany", {"companyId": company_id}
|
|
)
|
|
|
|
def activate_company(self, company_id: str) -> Any:
|
|
"""Activate a suspended company (companies.activateCompany). STATE-CHANGING."""
|
|
return self._jsonrpc_request(
|
|
"companies", "activateCompany", {"companyId": company_id}
|
|
)
|
|
|
|
def delete_company(self, company_id: str) -> Any:
|
|
"""Delete a company (companies.deleteCompany). STATE-CHANGING. Gated."""
|
|
return self._jsonrpc_request(
|
|
"companies", "deleteCompany", {"companyId": company_id}
|
|
)
|
|
|
|
def list_companies(self, page: int = 1, per_page: int = 100) -> dict:
|
|
result = self._jsonrpc_request(
|
|
"network",
|
|
"getNetworkInventoryItems",
|
|
{
|
|
"parentId": ACG_COMPANIES_CONTAINER_ID,
|
|
"page": page,
|
|
"perPage": per_page,
|
|
},
|
|
) or {}
|
|
items = [i for i in result.get("items", []) if i.get("type") == 1]
|
|
return {"total": len(items), "items": items}
|
|
|
|
def list_all_companies(self) -> list[dict]:
|
|
"""Every company under the ACG container, paginated (type==1 only).
|
|
list_companies() returns only one page; callers that must see the WHOLE
|
|
fleet (sweep-all, inventory refresh) use this so a >100-company tenant is
|
|
not silently truncated."""
|
|
companies: list[dict] = []
|
|
page = 1
|
|
per_page = 100
|
|
while True:
|
|
result = self._jsonrpc_request(
|
|
"network", "getNetworkInventoryItems",
|
|
{"parentId": ACG_COMPANIES_CONTAINER_ID,
|
|
"page": page, "perPage": per_page},
|
|
) or {}
|
|
raw = result.get("items", [])
|
|
companies.extend(i for i in raw if i.get("type") == 1)
|
|
if len(raw) < per_page: # short raw page = last page
|
|
break
|
|
page += 1
|
|
if page > MAX_PAGINATION_PAGES:
|
|
print(f"[WARNING] list_all_companies hit the {MAX_PAGINATION_PAGES}-"
|
|
"page ceiling; companies may be truncated.", file=sys.stderr)
|
|
break
|
|
return companies
|
|
|
|
def list_endpoints(
|
|
self, parent_id: Optional[str] = None, page: int = 1, per_page: int = 100
|
|
) -> dict:
|
|
params: dict = {"page": page, "perPage": per_page}
|
|
if parent_id:
|
|
params["parentId"] = parent_id
|
|
return self._jsonrpc_request("network", "getEndpointsList", params) or {}
|
|
|
|
def get_endpoint_details(self, endpoint_id: str) -> dict:
|
|
return self._jsonrpc_request(
|
|
"network", "getManagedEndpointDetails", {"endpointId": endpoint_id}
|
|
) or {}
|
|
|
|
def list_policies(self, page: int = 1, per_page: int = 100) -> dict:
|
|
return self._jsonrpc_request(
|
|
"policies", "getPoliciesList", {"page": page, "perPage": per_page}
|
|
) or {}
|
|
|
|
def get_policy_details(self, policy_id: str) -> dict:
|
|
return self._jsonrpc_request(
|
|
"policies", "getPolicyDetails", {"policyId": policy_id}
|
|
) or {}
|
|
|
|
def list_packages(self, page: int = 1, per_page: int = 100) -> dict:
|
|
return self._jsonrpc_request(
|
|
"packages", "getPackagesList", {"page": page, "perPage": per_page}
|
|
) or {}
|
|
|
|
def list_quarantine(
|
|
self, company_id: str, page: int = 1, per_page: int = 100
|
|
) -> dict:
|
|
# Service-scoped path 'quarantine/computers' (bare 'quarantine' 404s);
|
|
# the param is 'companyId', NOT 'parentId'.
|
|
return self._jsonrpc_request(
|
|
"quarantine/computers",
|
|
"getQuarantineItemsList",
|
|
{"companyId": company_id, "page": page, "perPage": per_page},
|
|
) or {}
|
|
|
|
def security_sweep(self, parent_id: str) -> list[GZEndpointSummary]:
|
|
"""Live security posture sweep of all endpoints under a parent."""
|
|
summaries: list[GZEndpointSummary] = []
|
|
page = 1
|
|
per_page = 100
|
|
|
|
while True:
|
|
data = self.list_endpoints(parent_id, page=page, per_page=per_page)
|
|
items = data.get("items", [])
|
|
if not items:
|
|
break
|
|
|
|
for item in items:
|
|
endpoint_id = item.get("id", "")
|
|
if not endpoint_id:
|
|
continue
|
|
try:
|
|
detail = self.get_endpoint_details(endpoint_id)
|
|
except GravityZoneError:
|
|
continue
|
|
|
|
malware = detail.get("malwareStatus", {}) or {}
|
|
agent = detail.get("agent", {}) or {}
|
|
summaries.append(
|
|
GZEndpointSummary(
|
|
endpoint_id=endpoint_id,
|
|
name=detail.get("name") or item.get("name", ""),
|
|
company_id=detail.get("companyId") or item.get("companyId", ""),
|
|
infected=bool(malware.get("infected", False)),
|
|
threat_detected=bool(malware.get("detection", False)),
|
|
signature_outdated=bool(agent.get("signatureOutdated", False)),
|
|
product_outdated=bool(agent.get("productOutdated", False)),
|
|
last_seen=detail.get("lastSeen"),
|
|
agent_version=agent.get("productVersion"),
|
|
state=detail.get("state", 0),
|
|
)
|
|
)
|
|
|
|
# Stop at the last page. A short page (< per_page) means no more;
|
|
# do NOT rely on `total` - some responses omit it, which would
|
|
# truncate the sweep after page 1.
|
|
if len(items) < per_page:
|
|
break
|
|
page += 1
|
|
if page > MAX_PAGINATION_PAGES:
|
|
print(f"[WARNING] security_sweep hit the {MAX_PAGINATION_PAGES}-page "
|
|
"ceiling; results may be truncated.", file=sys.stderr)
|
|
break
|
|
|
|
summaries.sort(
|
|
key=lambda s: (
|
|
not s.infected,
|
|
not s.signature_outdated,
|
|
not s.product_outdated,
|
|
s.name.lower(),
|
|
)
|
|
)
|
|
return summaries
|
|
|
|
def security_sweep_all_clients(self) -> list[GZEndpointSummary]:
|
|
"""Sweep every client company. The companies container is NOT a valid
|
|
endpoint parent, so iterate each company and sweep it individually."""
|
|
summaries: list[GZEndpointSummary] = []
|
|
companies = self.list_all_companies()
|
|
for company in companies:
|
|
cid = company.get("id")
|
|
if not cid:
|
|
continue
|
|
try:
|
|
company_summaries = self.security_sweep(cid)
|
|
except GravityZoneError:
|
|
continue
|
|
for s in company_summaries:
|
|
if not s.company_id:
|
|
s.company_id = cid
|
|
summaries.extend(company_summaries)
|
|
|
|
summaries.sort(
|
|
key=lambda s: (
|
|
not s.infected,
|
|
not s.signature_outdated,
|
|
not s.product_outdated,
|
|
s.name.lower(),
|
|
)
|
|
)
|
|
return summaries
|
|
|
|
# ======================================================================
|
|
# MANAGEMENT METHODS (verified only)
|
|
# ======================================================================
|
|
def create_package(
|
|
self,
|
|
package_name: str,
|
|
company_id: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
language: Optional[str] = None,
|
|
modules: Optional[dict] = None,
|
|
scan_mode: Optional[dict] = None,
|
|
settings: Optional[dict] = None,
|
|
roles: Optional[dict] = None,
|
|
deployment_options: Optional[dict] = None,
|
|
) -> Any:
|
|
params: dict = {"packageName": package_name}
|
|
if company_id:
|
|
params["companyId"] = company_id
|
|
if description is not None:
|
|
params["description"] = description
|
|
if language is not None:
|
|
params["language"] = language
|
|
if modules is not None:
|
|
params["modules"] = modules
|
|
if scan_mode is not None:
|
|
params["scanMode"] = scan_mode
|
|
if settings is not None:
|
|
params["settings"] = settings
|
|
if roles is not None:
|
|
params["roles"] = roles
|
|
if deployment_options is not None:
|
|
params["deploymentOptions"] = deployment_options
|
|
|
|
result = self._jsonrpc_request("packages", "createPackage", params)
|
|
# Write-through: refresh package list in cache.
|
|
self._cache_add_package(package_name, result)
|
|
return result
|
|
|
|
def get_installation_links(
|
|
self, package_name: str, company_id: Optional[str] = None
|
|
) -> Any:
|
|
params: dict = {"packageName": package_name}
|
|
if company_id:
|
|
params["companyId"] = company_id
|
|
return self._jsonrpc_request("packages", "getInstallationLinks", params)
|
|
|
|
def delete_package(self, package_id: str) -> Any:
|
|
"""Delete an installation package (packages.deletePackage).
|
|
VERIFIED LIVE 2026-06-21: the param is `packageId` (NOT packageName/
|
|
companyId - those error "not expected"). STATE-CHANGING - gate at call site."""
|
|
return self._jsonrpc_request(
|
|
"packages", "deletePackage", {"packageId": package_id}
|
|
)
|
|
|
|
def create_scan_task(
|
|
self,
|
|
target_ids: list[str],
|
|
scan_type: int,
|
|
name: Optional[str] = None,
|
|
custom_scan_settings: Optional[dict] = None,
|
|
) -> Any:
|
|
params: dict = {"targetIds": target_ids, "type": scan_type}
|
|
if name is not None:
|
|
params["name"] = name
|
|
if custom_scan_settings is not None:
|
|
params["customScanSettings"] = custom_scan_settings
|
|
return self._jsonrpc_request("network", "createScanTask", params)
|
|
|
|
def move_endpoints(self, endpoint_ids: list[str], group_id: str) -> Any:
|
|
return self._jsonrpc_request(
|
|
"network",
|
|
"moveEndpoints",
|
|
{"endpointIds": endpoint_ids, "groupId": group_id},
|
|
)
|
|
|
|
def delete_endpoint(self, endpoint_id: str) -> Any:
|
|
return self._jsonrpc_request(
|
|
"network", "deleteEndpoint", {"endpointId": endpoint_id}
|
|
)
|
|
|
|
def create_custom_group(self, name: str, parent_id: Optional[str] = None) -> Any:
|
|
# API param is `groupName` (verified live 2026-06-21), NOT `name`.
|
|
params: dict = {"groupName": name}
|
|
if parent_id:
|
|
params["parentId"] = parent_id
|
|
result = self._jsonrpc_request("network", "createCustomGroup", params)
|
|
# Write-through: createCustomGroup returns the new group id (string).
|
|
group_id = result if isinstance(result, str) else None
|
|
if isinstance(result, dict):
|
|
group_id = result.get("id") or result.get("groupId")
|
|
if group_id:
|
|
self._cache_add_group(group_id, name)
|
|
return result
|
|
|
|
def delete_custom_group(self, group_id: str) -> Any:
|
|
return self._jsonrpc_request(
|
|
"network", "deleteCustomGroup", {"groupId": group_id}
|
|
)
|
|
|
|
def move_custom_group(self, group_id: str, new_parent_id: str) -> Any:
|
|
# API param is `parentId` (verified live 2026-06-21), NOT `newParentId`.
|
|
return self._jsonrpc_request(
|
|
"network",
|
|
"moveCustomGroup",
|
|
{"groupId": group_id, "parentId": new_parent_id},
|
|
)
|
|
|
|
# ======================================================================
|
|
# INCIDENTS / EDR (module `/incidents`)
|
|
# ----------------------------------------------------------------------
|
|
# READ methods are always live and never cached (blocklist + incident
|
|
# status are volatile). State-changing methods (isolate, restore,
|
|
# add/remove blocklist) return the raw upstream result; the CLI caller is
|
|
# responsible for gating them behind --confirm.
|
|
# ======================================================================
|
|
|
|
# -- READ (safe) -----------------------------------------------------------
|
|
def list_blocklist(
|
|
self,
|
|
company_id: Optional[str] = None,
|
|
page: int = 1,
|
|
per_page: int = 100,
|
|
) -> dict:
|
|
"""List blocklisted hash items. VERIFIED LIVE (incidents.getBlocklistItems).
|
|
|
|
Returns {total, page, perPage, pagesCount, items:[{id, source,
|
|
sourceInfo, hashType, hash, companyId}]}. `companyId` scopes to one
|
|
company; omit for the whole partner tenant.
|
|
"""
|
|
params: dict = {"page": page, "perPage": per_page}
|
|
if company_id:
|
|
params["companyId"] = company_id
|
|
return self._jsonrpc_request("incidents", "getBlocklistItems", params) or {}
|
|
|
|
def list_incidents(
|
|
self,
|
|
parent_id: str,
|
|
page: int = 1,
|
|
per_page: int = 500,
|
|
filters: Optional[dict] = None,
|
|
) -> dict:
|
|
"""List incidents under a company/group.
|
|
|
|
`parent_id` (a company/group id) is REQUIRED for this method.
|
|
|
|
perPage default is 500: live testing surfaced an
|
|
"Invalid value for 'perPage' parameter. The value should be between 500
|
|
and 10000" error for smaller values, so 100 (the default elsewhere) is
|
|
rejected here.
|
|
|
|
NOTE (UNVERIFIED / possibly unavailable on this tenant): live re-testing
|
|
on 2026-05-30 returned "Method not found" for `getIncidentsList` on the
|
|
`/incidents` module, even though `getBlocklistItems` on the SAME module
|
|
succeeds in the same request (so this is not rate-limiting or a bad key).
|
|
The method may be gated by an EDR/incidents license feature that is OFF
|
|
on this tenant, or named differently in this API version. Treat the
|
|
return value as best-effort and confirm against the official Bitdefender
|
|
API reference / console before relying on incident data.
|
|
"""
|
|
params: dict = {"parentId": parent_id, "page": page, "perPage": per_page}
|
|
if filters is not None:
|
|
params["filters"] = filters
|
|
return self._jsonrpc_request("incidents", "getIncidentsList", params) or {}
|
|
|
|
# -- STATE-CHANGING (caller MUST gate behind --confirm) --------------------
|
|
def isolate_endpoints(self, endpoint_ids: list[str]) -> Any:
|
|
"""Isolate endpoints from the network (incidents.createIsolateEndpointTask).
|
|
|
|
VERIFIED LIVE 2026-06-21: the API takes a SINGLE `endpointId` per call
|
|
(an `endpointIds` array errors "not expected"), so we loop. Returns the
|
|
single task result for one id, else a list. STATE-CHANGING - gate behind
|
|
--confirm at the call site.
|
|
"""
|
|
results = []
|
|
for eid in endpoint_ids:
|
|
results.append(self._jsonrpc_request(
|
|
"incidents", "createIsolateEndpointTask", {"endpointId": eid}
|
|
))
|
|
return results if len(results) != 1 else results[0]
|
|
|
|
def restore_endpoints_from_isolation(self, endpoint_ids: list[str]) -> Any:
|
|
"""Un-isolate endpoints (incidents.createRestoreEndpointFromIsolationTask).
|
|
|
|
VERIFIED LIVE 2026-06-21: a SINGLE `endpointId` per call (not an array),
|
|
so we loop. Returns the single task result for one id, else a list.
|
|
Note: fails if the isolation task is still in progress - wait + retry.
|
|
STATE-CHANGING - gate behind --confirm at the call site.
|
|
"""
|
|
results = []
|
|
for eid in endpoint_ids:
|
|
results.append(self._jsonrpc_request(
|
|
"incidents",
|
|
"createRestoreEndpointFromIsolationTask",
|
|
{"endpointId": eid},
|
|
))
|
|
return results if len(results) != 1 else results[0]
|
|
|
|
def add_to_blocklist(
|
|
self,
|
|
company_id: str,
|
|
hash_list: list[str],
|
|
hash_type: int = 1,
|
|
source_info: str = "",
|
|
operating_systems: Optional[list[str]] = None,
|
|
) -> Any:
|
|
"""Add hashes to the blocklist (incidents.addToBlocklist).
|
|
|
|
`hash_type` is an int (1 is the common value seen live; see the
|
|
GravityZone console / API docs for the full mapping). `hash_list` is an
|
|
array of hash strings. `source_info` is a free-text description.
|
|
STATE-CHANGING - gate behind --confirm at the call site.
|
|
"""
|
|
params: dict = {
|
|
"companyId": company_id,
|
|
"hashType": hash_type,
|
|
"hashList": hash_list,
|
|
"sourceInfo": source_info,
|
|
}
|
|
if operating_systems is not None:
|
|
params["operatingSystems"] = operating_systems
|
|
return self._jsonrpc_request("incidents", "addToBlocklist", params)
|
|
|
|
def remove_from_blocklist(self, hash_item_id: str) -> Any:
|
|
"""Remove one blocklist entry (incidents.removeFromBlocklist).
|
|
|
|
STATE-CHANGING - gate behind --confirm at the call site.
|
|
|
|
UNVERIFIED: the param name `hashItemId` is the candidate (the `id`
|
|
field from getBlocklistItems). Confirm against the official Bitdefender
|
|
API reference before relying on it.
|
|
"""
|
|
return self._jsonrpc_request(
|
|
"incidents", "removeFromBlocklist", {"hashItemId": hash_item_id}
|
|
)
|
|
|
|
# ======================================================================
|
|
# POLICY ASSIGNMENT (state-changing; gate behind --confirm at call site)
|
|
# ----------------------------------------------------------------------
|
|
# NOTE: getPolicyDetails (above) returns the FULL granular module config
|
|
# (verified live 2026-06-21 - the earlier "shallow only" claim was wrong).
|
|
# The Public API still has NO create/edit/clone policy method - authoring
|
|
# stays in the console - but assigning an EXISTING policy is supported here.
|
|
# ======================================================================
|
|
def assign_policy(
|
|
self,
|
|
policy_id: str,
|
|
target_ids: list[str],
|
|
force_inheritance: bool = False,
|
|
) -> Any:
|
|
"""Assign an existing policy to endpoints/groups (network.assignPolicy).
|
|
|
|
VERIFIED LIVE 2026-06-21: assigning a `policyId` REQUIRES sending
|
|
`inheritFromAbove=false` in the same call. An inheriting target defaults
|
|
inheritFromAbove to true, so omitting it makes the API reject the call
|
|
with a misleading "inheritFromAbove should not be used with policyId"
|
|
error. `forcePolicyInheritance` optionally pushes the policy down to
|
|
sub-items. (To make a target INHERIT instead, call with
|
|
{targetIds, inheritFromAbove:true} and no policyId via `raw`.)
|
|
STATE-CHANGING - gate at the call site behind --confirm.
|
|
Docs: bitdefender.com/business/support/en/77212-924802-assignpolicy.html
|
|
"""
|
|
params: dict = {
|
|
"policyId": policy_id,
|
|
"targetIds": target_ids,
|
|
"inheritFromAbove": False,
|
|
}
|
|
if force_inheritance:
|
|
params["forcePolicyInheritance"] = True
|
|
return self._jsonrpc_request("network", "assignPolicy", params)
|
|
|
|
def list_scan_tasks(
|
|
self,
|
|
page: int = 1,
|
|
per_page: int = 100,
|
|
name: Optional[str] = None,
|
|
status: Optional[int] = None,
|
|
) -> dict:
|
|
"""List scan tasks (network.getScanTasksList). VERIFIED LIVE."""
|
|
params: dict = {"page": page, "perPage": per_page}
|
|
if name is not None:
|
|
params["name"] = name
|
|
if status is not None:
|
|
params["status"] = status
|
|
return self._jsonrpc_request("network", "getScanTasksList", params) or {}
|
|
|
|
def get_endpoint_tags(self) -> Any:
|
|
"""List endpoint tags defined in the tenant (network.getEndpointTags). Read."""
|
|
return self._jsonrpc_request("network", "getEndpointTags", {})
|
|
|
|
def set_endpoint_label(self, endpoint_id: str, label: str) -> Any:
|
|
"""Set an endpoint's label (network.setEndpointLabel). STATE-CHANGING.
|
|
Requires `endpointId` and `label` (both verified). Gate at the call site."""
|
|
return self._jsonrpc_request(
|
|
"network", "setEndpointLabel",
|
|
{"endpointId": endpoint_id, "label": label},
|
|
)
|
|
|
|
def reconfigure_client(
|
|
self, target_ids: list[str], extra: Optional[dict] = None
|
|
) -> Any:
|
|
"""Reconfigure installed agents (network.createReconfigureClientTask).
|
|
STATE-CHANGING. Requires `targetIds` (verified); the reconfigure body
|
|
(which modules/roles/scanMode to change) is documented and passed via
|
|
`extra`. Gate at the call site behind --confirm.
|
|
"""
|
|
params: dict = {"targetIds": target_ids}
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request(
|
|
"network", "createReconfigureClientTask", params
|
|
)
|
|
|
|
# ======================================================================
|
|
# REPORTS (module `/reports`) - VERIFIED LIVE
|
|
# ======================================================================
|
|
def list_reports(self, page: int = 1, per_page: int = 100) -> dict:
|
|
"""List saved reports (reports.getReportsList)."""
|
|
return self._jsonrpc_request(
|
|
"reports", "getReportsList", {"page": page, "perPage": per_page}
|
|
) or {}
|
|
|
|
def get_report_links(self, report_id: str) -> Any:
|
|
"""Get download links for a generated report (reports.getDownloadLinks).
|
|
|
|
Param name `reportId` is the candidate - confirm against the console if
|
|
it errors.
|
|
"""
|
|
return self._jsonrpc_request(
|
|
"reports", "getDownloadLinks", {"reportId": report_id}
|
|
)
|
|
|
|
# ======================================================================
|
|
# ACCOUNTS (module `/accounts`) - VERIFIED LIVE (read)
|
|
# ======================================================================
|
|
def list_accounts(self, page: int = 1, per_page: int = 100) -> dict:
|
|
"""List GravityZone console accounts/users (accounts.getAccountsList)."""
|
|
return self._jsonrpc_request(
|
|
"accounts", "getAccountsList", {"page": page, "perPage": per_page}
|
|
) or {}
|
|
|
|
def get_notifications_settings(self) -> dict:
|
|
"""Notification configuration (accounts.getNotificationsSettings)."""
|
|
return self._jsonrpc_request(
|
|
"accounts", "getNotificationsSettings", {}
|
|
) or {}
|
|
|
|
def get_account_details(self, account_id: Optional[str] = None) -> dict:
|
|
"""Account detail (accounts.getAccountDetails). With no id, returns the
|
|
API key owner's own account (verified live)."""
|
|
params: dict = {}
|
|
if account_id:
|
|
params["accountId"] = account_id
|
|
return self._jsonrpc_request("accounts", "getAccountDetails", params) or {}
|
|
|
|
def create_account(
|
|
self,
|
|
email: str,
|
|
password: Optional[str] = None,
|
|
username: Optional[str] = None,
|
|
role: Optional[int] = None,
|
|
profile: Optional[dict] = None,
|
|
rights: Optional[dict] = None,
|
|
phone_number: Optional[dict] = None,
|
|
extra: Optional[dict] = None,
|
|
) -> Any:
|
|
"""Create a console account (accounts.createAccount). STATE-CHANGING.
|
|
|
|
`email` is required (verified). Documented params: userName, password,
|
|
role (int), profile{fullName,language,timezone},
|
|
phoneNumber{countryCode,subscriberNumber}, rights{manageInventory,
|
|
managePoliciesRead/Write,...}. `extra` merges any additional documented
|
|
fields verbatim. Gate at the call site behind --confirm.
|
|
Docs: bitdefender.com/business/support/en/77212-125284-createaccount.html
|
|
"""
|
|
params: dict = {"email": email}
|
|
if username is not None:
|
|
params["userName"] = username
|
|
if password is not None:
|
|
params["password"] = password
|
|
if role is not None:
|
|
params["role"] = role
|
|
if profile is not None:
|
|
params["profile"] = profile
|
|
if rights is not None:
|
|
params["rights"] = rights
|
|
if phone_number is not None:
|
|
params["phoneNumber"] = phone_number
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request("accounts", "createAccount", params)
|
|
|
|
def update_account(self, account_id: str, fields: dict) -> Any:
|
|
"""Update a console account (accounts.updateAccount). STATE-CHANGING.
|
|
|
|
`accountId` is required (verified); `fields` are the documented
|
|
account attributes to change (same shape as create, minus email).
|
|
Gate at the call site behind --confirm.
|
|
"""
|
|
params: dict = {"accountId": account_id}
|
|
params.update(fields or {})
|
|
return self._jsonrpc_request("accounts", "updateAccount", params)
|
|
|
|
def delete_account(self, account_id: str) -> Any:
|
|
"""Delete a console account (accounts.deleteAccount). STATE-CHANGING.
|
|
`accountId` required (verified). Gate at the call site behind --confirm."""
|
|
return self._jsonrpc_request(
|
|
"accounts", "deleteAccount", {"accountId": account_id}
|
|
)
|
|
|
|
def configure_notifications_settings(self, settings: dict) -> Any:
|
|
"""Set notification settings (accounts.configureNotificationsSettings).
|
|
STATE-CHANGING - there are NO required params, so an empty payload is
|
|
accepted; the caller must pass the intended settings object. Gate at the
|
|
call site behind --confirm."""
|
|
return self._jsonrpc_request(
|
|
"accounts", "configureNotificationsSettings", settings or {}
|
|
)
|
|
|
|
# ======================================================================
|
|
# PUSH EVENT SERVICE (module `/push`)
|
|
# ----------------------------------------------------------------------
|
|
# `get`/`stats` are read (but error when the service was never configured -
|
|
# that is an EXPECTED state, not a failure; the CLI handles it cleanly).
|
|
# `set` is STATE-CHANGING (it configures where GravityZone POSTs security
|
|
# events) - gate behind --confirm at the call site.
|
|
# ======================================================================
|
|
def get_push_settings(self) -> dict:
|
|
"""Current push event service settings (push.getPushEventSettings)."""
|
|
return self._jsonrpc_request("push", "getPushEventSettings", {}) or {}
|
|
|
|
def get_push_stats(self) -> dict:
|
|
"""Push event service delivery stats (push.getPushEventStats)."""
|
|
return self._jsonrpc_request("push", "getPushEventStats", {}) or {}
|
|
|
|
def set_push_settings(
|
|
self,
|
|
status: int,
|
|
service_type: str = "jsonRPC",
|
|
url: Optional[str] = None,
|
|
require_valid_ssl: bool = True,
|
|
authorization: Optional[str] = None,
|
|
subscribe_event_types: Optional[dict] = None,
|
|
) -> Any:
|
|
"""Configure the GravityZone Push event service (push.setPushEventSettings).
|
|
|
|
`status` (1=on / 0=off) is REQUIRED (verified via validation probe
|
|
2026-06-21). When enabling, `serviceSettings.url` is the receiver
|
|
endpoint GravityZone POSTs events to. The nested shape
|
|
(serviceType/serviceSettings/subscribeToEventTypes) follows Bitdefender's
|
|
documented push API and is UNVERIFIED beyond `status` on this tenant -
|
|
confirm the first successful enable against the live response.
|
|
STATE-CHANGING - gate at the call site behind --confirm.
|
|
"""
|
|
params: dict = {"status": status, "serviceType": service_type}
|
|
service_settings: dict = {"requireValidSslCertificate": require_valid_ssl}
|
|
if url is not None:
|
|
service_settings["url"] = url
|
|
if authorization is not None:
|
|
service_settings["authorization"] = authorization
|
|
params["serviceSettings"] = service_settings
|
|
if subscribe_event_types is not None:
|
|
params["subscribeToEventTypes"] = subscribe_event_types
|
|
return self._jsonrpc_request("push", "setPushEventSettings", params)
|
|
|
|
def send_test_push_event(self, event_type: str, extra: Optional[dict] = None) -> Any:
|
|
"""Send a test push event (push.sendTestPushEvent). Requires `eventType`
|
|
(verified). Fires against the configured receiver - STATE-ADJACENT, gate
|
|
at the call site behind --confirm."""
|
|
params: dict = {"eventType": event_type}
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request("push", "sendTestPushEvent", params)
|
|
|
|
# ======================================================================
|
|
# PACKAGES (detail) - read
|
|
# ======================================================================
|
|
def get_package_details(self, package_id: str) -> dict:
|
|
"""Installation package detail (packages.getPackageDetails). `packageId`
|
|
required (verified)."""
|
|
return self._jsonrpc_request(
|
|
"packages", "getPackageDetails", {"packageId": package_id}
|
|
) or {}
|
|
|
|
# ======================================================================
|
|
# REPORTS (create / delete) - getReportsList + get_report_links above
|
|
# ======================================================================
|
|
def create_report(self, name: str, extra: Optional[dict] = None) -> Any:
|
|
"""Create a report (reports.createReport). `name` required (verified);
|
|
`type`, `targetIds`, recurrence/format etc. passed via `extra`.
|
|
STATE-CHANGING - gate at the call site behind --confirm."""
|
|
params: dict = {"name": name}
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request("reports", "createReport", params)
|
|
|
|
def delete_report(self, report_id: str) -> Any:
|
|
"""Delete a report (reports.deleteReport). `reportId` required (verified).
|
|
STATE-CHANGING - gate at the call site behind --confirm."""
|
|
return self._jsonrpc_request(
|
|
"reports", "deleteReport", {"reportId": report_id}
|
|
)
|
|
|
|
# ======================================================================
|
|
# QUARANTINE (remove / restore) - getQuarantineItemsList above
|
|
# ======================================================================
|
|
def remove_quarantine_items(
|
|
self, quarantine_item_ids: list[str], extra: Optional[dict] = None
|
|
) -> Any:
|
|
"""Delete quarantined items (quarantine/computers.createRemoveQuarantineItemTask).
|
|
`quarantineItemsIds` required (verified). STATE-CHANGING - gate behind --confirm."""
|
|
params: dict = {"quarantineItemsIds": quarantine_item_ids}
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request(
|
|
"quarantine/computers", "createRemoveQuarantineItemTask", params
|
|
)
|
|
|
|
def restore_quarantine_items(
|
|
self, quarantine_item_ids: list[str], extra: Optional[dict] = None
|
|
) -> Any:
|
|
"""Restore quarantined items (quarantine/computers.createRestoreQuarantineItemTask).
|
|
`quarantineItemsIds` required (verified). `addExclusionInPolicy` etc. via
|
|
`extra`. STATE-CHANGING - gate behind --confirm."""
|
|
params: dict = {"quarantineItemsIds": quarantine_item_ids}
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request(
|
|
"quarantine/computers", "createRestoreQuarantineItemTask", params
|
|
)
|
|
|
|
# ======================================================================
|
|
# INCIDENTS - custom rules + incident status/note (read + state-changing)
|
|
# ======================================================================
|
|
def list_custom_rules(self, page: int = 1, per_page: int = 100) -> dict:
|
|
"""List EDR custom rules (incidents.getCustomRulesList). VERIFIED LIVE."""
|
|
return self._jsonrpc_request(
|
|
"incidents", "getCustomRulesList", {"page": page, "perPage": per_page}
|
|
) or {}
|
|
|
|
def create_custom_rule(self, name: str, extra: Optional[dict] = None) -> Any:
|
|
"""Create an EDR custom rule (incidents.createCustomRule). `name` required
|
|
(verified); rule body (settings/companyId/tags) via `extra`.
|
|
STATE-CHANGING - gate behind --confirm."""
|
|
params: dict = {"name": name}
|
|
if extra:
|
|
params.update(extra)
|
|
return self._jsonrpc_request("incidents", "createCustomRule", params)
|
|
|
|
def delete_custom_rule(self, rule_id: str) -> Any:
|
|
"""Delete an EDR custom rule (incidents.deleteCustomRule). `ruleId` required
|
|
(verified). STATE-CHANGING - gate behind --confirm."""
|
|
return self._jsonrpc_request(
|
|
"incidents", "deleteCustomRule", {"ruleId": rule_id}
|
|
)
|
|
|
|
def change_incident_status(self, incident_type: str, fields: dict) -> Any:
|
|
"""Change an incident's status (incidents.changeIncidentStatus). `type`
|
|
required (verified) - the incident type/category - plus the incident id +
|
|
target status in `fields`. STATE-CHANGING - gate behind --confirm."""
|
|
params: dict = {"type": incident_type}
|
|
params.update(fields or {})
|
|
return self._jsonrpc_request("incidents", "changeIncidentStatus", params)
|
|
|
|
def update_incident_note(self, incident_type: str, fields: dict) -> Any:
|
|
"""Update an incident note (incidents.updateIncidentNote). `type` required
|
|
(verified) plus incident id + note text in `fields`. STATE-CHANGING."""
|
|
params: dict = {"type": incident_type}
|
|
params.update(fields or {})
|
|
return self._jsonrpc_request("incidents", "updateIncidentNote", params)
|
|
|
|
# ======================================================================
|
|
# LICENSING (usage) + INTEGRATIONS - read
|
|
# ======================================================================
|
|
def get_monthly_usage(self) -> dict:
|
|
"""Monthly license usage (licensing.getMonthlyUsage). VERIFIED LIVE."""
|
|
return self._jsonrpc_request("licensing", "getMonthlyUsage", {}) or {}
|
|
|
|
def get_configured_integrations(self, page: int = 1, per_page: int = 100) -> dict:
|
|
"""Configured third-party integrations (integrations.getConfiguredIntegrations).
|
|
VERIFIED LIVE."""
|
|
return self._jsonrpc_request(
|
|
"integrations", "getConfiguredIntegrations",
|
|
{"page": page, "perPage": per_page},
|
|
) or {}
|
|
|
|
# ======================================================================
|
|
# CACHE LAYER (identity / structure only - never volatile status)
|
|
# ======================================================================
|
|
def _read_cache(self) -> Optional[dict]:
|
|
if not CACHE_FILE.exists():
|
|
return None
|
|
try:
|
|
return json.loads(CACHE_FILE.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
def _write_cache(self, cache: dict) -> None:
|
|
"""Atomically replace the cache file (temp write + os.replace) so a crash
|
|
mid-write or a concurrent reader can never see a truncated file."""
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
payload = json.dumps(cache, indent=2, sort_keys=True)
|
|
fd, tmp = tempfile.mkstemp(dir=str(CACHE_DIR), prefix=".inventory.",
|
|
suffix=".tmp")
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
|
fh.write(payload)
|
|
fh.flush()
|
|
os.fsync(fh.fileno())
|
|
os.replace(tmp, CACHE_FILE) # atomic on the same filesystem
|
|
except BaseException:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
@contextmanager
|
|
def _cache_lock(self):
|
|
"""Best-effort cross-platform advisory lock around a read-modify-write of
|
|
the cache, so two concurrent gz.py invocations don't lose each other's
|
|
write-through update. Steals a stale lock; on timeout proceeds unlocked
|
|
(a lost update is tolerable, a hang is not)."""
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
deadline = time.monotonic() + CACHE_LOCK_TIMEOUT_SECONDS
|
|
acquired = False
|
|
while True:
|
|
try:
|
|
fd = os.open(str(CACHE_LOCK_FILE),
|
|
os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
|
os.close(fd)
|
|
acquired = True
|
|
break
|
|
except FileExistsError:
|
|
try:
|
|
age = time.time() - os.path.getmtime(CACHE_LOCK_FILE)
|
|
if age > CACHE_LOCK_STALE_SECONDS:
|
|
os.unlink(CACHE_LOCK_FILE)
|
|
continue
|
|
except OSError:
|
|
pass
|
|
if time.monotonic() >= deadline:
|
|
break # give up the lock, proceed unlocked
|
|
time.sleep(0.1)
|
|
try:
|
|
yield
|
|
finally:
|
|
if acquired:
|
|
try:
|
|
os.unlink(CACHE_LOCK_FILE)
|
|
except OSError:
|
|
pass
|
|
|
|
def _cache_is_fresh(self, cache: dict) -> bool:
|
|
fetched = cache.get("fetched_at")
|
|
ttl = cache.get("ttl_seconds", CACHE_TTL_SECONDS)
|
|
if not fetched:
|
|
return False
|
|
try:
|
|
ts = datetime.fromisoformat(fetched)
|
|
except ValueError:
|
|
return False
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
age = (datetime.now(timezone.utc) - ts).total_seconds()
|
|
return age < ttl
|
|
|
|
def refresh_inventory(self) -> dict:
|
|
"""Full identity/structure pull. Writes and returns the cache."""
|
|
companies_map: dict[str, str] = {}
|
|
endpoints_map: dict[str, dict] = {}
|
|
policies_map: dict[str, str] = {}
|
|
|
|
companies = self.list_all_companies()
|
|
for c in companies:
|
|
cid = c.get("id")
|
|
if cid:
|
|
companies_map[cid] = c.get("name", "")
|
|
|
|
# Endpoints per company (identity tier only - no status fields).
|
|
for cid in list(companies_map.keys()) + [ACG_ROOT_COMPANY_ID]:
|
|
page = 1
|
|
while True:
|
|
try:
|
|
data = self.list_endpoints(cid, page=page, per_page=100)
|
|
except GravityZoneError:
|
|
break
|
|
items = data.get("items", [])
|
|
if not items:
|
|
break
|
|
for ep in items:
|
|
eid = ep.get("id")
|
|
if not eid:
|
|
continue
|
|
endpoints_map[eid] = {
|
|
"name": ep.get("name", ""),
|
|
"company_id": ep.get("companyId", cid),
|
|
"fqdn": ep.get("fqdn") or ep.get("FQDN") or "",
|
|
}
|
|
# Last page = a short page; don't rely on `total` (may be omitted).
|
|
if len(items) < 100:
|
|
break
|
|
page += 1
|
|
if page > MAX_PAGINATION_PAGES:
|
|
print("[WARNING] refresh_inventory hit the "
|
|
f"{MAX_PAGINATION_PAGES}-page ceiling for company "
|
|
f"{cid}; inventory may be truncated.", file=sys.stderr)
|
|
break
|
|
|
|
try:
|
|
for p in self.list_policies(per_page=100).get("items", []):
|
|
pid = p.get("id")
|
|
if pid:
|
|
policies_map[pid] = p.get("name", "")
|
|
except GravityZoneError:
|
|
pass
|
|
|
|
packages: list = []
|
|
try:
|
|
packages = self.list_packages(per_page=100).get("items", [])
|
|
except GravityZoneError:
|
|
pass
|
|
|
|
cache = {
|
|
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
|
"ttl_seconds": CACHE_TTL_SECONDS,
|
|
"companies": companies_map,
|
|
"endpoints": endpoints_map,
|
|
"policies": policies_map,
|
|
"packages": packages,
|
|
}
|
|
self._write_cache(cache)
|
|
return cache
|
|
|
|
def get_inventory(self, refresh: bool = False) -> dict:
|
|
"""Return cached identity/structure, refreshing if stale or forced."""
|
|
if not refresh:
|
|
cache = self._read_cache()
|
|
if cache and self._cache_is_fresh(cache):
|
|
return cache
|
|
return self.refresh_inventory()
|
|
|
|
def _cache_add_group(self, group_id: str, name: str) -> None:
|
|
# Best-effort: the cache is only a hint, so a write failure must NEVER
|
|
# turn a successful API mutation (createCustomGroup) into a reported error.
|
|
try:
|
|
with self._cache_lock():
|
|
cache = self._read_cache()
|
|
if cache is None:
|
|
return # no cache yet - next refresh picks it up
|
|
# Groups live in the inventory tree; store under a 'groups' map.
|
|
cache.setdefault("groups", {})[group_id] = name
|
|
self._write_cache(cache)
|
|
except Exception:
|
|
pass
|
|
|
|
def _cache_add_package(self, package_name: str, create_result: Any) -> None:
|
|
# Best-effort (see _cache_add_group): never let a cache failure mask a
|
|
# successful createPackage.
|
|
try:
|
|
with self._cache_lock():
|
|
cache = self._read_cache()
|
|
if cache is None:
|
|
return
|
|
packages = cache.setdefault("packages", [])
|
|
pkg_id = create_result if isinstance(create_result, str) else None
|
|
if isinstance(create_result, dict):
|
|
pkg_id = create_result.get("id")
|
|
if not any(
|
|
(isinstance(p, dict) and p.get("name") == package_name)
|
|
for p in packages
|
|
):
|
|
packages.append({"id": pkg_id, "name": package_name})
|
|
self._write_cache(cache)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main() -> int:
|
|
"""Minimal self-check: load key (no network call)."""
|
|
try:
|
|
client = GravityZoneClient()
|
|
_ = client.api_key # triggers vault load
|
|
print("[OK] API key loaded; transport =",
|
|
"httpx" if _HAS_HTTPX else "urllib")
|
|
return 0
|
|
except GravityZoneError as exc:
|
|
print(f"[ERROR] {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|