Files
claudetools/.claude/skills/bitdefender/scripts/gz_client.py
Howard Enos 8f17c17258 fix(bitdefender): errorlog rule-compliance + moveCustomGroup param + ASCII-clean code
Finalizing the skill to "done, no errors, all skill rules":
- errorlog compliance: gz.py no longer logs EXPECTED API responses (validation,
  method-not-found, not-configured, rate-limit, expected state) or `raw`/selftest
  runs to errorlog.md. Per CLAUDE.md "do not log expected/handled conditions".
  Verified: selftest + probes leave errorlog unchanged.
- moveCustomGroup: param is `parentId`, not `newParentId` (6th doc-vs-live fix
  caught by a full param-shape audit).
- ASCII-clean code: removed all non-ASCII (em-dashes, U+21D2 arrow) from scripts
  (avoids cp1252 encode errors; aligns with the ASCII-markers rule).
- api-reference updated.

Verified: 18/18 read commands rc=0 live; selftest 75/75; parser builds; ASCII
markers + vault load + errorlog helper present.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-21 17:06:46 -07:00

1170 lines
48 KiB
Python

#!/usr/bin/env python3
"""GravityZone Cloud Public API client for the bitdefender skill.
Standalone (does not import the api/ service). Reuses the proven JSON-RPC
shape, HTTP Basic auth (api_key as username, empty password), and the ACG
hardcoded tenant IDs.
Transport: prefers httpx if installed, else falls back to stdlib urllib so the
script has no hard third-party dependency.
Credentials: never hardcoded. Loaded at runtime from the SOPS vault, or from
the GRAVITYZONE_API_KEY env var (testing override).
Cache: only the IDENTITY/STRUCTURE tier is cached (company/endpoint/policy
id<->name maps, package list). Volatile status (infected, lastSeen, online,
signature freshness) is NEVER cached and always pulled live.
"""
from __future__ import annotations
import base64
import json
import os
import subprocess
import sys
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
# --- optional httpx -----------------------------------------------------------
# urllib (stdlib) is always imported as the fallback transport; httpx is used
# when present for connection pooling/timeouts.
try:
import httpx # type: ignore
_HAS_HTTPX = True
except ImportError: # pragma: no cover - depends on environment
_HAS_HTTPX = False
# Cap upstream error bodies surfaced in exceptions. The GravityZone key never
# appears here, but `raw` can call arbitrary methods whose responses may reflect
# other data - bound the blast radius rather than echo full bodies into logs.
ERROR_BODY_MAX_CHARS = 500
# --- constants ----------------------------------------------------------------
GRAVITYZONE_API_BASE_URL = os.environ.get(
"GRAVITYZONE_API_BASE_URL",
"https://cloud.gravityzone.bitdefender.com/api/v1.0/jsonrpc",
)
GRAVITYZONE_TIMEOUT_SECONDS = 60.0
GRAVITYZONE_CONNECT_TIMEOUT_SECONDS = 10.0
ACG_ROOT_COMPANY_ID = "5c4280716c0318f3478b456a"
ACG_COMPANIES_CONTAINER_ID = "5c4280716c0318f3478b456e"
VAULT_ENTRY = "msp-tools/gravityzone.sops.yaml"
VAULT_FIELD = "credentials.api_key"
CACHE_TTL_SECONDS = 86400
SKILL_DIR = Path(__file__).resolve().parent.parent
CACHE_DIR = SKILL_DIR / ".cache"
CACHE_FILE = CACHE_DIR / "inventory.json"
class GravityZoneError(RuntimeError):
"""Raised for transport or JSON-RPC errors."""
@dataclass
class GZEndpointSummary:
endpoint_id: str
name: str
company_id: str
infected: bool
# True = a malware detection is currently active on the endpoint (tracks with infected); NOT an "engine on" indicator.
threat_detected: bool
signature_outdated: bool
product_outdated: bool
last_seen: Optional[str]
agent_version: Optional[str]
state: int
# --- credential loading -------------------------------------------------------
def _resolve_claudetools_root() -> Path:
"""Resolve the ClaudeTools repo root: env var, then identity.json, then derived path.
Final fallback is derived from this file's location
(.claude/skills/bitdefender/scripts -> repo root) so it works on the
Mac/Linux fleet machines, not only the Windows default.
"""
# SKILL_DIR = .../.claude/skills/bitdefender ; root is three levels up.
derived_root = SKILL_DIR.parent.parent.parent
env_root = os.environ.get("CLAUDETOOLS_ROOT")
if env_root:
return Path(env_root)
identity_path = derived_root / ".claude" / "identity.json"
if identity_path.exists():
try:
data = json.loads(identity_path.read_text(encoding="utf-8"))
root = data.get("claudetools_root")
if root:
return Path(root)
except (json.JSONDecodeError, OSError):
pass
return derived_root
def load_api_key() -> str:
"""Load the GravityZone API key.
Order: GRAVITYZONE_API_KEY env override, then the SOPS vault wrapper.
Never returns an empty key - raises if it cannot resolve one.
"""
env_key = os.environ.get("GRAVITYZONE_API_KEY")
if env_key:
return env_key.strip()
root = _resolve_claudetools_root()
vault_script = root / ".claude" / "scripts" / "vault.sh"
if not vault_script.exists():
raise GravityZoneError(
f"Cannot load API key: vault wrapper not found at {vault_script} "
"and GRAVITYZONE_API_KEY is not set."
)
try:
completed = subprocess.run(
["bash", str(vault_script), "get-field", VAULT_ENTRY, VAULT_FIELD],
capture_output=True,
text=True,
timeout=60,
)
except FileNotFoundError as exc:
raise GravityZoneError(
"Cannot load API key: 'bash' not found on PATH. Install Git Bash or "
"set GRAVITYZONE_API_KEY."
) from exc
except subprocess.TimeoutExpired as exc:
raise GravityZoneError("Cannot load API key: vault call timed out.") from exc
if completed.returncode != 0:
raise GravityZoneError(
"Cannot load API key from vault "
f"(exit {completed.returncode}): {completed.stderr.strip()}"
)
key = completed.stdout.strip()
if not key:
raise GravityZoneError("Vault returned an empty API key.")
return key
# --- client -------------------------------------------------------------------
class GravityZoneClient:
def __init__(
self,
api_key: Optional[str] = None,
api_base_url: str = GRAVITYZONE_API_BASE_URL,
timeout: float = GRAVITYZONE_TIMEOUT_SECONDS,
connect_timeout: float = GRAVITYZONE_CONNECT_TIMEOUT_SECONDS,
):
self.api_base_url = api_base_url.rstrip("/")
self._api_key = api_key # lazily loaded if None
self.timeout = timeout
self.connect_timeout = connect_timeout
@property
def api_key(self) -> str:
if not self._api_key:
self._api_key = load_api_key()
return self._api_key
# -- core transport --------------------------------------------------------
def _jsonrpc_request(self, module: str, method: str, params: dict) -> Any:
"""Make one JSON-RPC call. Returns body['result'] or raises GravityZoneError."""
url = f"{self.api_base_url}/{module}"
payload = {"id": "1", "jsonrpc": "2.0", "method": method, "params": params}
body = self._post(url, payload)
if isinstance(body, dict) and "error" in body and body["error"] is not None:
err = body["error"]
detail = None
if isinstance(err, dict):
detail = (err.get("data") or {}).get("details") or err.get("message")
raise GravityZoneError(
f"GravityZone API error [{module}.{method}]: {detail or err}"
)
if isinstance(body, dict):
return body.get("result")
return body
def _post(self, url: str, payload: dict) -> Any:
data = json.dumps(payload).encode("utf-8")
if _HAS_HTTPX:
try:
timeout = httpx.Timeout(self.timeout, connect=self.connect_timeout)
with httpx.Client(timeout=timeout) as client:
resp = client.post(url, content=data, auth=(self.api_key, ""),
headers={"Content-Type": "application/json"})
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException as exc:
raise GravityZoneError(f"GravityZone request timed out: {exc}") from exc
except httpx.HTTPStatusError as exc:
detail = (exc.response.text or "")[:ERROR_BODY_MAX_CHARS]
raise GravityZoneError(
f"GravityZone HTTP {exc.response.status_code}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise GravityZoneError(f"GravityZone request failed: {exc}") from exc
# stdlib fallback
token = base64.b64encode(f"{self.api_key}:".encode("utf-8")).decode("ascii")
req = urllib.request.Request(
url,
data=data,
method="POST",
headers={
"Content-Type": "application/json",
"Authorization": f"Basic {token}",
},
)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
raw = resp.read()
return json.loads(raw.decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")[:ERROR_BODY_MAX_CHARS]
raise GravityZoneError(f"GravityZone HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise GravityZoneError(f"GravityZone request failed: {exc}") from exc
# ======================================================================
# READ METHODS (always live)
# ======================================================================
def get_api_status(self) -> dict:
api_key_details = self._jsonrpc_request("general", "getApiKeyDetails", {}) or {}
# Nest the two responses to avoid silent key collisions on the merge.
status: dict = {"apiKey": api_key_details}
try:
status["license"] = (
self._jsonrpc_request("licensing", "getLicenseInfo", {}) or {}
)
except GravityZoneError as exc:
status["_licenseWarning"] = str(exc)
return status
def get_own_company(self) -> dict:
return self._jsonrpc_request("companies", "getCompanyDetails", {}) or {}
def get_company_details(self, company_id: Optional[str] = None) -> dict:
"""Company detail (companies.getCompanyDetails). No id -> own company."""
params: dict = {}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("companies", "getCompanyDetails", params) or {}
def get_company_by_user(self, username: str) -> dict:
"""Company that owns a given user (companies.getCompanyDetailsByUser)."""
return self._jsonrpc_request(
"companies", "getCompanyDetailsByUser", {"username": username}
) or {}
def create_company(
self,
company_type: int,
name: str,
parent_id: Optional[str] = None,
extra: Optional[dict] = None,
) -> Any:
"""Create a company (companies.createCompany). STATE-CHANGING.
Required (verified): `type` (0=Partner, 1=Customer) and `name`. Documented
optional params: parentId, address, country, state, phone, industry,
canBeManagedByAbove, assignedProductType, and a nested `licenseSubscription`
{type (3=monthly subscription, 4=monthly trial), reservedSlots,
endSubscription, autoRenewPeriod, ...}. Pass those via `extra`. Gate at
the call site behind --confirm.
Docs: bitdefender.com/business/support/en/77211-126236-createcompany.html
"""
params: dict = {"type": company_type, "name": name}
if parent_id is not None:
params["parentId"] = parent_id
if extra:
params.update(extra)
return self._jsonrpc_request("companies", "createCompany", params)
def suspend_company(self, company_id: str) -> Any:
"""Suspend a company (companies.suspendCompany). STATE-CHANGING. Gated."""
return self._jsonrpc_request(
"companies", "suspendCompany", {"companyId": company_id}
)
def activate_company(self, company_id: str) -> Any:
"""Activate a suspended company (companies.activateCompany). STATE-CHANGING."""
return self._jsonrpc_request(
"companies", "activateCompany", {"companyId": company_id}
)
def delete_company(self, company_id: str) -> Any:
"""Delete a company (companies.deleteCompany). STATE-CHANGING. Gated."""
return self._jsonrpc_request(
"companies", "deleteCompany", {"companyId": company_id}
)
def list_companies(self, page: int = 1, per_page: int = 100) -> dict:
result = self._jsonrpc_request(
"network",
"getNetworkInventoryItems",
{
"parentId": ACG_COMPANIES_CONTAINER_ID,
"page": page,
"perPage": per_page,
},
) or {}
items = [i for i in result.get("items", []) if i.get("type") == 1]
return {"total": len(items), "items": items}
def list_endpoints(
self, parent_id: Optional[str] = None, page: int = 1, per_page: int = 100
) -> dict:
params: dict = {"page": page, "perPage": per_page}
if parent_id:
params["parentId"] = parent_id
return self._jsonrpc_request("network", "getEndpointsList", params) or {}
def get_endpoint_details(self, endpoint_id: str) -> dict:
return self._jsonrpc_request(
"network", "getManagedEndpointDetails", {"endpointId": endpoint_id}
) or {}
def list_policies(self, page: int = 1, per_page: int = 100) -> dict:
return self._jsonrpc_request(
"policies", "getPoliciesList", {"page": page, "perPage": per_page}
) or {}
def get_policy_details(self, policy_id: str) -> dict:
return self._jsonrpc_request(
"policies", "getPolicyDetails", {"policyId": policy_id}
) or {}
def list_packages(self, page: int = 1, per_page: int = 100) -> dict:
return self._jsonrpc_request(
"packages", "getPackagesList", {"page": page, "perPage": per_page}
) or {}
def list_quarantine(
self, company_id: str, page: int = 1, per_page: int = 100
) -> dict:
# Service-scoped path 'quarantine/computers' (bare 'quarantine' 404s);
# the param is 'companyId', NOT 'parentId'.
return self._jsonrpc_request(
"quarantine/computers",
"getQuarantineItemsList",
{"companyId": company_id, "page": page, "perPage": per_page},
) or {}
def security_sweep(self, parent_id: str) -> list[GZEndpointSummary]:
"""Live security posture sweep of all endpoints under a parent."""
summaries: list[GZEndpointSummary] = []
page = 1
per_page = 100
while True:
data = self.list_endpoints(parent_id, page=page, per_page=per_page)
items = data.get("items", [])
if not items:
break
for item in items:
endpoint_id = item.get("id", "")
if not endpoint_id:
continue
try:
detail = self.get_endpoint_details(endpoint_id)
except GravityZoneError:
continue
malware = detail.get("malwareStatus", {}) or {}
agent = detail.get("agent", {}) or {}
summaries.append(
GZEndpointSummary(
endpoint_id=endpoint_id,
name=detail.get("name") or item.get("name", ""),
company_id=detail.get("companyId") or item.get("companyId", ""),
infected=bool(malware.get("infected", False)),
threat_detected=bool(malware.get("detection", False)),
signature_outdated=bool(agent.get("signatureOutdated", False)),
product_outdated=bool(agent.get("productOutdated", False)),
last_seen=detail.get("lastSeen"),
agent_version=agent.get("productVersion"),
state=detail.get("state", 0),
)
)
total = data.get("total", 0)
if page * per_page >= total:
break
page += 1
summaries.sort(
key=lambda s: (
not s.infected,
not s.signature_outdated,
not s.product_outdated,
s.name.lower(),
)
)
return summaries
def security_sweep_all_clients(self) -> list[GZEndpointSummary]:
"""Sweep every client company. The companies container is NOT a valid
endpoint parent, so iterate each company and sweep it individually."""
summaries: list[GZEndpointSummary] = []
companies = self.list_companies(per_page=100).get("items", [])
for company in companies:
cid = company.get("id")
if not cid:
continue
try:
company_summaries = self.security_sweep(cid)
except GravityZoneError:
continue
for s in company_summaries:
if not s.company_id:
s.company_id = cid
summaries.extend(company_summaries)
summaries.sort(
key=lambda s: (
not s.infected,
not s.signature_outdated,
not s.product_outdated,
s.name.lower(),
)
)
return summaries
# ======================================================================
# MANAGEMENT METHODS (verified only)
# ======================================================================
def create_package(
self,
package_name: str,
company_id: Optional[str] = None,
description: Optional[str] = None,
language: Optional[str] = None,
modules: Optional[dict] = None,
scan_mode: Optional[dict] = None,
settings: Optional[dict] = None,
roles: Optional[dict] = None,
deployment_options: Optional[dict] = None,
) -> Any:
params: dict = {"packageName": package_name}
if company_id:
params["companyId"] = company_id
if description is not None:
params["description"] = description
if language is not None:
params["language"] = language
if modules is not None:
params["modules"] = modules
if scan_mode is not None:
params["scanMode"] = scan_mode
if settings is not None:
params["settings"] = settings
if roles is not None:
params["roles"] = roles
if deployment_options is not None:
params["deploymentOptions"] = deployment_options
result = self._jsonrpc_request("packages", "createPackage", params)
# Write-through: refresh package list in cache.
self._cache_add_package(package_name, result)
return result
def get_installation_links(
self, package_name: str, company_id: Optional[str] = None
) -> Any:
params: dict = {"packageName": package_name}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("packages", "getInstallationLinks", params)
def delete_package(self, package_id: str) -> Any:
"""Delete an installation package (packages.deletePackage).
VERIFIED LIVE 2026-06-21: the param is `packageId` (NOT packageName/
companyId - those error "not expected"). STATE-CHANGING - gate at call site."""
return self._jsonrpc_request(
"packages", "deletePackage", {"packageId": package_id}
)
def create_scan_task(
self,
target_ids: list[str],
scan_type: int,
name: Optional[str] = None,
custom_scan_settings: Optional[dict] = None,
) -> Any:
params: dict = {"targetIds": target_ids, "type": scan_type}
if name is not None:
params["name"] = name
if custom_scan_settings is not None:
params["customScanSettings"] = custom_scan_settings
return self._jsonrpc_request("network", "createScanTask", params)
def move_endpoints(self, endpoint_ids: list[str], group_id: str) -> Any:
return self._jsonrpc_request(
"network",
"moveEndpoints",
{"endpointIds": endpoint_ids, "groupId": group_id},
)
def delete_endpoint(self, endpoint_id: str) -> Any:
return self._jsonrpc_request(
"network", "deleteEndpoint", {"endpointId": endpoint_id}
)
def create_custom_group(self, name: str, parent_id: Optional[str] = None) -> Any:
# API param is `groupName` (verified live 2026-06-21), NOT `name`.
params: dict = {"groupName": name}
if parent_id:
params["parentId"] = parent_id
result = self._jsonrpc_request("network", "createCustomGroup", params)
# Write-through: createCustomGroup returns the new group id (string).
group_id = result if isinstance(result, str) else None
if isinstance(result, dict):
group_id = result.get("id") or result.get("groupId")
if group_id:
self._cache_add_group(group_id, name)
return result
def delete_custom_group(self, group_id: str) -> Any:
return self._jsonrpc_request(
"network", "deleteCustomGroup", {"groupId": group_id}
)
def move_custom_group(self, group_id: str, new_parent_id: str) -> Any:
# API param is `parentId` (verified live 2026-06-21), NOT `newParentId`.
return self._jsonrpc_request(
"network",
"moveCustomGroup",
{"groupId": group_id, "parentId": new_parent_id},
)
# ======================================================================
# INCIDENTS / EDR (module `/incidents`)
# ----------------------------------------------------------------------
# READ methods are always live and never cached (blocklist + incident
# status are volatile). State-changing methods (isolate, restore,
# add/remove blocklist) return the raw upstream result; the CLI caller is
# responsible for gating them behind --confirm.
# ======================================================================
# -- READ (safe) -----------------------------------------------------------
def list_blocklist(
self,
company_id: Optional[str] = None,
page: int = 1,
per_page: int = 100,
) -> dict:
"""List blocklisted hash items. VERIFIED LIVE (incidents.getBlocklistItems).
Returns {total, page, perPage, pagesCount, items:[{id, source,
sourceInfo, hashType, hash, companyId}]}. `companyId` scopes to one
company; omit for the whole partner tenant.
"""
params: dict = {"page": page, "perPage": per_page}
if company_id:
params["companyId"] = company_id
return self._jsonrpc_request("incidents", "getBlocklistItems", params) or {}
def list_incidents(
self,
parent_id: str,
page: int = 1,
per_page: int = 500,
filters: Optional[dict] = None,
) -> dict:
"""List incidents under a company/group.
`parent_id` (a company/group id) is REQUIRED for this method.
perPage default is 500: live testing surfaced an
"Invalid value for 'perPage' parameter. The value should be between 500
and 10000" error for smaller values, so 100 (the default elsewhere) is
rejected here.
NOTE (UNVERIFIED / possibly unavailable on this tenant): live re-testing
on 2026-05-30 returned "Method not found" for `getIncidentsList` on the
`/incidents` module, even though `getBlocklistItems` on the SAME module
succeeds in the same request (so this is not rate-limiting or a bad key).
The method may be gated by an EDR/incidents license feature that is OFF
on this tenant, or named differently in this API version. Treat the
return value as best-effort and confirm against the official Bitdefender
API reference / console before relying on incident data.
"""
params: dict = {"parentId": parent_id, "page": page, "perPage": per_page}
if filters is not None:
params["filters"] = filters
return self._jsonrpc_request("incidents", "getIncidentsList", params) or {}
# -- STATE-CHANGING (caller MUST gate behind --confirm) --------------------
def isolate_endpoints(self, endpoint_ids: list[str]) -> Any:
"""Isolate endpoints from the network (incidents.createIsolateEndpointTask).
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
task ids. STATE-CHANGING - gate behind --confirm at the call site.
"""
# VERIFIED LIVE 2026-06-21: the API takes a SINGLE `endpointId` per call
# (NOT an `endpointIds` array - that errors "not expected"). Loop for many.
results = []
for eid in endpoint_ids:
results.append(self._jsonrpc_request(
"incidents", "createIsolateEndpointTask", {"endpointId": eid}
))
return results if len(results) != 1 else results[0]
def restore_endpoints_from_isolation(self, endpoint_ids: list[str]) -> Any:
"""Un-isolate endpoints (incidents.createRestoreEndpointFromIsolationTask).
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
task ids. STATE-CHANGING - gate behind --confirm at the call site.
"""
# VERIFIED LIVE 2026-06-21: single `endpointId` per call (not an array).
# Note: fails if the isolation task is still in progress - wait + retry.
results = []
for eid in endpoint_ids:
results.append(self._jsonrpc_request(
"incidents",
"createRestoreEndpointFromIsolationTask",
{"endpointId": eid},
))
return results if len(results) != 1 else results[0]
def add_to_blocklist(
self,
company_id: str,
hash_list: list[str],
hash_type: int = 1,
source_info: str = "",
operating_systems: Optional[list[str]] = None,
) -> Any:
"""Add hashes to the blocklist (incidents.addToBlocklist).
`hash_type` is an int (1 is the common value seen live; see the
GravityZone console / API docs for the full mapping). `hash_list` is an
array of hash strings. `source_info` is a free-text description.
STATE-CHANGING - gate behind --confirm at the call site.
"""
params: dict = {
"companyId": company_id,
"hashType": hash_type,
"hashList": hash_list,
"sourceInfo": source_info,
}
if operating_systems is not None:
params["operatingSystems"] = operating_systems
return self._jsonrpc_request("incidents", "addToBlocklist", params)
def remove_from_blocklist(self, hash_item_id: str) -> Any:
"""Remove one blocklist entry (incidents.removeFromBlocklist).
STATE-CHANGING - gate behind --confirm at the call site.
UNVERIFIED: the param name `hashItemId` is the candidate (the `id`
field from getBlocklistItems). Confirm against the official Bitdefender
API reference before relying on it.
"""
return self._jsonrpc_request(
"incidents", "removeFromBlocklist", {"hashItemId": hash_item_id}
)
# ======================================================================
# POLICY ASSIGNMENT (state-changing; gate behind --confirm at call site)
# ----------------------------------------------------------------------
# NOTE: getPolicyDetails (above) returns the FULL granular module config
# (verified live 2026-06-21 - the earlier "shallow only" claim was wrong).
# The Public API still has NO create/edit/clone policy method - authoring
# stays in the console - but assigning an EXISTING policy is supported here.
# ======================================================================
def assign_policy(
self,
policy_id: str,
target_ids: list[str],
force_inheritance: bool = False,
) -> Any:
"""Assign an existing policy to endpoints/groups (network.assignPolicy).
VERIFIED LIVE 2026-06-21: assigning a `policyId` REQUIRES sending
`inheritFromAbove=false` in the same call. An inheriting target defaults
inheritFromAbove to true, so omitting it makes the API reject the call
with a misleading "inheritFromAbove should not be used with policyId"
error. `forcePolicyInheritance` optionally pushes the policy down to
sub-items. (To make a target INHERIT instead, call with
{targetIds, inheritFromAbove:true} and no policyId via `raw`.)
STATE-CHANGING - gate at the call site behind --confirm.
Docs: bitdefender.com/business/support/en/77212-924802-assignpolicy.html
"""
params: dict = {
"policyId": policy_id,
"targetIds": target_ids,
"inheritFromAbove": False,
}
if force_inheritance:
params["forcePolicyInheritance"] = True
return self._jsonrpc_request("network", "assignPolicy", params)
def list_scan_tasks(
self,
page: int = 1,
per_page: int = 100,
name: Optional[str] = None,
status: Optional[int] = None,
) -> dict:
"""List scan tasks (network.getScanTasksList). VERIFIED LIVE."""
params: dict = {"page": page, "perPage": per_page}
if name is not None:
params["name"] = name
if status is not None:
params["status"] = status
return self._jsonrpc_request("network", "getScanTasksList", params) or {}
def get_endpoint_tags(self) -> Any:
"""List endpoint tags defined in the tenant (network.getEndpointTags). Read."""
return self._jsonrpc_request("network", "getEndpointTags", {})
def set_endpoint_label(self, endpoint_id: str, label: str) -> Any:
"""Set an endpoint's label (network.setEndpointLabel). STATE-CHANGING.
Requires `endpointId` and `label` (both verified). Gate at the call site."""
return self._jsonrpc_request(
"network", "setEndpointLabel",
{"endpointId": endpoint_id, "label": label},
)
def reconfigure_client(
self, target_ids: list[str], extra: Optional[dict] = None
) -> Any:
"""Reconfigure installed agents (network.createReconfigureClientTask).
STATE-CHANGING. Requires `targetIds` (verified); the reconfigure body
(which modules/roles/scanMode to change) is documented and passed via
`extra`. Gate at the call site behind --confirm.
"""
params: dict = {"targetIds": target_ids}
if extra:
params.update(extra)
return self._jsonrpc_request(
"network", "createReconfigureClientTask", params
)
# ======================================================================
# REPORTS (module `/reports`) - VERIFIED LIVE
# ======================================================================
def list_reports(self, page: int = 1, per_page: int = 100) -> dict:
"""List saved reports (reports.getReportsList)."""
return self._jsonrpc_request(
"reports", "getReportsList", {"page": page, "perPage": per_page}
) or {}
def get_report_links(self, report_id: str) -> Any:
"""Get download links for a generated report (reports.getDownloadLinks).
Param name `reportId` is the candidate - confirm against the console if
it errors.
"""
return self._jsonrpc_request(
"reports", "getDownloadLinks", {"reportId": report_id}
)
# ======================================================================
# ACCOUNTS (module `/accounts`) - VERIFIED LIVE (read)
# ======================================================================
def list_accounts(self, page: int = 1, per_page: int = 100) -> dict:
"""List GravityZone console accounts/users (accounts.getAccountsList)."""
return self._jsonrpc_request(
"accounts", "getAccountsList", {"page": page, "perPage": per_page}
) or {}
def get_notifications_settings(self) -> dict:
"""Notification configuration (accounts.getNotificationsSettings)."""
return self._jsonrpc_request(
"accounts", "getNotificationsSettings", {}
) or {}
def get_account_details(self, account_id: Optional[str] = None) -> dict:
"""Account detail (accounts.getAccountDetails). With no id, returns the
API key owner's own account (verified live)."""
params: dict = {}
if account_id:
params["accountId"] = account_id
return self._jsonrpc_request("accounts", "getAccountDetails", params) or {}
def create_account(
self,
email: str,
password: Optional[str] = None,
username: Optional[str] = None,
role: Optional[int] = None,
profile: Optional[dict] = None,
rights: Optional[dict] = None,
phone_number: Optional[dict] = None,
extra: Optional[dict] = None,
) -> Any:
"""Create a console account (accounts.createAccount). STATE-CHANGING.
`email` is required (verified). Documented params: userName, password,
role (int), profile{fullName,language,timezone},
phoneNumber{countryCode,subscriberNumber}, rights{manageInventory,
managePoliciesRead/Write,...}. `extra` merges any additional documented
fields verbatim. Gate at the call site behind --confirm.
Docs: bitdefender.com/business/support/en/77212-125284-createaccount.html
"""
params: dict = {"email": email}
if username is not None:
params["userName"] = username
if password is not None:
params["password"] = password
if role is not None:
params["role"] = role
if profile is not None:
params["profile"] = profile
if rights is not None:
params["rights"] = rights
if phone_number is not None:
params["phoneNumber"] = phone_number
if extra:
params.update(extra)
return self._jsonrpc_request("accounts", "createAccount", params)
def update_account(self, account_id: str, fields: dict) -> Any:
"""Update a console account (accounts.updateAccount). STATE-CHANGING.
`accountId` is required (verified); `fields` are the documented
account attributes to change (same shape as create, minus email).
Gate at the call site behind --confirm.
"""
params: dict = {"accountId": account_id}
params.update(fields or {})
return self._jsonrpc_request("accounts", "updateAccount", params)
def delete_account(self, account_id: str) -> Any:
"""Delete a console account (accounts.deleteAccount). STATE-CHANGING.
`accountId` required (verified). Gate at the call site behind --confirm."""
return self._jsonrpc_request(
"accounts", "deleteAccount", {"accountId": account_id}
)
def configure_notifications_settings(self, settings: dict) -> Any:
"""Set notification settings (accounts.configureNotificationsSettings).
STATE-CHANGING - there are NO required params, so an empty payload is
accepted; the caller must pass the intended settings object. Gate at the
call site behind --confirm."""
return self._jsonrpc_request(
"accounts", "configureNotificationsSettings", settings or {}
)
# ======================================================================
# PUSH EVENT SERVICE (module `/push`)
# ----------------------------------------------------------------------
# `get`/`stats` are read (but error when the service was never configured -
# that is an EXPECTED state, not a failure; the CLI handles it cleanly).
# `set` is STATE-CHANGING (it configures where GravityZone POSTs security
# events) - gate behind --confirm at the call site.
# ======================================================================
def get_push_settings(self) -> dict:
"""Current push event service settings (push.getPushEventSettings)."""
return self._jsonrpc_request("push", "getPushEventSettings", {}) or {}
def get_push_stats(self) -> dict:
"""Push event service delivery stats (push.getPushEventStats)."""
return self._jsonrpc_request("push", "getPushEventStats", {}) or {}
def set_push_settings(
self,
status: int,
service_type: str = "jsonRPC",
url: Optional[str] = None,
require_valid_ssl: bool = True,
authorization: Optional[str] = None,
subscribe_event_types: Optional[dict] = None,
) -> Any:
"""Configure the GravityZone Push event service (push.setPushEventSettings).
`status` (1=on / 0=off) is REQUIRED (verified via validation probe
2026-06-21). When enabling, `serviceSettings.url` is the receiver
endpoint GravityZone POSTs events to. The nested shape
(serviceType/serviceSettings/subscribeToEventTypes) follows Bitdefender's
documented push API and is UNVERIFIED beyond `status` on this tenant -
confirm the first successful enable against the live response.
STATE-CHANGING - gate at the call site behind --confirm.
"""
params: dict = {"status": status, "serviceType": service_type}
service_settings: dict = {"requireValidSslCertificate": require_valid_ssl}
if url is not None:
service_settings["url"] = url
if authorization is not None:
service_settings["authorization"] = authorization
params["serviceSettings"] = service_settings
if subscribe_event_types is not None:
params["subscribeToEventTypes"] = subscribe_event_types
return self._jsonrpc_request("push", "setPushEventSettings", params)
def send_test_push_event(self, event_type: str, extra: Optional[dict] = None) -> Any:
"""Send a test push event (push.sendTestPushEvent). Requires `eventType`
(verified). Fires against the configured receiver - STATE-ADJACENT, gate
at the call site behind --confirm."""
params: dict = {"eventType": event_type}
if extra:
params.update(extra)
return self._jsonrpc_request("push", "sendTestPushEvent", params)
# ======================================================================
# PACKAGES (detail) - read
# ======================================================================
def get_package_details(self, package_id: str) -> dict:
"""Installation package detail (packages.getPackageDetails). `packageId`
required (verified)."""
return self._jsonrpc_request(
"packages", "getPackageDetails", {"packageId": package_id}
) or {}
# ======================================================================
# REPORTS (create / delete) - getReportsList + get_report_links above
# ======================================================================
def create_report(self, name: str, extra: Optional[dict] = None) -> Any:
"""Create a report (reports.createReport). `name` required (verified);
`type`, `targetIds`, recurrence/format etc. passed via `extra`.
STATE-CHANGING - gate at the call site behind --confirm."""
params: dict = {"name": name}
if extra:
params.update(extra)
return self._jsonrpc_request("reports", "createReport", params)
def delete_report(self, report_id: str) -> Any:
"""Delete a report (reports.deleteReport). `reportId` required (verified).
STATE-CHANGING - gate at the call site behind --confirm."""
return self._jsonrpc_request(
"reports", "deleteReport", {"reportId": report_id}
)
# ======================================================================
# QUARANTINE (remove / restore) - getQuarantineItemsList above
# ======================================================================
def remove_quarantine_items(
self, quarantine_item_ids: list[str], extra: Optional[dict] = None
) -> Any:
"""Delete quarantined items (quarantine/computers.createRemoveQuarantineItemTask).
`quarantineItemsIds` required (verified). STATE-CHANGING - gate behind --confirm."""
params: dict = {"quarantineItemsIds": quarantine_item_ids}
if extra:
params.update(extra)
return self._jsonrpc_request(
"quarantine/computers", "createRemoveQuarantineItemTask", params
)
def restore_quarantine_items(
self, quarantine_item_ids: list[str], extra: Optional[dict] = None
) -> Any:
"""Restore quarantined items (quarantine/computers.createRestoreQuarantineItemTask).
`quarantineItemsIds` required (verified). `addExclusionInPolicy` etc. via
`extra`. STATE-CHANGING - gate behind --confirm."""
params: dict = {"quarantineItemsIds": quarantine_item_ids}
if extra:
params.update(extra)
return self._jsonrpc_request(
"quarantine/computers", "createRestoreQuarantineItemTask", params
)
# ======================================================================
# INCIDENTS - custom rules + incident status/note (read + state-changing)
# ======================================================================
def list_custom_rules(self, page: int = 1, per_page: int = 100) -> dict:
"""List EDR custom rules (incidents.getCustomRulesList). VERIFIED LIVE."""
return self._jsonrpc_request(
"incidents", "getCustomRulesList", {"page": page, "perPage": per_page}
) or {}
def create_custom_rule(self, name: str, extra: Optional[dict] = None) -> Any:
"""Create an EDR custom rule (incidents.createCustomRule). `name` required
(verified); rule body (settings/companyId/tags) via `extra`.
STATE-CHANGING - gate behind --confirm."""
params: dict = {"name": name}
if extra:
params.update(extra)
return self._jsonrpc_request("incidents", "createCustomRule", params)
def delete_custom_rule(self, rule_id: str) -> Any:
"""Delete an EDR custom rule (incidents.deleteCustomRule). `ruleId` required
(verified). STATE-CHANGING - gate behind --confirm."""
return self._jsonrpc_request(
"incidents", "deleteCustomRule", {"ruleId": rule_id}
)
def change_incident_status(self, incident_type: str, fields: dict) -> Any:
"""Change an incident's status (incidents.changeIncidentStatus). `type`
required (verified) - the incident type/category - plus the incident id +
target status in `fields`. STATE-CHANGING - gate behind --confirm."""
params: dict = {"type": incident_type}
params.update(fields or {})
return self._jsonrpc_request("incidents", "changeIncidentStatus", params)
def update_incident_note(self, incident_type: str, fields: dict) -> Any:
"""Update an incident note (incidents.updateIncidentNote). `type` required
(verified) plus incident id + note text in `fields`. STATE-CHANGING."""
params: dict = {"type": incident_type}
params.update(fields or {})
return self._jsonrpc_request("incidents", "updateIncidentNote", params)
# ======================================================================
# LICENSING (usage) + INTEGRATIONS - read
# ======================================================================
def get_monthly_usage(self) -> dict:
"""Monthly license usage (licensing.getMonthlyUsage). VERIFIED LIVE."""
return self._jsonrpc_request("licensing", "getMonthlyUsage", {}) or {}
def get_configured_integrations(self, page: int = 1, per_page: int = 100) -> dict:
"""Configured third-party integrations (integrations.getConfiguredIntegrations).
VERIFIED LIVE."""
return self._jsonrpc_request(
"integrations", "getConfiguredIntegrations",
{"page": page, "perPage": per_page},
) or {}
# ======================================================================
# CACHE LAYER (identity / structure only - never volatile status)
# ======================================================================
def _read_cache(self) -> Optional[dict]:
if not CACHE_FILE.exists():
return None
try:
return json.loads(CACHE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
def _write_cache(self, cache: dict) -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(
json.dumps(cache, indent=2, sort_keys=True), encoding="utf-8"
)
def _cache_is_fresh(self, cache: dict) -> bool:
fetched = cache.get("fetched_at")
ttl = cache.get("ttl_seconds", CACHE_TTL_SECONDS)
if not fetched:
return False
try:
ts = datetime.fromisoformat(fetched)
except ValueError:
return False
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - ts).total_seconds()
return age < ttl
def refresh_inventory(self) -> dict:
"""Full identity/structure pull. Writes and returns the cache."""
companies_map: dict[str, str] = {}
endpoints_map: dict[str, dict] = {}
policies_map: dict[str, str] = {}
companies = self.list_companies(per_page=100).get("items", [])
for c in companies:
cid = c.get("id")
if cid:
companies_map[cid] = c.get("name", "")
# Endpoints per company (identity tier only - no status fields).
for cid in list(companies_map.keys()) + [ACG_ROOT_COMPANY_ID]:
page = 1
while True:
try:
data = self.list_endpoints(cid, page=page, per_page=100)
except GravityZoneError:
break
items = data.get("items", [])
if not items:
break
for ep in items:
eid = ep.get("id")
if not eid:
continue
endpoints_map[eid] = {
"name": ep.get("name", ""),
"company_id": ep.get("companyId", cid),
"fqdn": ep.get("fqdn") or ep.get("FQDN") or "",
}
total = data.get("total", 0)
if page * 100 >= total:
break
page += 1
try:
for p in self.list_policies(per_page=100).get("items", []):
pid = p.get("id")
if pid:
policies_map[pid] = p.get("name", "")
except GravityZoneError:
pass
packages: list = []
try:
packages = self.list_packages(per_page=100).get("items", [])
except GravityZoneError:
pass
cache = {
"fetched_at": datetime.now(timezone.utc).isoformat(),
"ttl_seconds": CACHE_TTL_SECONDS,
"companies": companies_map,
"endpoints": endpoints_map,
"policies": policies_map,
"packages": packages,
}
self._write_cache(cache)
return cache
def get_inventory(self, refresh: bool = False) -> dict:
"""Return cached identity/structure, refreshing if stale or forced."""
if not refresh:
cache = self._read_cache()
if cache and self._cache_is_fresh(cache):
return cache
return self.refresh_inventory()
def _cache_add_group(self, group_id: str, name: str) -> None:
cache = self._read_cache()
if cache is None:
return # no cache yet - next refresh picks it up
cache.setdefault("companies", {})
# Groups live in the inventory tree; store under a 'groups' map.
cache.setdefault("groups", {})[group_id] = name
self._write_cache(cache)
def _cache_add_package(self, package_name: str, create_result: Any) -> None:
cache = self._read_cache()
if cache is None:
return
packages = cache.setdefault("packages", [])
pkg_id = create_result if isinstance(create_result, str) else None
if isinstance(create_result, dict):
pkg_id = create_result.get("id")
if not any(
(isinstance(p, dict) and p.get("name") == package_name) for p in packages
):
packages.append({"id": pkg_id, "name": package_name})
self._write_cache(cache)
def main() -> int:
"""Minimal self-check: load key (no network call)."""
try:
client = GravityZoneClient()
_ = client.api_key # triggers vault load
print("[OK] API key loaded; transport =",
"httpx" if _HAS_HTTPX else "urllib")
return 0
except GravityZoneError as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())