#!/usr/bin/env python3 """ Google Workspace Compromise Investigation Tool Performs a comprehensive security investigation of a Google Workspace tenant using domain-wide delegation via a service account. Checks for signs of account compromise including suspicious sign-ins, email forwarding rules, OAuth app grants, password changes, admin activity, and Gmail delegation. Usage: python gws_investigate.py python gws_investigate.py --domain example.com --admin admin@example.com python gws_investigate.py --user compromised@example.com """ import argparse import json import os import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, List, Optional try: from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.errors import HttpError except ImportError: print("[ERROR] Required packages not installed. Run:") print(" pip install google-auth google-api-python-client") sys.exit(1) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- SCRIPT_DIR = Path(__file__).resolve().parent SERVICE_ACCOUNT_KEY = SCRIPT_DIR / "acg-msp-access-8f72339997e5.json" RESULTS_FILE = SCRIPT_DIR / "gws_investigation_results.json" DEFAULT_DOMAIN = "lonestarelectrical.net" DEFAULT_ADMIN = "sysadmin@lonestarelectrical.net" # Scopes required for investigation (must be authorized in Google Admin console) SCOPES = [ "https://www.googleapis.com/auth/admin.directory.user", "https://www.googleapis.com/auth/admin.directory.user.security", "https://www.googleapis.com/auth/admin.reports.audit.readonly", "https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/gmail.settings.basic", "https://www.googleapis.com/auth/drive.readonly", "https://www.googleapis.com/auth/admin.directory.domain.readonly", ] # --------------------------------------------------------------------------- # Globals # --------------------------------------------------------------------------- results: Dict[str, Any] = {} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def print_separator(title: str) -> None: print(f"\n{'=' * 70}") print(f" {title}") print(f"{'=' * 70}") def build_service( api_name: str, api_version: str, impersonate_email: str, scopes: Optional[List[str]] = None, ) -> Any: """Build a Google API service client with domain-wide delegation.""" if scopes is None: scopes = SCOPES credentials = service_account.Credentials.from_service_account_file( str(SERVICE_ACCOUNT_KEY), scopes=scopes, ) delegated = credentials.with_subject(impersonate_email) service = build(api_name, api_version, credentials=delegated) return service def paginate(request, key: str = "users") -> List[Dict]: """Generic paginator for Google API list requests.""" all_items: List[Dict] = [] while request is not None: try: response = request.execute() except HttpError as exc: print(f" [WARNING] API error during pagination: {exc.status_code} - {exc.reason}") break items = response.get(key, []) all_items.extend(items) # Handle nextPageToken-based pagination token = response.get("nextPageToken") if token and hasattr(request, "uri"): # Rebuild request with pageToken -- handled by list_next break else: break return all_items def paginate_list(service_resource, method_name: str, key: str, **kwargs) -> List[Dict]: """Paginate through a list method using list / list_next pattern.""" all_items: List[Dict] = [] request = getattr(service_resource, method_name)(**kwargs) while request is not None: try: response = request.execute() except HttpError as exc: print(f" [WARNING] API error: {exc.status_code} - {exc.reason}") break items = response.get(key, []) all_items.extend(items) try: request = getattr(service_resource, method_name + "_next")(request, response) except AttributeError: # No _next method means no pagination support for this endpoint break return all_items def safe_api_call(func, label: str, default=None): """Wrap an API call with error handling.""" try: return func() except HttpError as exc: status = exc.status_code reason = exc.reason if status == 403: print(f" [WARNING] {label}: Access denied (403) - scope may not be authorized") elif status == 404: print(f" [WARNING] {label}: Not found (404)") elif status == 400: print(f" [WARNING] {label}: Bad request (400) - {reason}") else: print(f" [WARNING] {label}: HTTP {status} - {reason}") return default except Exception as exc: print(f" [WARNING] {label}: {type(exc).__name__} - {exc}") return default # --------------------------------------------------------------------------- # Investigation Steps # --------------------------------------------------------------------------- def step1_list_users(admin_service, domain: str) -> List[Dict]: """List all users with security-relevant details.""" print_separator("STEP 1: ALL DOMAIN USERS") users = paginate_list( admin_service.users(), "list", "users", domain=domain, projection="full", maxResults=500, orderBy="email", ) results["users"] = [] if not users: print(" [WARNING] No users returned. Check domain-wide delegation setup.") return [] print(f" Found {len(users)} user(s):\n") for u in users: email = u.get("primaryEmail", "N/A") name = u.get("name", {}).get("fullName", "N/A") last_login = u.get("lastLoginTime", "Never") is_admin = u.get("isAdmin", False) or u.get("isDelegatedAdmin", False) is_suspended = u.get("suspended", False) is_2fa = u.get("isEnrolledIn2Sv", False) is_enforced_2fa = u.get("isEnforcedIn2Sv", False) creation = u.get("creationTime", "N/A") org_unit = u.get("orgUnitPath", "/") # Determine status markers status_parts = [] if is_suspended: status_parts.append("[SUSPENDED]") if is_admin: status_parts.append("[ADMIN]") if not is_2fa: status_parts.append("[NO-2FA]") elif is_enforced_2fa: status_parts.append("[2FA-ENFORCED]") else: status_parts.append("[2FA-ENROLLED]") status_str = " ".join(status_parts) if status_parts else "[OK]" # Flag suspicious conditions marker = " " if not is_2fa: marker = "[WARNING] " if is_suspended: marker = "[INFO] " print(f" {marker} {email}") print(f" Name: {name} | Status: {status_str}") print(f" Last Login: {last_login} | Created: {creation}") print(f" OrgUnit: {org_unit}") user_record = { "email": email, "name": name, "last_login": last_login, "is_admin": is_admin, "is_suspended": is_suspended, "is_2fa_enrolled": is_2fa, "is_2fa_enforced": is_enforced_2fa, "creation_time": creation, "org_unit": org_unit, } results["users"].append(user_record) # Summary total = len(users) admins = sum(1 for u in results["users"] if u["is_admin"]) no_2fa = sum(1 for u in results["users"] if not u["is_2fa_enrolled"]) suspended = sum(1 for u in results["users"] if u["is_suspended"]) print(f"\n --- User Summary ---") print(f" Total users: {total}") print(f" Admins: {admins}") print(f" Without 2FA: {no_2fa}") print(f" Suspended: {suspended}") if no_2fa > 0: print(f" [ALERT] {no_2fa} user(s) do NOT have 2FA enabled!") results["user_summary"] = { "total": total, "admins": admins, "no_2fa": no_2fa, "suspended": suspended, } return users def step2_signin_audit(reports_service, domain: str, target_user: Optional[str]) -> None: """Check sign-in audit logs for the last 7 days.""" print_separator("STEP 2: SIGN-IN AUDIT LOGS (Last 7 Days)") start_time = (datetime.now(timezone.utc) - timedelta(days=7)).strftime( "%Y-%m-%dT%H:%M:%S.000Z" ) kwargs = { "userKey": "all", "applicationName": "login", "startTime": start_time, "maxResults": 1000, } if target_user: kwargs["userKey"] = target_user print(f" Filtering for user: {target_user}") else: print(f" Checking all users in domain") activities = paginate_list( reports_service.activities(), "list", "items", **kwargs, ) results["signin_logs"] = [] if not activities: print(" [INFO] No sign-in events found in the last 7 days") print(" (This may be normal if Reports API needs time to propagate)") return ips_seen: Dict[str, int] = {} users_seen: Dict[str, int] = {} failed_logins: List[Dict] = [] suspicious_events: List[Dict] = [] for activity in activities: actor_email = activity.get("actor", {}).get("email", "N/A") ip_address = activity.get("ipAddress", "N/A") event_time = activity.get("id", {}).get("time", "N/A") events = activity.get("events", []) ips_seen[ip_address] = ips_seen.get(ip_address, 0) + 1 users_seen[actor_email] = users_seen.get(actor_email, 0) + 1 for event in events: event_name = event.get("name", "N/A") event_type = event.get("type", "N/A") parameters = { p.get("name", ""): p.get("value", p.get("boolValue", p.get("multiValue", ""))) for p in event.get("parameters", []) } is_failed = event_name == "login_failure" is_suspicious_login = parameters.get("is_suspicious", False) login_type = parameters.get("login_type", "") login_challenge = parameters.get("login_challenge_method", "") record = { "time": event_time, "user": actor_email, "ip": ip_address, "event": event_name, "type": event_type, "login_type": login_type, "challenge_method": login_challenge, "is_suspicious": is_suspicious_login, "parameters": parameters, } results["signin_logs"].append(record) # Determine markers flags = [] if is_failed: flags.append("FAILED") failed_logins.append(record) if is_suspicious_login: flags.append("SUSPICIOUS") suspicious_events.append(record) flag_str = f" [{' | '.join(flags)}]" if flags else "" marker = "[ALERT] " if is_suspicious_login else ( "[WARNING] " if is_failed else " " ) print( f" {marker} {event_time} | {actor_email} | IP: {ip_address} " f"| Event: {event_name} | Type: {login_type}{flag_str}" ) # Summary print(f"\n --- Sign-in Summary ---") print(f" Total events: {len(activities)}") print(f" Unique IPs: {len(ips_seen)}") print(f" Unique users: {len(users_seen)}") print(f" Failed logins: {len(failed_logins)}") print(f" Suspicious events: {len(suspicious_events)}") if ips_seen: print(f"\n --- IP Address Breakdown ---") for ip, count in sorted(ips_seen.items(), key=lambda x: x[1], reverse=True): print(f" {ip}: {count} event(s)") if failed_logins: print(f"\n [ALERT] {len(failed_logins)} failed login(s) detected:") for fl in failed_logins[:20]: print(f" {fl['time']} | {fl['user']} | IP: {fl['ip']}") if suspicious_events: print(f"\n [ALERT] {len(suspicious_events)} suspicious login event(s) flagged by Google:") for se in suspicious_events[:20]: print(f" {se['time']} | {se['user']} | IP: {se['ip']}") results["signin_summary"] = { "total_events": len(activities), "unique_ips": len(ips_seen), "unique_users": len(users_seen), "failed_logins": len(failed_logins), "suspicious_events": len(suspicious_events), "ips": ips_seen, "users": users_seen, } def step3_email_forwarding( admin_email: str, domain: str, users: List[Dict], target_user: Optional[str] ) -> None: """Check all users (or specific user) for email auto-forwarding rules.""" print_separator("STEP 3: EMAIL FORWARDING RULES") users_to_check = [] if target_user: users_to_check = [target_user] print(f" Checking forwarding for: {target_user}") else: users_to_check = [ u.get("primaryEmail", "") for u in users if u.get("primaryEmail") and not u.get("suspended", False) ] print(f" Checking forwarding for {len(users_to_check)} active user(s)") results["forwarding_rules"] = {} alerts_found = 0 for user_email in users_to_check: gmail_service = safe_api_call( lambda email=user_email: build_service( "gmail", "v1", email, scopes=["https://www.googleapis.com/auth/gmail.settings.basic"], ), f"Gmail service for {user_email}", ) if gmail_service is None: print(f" [WARNING] Could not access Gmail settings for {user_email}") continue # Check auto-forwarding setting forwarding = safe_api_call( lambda svc=gmail_service, em=user_email: svc.users() .settings() .getAutoForwarding(userId=em) .execute(), f"auto-forwarding for {user_email}", ) if forwarding: is_enabled = forwarding.get("enabled", False) fwd_email = forwarding.get("emailAddress", "") disposition = forwarding.get("disposition", "") if is_enabled and fwd_email: is_external = domain not in fwd_email.lower() marker = "[ALERT] " if is_external else "[WARNING] " label = "EXTERNAL" if is_external else "INTERNAL" print( f" {marker} {user_email} -> forwards to {fwd_email} " f"({label}) [Disposition: {disposition}]" ) alerts_found += 1 results["forwarding_rules"][user_email] = { "enabled": True, "forward_to": fwd_email, "is_external": is_external, "disposition": disposition, } else: results["forwarding_rules"][user_email] = {"enabled": False} # Check Gmail filters for forwarding actions filters = safe_api_call( lambda svc=gmail_service, em=user_email: svc.users() .settings() .filters() .list(userId=em) .execute(), f"Gmail filters for {user_email}", ) if filters and "filter" in filters: for filt in filters["filter"]: action = filt.get("action", {}) forward_to = action.get("forward", "") if forward_to: is_external = domain not in forward_to.lower() criteria = filt.get("criteria", {}) criteria_str = json.dumps(criteria) if criteria else "No criteria" marker = "[ALERT] " if is_external else "[WARNING] " label = "EXTERNAL" if is_external else "INTERNAL" print( f" {marker} {user_email} -> filter forwards to {forward_to} " f"({label})" ) print(f" Criteria: {criteria_str}") alerts_found += 1 fwd_key = f"{user_email}_filters" if fwd_key not in results["forwarding_rules"]: results["forwarding_rules"][fwd_key] = [] results["forwarding_rules"][fwd_key].append({ "forward_to": forward_to, "is_external": is_external, "criteria": criteria, "filter_id": filt.get("id", "N/A"), }) if alerts_found == 0: print(" [OK] No email forwarding rules detected") else: print(f"\n [ALERT] {alerts_found} forwarding rule(s) found - review above") def step4_oauth_grants(admin_service, users: List[Dict], target_user: Optional[str]) -> None: """List third-party OAuth apps with access to user accounts.""" print_separator("STEP 4: OAUTH APP GRANTS (Third-Party Access)") users_to_check = [] if target_user: users_to_check = [target_user] print(f" Checking OAuth grants for: {target_user}") else: users_to_check = [ u.get("primaryEmail", "") for u in users if u.get("primaryEmail") and not u.get("suspended", False) ] print(f" Checking OAuth grants for {len(users_to_check)} active user(s)") results["oauth_grants"] = {} suspicious_apps: List[Dict] = [] # Known safe app client IDs (Google's own apps) known_safe_prefixes = [ "google", "chrome", "android", ] for user_email in users_to_check: tokens = safe_api_call( lambda em=user_email: admin_service.tokens() .list(userKey=em) .execute(), f"OAuth tokens for {user_email}", ) if tokens is None or "items" not in tokens: continue user_grants = [] for token in tokens.get("items", []): app_name = token.get("displayText", "Unknown App") client_id = token.get("clientId", "N/A") scopes = token.get("scopes", []) is_native = token.get("nativeApp", False) user_key = token.get("userKey", "N/A") # Check for suspicious scope patterns dangerous_scopes = [ "mail", "gmail", "drive", "contacts", "calendar", "admin", ] has_dangerous = any( ds in scope.lower() for scope in scopes for ds in dangerous_scopes ) is_known_safe = any( prefix in app_name.lower() for prefix in known_safe_prefixes ) grant_record = { "app_name": app_name, "client_id": client_id, "scopes": scopes, "is_native": is_native, } user_grants.append(grant_record) if has_dangerous and not is_known_safe: marker = "[ALERT] " suspicious_apps.append({ "user": user_email, "app": app_name, "client_id": client_id, "scopes": scopes, }) elif not is_known_safe: marker = "[WARNING] " else: marker = " " scope_str = ", ".join(scopes[:5]) if len(scopes) > 5: scope_str += f" (+{len(scopes) - 5} more)" print(f" {marker} {user_email} | App: {app_name}") print(f" ClientID: {client_id} | Scopes: {scope_str}") if user_grants: results["oauth_grants"][user_email] = user_grants if not results["oauth_grants"]: print(" [OK] No third-party OAuth grants found") elif suspicious_apps: print(f"\n [ALERT] {len(suspicious_apps)} suspicious OAuth app grant(s):") for sa in suspicious_apps: print(f" User: {sa['user']} | App: {sa['app']} | Scopes: {', '.join(sa['scopes'][:3])}") results["suspicious_oauth_apps"] = suspicious_apps def step5_password_changes(reports_service, target_user: Optional[str]) -> None: """Check for password changes in the last 30 days.""" print_separator("STEP 5: PASSWORD CHANGES (Last 30 Days)") start_time = (datetime.now(timezone.utc) - timedelta(days=30)).strftime( "%Y-%m-%dT%H:%M:%S.000Z" ) kwargs = { "userKey": "all", "applicationName": "admin", "startTime": start_time, "eventName": "CHANGE_PASSWORD", "maxResults": 500, } if target_user: print(f" Checking password changes involving: {target_user}") activities = paginate_list( reports_service.activities(), "list", "items", **kwargs, ) results["password_changes"] = [] # Also check user-initiated password changes user_pw_kwargs = { "userKey": "all", "applicationName": "login", "startTime": start_time, "eventName": "password_edit", "maxResults": 500, } user_pw_activities = paginate_list( reports_service.activities(), "list", "items", **user_pw_kwargs, ) all_activities = activities + user_pw_activities if not all_activities: print(" [OK] No password changes detected in the last 30 days") return for activity in all_activities: actor_email = activity.get("actor", {}).get("email", "N/A") ip_address = activity.get("ipAddress", "N/A") event_time = activity.get("id", {}).get("time", "N/A") events = activity.get("events", []) for event in events: event_name = event.get("name", "N/A") parameters = { p.get("name", ""): p.get("value", p.get("multiValue", "")) for p in event.get("parameters", []) } target_email = parameters.get("USER_EMAIL", actor_email) # Check if admin reset vs self-service is_admin_reset = event_name == "CHANGE_PASSWORD" and actor_email != target_email marker = "[ALERT] " if is_admin_reset else "[INFO] " change_type = "Admin Reset" if is_admin_reset else "Self-Service" if target_user and target_user.lower() not in ( actor_email.lower(), target_email.lower(), ): continue print( f" {marker} {event_time} | {change_type} | Actor: {actor_email} " f"| Target: {target_email} | IP: {ip_address}" ) results["password_changes"].append({ "time": event_time, "actor": actor_email, "target": target_email, "ip": ip_address, "type": change_type, "event_name": event_name, }) print(f"\n Total password changes: {len(results['password_changes'])}") def step6_admin_activity(reports_service, target_user: Optional[str]) -> None: """Check admin console activity for the last 30 days.""" print_separator("STEP 6: ADMIN ACTIVITY (Last 30 Days)") start_time = (datetime.now(timezone.utc) - timedelta(days=30)).strftime( "%Y-%m-%dT%H:%M:%S.000Z" ) # Critical admin events to watch for critical_events = [ "CREATE_USER", "DELETE_USER", "SUSPEND_USER", "UNSUSPEND_USER", "GRANT_ADMIN_PRIVILEGE", "REVOKE_ADMIN_PRIVILEGE", "ADD_GROUP_MEMBER", "REMOVE_GROUP_MEMBER", "CHANGE_PASSWORD", "AUTHORIZE_API_CLIENT_ACCESS", "TOGGLE_SERVICE_ENABLED", "CHANGE_TWO_STEP_VERIFICATION_ENROLLMENT_PERIOD", "CHANGE_ALLOWED_TWO_STEP_VERIFICATION_METHODS", "TOGGLE_TWO_STEP_VERIFICATION", "CHANGE_APPLICATION_SETTING", "CREATE_APPLICATION_SETTING", "DELETE_APPLICATION_SETTING", "CHANGE_DOMAIN_DEFAULT_LOCALE", "ADD_DOMAIN_ALIAS", "ADD_TRUSTED_DOMAINS", ] kwargs = { "userKey": "all", "applicationName": "admin", "startTime": start_time, "maxResults": 1000, } activities = paginate_list( reports_service.activities(), "list", "items", **kwargs, ) results["admin_activity"] = [] if not activities: print(" [INFO] No admin activity found in the last 30 days") return for activity in activities: actor_email = activity.get("actor", {}).get("email", "N/A") ip_address = activity.get("ipAddress", "N/A") event_time = activity.get("id", {}).get("time", "N/A") events = activity.get("events", []) for event in events: event_name = event.get("name", "N/A") event_type = event.get("type", "N/A") parameters = { p.get("name", ""): p.get("value", p.get("multiValue", "")) for p in event.get("parameters", []) } target_email = parameters.get("USER_EMAIL", "") # Filter for target user if specified if target_user and target_user.lower() not in ( actor_email.lower(), target_email.lower() if target_email else "", ): continue is_critical = event_name in critical_events marker = "[ALERT] " if is_critical else "[INFO] " param_summary = "" if target_email: param_summary = f" | Target: {target_email}" if parameters.get("DOMAIN_NAME"): param_summary += f" | Domain: {parameters['DOMAIN_NAME']}" if parameters.get("GROUP_EMAIL"): param_summary += f" | Group: {parameters['GROUP_EMAIL']}" if parameters.get("APP_NAME"): param_summary += f" | App: {parameters['APP_NAME']}" print( f" {marker} {event_time} | {event_name} | Actor: {actor_email} " f"| IP: {ip_address}{param_summary}" ) results["admin_activity"].append({ "time": event_time, "event": event_name, "type": event_type, "actor": actor_email, "ip": ip_address, "target_user": target_email, "parameters": parameters, "is_critical": is_critical, }) critical_count = sum(1 for a in results["admin_activity"] if a.get("is_critical")) print(f"\n --- Admin Activity Summary ---") print(f" Total admin events: {len(results['admin_activity'])}") print(f" Critical events: {critical_count}") if critical_count > 0: print(f" [ALERT] {critical_count} critical admin action(s) detected - review above") def step7_gmail_delegation( admin_email: str, domain: str, users: List[Dict], target_user: Optional[str] ) -> None: """Check for Gmail delegated mailbox access.""" print_separator("STEP 7: GMAIL DELEGATION (Delegated Mailbox Access)") users_to_check = [] if target_user: users_to_check = [target_user] print(f" Checking delegation for: {target_user}") else: users_to_check = [ u.get("primaryEmail", "") for u in users if u.get("primaryEmail") and not u.get("suspended", False) ] print(f" Checking delegation for {len(users_to_check)} active user(s)") results["gmail_delegation"] = {} delegation_found = 0 for user_email in users_to_check: gmail_service = safe_api_call( lambda email=user_email: build_service( "gmail", "v1", email, scopes=["https://www.googleapis.com/auth/gmail.settings.basic"], ), f"Gmail service for {user_email}", ) if gmail_service is None: continue delegates = safe_api_call( lambda svc=gmail_service, em=user_email: svc.users() .settings() .delegates() .list(userId=em) .execute(), f"delegation for {user_email}", ) if delegates and "delegates" in delegates: delegate_list = delegates["delegates"] results["gmail_delegation"][user_email] = delegate_list delegation_found += len(delegate_list) for d in delegate_list: delegate_email = d.get("delegateEmail", "N/A") status = d.get("verificationStatus", "N/A") is_external = domain not in delegate_email.lower() marker = "[ALERT] " if is_external else "[WARNING] " label = "EXTERNAL" if is_external else "INTERNAL" print( f" {marker} {user_email} -> delegated to {delegate_email} " f"({label}) [Status: {status}]" ) if delegation_found == 0: print(" [OK] No Gmail delegation found") else: print(f"\n [WARNING] {delegation_found} delegation(s) found - review above") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="Google Workspace Compromise Investigation Tool" ) parser.add_argument( "--domain", default=DEFAULT_DOMAIN, help=f"Target domain (default: {DEFAULT_DOMAIN})", ) parser.add_argument( "--admin", default=DEFAULT_ADMIN, help=f"Admin email to impersonate (default: {DEFAULT_ADMIN})", ) parser.add_argument( "--user", default=None, help="Investigate a specific user (optional)", ) args = parser.parse_args() domain = args.domain admin_email = args.admin target_user = args.user # Validate key file exists if not SERVICE_ACCOUNT_KEY.exists(): print(f"[ERROR] Service account key not found: {SERVICE_ACCOUNT_KEY}") print(" Ensure the key file is in the same directory as this script.") sys.exit(1) # Banner print("=" * 70) print(" GOOGLE WORKSPACE - COMPROMISE INVESTIGATION") print(f" Date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}") print(f" Domain: {domain}") print(f" Admin: {admin_email}") if target_user: print(f" Target User: {target_user}") print("=" * 70) results["investigation_metadata"] = { "date": datetime.now(timezone.utc).isoformat(), "domain": domain, "admin": admin_email, "target_user": target_user, } # Build services print("\n[*] Building API service clients...") admin_service = safe_api_call( lambda: build_service("admin", "directory_v1", admin_email), "Admin Directory service", ) if admin_service is None: print("[ERROR] Cannot build Admin Directory service. Check delegation setup.") sys.exit(1) print("[OK] Admin Directory service ready") reports_service = safe_api_call( lambda: build_service("admin", "reports_v1", admin_email), "Admin Reports service", ) if reports_service is None: print("[WARNING] Cannot build Reports service - some checks will be skipped") else: print("[OK] Admin Reports service ready") # Run investigation steps users = step1_list_users(admin_service, domain) if reports_service: step2_signin_audit(reports_service, domain, target_user) else: print_separator("STEP 2: SIGN-IN AUDIT LOGS") print(" [WARNING] Skipped - Reports API not available") step3_email_forwarding(admin_email, domain, users, target_user) step4_oauth_grants(admin_service, users, target_user) if reports_service: step5_password_changes(reports_service, target_user) step6_admin_activity(reports_service, target_user) else: print_separator("STEP 5: PASSWORD CHANGES") print(" [WARNING] Skipped - Reports API not available") print_separator("STEP 6: ADMIN ACTIVITY") print(" [WARNING] Skipped - Reports API not available") step7_gmail_delegation(admin_email, domain, users, target_user) # Save results print_separator("SAVING RESULTS") with open(RESULTS_FILE, "w", encoding="utf-8") as f: json.dump(results, f, indent=2, default=str) print(f" Results saved to: {RESULTS_FILE}") # Final summary print_separator("INVESTIGATION SUMMARY") findings: List[str] = [] # User findings user_summary = results.get("user_summary", {}) if user_summary.get("no_2fa", 0) > 0: findings.append( f"[ALERT] {user_summary['no_2fa']} user(s) without 2FA enabled" ) # Sign-in findings signin_summary = results.get("signin_summary", {}) if signin_summary.get("failed_logins", 0) > 0: findings.append( f"[ALERT] {signin_summary['failed_logins']} failed login attempt(s) in last 7 days" ) if signin_summary.get("suspicious_events", 0) > 0: findings.append( f"[ALERT] {signin_summary['suspicious_events']} suspicious login event(s) " f"flagged by Google" ) # Forwarding findings fwd_rules = results.get("forwarding_rules", {}) external_fwd = sum( 1 for v in fwd_rules.values() if isinstance(v, dict) and v.get("is_external", False) ) if external_fwd > 0: findings.append( f"[ALERT] {external_fwd} external email forwarding rule(s) detected" ) # OAuth findings suspicious_oauth = results.get("suspicious_oauth_apps", []) if suspicious_oauth: findings.append( f"[ALERT] {len(suspicious_oauth)} suspicious OAuth app grant(s) with " f"sensitive scopes" ) # Password findings pw_changes = results.get("password_changes", []) admin_resets = [p for p in pw_changes if p.get("type") == "Admin Reset"] if admin_resets: findings.append( f"[WARNING] {len(admin_resets)} admin-initiated password reset(s) in last 30 days" ) # Admin activity findings admin_acts = results.get("admin_activity", []) critical_admin = [a for a in admin_acts if a.get("is_critical")] if critical_admin: findings.append( f"[ALERT] {len(critical_admin)} critical admin action(s) in last 30 days" ) # Delegation findings delegation = results.get("gmail_delegation", {}) total_delegates = sum(len(v) for v in delegation.values() if isinstance(v, list)) if total_delegates > 0: findings.append( f"[WARNING] {total_delegates} Gmail delegation(s) found" ) print(f""" Domain: {domain} Admin: {admin_email} Target User: {target_user or 'All users'} Investigation Date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')} Total Users: {user_summary.get('total', 'N/A')} KEY FINDINGS: ============= """) if findings: for f in findings: print(f" {f}") else: print(" [OK] No critical findings detected. Review detailed output above for context.") print(f""" RECOMMENDED ACTIONS: ==================== 1. Disable any external email forwarding rules immediately 2. Revoke suspicious OAuth app grants (Admin Console > Security > API Controls) 3. Force password reset for compromised accounts 4. Enable 2FA enforcement for all users without it 5. Review and remove any unauthorized Gmail delegations 6. Check Google Drive sharing for external data exposure 7. Review mobile device access (Admin Console > Devices) 8. Enable enhanced pre-delivery message scanning 9. Review DLP rules and alert configurations 10. Monitor account activity for the next 30 days Investigation script: {Path(__file__).resolve()} Raw results: {RESULTS_FILE} """) if __name__ == "__main__": main()