import logging import os from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional import httpx logger = logging.getLogger(__name__) GRAVITYZONE_API_BASE_URL = os.environ.get( "GRAVITYZONE_API_BASE_URL", "https://cloud.gravityzone.bitdefender.com/api/v1.0/jsonrpc", ) GRAVITYZONE_API_KEY = os.environ.get("GRAVITYZONE_API_KEY", "") GRAVITYZONE_TIMEOUT_SECONDS = 30.0 GRAVITYZONE_CONNECT_TIMEOUT_SECONDS = 10.0 ACG_ROOT_COMPANY_ID = "5c4280716c0318f3478b456a" ACG_COMPANIES_CONTAINER_ID = "5c4280716c0318f3478b456e" @dataclass class GZResult: success: bool data: Optional[dict] = None error: Optional[str] = None @dataclass class GZEndpointSummary: endpoint_id: str name: str company_id: str infected: bool detection_active: bool signature_outdated: bool product_outdated: bool last_seen: Optional[str] agent_version: Optional[str] state: int class GravityZoneService: def __init__( self, api_base_url: str = GRAVITYZONE_API_BASE_URL, api_key: str = GRAVITYZONE_API_KEY, timeout: float = GRAVITYZONE_TIMEOUT_SECONDS, connect_timeout: float = GRAVITYZONE_CONNECT_TIMEOUT_SECONDS, ): self.api_base_url = api_base_url.rstrip("/") self.api_key = api_key self.timeout = httpx.Timeout(timeout, connect=connect_timeout) async def _jsonrpc_request( self, module: str, method: str, params: dict ) -> GZResult: url = f"{self.api_base_url}/{module}" payload = {"id": "1", "jsonrpc": "2.0", "method": method, "params": params} try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post(url, json=payload, auth=(self.api_key, "")) response.raise_for_status() body = response.json() except httpx.TimeoutException as exc: return GZResult(success=False, error=f"GravityZone API timeout: {exc}") except httpx.HTTPStatusError as exc: return GZResult( success=False, error=f"GravityZone HTTP error {exc.response.status_code}: {exc.response.text}", ) except Exception as exc: return GZResult(success=False, error=f"GravityZone request failed: {exc}") if "error" in body: err = body["error"] detail = err.get("data", {}).get("details") or err.get("message") return GZResult(success=False, error=detail) return GZResult(success=True, data=body.get("result")) async def get_api_status(self) -> GZResult: key_result = await self._jsonrpc_request("general", "getApiKeyDetails", {}) license_result = await self._jsonrpc_request("licensing", "getLicenseInfo", {}) if not key_result.success: return key_result combined = {**(key_result.data or {})} if license_result.success: combined.update(license_result.data or {}) else: logger.warning(f"GravityZone getLicenseInfo failed: {license_result.error}") return GZResult(success=True, data=combined) async def get_own_company(self) -> GZResult: return await self._jsonrpc_request("companies", "getCompanyDetails", {}) async def list_client_companies(self, page: int = 1, per_page: int = 100) -> GZResult: result = await self._jsonrpc_request( "network", "getNetworkInventoryItems", { "parentId": ACG_COMPANIES_CONTAINER_ID, "page": page, "perPage": per_page, }, ) if not result.success: return result data = result.data or {} items = data.get("items", []) companies = [item for item in items if item.get("type") == 1] return GZResult( success=True, data={"total": len(companies), "items": companies}, ) async def list_endpoints( self, parent_id: str, page: int = 1, per_page: int = 50 ) -> GZResult: return await self._jsonrpc_request( "network", "getEndpointsList", {"parentId": parent_id, "page": page, "perPage": per_page}, ) async def get_endpoint_details(self, endpoint_id: str) -> GZResult: return await self._jsonrpc_request( "network", "getManagedEndpointDetails", {"endpointId": endpoint_id}, ) async def list_quarantine_items( self, parent_id: str, page: int = 1, per_page: int = 50 ) -> GZResult: return await self._jsonrpc_request( "quarantine", "getQuarantineItemsList", {"parentId": parent_id, "page": page, "perPage": per_page}, ) async def security_sweep(self, parent_id: str) -> list[GZEndpointSummary]: summaries: list[GZEndpointSummary] = [] page = 1 per_page = 100 while True: result = await self.list_endpoints(parent_id, page=page, per_page=per_page) if not result.success: logger.warning( f"GravityZone security_sweep list_endpoints failed for " f"{parent_id} page {page}: {result.error}" ) break data = result.data or {} items = data.get("items", []) if not items: break for item in items: endpoint_id = item.get("id", "") detail_result = await self.get_endpoint_details(endpoint_id) if not detail_result.success: logger.warning( f"GravityZone getManagedEndpointDetails failed for " f"{endpoint_id}: {detail_result.error}" ) continue detail = detail_result.data or {} malware = detail.get("malwareStatus", {}) agent = detail.get("agent", {}) infected = bool(malware.get("infected", False)) detection_active = bool(malware.get("detection", False)) signature_outdated = bool(agent.get("signatureOutdated", False)) product_outdated = bool(agent.get("productOutdated", False)) if infected: logger.warning( f"GravityZone: infected endpoint detected — " f"id={endpoint_id} name={detail.get('name', '')}" ) summaries.append( GZEndpointSummary( endpoint_id=endpoint_id, name=detail.get("name") or item.get("name", ""), company_id=item.get("companyId", ""), infected=infected, detection_active=detection_active, signature_outdated=signature_outdated, product_outdated=product_outdated, last_seen=detail.get("lastSeen"), agent_version=agent.get("productVersion"), state=detail.get("state", 0), ) ) total = data.get("total", 0) if page * per_page >= total: break page += 1 def _sort_key(s: GZEndpointSummary) -> tuple: return ( not s.infected, not s.signature_outdated, not s.product_outdated, s.name.lower(), ) summaries.sort(key=_sort_key) return summaries async def security_sweep_all_clients(self) -> list[GZEndpointSummary]: companies_result = await self.list_client_companies(per_page=100) if not companies_result.success: logger.warning( f"GravityZone security_sweep_all_clients: list_client_companies failed: " f"{companies_result.error}" ) return [] companies = (companies_result.data or {}).get("items", []) all_summaries: list[GZEndpointSummary] = [] for company in companies: company_id = company.get("id", "") if not company_id: continue company_summaries = await self.security_sweep(company_id) for s in company_summaries: if not s.company_id: s.company_id = company_id all_summaries.extend(company_summaries) def _sort_key(s: GZEndpointSummary) -> tuple: return ( not s.infected, not s.signature_outdated, not s.product_outdated, s.name.lower(), ) all_summaries.sort(key=_sort_key) return all_summaries _gravityzone_service: Optional[GravityZoneService] = None def get_gravityzone_service() -> GravityZoneService: global _gravityzone_service if _gravityzone_service is None: _gravityzone_service = GravityZoneService() return _gravityzone_service