Files
claudetools/.claude/skills/bitdefender/scripts/gz.py
Howard Enos 8ba92bf02b feat(bitdefender): GravityZone Cloud Public API skill
Adds a /bitdefender skill that drives the ACG GravityZone partner tenant
via the JSON-RPC Public API. Read + management ops (companies, endpoints,
live security sweep, policies [read-only/shallow], packages, quarantine,
scans, groups, move/delete). Identity-tier JSON cache (24h TTL,
--refresh); volatile status is always pulled live, never cached.

Security hardening: API key loaded from SOPS vault at runtime (never on
disk/logs/argv/cache); destructive deletes gated behind --confirm; `raw`
also gates destructive methods; upstream error bodies truncated. UNVERIFIED
API methods reachable only via `raw`. Reuses the auth/JSON-RPC pattern from
api/services/gravityzone_service.py.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-30 07:29:55 -07:00

377 lines
13 KiB
Python

#!/usr/bin/env python3
"""CLI for the bitdefender skill — GravityZone Cloud Public API.
Read-only subcommands run freely. Destructive subcommands (delete-endpoint,
delete-package, delete-group) refuse to run unless --confirm is passed; without
it they print what they WOULD do and exit non-zero.
Output: --json emits raw JSON; otherwise a readable table/summary.
Usage examples:
python gz.py status
python gz.py companies
python gz.py endpoints --company 5c4280716c0318f3478b456e
python gz.py endpoint <endpointId>
python gz.py sweep --company <id>
python gz.py policies
python gz.py policy <policyId>
python gz.py packages
python gz.py quarantine --company <id>
python gz.py inventory --refresh
python gz.py create-package --name "Win Default" --company <id>
python gz.py install-links --package "Win Default" --company <id>
python gz.py scan --targets <id1> <id2> --type 2 --name "Full scan"
python gz.py move --endpoints <id1> <id2> --group <groupId>
python gz.py make-group --name "New Group" --parent <parentId>
python gz.py delete-endpoint <id> --confirm
python gz.py raw --module network --method getEndpointsList --params '{"page":1}'
"""
from __future__ import annotations
import argparse
import dataclasses
import json
import sys
from gz_client import GravityZoneClient, GravityZoneError, GZEndpointSummary
def _emit(obj, as_json: bool, table_fn=None) -> None:
if as_json or table_fn is None:
print(json.dumps(obj, indent=2, default=_json_default))
else:
table_fn(obj)
def _json_default(o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return str(o)
# --- table renderers ----------------------------------------------------------
def _print_kv(d: dict) -> None:
for k, v in d.items():
print(f" {k}: {v}")
def _print_company_table(data: dict) -> None:
items = data.get("items", [])
print(f"Companies: {data.get('total', len(items))}")
for c in items:
print(f" {c.get('id','?'):26} {c.get('name','')}")
def _print_endpoint_table(data: dict) -> None:
items = data.get("items", [])
print(f"Endpoints: {data.get('total', len(items))}")
for e in items:
print(f" {e.get('id','?'):26} {e.get('name',''):30} "
f"{e.get('operatingSystemVersion', e.get('os',''))}")
def _print_sweep_table(summaries: list) -> None:
print(f"Endpoints swept: {len(summaries)}")
print(f" {'STATUS':10} {'NAME':30} {'AGENT':14} {'LAST SEEN'}")
for s in summaries:
flags = []
if s.infected:
flags.append("INFECTED")
if s.signature_outdated:
flags.append("SIG-OLD")
if s.product_outdated:
flags.append("PROD-OLD")
status = ",".join(flags) if flags else "OK"
print(f" {status:10} {s.name[:30]:30} {str(s.agent_version or '-'):14} "
f"{s.last_seen or '-'}")
def _print_policy_table(data: dict) -> None:
items = data.get("items", [])
print(f"Policies: {data.get('total', len(items))}")
for p in items:
print(f" {p.get('id','?'):26} {p.get('name','')}")
def _print_package_table(data: dict) -> None:
items = data.get("items", [])
print(f"Packages: {data.get('total', len(items))}")
for p in items:
print(f" {str(p.get('id','?')):26} {p.get('name','')}")
def _print_quarantine_table(data: dict) -> None:
items = data.get("items", [])
print(f"Quarantine items: {data.get('total', len(items))}")
for q in items:
print(f" {q.get('threatName','?'):30} {q.get('endpointName','')} "
f"{q.get('detectionTime','')}")
def _print_inventory_table(cache: dict) -> None:
print(f"Inventory cached_at: {cache.get('fetched_at')}")
print(f" companies: {len(cache.get('companies', {}))}")
print(f" endpoints: {len(cache.get('endpoints', {}))}")
print(f" policies: {len(cache.get('policies', {}))}")
print(f" packages: {len(cache.get('packages', []))}")
print(f" groups: {len(cache.get('groups', {}))}")
# --- command handlers ---------------------------------------------------------
def cmd_status(client, args):
_emit(client.get_api_status(), args.json, _print_kv)
def cmd_companies(client, args):
_emit(client.list_companies(), args.json, _print_company_table)
def cmd_endpoints(client, args):
_emit(client.list_endpoints(args.company, per_page=args.per_page),
args.json, _print_endpoint_table)
def cmd_endpoint(client, args):
_emit(client.get_endpoint_details(args.endpoint_id), args.json, _print_kv)
def cmd_sweep(client, args):
target = args.company or _require_company_for_sweep()
summaries = client.security_sweep(target)
if args.json:
print(json.dumps([dataclasses.asdict(s) for s in summaries], indent=2))
else:
_print_sweep_table(summaries)
def _require_company_for_sweep() -> str:
from gz_client import ACG_COMPANIES_CONTAINER_ID
print("[INFO] No --company given; sweeping the ACG companies container.",
file=sys.stderr)
return ACG_COMPANIES_CONTAINER_ID
def cmd_policies(client, args):
_emit(client.list_policies(), args.json, _print_policy_table)
def cmd_policy(client, args):
print("[WARNING] Public API returns shallow policy detail only "
"(no granular config).", file=sys.stderr)
_emit(client.get_policy_details(args.policy_id), args.json, _print_kv)
def cmd_packages(client, args):
_emit(client.list_packages(), args.json, _print_package_table)
def cmd_quarantine(client, args):
_emit(client.list_quarantine(args.company), args.json, _print_quarantine_table)
def cmd_inventory(client, args):
_emit(client.get_inventory(refresh=args.refresh), args.json,
_print_inventory_table)
def cmd_create_package(client, args):
result = client.create_package(
package_name=args.name,
company_id=args.company,
description=args.description,
language=args.language,
)
_emit({"created": args.name, "result": result}, args.json, _print_kv)
def cmd_install_links(client, args):
_emit(client.get_installation_links(args.package, args.company),
args.json, _print_kv)
def cmd_scan(client, args):
result = client.create_scan_task(
target_ids=args.targets, scan_type=args.type, name=args.name
)
_emit({"scanTask": result}, args.json, _print_kv)
def cmd_move(client, args):
result = client.move_endpoints(args.endpoints, args.group)
_emit({"moved": args.endpoints, "to": args.group, "result": result},
args.json, _print_kv)
def cmd_make_group(client, args):
result = client.create_custom_group(args.name, args.parent)
_emit({"createdGroup": args.name, "result": result}, args.json, _print_kv)
def cmd_raw(client, args):
try:
params = json.loads(args.params) if args.params else {}
except json.JSONDecodeError as exc:
print(f"[ERROR] --params is not valid JSON: {exc}", file=sys.stderr)
return 2
if not isinstance(params, dict):
print("[ERROR] --params must be a JSON object.", file=sys.stderr)
return 2
result = client._jsonrpc_request(args.module, args.method, params)
print(json.dumps(result, indent=2, default=_json_default))
return 0
# --- destructive (gated) ------------------------------------------------------
def _gated(action_desc: str, confirm: bool) -> bool:
if not confirm:
print(f"[WARNING] Refusing destructive action without --confirm.")
print(f"[INFO] Would: {action_desc}")
return False
return True
def cmd_delete_endpoint(client, args):
if not _gated(f"delete endpoint {args.endpoint_id}", args.confirm):
return 3
result = client.delete_endpoint(args.endpoint_id)
_emit({"deletedEndpoint": args.endpoint_id, "result": result},
args.json, _print_kv)
return 0
def cmd_delete_package(client, args):
if not _gated(f"delete package '{args.package}'", args.confirm):
return 3
result = client.delete_package(args.package, args.company)
_emit({"deletedPackage": args.package, "result": result}, args.json, _print_kv)
return 0
def cmd_delete_group(client, args):
if not _gated(f"delete custom group {args.group}", args.confirm):
return 3
result = client.delete_custom_group(args.group)
_emit({"deletedGroup": args.group, "result": result}, args.json, _print_kv)
return 0
# --- parser -------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="gz.py",
description="GravityZone Cloud Public API CLI (ACG MSP tenant).",
)
p.add_argument("--json", action="store_true", help="Emit raw JSON output.")
sub = p.add_subparsers(dest="command", required=True)
sub.add_parser("status", help="API key + license status.")
sub.add_parser("companies", help="List client companies.")
sp = sub.add_parser("endpoints", help="List endpoints under a company/group.")
sp.add_argument("--company", help="Parent company/group id.")
sp.add_argument("--per-page", type=int, default=100)
sp = sub.add_parser("endpoint", help="Full detail for one endpoint.")
sp.add_argument("endpoint_id")
sp = sub.add_parser("sweep", help="Live security posture sweep.")
sp.add_argument("--company", help="Parent id (defaults to ACG container).")
sub.add_parser("policies", help="List policies (id + name).")
sp = sub.add_parser("policy", help="Shallow detail for one policy.")
sp.add_argument("policy_id")
sub.add_parser("packages", help="List installation packages.")
sp = sub.add_parser("quarantine", help="List quarantine items for a company.")
sp.add_argument("--company", required=True)
sp = sub.add_parser("inventory", help="Show cached identity/structure.")
sp.add_argument("--refresh", action="store_true", help="Force a full re-pull.")
sp = sub.add_parser("create-package", help="Create an installer package.")
sp.add_argument("--name", required=True)
sp.add_argument("--company")
sp.add_argument("--description")
sp.add_argument("--language")
sp = sub.add_parser("install-links", help="Get installer download URLs.")
sp.add_argument("--package", required=True)
sp.add_argument("--company")
sp = sub.add_parser("scan", help="Create a scan task.")
sp.add_argument("--targets", nargs="+", required=True)
sp.add_argument("--type", type=int, required=True,
help="1=Quick 2=Full 3=Memory 4=Custom (verify in console).")
sp.add_argument("--name")
sp = sub.add_parser("move", help="Move endpoints into a group.")
sp.add_argument("--endpoints", nargs="+", required=True)
sp.add_argument("--group", required=True)
sp = sub.add_parser("make-group", help="Create a custom group.")
sp.add_argument("--name", required=True)
sp.add_argument("--parent")
sp = sub.add_parser("raw", help="Call any method directly (power use).")
sp.add_argument("--module", required=True)
sp.add_argument("--method", required=True)
sp.add_argument("--params", default="{}", help="JSON object of params.")
# destructive (gated)
sp = sub.add_parser("delete-endpoint", help="Delete an endpoint (gated).")
sp.add_argument("endpoint_id")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("delete-package", help="Delete a package (gated).")
sp.add_argument("--package", required=True)
sp.add_argument("--company")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("delete-group", help="Delete a custom group (gated).")
sp.add_argument("--group", required=True)
sp.add_argument("--confirm", action="store_true")
return p
HANDLERS = {
"status": cmd_status,
"companies": cmd_companies,
"endpoints": cmd_endpoints,
"endpoint": cmd_endpoint,
"sweep": cmd_sweep,
"policies": cmd_policies,
"policy": cmd_policy,
"packages": cmd_packages,
"quarantine": cmd_quarantine,
"inventory": cmd_inventory,
"create-package": cmd_create_package,
"install-links": cmd_install_links,
"scan": cmd_scan,
"move": cmd_move,
"make-group": cmd_make_group,
"raw": cmd_raw,
"delete-endpoint": cmd_delete_endpoint,
"delete-package": cmd_delete_package,
"delete-group": cmd_delete_group,
}
def main(argv=None) -> int:
args = build_parser().parse_args(argv)
handler = HANDLERS[args.command]
try:
client = GravityZoneClient()
rc = handler(client, args)
return rc if isinstance(rc, int) else 0
except GravityZoneError as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
return 1
except KeyboardInterrupt:
return 130
if __name__ == "__main__":
raise SystemExit(main())