|
|
|
|
@@ -41,7 +41,7 @@ except ImportError: # pragma: no cover - depends on environment
|
|
|
|
|
|
|
|
|
|
# Cap upstream error bodies surfaced in exceptions. The GravityZone key never
|
|
|
|
|
# appears here, but `raw` can call arbitrary methods whose responses may reflect
|
|
|
|
|
# other data — bound the blast radius rather than echo full bodies into logs.
|
|
|
|
|
# other data - bound the blast radius rather than echo full bodies into logs.
|
|
|
|
|
ERROR_BODY_MAX_CHARS = 500
|
|
|
|
|
|
|
|
|
|
# --- constants ----------------------------------------------------------------
|
|
|
|
|
@@ -115,7 +115,7 @@ def load_api_key() -> str:
|
|
|
|
|
"""Load the GravityZone API key.
|
|
|
|
|
|
|
|
|
|
Order: GRAVITYZONE_API_KEY env override, then the SOPS vault wrapper.
|
|
|
|
|
Never returns an empty key — raises if it cannot resolve one.
|
|
|
|
|
Never returns an empty key - raises if it cannot resolve one.
|
|
|
|
|
"""
|
|
|
|
|
env_key = os.environ.get("GRAVITYZONE_API_KEY")
|
|
|
|
|
if env_key:
|
|
|
|
|
@@ -255,7 +255,7 @@ class GravityZoneClient:
|
|
|
|
|
return self._jsonrpc_request("companies", "getCompanyDetails", {}) or {}
|
|
|
|
|
|
|
|
|
|
def get_company_details(self, company_id: Optional[str] = None) -> dict:
|
|
|
|
|
"""Company detail (companies.getCompanyDetails). No id ⇒ own company."""
|
|
|
|
|
"""Company detail (companies.getCompanyDetails). No id -> own company."""
|
|
|
|
|
params: dict = {}
|
|
|
|
|
if company_id:
|
|
|
|
|
params["companyId"] = company_id
|
|
|
|
|
@@ -491,7 +491,7 @@ class GravityZoneClient:
|
|
|
|
|
def delete_package(self, package_id: str) -> Any:
|
|
|
|
|
"""Delete an installation package (packages.deletePackage).
|
|
|
|
|
VERIFIED LIVE 2026-06-21: the param is `packageId` (NOT packageName/
|
|
|
|
|
companyId — those error "not expected"). STATE-CHANGING — gate at call site."""
|
|
|
|
|
companyId - those error "not expected"). STATE-CHANGING - gate at call site."""
|
|
|
|
|
return self._jsonrpc_request(
|
|
|
|
|
"packages", "deletePackage", {"packageId": package_id}
|
|
|
|
|
)
|
|
|
|
|
@@ -542,10 +542,11 @@ class GravityZoneClient:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def move_custom_group(self, group_id: str, new_parent_id: str) -> Any:
|
|
|
|
|
# API param is `parentId` (verified live 2026-06-21), NOT `newParentId`.
|
|
|
|
|
return self._jsonrpc_request(
|
|
|
|
|
"network",
|
|
|
|
|
"moveCustomGroup",
|
|
|
|
|
{"groupId": group_id, "newParentId": new_parent_id},
|
|
|
|
|
{"groupId": group_id, "parentId": new_parent_id},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
@@ -610,10 +611,10 @@ class GravityZoneClient:
|
|
|
|
|
"""Isolate endpoints from the network (incidents.createIsolateEndpointTask).
|
|
|
|
|
|
|
|
|
|
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
|
|
|
|
|
task ids. STATE-CHANGING — gate behind --confirm at the call site.
|
|
|
|
|
task ids. STATE-CHANGING - gate behind --confirm at the call site.
|
|
|
|
|
"""
|
|
|
|
|
# VERIFIED LIVE 2026-06-21: the API takes a SINGLE `endpointId` per call
|
|
|
|
|
# (NOT an `endpointIds` array — that errors "not expected"). Loop for many.
|
|
|
|
|
# (NOT an `endpointIds` array - that errors "not expected"). Loop for many.
|
|
|
|
|
results = []
|
|
|
|
|
for eid in endpoint_ids:
|
|
|
|
|
results.append(self._jsonrpc_request(
|
|
|
|
|
@@ -625,10 +626,10 @@ class GravityZoneClient:
|
|
|
|
|
"""Un-isolate endpoints (incidents.createRestoreEndpointFromIsolationTask).
|
|
|
|
|
|
|
|
|
|
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
|
|
|
|
|
task ids. STATE-CHANGING — gate behind --confirm at the call site.
|
|
|
|
|
task ids. STATE-CHANGING - gate behind --confirm at the call site.
|
|
|
|
|
"""
|
|
|
|
|
# VERIFIED LIVE 2026-06-21: single `endpointId` per call (not an array).
|
|
|
|
|
# Note: fails if the isolation task is still in progress — wait + retry.
|
|
|
|
|
# Note: fails if the isolation task is still in progress - wait + retry.
|
|
|
|
|
results = []
|
|
|
|
|
for eid in endpoint_ids:
|
|
|
|
|
results.append(self._jsonrpc_request(
|
|
|
|
|
@@ -651,7 +652,7 @@ class GravityZoneClient:
|
|
|
|
|
`hash_type` is an int (1 is the common value seen live; see the
|
|
|
|
|
GravityZone console / API docs for the full mapping). `hash_list` is an
|
|
|
|
|
array of hash strings. `source_info` is a free-text description.
|
|
|
|
|
STATE-CHANGING — gate behind --confirm at the call site.
|
|
|
|
|
STATE-CHANGING - gate behind --confirm at the call site.
|
|
|
|
|
"""
|
|
|
|
|
params: dict = {
|
|
|
|
|
"companyId": company_id,
|
|
|
|
|
@@ -666,7 +667,7 @@ class GravityZoneClient:
|
|
|
|
|
def remove_from_blocklist(self, hash_item_id: str) -> Any:
|
|
|
|
|
"""Remove one blocklist entry (incidents.removeFromBlocklist).
|
|
|
|
|
|
|
|
|
|
STATE-CHANGING — gate behind --confirm at the call site.
|
|
|
|
|
STATE-CHANGING - gate behind --confirm at the call site.
|
|
|
|
|
|
|
|
|
|
UNVERIFIED: the param name `hashItemId` is the candidate (the `id`
|
|
|
|
|
field from getBlocklistItems). Confirm against the official Bitdefender
|
|
|
|
|
@@ -680,9 +681,9 @@ class GravityZoneClient:
|
|
|
|
|
# POLICY ASSIGNMENT (state-changing; gate behind --confirm at call site)
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
# NOTE: getPolicyDetails (above) returns the FULL granular module config
|
|
|
|
|
# (verified live 2026-06-21 — the earlier "shallow only" claim was wrong).
|
|
|
|
|
# The Public API still has NO create/edit/clone policy method — authoring
|
|
|
|
|
# stays in the console — but assigning an EXISTING policy is supported here.
|
|
|
|
|
# (verified live 2026-06-21 - the earlier "shallow only" claim was wrong).
|
|
|
|
|
# The Public API still has NO create/edit/clone policy method - authoring
|
|
|
|
|
# stays in the console - but assigning an EXISTING policy is supported here.
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def assign_policy(
|
|
|
|
|
self,
|
|
|
|
|
@@ -699,7 +700,7 @@ class GravityZoneClient:
|
|
|
|
|
error. `forcePolicyInheritance` optionally pushes the policy down to
|
|
|
|
|
sub-items. (To make a target INHERIT instead, call with
|
|
|
|
|
{targetIds, inheritFromAbove:true} and no policyId via `raw`.)
|
|
|
|
|
STATE-CHANGING — gate at the call site behind --confirm.
|
|
|
|
|
STATE-CHANGING - gate at the call site behind --confirm.
|
|
|
|
|
Docs: bitdefender.com/business/support/en/77212-924802-assignpolicy.html
|
|
|
|
|
"""
|
|
|
|
|
params: dict = {
|
|
|
|
|
@@ -754,7 +755,7 @@ class GravityZoneClient:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# REPORTS (module `/reports`) — VERIFIED LIVE
|
|
|
|
|
# REPORTS (module `/reports`) - VERIFIED LIVE
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def list_reports(self, page: int = 1, per_page: int = 100) -> dict:
|
|
|
|
|
"""List saved reports (reports.getReportsList)."""
|
|
|
|
|
@@ -765,7 +766,7 @@ class GravityZoneClient:
|
|
|
|
|
def get_report_links(self, report_id: str) -> Any:
|
|
|
|
|
"""Get download links for a generated report (reports.getDownloadLinks).
|
|
|
|
|
|
|
|
|
|
Param name `reportId` is the candidate — confirm against the console if
|
|
|
|
|
Param name `reportId` is the candidate - confirm against the console if
|
|
|
|
|
it errors.
|
|
|
|
|
"""
|
|
|
|
|
return self._jsonrpc_request(
|
|
|
|
|
@@ -773,7 +774,7 @@ class GravityZoneClient:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# ACCOUNTS (module `/accounts`) — VERIFIED LIVE (read)
|
|
|
|
|
# ACCOUNTS (module `/accounts`) - VERIFIED LIVE (read)
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def list_accounts(self, page: int = 1, per_page: int = 100) -> dict:
|
|
|
|
|
"""List GravityZone console accounts/users (accounts.getAccountsList)."""
|
|
|
|
|
@@ -852,7 +853,7 @@ class GravityZoneClient:
|
|
|
|
|
|
|
|
|
|
def configure_notifications_settings(self, settings: dict) -> Any:
|
|
|
|
|
"""Set notification settings (accounts.configureNotificationsSettings).
|
|
|
|
|
STATE-CHANGING — there are NO required params, so an empty payload is
|
|
|
|
|
STATE-CHANGING - there are NO required params, so an empty payload is
|
|
|
|
|
accepted; the caller must pass the intended settings object. Gate at the
|
|
|
|
|
call site behind --confirm."""
|
|
|
|
|
return self._jsonrpc_request(
|
|
|
|
|
@@ -862,10 +863,10 @@ class GravityZoneClient:
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# PUSH EVENT SERVICE (module `/push`)
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
|
|
|
# `get`/`stats` are read (but error when the service was never configured —
|
|
|
|
|
# `get`/`stats` are read (but error when the service was never configured -
|
|
|
|
|
# that is an EXPECTED state, not a failure; the CLI handles it cleanly).
|
|
|
|
|
# `set` is STATE-CHANGING (it configures where GravityZone POSTs security
|
|
|
|
|
# events) — gate behind --confirm at the call site.
|
|
|
|
|
# events) - gate behind --confirm at the call site.
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def get_push_settings(self) -> dict:
|
|
|
|
|
"""Current push event service settings (push.getPushEventSettings)."""
|
|
|
|
|
@@ -890,9 +891,9 @@ class GravityZoneClient:
|
|
|
|
|
2026-06-21). When enabling, `serviceSettings.url` is the receiver
|
|
|
|
|
endpoint GravityZone POSTs events to. The nested shape
|
|
|
|
|
(serviceType/serviceSettings/subscribeToEventTypes) follows Bitdefender's
|
|
|
|
|
documented push API and is UNVERIFIED beyond `status` on this tenant —
|
|
|
|
|
documented push API and is UNVERIFIED beyond `status` on this tenant -
|
|
|
|
|
confirm the first successful enable against the live response.
|
|
|
|
|
STATE-CHANGING — gate at the call site behind --confirm.
|
|
|
|
|
STATE-CHANGING - gate at the call site behind --confirm.
|
|
|
|
|
"""
|
|
|
|
|
params: dict = {"status": status, "serviceType": service_type}
|
|
|
|
|
service_settings: dict = {"requireValidSslCertificate": require_valid_ssl}
|
|
|
|
|
@@ -907,7 +908,7 @@ class GravityZoneClient:
|
|
|
|
|
|
|
|
|
|
def send_test_push_event(self, event_type: str, extra: Optional[dict] = None) -> Any:
|
|
|
|
|
"""Send a test push event (push.sendTestPushEvent). Requires `eventType`
|
|
|
|
|
(verified). Fires against the configured receiver — STATE-ADJACENT, gate
|
|
|
|
|
(verified). Fires against the configured receiver - STATE-ADJACENT, gate
|
|
|
|
|
at the call site behind --confirm."""
|
|
|
|
|
params: dict = {"eventType": event_type}
|
|
|
|
|
if extra:
|
|
|
|
|
@@ -915,7 +916,7 @@ class GravityZoneClient:
|
|
|
|
|
return self._jsonrpc_request("push", "sendTestPushEvent", params)
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# PACKAGES (detail) — read
|
|
|
|
|
# PACKAGES (detail) - read
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def get_package_details(self, package_id: str) -> dict:
|
|
|
|
|
"""Installation package detail (packages.getPackageDetails). `packageId`
|
|
|
|
|
@@ -925,12 +926,12 @@ class GravityZoneClient:
|
|
|
|
|
) or {}
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# REPORTS (create / delete) — getReportsList + get_report_links above
|
|
|
|
|
# REPORTS (create / delete) - getReportsList + get_report_links above
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def create_report(self, name: str, extra: Optional[dict] = None) -> Any:
|
|
|
|
|
"""Create a report (reports.createReport). `name` required (verified);
|
|
|
|
|
`type`, `targetIds`, recurrence/format etc. passed via `extra`.
|
|
|
|
|
STATE-CHANGING — gate at the call site behind --confirm."""
|
|
|
|
|
STATE-CHANGING - gate at the call site behind --confirm."""
|
|
|
|
|
params: dict = {"name": name}
|
|
|
|
|
if extra:
|
|
|
|
|
params.update(extra)
|
|
|
|
|
@@ -938,19 +939,19 @@ class GravityZoneClient:
|
|
|
|
|
|
|
|
|
|
def delete_report(self, report_id: str) -> Any:
|
|
|
|
|
"""Delete a report (reports.deleteReport). `reportId` required (verified).
|
|
|
|
|
STATE-CHANGING — gate at the call site behind --confirm."""
|
|
|
|
|
STATE-CHANGING - gate at the call site behind --confirm."""
|
|
|
|
|
return self._jsonrpc_request(
|
|
|
|
|
"reports", "deleteReport", {"reportId": report_id}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# QUARANTINE (remove / restore) — getQuarantineItemsList above
|
|
|
|
|
# QUARANTINE (remove / restore) - getQuarantineItemsList above
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def remove_quarantine_items(
|
|
|
|
|
self, quarantine_item_ids: list[str], extra: Optional[dict] = None
|
|
|
|
|
) -> Any:
|
|
|
|
|
"""Delete quarantined items (quarantine/computers.createRemoveQuarantineItemTask).
|
|
|
|
|
`quarantineItemsIds` required (verified). STATE-CHANGING — gate behind --confirm."""
|
|
|
|
|
`quarantineItemsIds` required (verified). STATE-CHANGING - gate behind --confirm."""
|
|
|
|
|
params: dict = {"quarantineItemsIds": quarantine_item_ids}
|
|
|
|
|
if extra:
|
|
|
|
|
params.update(extra)
|
|
|
|
|
@@ -963,7 +964,7 @@ class GravityZoneClient:
|
|
|
|
|
) -> Any:
|
|
|
|
|
"""Restore quarantined items (quarantine/computers.createRestoreQuarantineItemTask).
|
|
|
|
|
`quarantineItemsIds` required (verified). `addExclusionInPolicy` etc. via
|
|
|
|
|
`extra`. STATE-CHANGING — gate behind --confirm."""
|
|
|
|
|
`extra`. STATE-CHANGING - gate behind --confirm."""
|
|
|
|
|
params: dict = {"quarantineItemsIds": quarantine_item_ids}
|
|
|
|
|
if extra:
|
|
|
|
|
params.update(extra)
|
|
|
|
|
@@ -972,7 +973,7 @@ class GravityZoneClient:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# INCIDENTS — custom rules + incident status/note (read + state-changing)
|
|
|
|
|
# INCIDENTS - custom rules + incident status/note (read + state-changing)
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def list_custom_rules(self, page: int = 1, per_page: int = 100) -> dict:
|
|
|
|
|
"""List EDR custom rules (incidents.getCustomRulesList). VERIFIED LIVE."""
|
|
|
|
|
@@ -983,7 +984,7 @@ class GravityZoneClient:
|
|
|
|
|
def create_custom_rule(self, name: str, extra: Optional[dict] = None) -> Any:
|
|
|
|
|
"""Create an EDR custom rule (incidents.createCustomRule). `name` required
|
|
|
|
|
(verified); rule body (settings/companyId/tags) via `extra`.
|
|
|
|
|
STATE-CHANGING — gate behind --confirm."""
|
|
|
|
|
STATE-CHANGING - gate behind --confirm."""
|
|
|
|
|
params: dict = {"name": name}
|
|
|
|
|
if extra:
|
|
|
|
|
params.update(extra)
|
|
|
|
|
@@ -991,15 +992,15 @@ class GravityZoneClient:
|
|
|
|
|
|
|
|
|
|
def delete_custom_rule(self, rule_id: str) -> Any:
|
|
|
|
|
"""Delete an EDR custom rule (incidents.deleteCustomRule). `ruleId` required
|
|
|
|
|
(verified). STATE-CHANGING — gate behind --confirm."""
|
|
|
|
|
(verified). STATE-CHANGING - gate behind --confirm."""
|
|
|
|
|
return self._jsonrpc_request(
|
|
|
|
|
"incidents", "deleteCustomRule", {"ruleId": rule_id}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def change_incident_status(self, incident_type: str, fields: dict) -> Any:
|
|
|
|
|
"""Change an incident's status (incidents.changeIncidentStatus). `type`
|
|
|
|
|
required (verified) — the incident type/category — plus the incident id +
|
|
|
|
|
target status in `fields`. STATE-CHANGING — gate behind --confirm."""
|
|
|
|
|
required (verified) - the incident type/category - plus the incident id +
|
|
|
|
|
target status in `fields`. STATE-CHANGING - gate behind --confirm."""
|
|
|
|
|
params: dict = {"type": incident_type}
|
|
|
|
|
params.update(fields or {})
|
|
|
|
|
return self._jsonrpc_request("incidents", "changeIncidentStatus", params)
|
|
|
|
|
@@ -1012,7 +1013,7 @@ class GravityZoneClient:
|
|
|
|
|
return self._jsonrpc_request("incidents", "updateIncidentNote", params)
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# LICENSING (usage) + INTEGRATIONS — read
|
|
|
|
|
# LICENSING (usage) + INTEGRATIONS - read
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def get_monthly_usage(self) -> dict:
|
|
|
|
|
"""Monthly license usage (licensing.getMonthlyUsage). VERIFIED LIVE."""
|
|
|
|
|
@@ -1027,7 +1028,7 @@ class GravityZoneClient:
|
|
|
|
|
) or {}
|
|
|
|
|
|
|
|
|
|
# ======================================================================
|
|
|
|
|
# CACHE LAYER (identity / structure only — never volatile status)
|
|
|
|
|
# CACHE LAYER (identity / structure only - never volatile status)
|
|
|
|
|
# ======================================================================
|
|
|
|
|
def _read_cache(self) -> Optional[dict]:
|
|
|
|
|
if not CACHE_FILE.exists():
|
|
|
|
|
@@ -1069,7 +1070,7 @@ class GravityZoneClient:
|
|
|
|
|
if cid:
|
|
|
|
|
companies_map[cid] = c.get("name", "")
|
|
|
|
|
|
|
|
|
|
# Endpoints per company (identity tier only — no status fields).
|
|
|
|
|
# Endpoints per company (identity tier only - no status fields).
|
|
|
|
|
for cid in list(companies_map.keys()) + [ACG_ROOT_COMPANY_ID]:
|
|
|
|
|
page = 1
|
|
|
|
|
while True:
|
|
|
|
|
@@ -1130,7 +1131,7 @@ class GravityZoneClient:
|
|
|
|
|
def _cache_add_group(self, group_id: str, name: str) -> None:
|
|
|
|
|
cache = self._read_cache()
|
|
|
|
|
if cache is None:
|
|
|
|
|
return # no cache yet — next refresh picks it up
|
|
|
|
|
return # no cache yet - next refresh picks it up
|
|
|
|
|
cache.setdefault("companies", {})
|
|
|
|
|
# Groups live in the inventory tree; store under a 'groups' map.
|
|
|
|
|
cache.setdefault("groups", {})[group_id] = name
|
|
|
|
|
|