Files
claudetools/temp/gws_investigate.py
Mike Swanson fa15b03180 sync: Auto-sync from ACG-M-L5090 at 2026-03-10 19:11:00
Synced files:
- Quote wizard frontend (all components, hooks, types, config)
- API updates (config, models, routers, schemas, services)
- Client work (bg-builders, gurushow)
- Scripts (BGB Lesley termination, CIPP, Datto, migration)
- Temp files (Bardach contacts, VWP investigation, misc)
- Credentials and session logs
- Email service, PHP API, session logs

Machine: ACG-M-L5090
Timestamp: 2026-03-10 19:11:00

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 19:59:08 -07:00

1069 lines
36 KiB
Python

#!/usr/bin/env python3
"""
Google Workspace Compromise Investigation Tool
Performs a comprehensive security investigation of a Google Workspace tenant
using domain-wide delegation via a service account. Checks for signs of
account compromise including suspicious sign-ins, email forwarding rules,
OAuth app grants, password changes, admin activity, and Gmail delegation.
Usage:
python gws_investigate.py
python gws_investigate.py --domain example.com --admin admin@example.com
python gws_investigate.py --user compromised@example.com
"""
import argparse
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
except ImportError:
print("[ERROR] Required packages not installed. Run:")
print(" pip install google-auth google-api-python-client")
sys.exit(1)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).resolve().parent
SERVICE_ACCOUNT_KEY = SCRIPT_DIR / "acg-msp-access-8f72339997e5.json"
RESULTS_FILE = SCRIPT_DIR / "gws_investigation_results.json"
DEFAULT_DOMAIN = "lonestarelectrical.net"
DEFAULT_ADMIN = "sysadmin@lonestarelectrical.net"
# Scopes required for investigation (must be authorized in Google Admin console)
SCOPES = [
"https://www.googleapis.com/auth/admin.directory.user",
"https://www.googleapis.com/auth/admin.directory.user.security",
"https://www.googleapis.com/auth/admin.reports.audit.readonly",
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.settings.basic",
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/admin.directory.domain.readonly",
]
# ---------------------------------------------------------------------------
# Globals
# ---------------------------------------------------------------------------
results: Dict[str, Any] = {}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def print_separator(title: str) -> None:
print(f"\n{'=' * 70}")
print(f" {title}")
print(f"{'=' * 70}")
def build_service(
api_name: str,
api_version: str,
impersonate_email: str,
scopes: Optional[List[str]] = None,
) -> Any:
"""Build a Google API service client with domain-wide delegation."""
if scopes is None:
scopes = SCOPES
credentials = service_account.Credentials.from_service_account_file(
str(SERVICE_ACCOUNT_KEY),
scopes=scopes,
)
delegated = credentials.with_subject(impersonate_email)
service = build(api_name, api_version, credentials=delegated)
return service
def paginate(request, key: str = "users") -> List[Dict]:
"""Generic paginator for Google API list requests."""
all_items: List[Dict] = []
while request is not None:
try:
response = request.execute()
except HttpError as exc:
print(f" [WARNING] API error during pagination: {exc.status_code} - {exc.reason}")
break
items = response.get(key, [])
all_items.extend(items)
# Handle nextPageToken-based pagination
token = response.get("nextPageToken")
if token and hasattr(request, "uri"):
# Rebuild request with pageToken -- handled by list_next
break
else:
break
return all_items
def paginate_list(service_resource, method_name: str, key: str, **kwargs) -> List[Dict]:
"""Paginate through a list method using list / list_next pattern."""
all_items: List[Dict] = []
request = getattr(service_resource, method_name)(**kwargs)
while request is not None:
try:
response = request.execute()
except HttpError as exc:
print(f" [WARNING] API error: {exc.status_code} - {exc.reason}")
break
items = response.get(key, [])
all_items.extend(items)
try:
request = getattr(service_resource, method_name + "_next")(request, response)
except AttributeError:
# No _next method means no pagination support for this endpoint
break
return all_items
def safe_api_call(func, label: str, default=None):
"""Wrap an API call with error handling."""
try:
return func()
except HttpError as exc:
status = exc.status_code
reason = exc.reason
if status == 403:
print(f" [WARNING] {label}: Access denied (403) - scope may not be authorized")
elif status == 404:
print(f" [WARNING] {label}: Not found (404)")
elif status == 400:
print(f" [WARNING] {label}: Bad request (400) - {reason}")
else:
print(f" [WARNING] {label}: HTTP {status} - {reason}")
return default
except Exception as exc:
print(f" [WARNING] {label}: {type(exc).__name__} - {exc}")
return default
# ---------------------------------------------------------------------------
# Investigation Steps
# ---------------------------------------------------------------------------
def step1_list_users(admin_service, domain: str) -> List[Dict]:
"""List all users with security-relevant details."""
print_separator("STEP 1: ALL DOMAIN USERS")
users = paginate_list(
admin_service.users(),
"list",
"users",
domain=domain,
projection="full",
maxResults=500,
orderBy="email",
)
results["users"] = []
if not users:
print(" [WARNING] No users returned. Check domain-wide delegation setup.")
return []
print(f" Found {len(users)} user(s):\n")
for u in users:
email = u.get("primaryEmail", "N/A")
name = u.get("name", {}).get("fullName", "N/A")
last_login = u.get("lastLoginTime", "Never")
is_admin = u.get("isAdmin", False) or u.get("isDelegatedAdmin", False)
is_suspended = u.get("suspended", False)
is_2fa = u.get("isEnrolledIn2Sv", False)
is_enforced_2fa = u.get("isEnforcedIn2Sv", False)
creation = u.get("creationTime", "N/A")
org_unit = u.get("orgUnitPath", "/")
# Determine status markers
status_parts = []
if is_suspended:
status_parts.append("[SUSPENDED]")
if is_admin:
status_parts.append("[ADMIN]")
if not is_2fa:
status_parts.append("[NO-2FA]")
elif is_enforced_2fa:
status_parts.append("[2FA-ENFORCED]")
else:
status_parts.append("[2FA-ENROLLED]")
status_str = " ".join(status_parts) if status_parts else "[OK]"
# Flag suspicious conditions
marker = " "
if not is_2fa:
marker = "[WARNING] "
if is_suspended:
marker = "[INFO] "
print(f" {marker} {email}")
print(f" Name: {name} | Status: {status_str}")
print(f" Last Login: {last_login} | Created: {creation}")
print(f" OrgUnit: {org_unit}")
user_record = {
"email": email,
"name": name,
"last_login": last_login,
"is_admin": is_admin,
"is_suspended": is_suspended,
"is_2fa_enrolled": is_2fa,
"is_2fa_enforced": is_enforced_2fa,
"creation_time": creation,
"org_unit": org_unit,
}
results["users"].append(user_record)
# Summary
total = len(users)
admins = sum(1 for u in results["users"] if u["is_admin"])
no_2fa = sum(1 for u in results["users"] if not u["is_2fa_enrolled"])
suspended = sum(1 for u in results["users"] if u["is_suspended"])
print(f"\n --- User Summary ---")
print(f" Total users: {total}")
print(f" Admins: {admins}")
print(f" Without 2FA: {no_2fa}")
print(f" Suspended: {suspended}")
if no_2fa > 0:
print(f" [ALERT] {no_2fa} user(s) do NOT have 2FA enabled!")
results["user_summary"] = {
"total": total,
"admins": admins,
"no_2fa": no_2fa,
"suspended": suspended,
}
return users
def step2_signin_audit(reports_service, domain: str, target_user: Optional[str]) -> None:
"""Check sign-in audit logs for the last 7 days."""
print_separator("STEP 2: SIGN-IN AUDIT LOGS (Last 7 Days)")
start_time = (datetime.now(timezone.utc) - timedelta(days=7)).strftime(
"%Y-%m-%dT%H:%M:%S.000Z"
)
kwargs = {
"userKey": "all",
"applicationName": "login",
"startTime": start_time,
"maxResults": 1000,
}
if target_user:
kwargs["userKey"] = target_user
print(f" Filtering for user: {target_user}")
else:
print(f" Checking all users in domain")
activities = paginate_list(
reports_service.activities(),
"list",
"items",
**kwargs,
)
results["signin_logs"] = []
if not activities:
print(" [INFO] No sign-in events found in the last 7 days")
print(" (This may be normal if Reports API needs time to propagate)")
return
ips_seen: Dict[str, int] = {}
users_seen: Dict[str, int] = {}
failed_logins: List[Dict] = []
suspicious_events: List[Dict] = []
for activity in activities:
actor_email = activity.get("actor", {}).get("email", "N/A")
ip_address = activity.get("ipAddress", "N/A")
event_time = activity.get("id", {}).get("time", "N/A")
events = activity.get("events", [])
ips_seen[ip_address] = ips_seen.get(ip_address, 0) + 1
users_seen[actor_email] = users_seen.get(actor_email, 0) + 1
for event in events:
event_name = event.get("name", "N/A")
event_type = event.get("type", "N/A")
parameters = {
p.get("name", ""): p.get("value", p.get("boolValue", p.get("multiValue", "")))
for p in event.get("parameters", [])
}
is_failed = event_name == "login_failure"
is_suspicious_login = parameters.get("is_suspicious", False)
login_type = parameters.get("login_type", "")
login_challenge = parameters.get("login_challenge_method", "")
record = {
"time": event_time,
"user": actor_email,
"ip": ip_address,
"event": event_name,
"type": event_type,
"login_type": login_type,
"challenge_method": login_challenge,
"is_suspicious": is_suspicious_login,
"parameters": parameters,
}
results["signin_logs"].append(record)
# Determine markers
flags = []
if is_failed:
flags.append("FAILED")
failed_logins.append(record)
if is_suspicious_login:
flags.append("SUSPICIOUS")
suspicious_events.append(record)
flag_str = f" [{' | '.join(flags)}]" if flags else ""
marker = "[ALERT] " if is_suspicious_login else (
"[WARNING] " if is_failed else " "
)
print(
f" {marker} {event_time} | {actor_email} | IP: {ip_address} "
f"| Event: {event_name} | Type: {login_type}{flag_str}"
)
# Summary
print(f"\n --- Sign-in Summary ---")
print(f" Total events: {len(activities)}")
print(f" Unique IPs: {len(ips_seen)}")
print(f" Unique users: {len(users_seen)}")
print(f" Failed logins: {len(failed_logins)}")
print(f" Suspicious events: {len(suspicious_events)}")
if ips_seen:
print(f"\n --- IP Address Breakdown ---")
for ip, count in sorted(ips_seen.items(), key=lambda x: x[1], reverse=True):
print(f" {ip}: {count} event(s)")
if failed_logins:
print(f"\n [ALERT] {len(failed_logins)} failed login(s) detected:")
for fl in failed_logins[:20]:
print(f" {fl['time']} | {fl['user']} | IP: {fl['ip']}")
if suspicious_events:
print(f"\n [ALERT] {len(suspicious_events)} suspicious login event(s) flagged by Google:")
for se in suspicious_events[:20]:
print(f" {se['time']} | {se['user']} | IP: {se['ip']}")
results["signin_summary"] = {
"total_events": len(activities),
"unique_ips": len(ips_seen),
"unique_users": len(users_seen),
"failed_logins": len(failed_logins),
"suspicious_events": len(suspicious_events),
"ips": ips_seen,
"users": users_seen,
}
def step3_email_forwarding(
admin_email: str, domain: str, users: List[Dict], target_user: Optional[str]
) -> None:
"""Check all users (or specific user) for email auto-forwarding rules."""
print_separator("STEP 3: EMAIL FORWARDING RULES")
users_to_check = []
if target_user:
users_to_check = [target_user]
print(f" Checking forwarding for: {target_user}")
else:
users_to_check = [
u.get("primaryEmail", "")
for u in users
if u.get("primaryEmail") and not u.get("suspended", False)
]
print(f" Checking forwarding for {len(users_to_check)} active user(s)")
results["forwarding_rules"] = {}
alerts_found = 0
for user_email in users_to_check:
gmail_service = safe_api_call(
lambda email=user_email: build_service(
"gmail", "v1", email,
scopes=["https://www.googleapis.com/auth/gmail.settings.basic"],
),
f"Gmail service for {user_email}",
)
if gmail_service is None:
print(f" [WARNING] Could not access Gmail settings for {user_email}")
continue
# Check auto-forwarding setting
forwarding = safe_api_call(
lambda svc=gmail_service, em=user_email: svc.users()
.settings()
.getAutoForwarding(userId=em)
.execute(),
f"auto-forwarding for {user_email}",
)
if forwarding:
is_enabled = forwarding.get("enabled", False)
fwd_email = forwarding.get("emailAddress", "")
disposition = forwarding.get("disposition", "")
if is_enabled and fwd_email:
is_external = domain not in fwd_email.lower()
marker = "[ALERT] " if is_external else "[WARNING] "
label = "EXTERNAL" if is_external else "INTERNAL"
print(
f" {marker} {user_email} -> forwards to {fwd_email} "
f"({label}) [Disposition: {disposition}]"
)
alerts_found += 1
results["forwarding_rules"][user_email] = {
"enabled": True,
"forward_to": fwd_email,
"is_external": is_external,
"disposition": disposition,
}
else:
results["forwarding_rules"][user_email] = {"enabled": False}
# Check Gmail filters for forwarding actions
filters = safe_api_call(
lambda svc=gmail_service, em=user_email: svc.users()
.settings()
.filters()
.list(userId=em)
.execute(),
f"Gmail filters for {user_email}",
)
if filters and "filter" in filters:
for filt in filters["filter"]:
action = filt.get("action", {})
forward_to = action.get("forward", "")
if forward_to:
is_external = domain not in forward_to.lower()
criteria = filt.get("criteria", {})
criteria_str = json.dumps(criteria) if criteria else "No criteria"
marker = "[ALERT] " if is_external else "[WARNING] "
label = "EXTERNAL" if is_external else "INTERNAL"
print(
f" {marker} {user_email} -> filter forwards to {forward_to} "
f"({label})"
)
print(f" Criteria: {criteria_str}")
alerts_found += 1
fwd_key = f"{user_email}_filters"
if fwd_key not in results["forwarding_rules"]:
results["forwarding_rules"][fwd_key] = []
results["forwarding_rules"][fwd_key].append({
"forward_to": forward_to,
"is_external": is_external,
"criteria": criteria,
"filter_id": filt.get("id", "N/A"),
})
if alerts_found == 0:
print(" [OK] No email forwarding rules detected")
else:
print(f"\n [ALERT] {alerts_found} forwarding rule(s) found - review above")
def step4_oauth_grants(admin_service, users: List[Dict], target_user: Optional[str]) -> None:
"""List third-party OAuth apps with access to user accounts."""
print_separator("STEP 4: OAUTH APP GRANTS (Third-Party Access)")
users_to_check = []
if target_user:
users_to_check = [target_user]
print(f" Checking OAuth grants for: {target_user}")
else:
users_to_check = [
u.get("primaryEmail", "")
for u in users
if u.get("primaryEmail") and not u.get("suspended", False)
]
print(f" Checking OAuth grants for {len(users_to_check)} active user(s)")
results["oauth_grants"] = {}
suspicious_apps: List[Dict] = []
# Known safe app client IDs (Google's own apps)
known_safe_prefixes = [
"google",
"chrome",
"android",
]
for user_email in users_to_check:
tokens = safe_api_call(
lambda em=user_email: admin_service.tokens()
.list(userKey=em)
.execute(),
f"OAuth tokens for {user_email}",
)
if tokens is None or "items" not in tokens:
continue
user_grants = []
for token in tokens.get("items", []):
app_name = token.get("displayText", "Unknown App")
client_id = token.get("clientId", "N/A")
scopes = token.get("scopes", [])
is_native = token.get("nativeApp", False)
user_key = token.get("userKey", "N/A")
# Check for suspicious scope patterns
dangerous_scopes = [
"mail",
"gmail",
"drive",
"contacts",
"calendar",
"admin",
]
has_dangerous = any(
ds in scope.lower() for scope in scopes for ds in dangerous_scopes
)
is_known_safe = any(
prefix in app_name.lower() for prefix in known_safe_prefixes
)
grant_record = {
"app_name": app_name,
"client_id": client_id,
"scopes": scopes,
"is_native": is_native,
}
user_grants.append(grant_record)
if has_dangerous and not is_known_safe:
marker = "[ALERT] "
suspicious_apps.append({
"user": user_email,
"app": app_name,
"client_id": client_id,
"scopes": scopes,
})
elif not is_known_safe:
marker = "[WARNING] "
else:
marker = " "
scope_str = ", ".join(scopes[:5])
if len(scopes) > 5:
scope_str += f" (+{len(scopes) - 5} more)"
print(f" {marker} {user_email} | App: {app_name}")
print(f" ClientID: {client_id} | Scopes: {scope_str}")
if user_grants:
results["oauth_grants"][user_email] = user_grants
if not results["oauth_grants"]:
print(" [OK] No third-party OAuth grants found")
elif suspicious_apps:
print(f"\n [ALERT] {len(suspicious_apps)} suspicious OAuth app grant(s):")
for sa in suspicious_apps:
print(f" User: {sa['user']} | App: {sa['app']} | Scopes: {', '.join(sa['scopes'][:3])}")
results["suspicious_oauth_apps"] = suspicious_apps
def step5_password_changes(reports_service, target_user: Optional[str]) -> None:
"""Check for password changes in the last 30 days."""
print_separator("STEP 5: PASSWORD CHANGES (Last 30 Days)")
start_time = (datetime.now(timezone.utc) - timedelta(days=30)).strftime(
"%Y-%m-%dT%H:%M:%S.000Z"
)
kwargs = {
"userKey": "all",
"applicationName": "admin",
"startTime": start_time,
"eventName": "CHANGE_PASSWORD",
"maxResults": 500,
}
if target_user:
print(f" Checking password changes involving: {target_user}")
activities = paginate_list(
reports_service.activities(),
"list",
"items",
**kwargs,
)
results["password_changes"] = []
# Also check user-initiated password changes
user_pw_kwargs = {
"userKey": "all",
"applicationName": "login",
"startTime": start_time,
"eventName": "password_edit",
"maxResults": 500,
}
user_pw_activities = paginate_list(
reports_service.activities(),
"list",
"items",
**user_pw_kwargs,
)
all_activities = activities + user_pw_activities
if not all_activities:
print(" [OK] No password changes detected in the last 30 days")
return
for activity in all_activities:
actor_email = activity.get("actor", {}).get("email", "N/A")
ip_address = activity.get("ipAddress", "N/A")
event_time = activity.get("id", {}).get("time", "N/A")
events = activity.get("events", [])
for event in events:
event_name = event.get("name", "N/A")
parameters = {
p.get("name", ""): p.get("value", p.get("multiValue", ""))
for p in event.get("parameters", [])
}
target_email = parameters.get("USER_EMAIL", actor_email)
# Check if admin reset vs self-service
is_admin_reset = event_name == "CHANGE_PASSWORD" and actor_email != target_email
marker = "[ALERT] " if is_admin_reset else "[INFO] "
change_type = "Admin Reset" if is_admin_reset else "Self-Service"
if target_user and target_user.lower() not in (
actor_email.lower(),
target_email.lower(),
):
continue
print(
f" {marker} {event_time} | {change_type} | Actor: {actor_email} "
f"| Target: {target_email} | IP: {ip_address}"
)
results["password_changes"].append({
"time": event_time,
"actor": actor_email,
"target": target_email,
"ip": ip_address,
"type": change_type,
"event_name": event_name,
})
print(f"\n Total password changes: {len(results['password_changes'])}")
def step6_admin_activity(reports_service, target_user: Optional[str]) -> None:
"""Check admin console activity for the last 30 days."""
print_separator("STEP 6: ADMIN ACTIVITY (Last 30 Days)")
start_time = (datetime.now(timezone.utc) - timedelta(days=30)).strftime(
"%Y-%m-%dT%H:%M:%S.000Z"
)
# Critical admin events to watch for
critical_events = [
"CREATE_USER",
"DELETE_USER",
"SUSPEND_USER",
"UNSUSPEND_USER",
"GRANT_ADMIN_PRIVILEGE",
"REVOKE_ADMIN_PRIVILEGE",
"ADD_GROUP_MEMBER",
"REMOVE_GROUP_MEMBER",
"CHANGE_PASSWORD",
"AUTHORIZE_API_CLIENT_ACCESS",
"TOGGLE_SERVICE_ENABLED",
"CHANGE_TWO_STEP_VERIFICATION_ENROLLMENT_PERIOD",
"CHANGE_ALLOWED_TWO_STEP_VERIFICATION_METHODS",
"TOGGLE_TWO_STEP_VERIFICATION",
"CHANGE_APPLICATION_SETTING",
"CREATE_APPLICATION_SETTING",
"DELETE_APPLICATION_SETTING",
"CHANGE_DOMAIN_DEFAULT_LOCALE",
"ADD_DOMAIN_ALIAS",
"ADD_TRUSTED_DOMAINS",
]
kwargs = {
"userKey": "all",
"applicationName": "admin",
"startTime": start_time,
"maxResults": 1000,
}
activities = paginate_list(
reports_service.activities(),
"list",
"items",
**kwargs,
)
results["admin_activity"] = []
if not activities:
print(" [INFO] No admin activity found in the last 30 days")
return
for activity in activities:
actor_email = activity.get("actor", {}).get("email", "N/A")
ip_address = activity.get("ipAddress", "N/A")
event_time = activity.get("id", {}).get("time", "N/A")
events = activity.get("events", [])
for event in events:
event_name = event.get("name", "N/A")
event_type = event.get("type", "N/A")
parameters = {
p.get("name", ""): p.get("value", p.get("multiValue", ""))
for p in event.get("parameters", [])
}
target_email = parameters.get("USER_EMAIL", "")
# Filter for target user if specified
if target_user and target_user.lower() not in (
actor_email.lower(),
target_email.lower() if target_email else "",
):
continue
is_critical = event_name in critical_events
marker = "[ALERT] " if is_critical else "[INFO] "
param_summary = ""
if target_email:
param_summary = f" | Target: {target_email}"
if parameters.get("DOMAIN_NAME"):
param_summary += f" | Domain: {parameters['DOMAIN_NAME']}"
if parameters.get("GROUP_EMAIL"):
param_summary += f" | Group: {parameters['GROUP_EMAIL']}"
if parameters.get("APP_NAME"):
param_summary += f" | App: {parameters['APP_NAME']}"
print(
f" {marker} {event_time} | {event_name} | Actor: {actor_email} "
f"| IP: {ip_address}{param_summary}"
)
results["admin_activity"].append({
"time": event_time,
"event": event_name,
"type": event_type,
"actor": actor_email,
"ip": ip_address,
"target_user": target_email,
"parameters": parameters,
"is_critical": is_critical,
})
critical_count = sum(1 for a in results["admin_activity"] if a.get("is_critical"))
print(f"\n --- Admin Activity Summary ---")
print(f" Total admin events: {len(results['admin_activity'])}")
print(f" Critical events: {critical_count}")
if critical_count > 0:
print(f" [ALERT] {critical_count} critical admin action(s) detected - review above")
def step7_gmail_delegation(
admin_email: str, domain: str, users: List[Dict], target_user: Optional[str]
) -> None:
"""Check for Gmail delegated mailbox access."""
print_separator("STEP 7: GMAIL DELEGATION (Delegated Mailbox Access)")
users_to_check = []
if target_user:
users_to_check = [target_user]
print(f" Checking delegation for: {target_user}")
else:
users_to_check = [
u.get("primaryEmail", "")
for u in users
if u.get("primaryEmail") and not u.get("suspended", False)
]
print(f" Checking delegation for {len(users_to_check)} active user(s)")
results["gmail_delegation"] = {}
delegation_found = 0
for user_email in users_to_check:
gmail_service = safe_api_call(
lambda email=user_email: build_service(
"gmail", "v1", email,
scopes=["https://www.googleapis.com/auth/gmail.settings.basic"],
),
f"Gmail service for {user_email}",
)
if gmail_service is None:
continue
delegates = safe_api_call(
lambda svc=gmail_service, em=user_email: svc.users()
.settings()
.delegates()
.list(userId=em)
.execute(),
f"delegation for {user_email}",
)
if delegates and "delegates" in delegates:
delegate_list = delegates["delegates"]
results["gmail_delegation"][user_email] = delegate_list
delegation_found += len(delegate_list)
for d in delegate_list:
delegate_email = d.get("delegateEmail", "N/A")
status = d.get("verificationStatus", "N/A")
is_external = domain not in delegate_email.lower()
marker = "[ALERT] " if is_external else "[WARNING] "
label = "EXTERNAL" if is_external else "INTERNAL"
print(
f" {marker} {user_email} -> delegated to {delegate_email} "
f"({label}) [Status: {status}]"
)
if delegation_found == 0:
print(" [OK] No Gmail delegation found")
else:
print(f"\n [WARNING] {delegation_found} delegation(s) found - review above")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Google Workspace Compromise Investigation Tool"
)
parser.add_argument(
"--domain",
default=DEFAULT_DOMAIN,
help=f"Target domain (default: {DEFAULT_DOMAIN})",
)
parser.add_argument(
"--admin",
default=DEFAULT_ADMIN,
help=f"Admin email to impersonate (default: {DEFAULT_ADMIN})",
)
parser.add_argument(
"--user",
default=None,
help="Investigate a specific user (optional)",
)
args = parser.parse_args()
domain = args.domain
admin_email = args.admin
target_user = args.user
# Validate key file exists
if not SERVICE_ACCOUNT_KEY.exists():
print(f"[ERROR] Service account key not found: {SERVICE_ACCOUNT_KEY}")
print(" Ensure the key file is in the same directory as this script.")
sys.exit(1)
# Banner
print("=" * 70)
print(" GOOGLE WORKSPACE - COMPROMISE INVESTIGATION")
print(f" Date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f" Domain: {domain}")
print(f" Admin: {admin_email}")
if target_user:
print(f" Target User: {target_user}")
print("=" * 70)
results["investigation_metadata"] = {
"date": datetime.now(timezone.utc).isoformat(),
"domain": domain,
"admin": admin_email,
"target_user": target_user,
}
# Build services
print("\n[*] Building API service clients...")
admin_service = safe_api_call(
lambda: build_service("admin", "directory_v1", admin_email),
"Admin Directory service",
)
if admin_service is None:
print("[ERROR] Cannot build Admin Directory service. Check delegation setup.")
sys.exit(1)
print("[OK] Admin Directory service ready")
reports_service = safe_api_call(
lambda: build_service("admin", "reports_v1", admin_email),
"Admin Reports service",
)
if reports_service is None:
print("[WARNING] Cannot build Reports service - some checks will be skipped")
else:
print("[OK] Admin Reports service ready")
# Run investigation steps
users = step1_list_users(admin_service, domain)
if reports_service:
step2_signin_audit(reports_service, domain, target_user)
else:
print_separator("STEP 2: SIGN-IN AUDIT LOGS")
print(" [WARNING] Skipped - Reports API not available")
step3_email_forwarding(admin_email, domain, users, target_user)
step4_oauth_grants(admin_service, users, target_user)
if reports_service:
step5_password_changes(reports_service, target_user)
step6_admin_activity(reports_service, target_user)
else:
print_separator("STEP 5: PASSWORD CHANGES")
print(" [WARNING] Skipped - Reports API not available")
print_separator("STEP 6: ADMIN ACTIVITY")
print(" [WARNING] Skipped - Reports API not available")
step7_gmail_delegation(admin_email, domain, users, target_user)
# Save results
print_separator("SAVING RESULTS")
with open(RESULTS_FILE, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, default=str)
print(f" Results saved to: {RESULTS_FILE}")
# Final summary
print_separator("INVESTIGATION SUMMARY")
findings: List[str] = []
# User findings
user_summary = results.get("user_summary", {})
if user_summary.get("no_2fa", 0) > 0:
findings.append(
f"[ALERT] {user_summary['no_2fa']} user(s) without 2FA enabled"
)
# Sign-in findings
signin_summary = results.get("signin_summary", {})
if signin_summary.get("failed_logins", 0) > 0:
findings.append(
f"[ALERT] {signin_summary['failed_logins']} failed login attempt(s) in last 7 days"
)
if signin_summary.get("suspicious_events", 0) > 0:
findings.append(
f"[ALERT] {signin_summary['suspicious_events']} suspicious login event(s) "
f"flagged by Google"
)
# Forwarding findings
fwd_rules = results.get("forwarding_rules", {})
external_fwd = sum(
1
for v in fwd_rules.values()
if isinstance(v, dict) and v.get("is_external", False)
)
if external_fwd > 0:
findings.append(
f"[ALERT] {external_fwd} external email forwarding rule(s) detected"
)
# OAuth findings
suspicious_oauth = results.get("suspicious_oauth_apps", [])
if suspicious_oauth:
findings.append(
f"[ALERT] {len(suspicious_oauth)} suspicious OAuth app grant(s) with "
f"sensitive scopes"
)
# Password findings
pw_changes = results.get("password_changes", [])
admin_resets = [p for p in pw_changes if p.get("type") == "Admin Reset"]
if admin_resets:
findings.append(
f"[WARNING] {len(admin_resets)} admin-initiated password reset(s) in last 30 days"
)
# Admin activity findings
admin_acts = results.get("admin_activity", [])
critical_admin = [a for a in admin_acts if a.get("is_critical")]
if critical_admin:
findings.append(
f"[ALERT] {len(critical_admin)} critical admin action(s) in last 30 days"
)
# Delegation findings
delegation = results.get("gmail_delegation", {})
total_delegates = sum(len(v) for v in delegation.values() if isinstance(v, list))
if total_delegates > 0:
findings.append(
f"[WARNING] {total_delegates} Gmail delegation(s) found"
)
print(f"""
Domain: {domain}
Admin: {admin_email}
Target User: {target_user or 'All users'}
Investigation Date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}
Total Users: {user_summary.get('total', 'N/A')}
KEY FINDINGS:
=============
""")
if findings:
for f in findings:
print(f" {f}")
else:
print(" [OK] No critical findings detected. Review detailed output above for context.")
print(f"""
RECOMMENDED ACTIONS:
====================
1. Disable any external email forwarding rules immediately
2. Revoke suspicious OAuth app grants (Admin Console > Security > API Controls)
3. Force password reset for compromised accounts
4. Enable 2FA enforcement for all users without it
5. Review and remove any unauthorized Gmail delegations
6. Check Google Drive sharing for external data exposure
7. Review mobile device access (Admin Console > Devices)
8. Enable enhanced pre-delivery message scanning
9. Review DLP rules and alert configurations
10. Monitor account activity for the next 30 days
Investigation script: {Path(__file__).resolve()}
Raw results: {RESULTS_FILE}
""")
if __name__ == "__main__":
main()