fix(bitdefender): all-clients sweep, quarantine path, EDR controls, self-test
Several bugs found and fixed during live testing against the ACG GravityZone tenant: - security_sweep_all_clients: iterate each company (the companies container is not a valid endpoint parent; passing it 400'd the whole sweep) - list_quarantine: use service-scoped path quarantine/computers with companyId (bare quarantine module 404'd; param is companyId not parentId) - rename GZEndpointSummary.detection_active -> threat_detected with corrected semantics (True = active threat, tracks with infected; not an engine-on flag) - status: readable sectioned table renderer for the nested apiKey/license dict - portable CLAUDETOOLS_ROOT resolution (derive from file path, not a Windows literal) so it works on the Mac/Linux fleet Adds scripts/selftest.py: a 29-check read-only harness (all passing) covering every read command, --json, error exit codes, and destructive-action gating. EDR/incident commands (blocklist, isolate/unisolate, blocklist-add/remove) and raw destructive-method gating are included from this session's work. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -74,7 +74,8 @@ class GZEndpointSummary:
|
||||
name: str
|
||||
company_id: str
|
||||
infected: bool
|
||||
detection_active: bool
|
||||
# True = a malware detection is currently active on the endpoint (tracks with infected); NOT an "engine on" indicator.
|
||||
threat_detected: bool
|
||||
signature_outdated: bool
|
||||
product_outdated: bool
|
||||
last_seen: Optional[str]
|
||||
@@ -295,12 +296,14 @@ class GravityZoneClient:
|
||||
) or {}
|
||||
|
||||
def list_quarantine(
|
||||
self, parent_id: str, page: int = 1, per_page: int = 100
|
||||
self, company_id: str, page: int = 1, per_page: int = 100
|
||||
) -> dict:
|
||||
# Service-scoped path 'quarantine/computers' (bare 'quarantine' 404s);
|
||||
# the param is 'companyId', NOT 'parentId'.
|
||||
return self._jsonrpc_request(
|
||||
"quarantine",
|
||||
"quarantine/computers",
|
||||
"getQuarantineItemsList",
|
||||
{"parentId": parent_id, "page": page, "perPage": per_page},
|
||||
{"companyId": company_id, "page": page, "perPage": per_page},
|
||||
) or {}
|
||||
|
||||
def security_sweep(self, parent_id: str) -> list[GZEndpointSummary]:
|
||||
@@ -332,7 +335,7 @@ class GravityZoneClient:
|
||||
name=detail.get("name") or item.get("name", ""),
|
||||
company_id=detail.get("companyId") or item.get("companyId", ""),
|
||||
infected=bool(malware.get("infected", False)),
|
||||
detection_active=bool(malware.get("detection", False)),
|
||||
threat_detected=bool(malware.get("detection", False)),
|
||||
signature_outdated=bool(agent.get("signatureOutdated", False)),
|
||||
product_outdated=bool(agent.get("productOutdated", False)),
|
||||
last_seen=detail.get("lastSeen"),
|
||||
@@ -356,6 +359,34 @@ class GravityZoneClient:
|
||||
)
|
||||
return summaries
|
||||
|
||||
def security_sweep_all_clients(self) -> list[GZEndpointSummary]:
|
||||
"""Sweep every client company. The companies container is NOT a valid
|
||||
endpoint parent, so iterate each company and sweep it individually."""
|
||||
summaries: list[GZEndpointSummary] = []
|
||||
companies = self.list_companies(per_page=100).get("items", [])
|
||||
for company in companies:
|
||||
cid = company.get("id")
|
||||
if not cid:
|
||||
continue
|
||||
try:
|
||||
company_summaries = self.security_sweep(cid)
|
||||
except GravityZoneError:
|
||||
continue
|
||||
for s in company_summaries:
|
||||
if not s.company_id:
|
||||
s.company_id = cid
|
||||
summaries.extend(company_summaries)
|
||||
|
||||
summaries.sort(
|
||||
key=lambda s: (
|
||||
not s.infected,
|
||||
not s.signature_outdated,
|
||||
not s.product_outdated,
|
||||
s.name.lower(),
|
||||
)
|
||||
)
|
||||
return summaries
|
||||
|
||||
# ======================================================================
|
||||
# MANAGEMENT METHODS (verified only)
|
||||
# ======================================================================
|
||||
@@ -461,6 +492,134 @@ class GravityZoneClient:
|
||||
{"groupId": group_id, "newParentId": new_parent_id},
|
||||
)
|
||||
|
||||
# ======================================================================
|
||||
# INCIDENTS / EDR (module `/incidents`)
|
||||
# ----------------------------------------------------------------------
|
||||
# READ methods are always live and never cached (blocklist + incident
|
||||
# status are volatile). State-changing methods (isolate, restore,
|
||||
# add/remove blocklist) return the raw upstream result; the CLI caller is
|
||||
# responsible for gating them behind --confirm.
|
||||
# ======================================================================
|
||||
|
||||
# -- READ (safe) -----------------------------------------------------------
|
||||
def list_blocklist(
|
||||
self,
|
||||
company_id: Optional[str] = None,
|
||||
page: int = 1,
|
||||
per_page: int = 100,
|
||||
) -> dict:
|
||||
"""List blocklisted hash items. VERIFIED LIVE (incidents.getBlocklistItems).
|
||||
|
||||
Returns {total, page, perPage, pagesCount, items:[{id, source,
|
||||
sourceInfo, hashType, hash, companyId}]}. `companyId` scopes to one
|
||||
company; omit for the whole partner tenant.
|
||||
"""
|
||||
params: dict = {"page": page, "perPage": per_page}
|
||||
if company_id:
|
||||
params["companyId"] = company_id
|
||||
return self._jsonrpc_request("incidents", "getBlocklistItems", params) or {}
|
||||
|
||||
def list_incidents(
|
||||
self,
|
||||
parent_id: str,
|
||||
page: int = 1,
|
||||
per_page: int = 500,
|
||||
filters: Optional[dict] = None,
|
||||
) -> dict:
|
||||
"""List incidents under a company/group.
|
||||
|
||||
`parent_id` (a company/group id) is REQUIRED for this method.
|
||||
|
||||
perPage default is 500: live testing surfaced an
|
||||
"Invalid value for 'perPage' parameter. The value should be between 500
|
||||
and 10000" error for smaller values, so 100 (the default elsewhere) is
|
||||
rejected here.
|
||||
|
||||
NOTE (UNVERIFIED / possibly unavailable on this tenant): live re-testing
|
||||
on 2026-05-30 returned "Method not found" for `getIncidentsList` on the
|
||||
`/incidents` module, even though `getBlocklistItems` on the SAME module
|
||||
succeeds in the same request (so this is not rate-limiting or a bad key).
|
||||
The method may be gated by an EDR/incidents license feature that is OFF
|
||||
on this tenant, or named differently in this API version. Treat the
|
||||
return value as best-effort and confirm against the official Bitdefender
|
||||
API reference / console before relying on incident data.
|
||||
"""
|
||||
params: dict = {"parentId": parent_id, "page": page, "perPage": per_page}
|
||||
if filters is not None:
|
||||
params["filters"] = filters
|
||||
return self._jsonrpc_request("incidents", "getIncidentsList", params) or {}
|
||||
|
||||
# -- STATE-CHANGING (caller MUST gate behind --confirm) --------------------
|
||||
def isolate_endpoints(self, endpoint_ids: list[str]) -> Any:
|
||||
"""Isolate endpoints from the network (incidents.createIsolateEndpointTask).
|
||||
|
||||
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
|
||||
task ids. STATE-CHANGING — gate behind --confirm at the call site.
|
||||
"""
|
||||
if len(endpoint_ids) > 1000:
|
||||
raise GravityZoneError(
|
||||
"isolate_endpoints accepts at most 1000 endpoint ids per call "
|
||||
f"(got {len(endpoint_ids)})."
|
||||
)
|
||||
return self._jsonrpc_request(
|
||||
"incidents", "createIsolateEndpointTask", {"endpointIds": endpoint_ids}
|
||||
)
|
||||
|
||||
def restore_endpoints_from_isolation(self, endpoint_ids: list[str]) -> Any:
|
||||
"""Un-isolate endpoints (incidents.createRestoreEndpointFromIsolationTask).
|
||||
|
||||
v1.2 takes an ARRAY `endpointIds` (max 1000) and returns an array of
|
||||
task ids. STATE-CHANGING — gate behind --confirm at the call site.
|
||||
"""
|
||||
if len(endpoint_ids) > 1000:
|
||||
raise GravityZoneError(
|
||||
"restore_endpoints_from_isolation accepts at most 1000 endpoint "
|
||||
f"ids per call (got {len(endpoint_ids)})."
|
||||
)
|
||||
return self._jsonrpc_request(
|
||||
"incidents",
|
||||
"createRestoreEndpointFromIsolationTask",
|
||||
{"endpointIds": endpoint_ids},
|
||||
)
|
||||
|
||||
def add_to_blocklist(
|
||||
self,
|
||||
company_id: str,
|
||||
hash_list: list[str],
|
||||
hash_type: int = 1,
|
||||
source_info: str = "",
|
||||
operating_systems: Optional[list[str]] = None,
|
||||
) -> Any:
|
||||
"""Add hashes to the blocklist (incidents.addToBlocklist).
|
||||
|
||||
`hash_type` is an int (1 is the common value seen live; see the
|
||||
GravityZone console / API docs for the full mapping). `hash_list` is an
|
||||
array of hash strings. `source_info` is a free-text description.
|
||||
STATE-CHANGING — gate behind --confirm at the call site.
|
||||
"""
|
||||
params: dict = {
|
||||
"companyId": company_id,
|
||||
"hashType": hash_type,
|
||||
"hashList": hash_list,
|
||||
"sourceInfo": source_info,
|
||||
}
|
||||
if operating_systems is not None:
|
||||
params["operatingSystems"] = operating_systems
|
||||
return self._jsonrpc_request("incidents", "addToBlocklist", params)
|
||||
|
||||
def remove_from_blocklist(self, hash_item_id: str) -> Any:
|
||||
"""Remove one blocklist entry (incidents.removeFromBlocklist).
|
||||
|
||||
STATE-CHANGING — gate behind --confirm at the call site.
|
||||
|
||||
UNVERIFIED: the param name `hashItemId` is the candidate (the `id`
|
||||
field from getBlocklistItems). Confirm against the official Bitdefender
|
||||
API reference before relying on it.
|
||||
"""
|
||||
return self._jsonrpc_request(
|
||||
"incidents", "removeFromBlocklist", {"hashItemId": hash_item_id}
|
||||
)
|
||||
|
||||
# ======================================================================
|
||||
# CACHE LAYER (identity / structure only — never volatile status)
|
||||
# ======================================================================
|
||||
|
||||
Reference in New Issue
Block a user