Synced files: - Quote wizard frontend (all components, hooks, types, config) - API updates (config, models, routers, schemas, services) - Client work (bg-builders, gurushow) - Scripts (BGB Lesley termination, CIPP, Datto, migration) - Temp files (Bardach contacts, VWP investigation, misc) - Credentials and session logs - Email service, PHP API, session logs Machine: ACG-M-L5090 Timestamp: 2026-03-10 19:11:00 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1069 lines
36 KiB
Python
1069 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Google Workspace Compromise Investigation Tool
|
|
|
|
Performs a comprehensive security investigation of a Google Workspace tenant
|
|
using domain-wide delegation via a service account. Checks for signs of
|
|
account compromise including suspicious sign-ins, email forwarding rules,
|
|
OAuth app grants, password changes, admin activity, and Gmail delegation.
|
|
|
|
Usage:
|
|
python gws_investigate.py
|
|
python gws_investigate.py --domain example.com --admin admin@example.com
|
|
python gws_investigate.py --user compromised@example.com
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
try:
|
|
from google.oauth2 import service_account
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import HttpError
|
|
except ImportError:
|
|
print("[ERROR] Required packages not installed. Run:")
|
|
print(" pip install google-auth google-api-python-client")
|
|
sys.exit(1)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
SERVICE_ACCOUNT_KEY = SCRIPT_DIR / "acg-msp-access-8f72339997e5.json"
|
|
RESULTS_FILE = SCRIPT_DIR / "gws_investigation_results.json"
|
|
|
|
DEFAULT_DOMAIN = "lonestarelectrical.net"
|
|
DEFAULT_ADMIN = "sysadmin@lonestarelectrical.net"
|
|
|
|
# Scopes required for investigation (must be authorized in Google Admin console)
|
|
SCOPES = [
|
|
"https://www.googleapis.com/auth/admin.directory.user",
|
|
"https://www.googleapis.com/auth/admin.directory.user.security",
|
|
"https://www.googleapis.com/auth/admin.reports.audit.readonly",
|
|
"https://www.googleapis.com/auth/gmail.readonly",
|
|
"https://www.googleapis.com/auth/gmail.settings.basic",
|
|
"https://www.googleapis.com/auth/drive.readonly",
|
|
"https://www.googleapis.com/auth/admin.directory.domain.readonly",
|
|
]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Globals
|
|
# ---------------------------------------------------------------------------
|
|
results: Dict[str, Any] = {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
def print_separator(title: str) -> None:
|
|
print(f"\n{'=' * 70}")
|
|
print(f" {title}")
|
|
print(f"{'=' * 70}")
|
|
|
|
|
|
def build_service(
|
|
api_name: str,
|
|
api_version: str,
|
|
impersonate_email: str,
|
|
scopes: Optional[List[str]] = None,
|
|
) -> Any:
|
|
"""Build a Google API service client with domain-wide delegation."""
|
|
if scopes is None:
|
|
scopes = SCOPES
|
|
|
|
credentials = service_account.Credentials.from_service_account_file(
|
|
str(SERVICE_ACCOUNT_KEY),
|
|
scopes=scopes,
|
|
)
|
|
delegated = credentials.with_subject(impersonate_email)
|
|
service = build(api_name, api_version, credentials=delegated)
|
|
return service
|
|
|
|
|
|
def paginate(request, key: str = "users") -> List[Dict]:
|
|
"""Generic paginator for Google API list requests."""
|
|
all_items: List[Dict] = []
|
|
while request is not None:
|
|
try:
|
|
response = request.execute()
|
|
except HttpError as exc:
|
|
print(f" [WARNING] API error during pagination: {exc.status_code} - {exc.reason}")
|
|
break
|
|
items = response.get(key, [])
|
|
all_items.extend(items)
|
|
# Handle nextPageToken-based pagination
|
|
token = response.get("nextPageToken")
|
|
if token and hasattr(request, "uri"):
|
|
# Rebuild request with pageToken -- handled by list_next
|
|
break
|
|
else:
|
|
break
|
|
return all_items
|
|
|
|
|
|
def paginate_list(service_resource, method_name: str, key: str, **kwargs) -> List[Dict]:
|
|
"""Paginate through a list method using list / list_next pattern."""
|
|
all_items: List[Dict] = []
|
|
request = getattr(service_resource, method_name)(**kwargs)
|
|
while request is not None:
|
|
try:
|
|
response = request.execute()
|
|
except HttpError as exc:
|
|
print(f" [WARNING] API error: {exc.status_code} - {exc.reason}")
|
|
break
|
|
items = response.get(key, [])
|
|
all_items.extend(items)
|
|
try:
|
|
request = getattr(service_resource, method_name + "_next")(request, response)
|
|
except AttributeError:
|
|
# No _next method means no pagination support for this endpoint
|
|
break
|
|
return all_items
|
|
|
|
|
|
def safe_api_call(func, label: str, default=None):
|
|
"""Wrap an API call with error handling."""
|
|
try:
|
|
return func()
|
|
except HttpError as exc:
|
|
status = exc.status_code
|
|
reason = exc.reason
|
|
if status == 403:
|
|
print(f" [WARNING] {label}: Access denied (403) - scope may not be authorized")
|
|
elif status == 404:
|
|
print(f" [WARNING] {label}: Not found (404)")
|
|
elif status == 400:
|
|
print(f" [WARNING] {label}: Bad request (400) - {reason}")
|
|
else:
|
|
print(f" [WARNING] {label}: HTTP {status} - {reason}")
|
|
return default
|
|
except Exception as exc:
|
|
print(f" [WARNING] {label}: {type(exc).__name__} - {exc}")
|
|
return default
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Investigation Steps
|
|
# ---------------------------------------------------------------------------
|
|
def step1_list_users(admin_service, domain: str) -> List[Dict]:
|
|
"""List all users with security-relevant details."""
|
|
print_separator("STEP 1: ALL DOMAIN USERS")
|
|
|
|
users = paginate_list(
|
|
admin_service.users(),
|
|
"list",
|
|
"users",
|
|
domain=domain,
|
|
projection="full",
|
|
maxResults=500,
|
|
orderBy="email",
|
|
)
|
|
|
|
results["users"] = []
|
|
|
|
if not users:
|
|
print(" [WARNING] No users returned. Check domain-wide delegation setup.")
|
|
return []
|
|
|
|
print(f" Found {len(users)} user(s):\n")
|
|
|
|
for u in users:
|
|
email = u.get("primaryEmail", "N/A")
|
|
name = u.get("name", {}).get("fullName", "N/A")
|
|
last_login = u.get("lastLoginTime", "Never")
|
|
is_admin = u.get("isAdmin", False) or u.get("isDelegatedAdmin", False)
|
|
is_suspended = u.get("suspended", False)
|
|
is_2fa = u.get("isEnrolledIn2Sv", False)
|
|
is_enforced_2fa = u.get("isEnforcedIn2Sv", False)
|
|
creation = u.get("creationTime", "N/A")
|
|
org_unit = u.get("orgUnitPath", "/")
|
|
|
|
# Determine status markers
|
|
status_parts = []
|
|
if is_suspended:
|
|
status_parts.append("[SUSPENDED]")
|
|
if is_admin:
|
|
status_parts.append("[ADMIN]")
|
|
if not is_2fa:
|
|
status_parts.append("[NO-2FA]")
|
|
elif is_enforced_2fa:
|
|
status_parts.append("[2FA-ENFORCED]")
|
|
else:
|
|
status_parts.append("[2FA-ENROLLED]")
|
|
|
|
status_str = " ".join(status_parts) if status_parts else "[OK]"
|
|
|
|
# Flag suspicious conditions
|
|
marker = " "
|
|
if not is_2fa:
|
|
marker = "[WARNING] "
|
|
if is_suspended:
|
|
marker = "[INFO] "
|
|
|
|
print(f" {marker} {email}")
|
|
print(f" Name: {name} | Status: {status_str}")
|
|
print(f" Last Login: {last_login} | Created: {creation}")
|
|
print(f" OrgUnit: {org_unit}")
|
|
|
|
user_record = {
|
|
"email": email,
|
|
"name": name,
|
|
"last_login": last_login,
|
|
"is_admin": is_admin,
|
|
"is_suspended": is_suspended,
|
|
"is_2fa_enrolled": is_2fa,
|
|
"is_2fa_enforced": is_enforced_2fa,
|
|
"creation_time": creation,
|
|
"org_unit": org_unit,
|
|
}
|
|
results["users"].append(user_record)
|
|
|
|
# Summary
|
|
total = len(users)
|
|
admins = sum(1 for u in results["users"] if u["is_admin"])
|
|
no_2fa = sum(1 for u in results["users"] if not u["is_2fa_enrolled"])
|
|
suspended = sum(1 for u in results["users"] if u["is_suspended"])
|
|
|
|
print(f"\n --- User Summary ---")
|
|
print(f" Total users: {total}")
|
|
print(f" Admins: {admins}")
|
|
print(f" Without 2FA: {no_2fa}")
|
|
print(f" Suspended: {suspended}")
|
|
|
|
if no_2fa > 0:
|
|
print(f" [ALERT] {no_2fa} user(s) do NOT have 2FA enabled!")
|
|
|
|
results["user_summary"] = {
|
|
"total": total,
|
|
"admins": admins,
|
|
"no_2fa": no_2fa,
|
|
"suspended": suspended,
|
|
}
|
|
|
|
return users
|
|
|
|
|
|
def step2_signin_audit(reports_service, domain: str, target_user: Optional[str]) -> None:
|
|
"""Check sign-in audit logs for the last 7 days."""
|
|
print_separator("STEP 2: SIGN-IN AUDIT LOGS (Last 7 Days)")
|
|
|
|
start_time = (datetime.now(timezone.utc) - timedelta(days=7)).strftime(
|
|
"%Y-%m-%dT%H:%M:%S.000Z"
|
|
)
|
|
|
|
kwargs = {
|
|
"userKey": "all",
|
|
"applicationName": "login",
|
|
"startTime": start_time,
|
|
"maxResults": 1000,
|
|
}
|
|
|
|
if target_user:
|
|
kwargs["userKey"] = target_user
|
|
print(f" Filtering for user: {target_user}")
|
|
else:
|
|
print(f" Checking all users in domain")
|
|
|
|
activities = paginate_list(
|
|
reports_service.activities(),
|
|
"list",
|
|
"items",
|
|
**kwargs,
|
|
)
|
|
|
|
results["signin_logs"] = []
|
|
|
|
if not activities:
|
|
print(" [INFO] No sign-in events found in the last 7 days")
|
|
print(" (This may be normal if Reports API needs time to propagate)")
|
|
return
|
|
|
|
ips_seen: Dict[str, int] = {}
|
|
users_seen: Dict[str, int] = {}
|
|
failed_logins: List[Dict] = []
|
|
suspicious_events: List[Dict] = []
|
|
|
|
for activity in activities:
|
|
actor_email = activity.get("actor", {}).get("email", "N/A")
|
|
ip_address = activity.get("ipAddress", "N/A")
|
|
event_time = activity.get("id", {}).get("time", "N/A")
|
|
events = activity.get("events", [])
|
|
|
|
ips_seen[ip_address] = ips_seen.get(ip_address, 0) + 1
|
|
users_seen[actor_email] = users_seen.get(actor_email, 0) + 1
|
|
|
|
for event in events:
|
|
event_name = event.get("name", "N/A")
|
|
event_type = event.get("type", "N/A")
|
|
parameters = {
|
|
p.get("name", ""): p.get("value", p.get("boolValue", p.get("multiValue", "")))
|
|
for p in event.get("parameters", [])
|
|
}
|
|
|
|
is_failed = event_name == "login_failure"
|
|
is_suspicious_login = parameters.get("is_suspicious", False)
|
|
login_type = parameters.get("login_type", "")
|
|
login_challenge = parameters.get("login_challenge_method", "")
|
|
|
|
record = {
|
|
"time": event_time,
|
|
"user": actor_email,
|
|
"ip": ip_address,
|
|
"event": event_name,
|
|
"type": event_type,
|
|
"login_type": login_type,
|
|
"challenge_method": login_challenge,
|
|
"is_suspicious": is_suspicious_login,
|
|
"parameters": parameters,
|
|
}
|
|
results["signin_logs"].append(record)
|
|
|
|
# Determine markers
|
|
flags = []
|
|
if is_failed:
|
|
flags.append("FAILED")
|
|
failed_logins.append(record)
|
|
if is_suspicious_login:
|
|
flags.append("SUSPICIOUS")
|
|
suspicious_events.append(record)
|
|
|
|
flag_str = f" [{' | '.join(flags)}]" if flags else ""
|
|
marker = "[ALERT] " if is_suspicious_login else (
|
|
"[WARNING] " if is_failed else " "
|
|
)
|
|
|
|
print(
|
|
f" {marker} {event_time} | {actor_email} | IP: {ip_address} "
|
|
f"| Event: {event_name} | Type: {login_type}{flag_str}"
|
|
)
|
|
|
|
# Summary
|
|
print(f"\n --- Sign-in Summary ---")
|
|
print(f" Total events: {len(activities)}")
|
|
print(f" Unique IPs: {len(ips_seen)}")
|
|
print(f" Unique users: {len(users_seen)}")
|
|
print(f" Failed logins: {len(failed_logins)}")
|
|
print(f" Suspicious events: {len(suspicious_events)}")
|
|
|
|
if ips_seen:
|
|
print(f"\n --- IP Address Breakdown ---")
|
|
for ip, count in sorted(ips_seen.items(), key=lambda x: x[1], reverse=True):
|
|
print(f" {ip}: {count} event(s)")
|
|
|
|
if failed_logins:
|
|
print(f"\n [ALERT] {len(failed_logins)} failed login(s) detected:")
|
|
for fl in failed_logins[:20]:
|
|
print(f" {fl['time']} | {fl['user']} | IP: {fl['ip']}")
|
|
|
|
if suspicious_events:
|
|
print(f"\n [ALERT] {len(suspicious_events)} suspicious login event(s) flagged by Google:")
|
|
for se in suspicious_events[:20]:
|
|
print(f" {se['time']} | {se['user']} | IP: {se['ip']}")
|
|
|
|
results["signin_summary"] = {
|
|
"total_events": len(activities),
|
|
"unique_ips": len(ips_seen),
|
|
"unique_users": len(users_seen),
|
|
"failed_logins": len(failed_logins),
|
|
"suspicious_events": len(suspicious_events),
|
|
"ips": ips_seen,
|
|
"users": users_seen,
|
|
}
|
|
|
|
|
|
def step3_email_forwarding(
|
|
admin_email: str, domain: str, users: List[Dict], target_user: Optional[str]
|
|
) -> None:
|
|
"""Check all users (or specific user) for email auto-forwarding rules."""
|
|
print_separator("STEP 3: EMAIL FORWARDING RULES")
|
|
|
|
users_to_check = []
|
|
if target_user:
|
|
users_to_check = [target_user]
|
|
print(f" Checking forwarding for: {target_user}")
|
|
else:
|
|
users_to_check = [
|
|
u.get("primaryEmail", "")
|
|
for u in users
|
|
if u.get("primaryEmail") and not u.get("suspended", False)
|
|
]
|
|
print(f" Checking forwarding for {len(users_to_check)} active user(s)")
|
|
|
|
results["forwarding_rules"] = {}
|
|
alerts_found = 0
|
|
|
|
for user_email in users_to_check:
|
|
gmail_service = safe_api_call(
|
|
lambda email=user_email: build_service(
|
|
"gmail", "v1", email,
|
|
scopes=["https://www.googleapis.com/auth/gmail.settings.basic"],
|
|
),
|
|
f"Gmail service for {user_email}",
|
|
)
|
|
|
|
if gmail_service is None:
|
|
print(f" [WARNING] Could not access Gmail settings for {user_email}")
|
|
continue
|
|
|
|
# Check auto-forwarding setting
|
|
forwarding = safe_api_call(
|
|
lambda svc=gmail_service, em=user_email: svc.users()
|
|
.settings()
|
|
.getAutoForwarding(userId=em)
|
|
.execute(),
|
|
f"auto-forwarding for {user_email}",
|
|
)
|
|
|
|
if forwarding:
|
|
is_enabled = forwarding.get("enabled", False)
|
|
fwd_email = forwarding.get("emailAddress", "")
|
|
disposition = forwarding.get("disposition", "")
|
|
|
|
if is_enabled and fwd_email:
|
|
is_external = domain not in fwd_email.lower()
|
|
marker = "[ALERT] " if is_external else "[WARNING] "
|
|
label = "EXTERNAL" if is_external else "INTERNAL"
|
|
print(
|
|
f" {marker} {user_email} -> forwards to {fwd_email} "
|
|
f"({label}) [Disposition: {disposition}]"
|
|
)
|
|
alerts_found += 1
|
|
results["forwarding_rules"][user_email] = {
|
|
"enabled": True,
|
|
"forward_to": fwd_email,
|
|
"is_external": is_external,
|
|
"disposition": disposition,
|
|
}
|
|
else:
|
|
results["forwarding_rules"][user_email] = {"enabled": False}
|
|
|
|
# Check Gmail filters for forwarding actions
|
|
filters = safe_api_call(
|
|
lambda svc=gmail_service, em=user_email: svc.users()
|
|
.settings()
|
|
.filters()
|
|
.list(userId=em)
|
|
.execute(),
|
|
f"Gmail filters for {user_email}",
|
|
)
|
|
|
|
if filters and "filter" in filters:
|
|
for filt in filters["filter"]:
|
|
action = filt.get("action", {})
|
|
forward_to = action.get("forward", "")
|
|
if forward_to:
|
|
is_external = domain not in forward_to.lower()
|
|
criteria = filt.get("criteria", {})
|
|
criteria_str = json.dumps(criteria) if criteria else "No criteria"
|
|
marker = "[ALERT] " if is_external else "[WARNING] "
|
|
label = "EXTERNAL" if is_external else "INTERNAL"
|
|
print(
|
|
f" {marker} {user_email} -> filter forwards to {forward_to} "
|
|
f"({label})"
|
|
)
|
|
print(f" Criteria: {criteria_str}")
|
|
alerts_found += 1
|
|
|
|
fwd_key = f"{user_email}_filters"
|
|
if fwd_key not in results["forwarding_rules"]:
|
|
results["forwarding_rules"][fwd_key] = []
|
|
results["forwarding_rules"][fwd_key].append({
|
|
"forward_to": forward_to,
|
|
"is_external": is_external,
|
|
"criteria": criteria,
|
|
"filter_id": filt.get("id", "N/A"),
|
|
})
|
|
|
|
if alerts_found == 0:
|
|
print(" [OK] No email forwarding rules detected")
|
|
else:
|
|
print(f"\n [ALERT] {alerts_found} forwarding rule(s) found - review above")
|
|
|
|
|
|
def step4_oauth_grants(admin_service, users: List[Dict], target_user: Optional[str]) -> None:
|
|
"""List third-party OAuth apps with access to user accounts."""
|
|
print_separator("STEP 4: OAUTH APP GRANTS (Third-Party Access)")
|
|
|
|
users_to_check = []
|
|
if target_user:
|
|
users_to_check = [target_user]
|
|
print(f" Checking OAuth grants for: {target_user}")
|
|
else:
|
|
users_to_check = [
|
|
u.get("primaryEmail", "")
|
|
for u in users
|
|
if u.get("primaryEmail") and not u.get("suspended", False)
|
|
]
|
|
print(f" Checking OAuth grants for {len(users_to_check)} active user(s)")
|
|
|
|
results["oauth_grants"] = {}
|
|
suspicious_apps: List[Dict] = []
|
|
|
|
# Known safe app client IDs (Google's own apps)
|
|
known_safe_prefixes = [
|
|
"google",
|
|
"chrome",
|
|
"android",
|
|
]
|
|
|
|
for user_email in users_to_check:
|
|
tokens = safe_api_call(
|
|
lambda em=user_email: admin_service.tokens()
|
|
.list(userKey=em)
|
|
.execute(),
|
|
f"OAuth tokens for {user_email}",
|
|
)
|
|
|
|
if tokens is None or "items" not in tokens:
|
|
continue
|
|
|
|
user_grants = []
|
|
for token in tokens.get("items", []):
|
|
app_name = token.get("displayText", "Unknown App")
|
|
client_id = token.get("clientId", "N/A")
|
|
scopes = token.get("scopes", [])
|
|
is_native = token.get("nativeApp", False)
|
|
user_key = token.get("userKey", "N/A")
|
|
|
|
# Check for suspicious scope patterns
|
|
dangerous_scopes = [
|
|
"mail",
|
|
"gmail",
|
|
"drive",
|
|
"contacts",
|
|
"calendar",
|
|
"admin",
|
|
]
|
|
has_dangerous = any(
|
|
ds in scope.lower() for scope in scopes for ds in dangerous_scopes
|
|
)
|
|
|
|
is_known_safe = any(
|
|
prefix in app_name.lower() for prefix in known_safe_prefixes
|
|
)
|
|
|
|
grant_record = {
|
|
"app_name": app_name,
|
|
"client_id": client_id,
|
|
"scopes": scopes,
|
|
"is_native": is_native,
|
|
}
|
|
user_grants.append(grant_record)
|
|
|
|
if has_dangerous and not is_known_safe:
|
|
marker = "[ALERT] "
|
|
suspicious_apps.append({
|
|
"user": user_email,
|
|
"app": app_name,
|
|
"client_id": client_id,
|
|
"scopes": scopes,
|
|
})
|
|
elif not is_known_safe:
|
|
marker = "[WARNING] "
|
|
else:
|
|
marker = " "
|
|
|
|
scope_str = ", ".join(scopes[:5])
|
|
if len(scopes) > 5:
|
|
scope_str += f" (+{len(scopes) - 5} more)"
|
|
|
|
print(f" {marker} {user_email} | App: {app_name}")
|
|
print(f" ClientID: {client_id} | Scopes: {scope_str}")
|
|
|
|
if user_grants:
|
|
results["oauth_grants"][user_email] = user_grants
|
|
|
|
if not results["oauth_grants"]:
|
|
print(" [OK] No third-party OAuth grants found")
|
|
elif suspicious_apps:
|
|
print(f"\n [ALERT] {len(suspicious_apps)} suspicious OAuth app grant(s):")
|
|
for sa in suspicious_apps:
|
|
print(f" User: {sa['user']} | App: {sa['app']} | Scopes: {', '.join(sa['scopes'][:3])}")
|
|
|
|
results["suspicious_oauth_apps"] = suspicious_apps
|
|
|
|
|
|
def step5_password_changes(reports_service, target_user: Optional[str]) -> None:
|
|
"""Check for password changes in the last 30 days."""
|
|
print_separator("STEP 5: PASSWORD CHANGES (Last 30 Days)")
|
|
|
|
start_time = (datetime.now(timezone.utc) - timedelta(days=30)).strftime(
|
|
"%Y-%m-%dT%H:%M:%S.000Z"
|
|
)
|
|
|
|
kwargs = {
|
|
"userKey": "all",
|
|
"applicationName": "admin",
|
|
"startTime": start_time,
|
|
"eventName": "CHANGE_PASSWORD",
|
|
"maxResults": 500,
|
|
}
|
|
|
|
if target_user:
|
|
print(f" Checking password changes involving: {target_user}")
|
|
|
|
activities = paginate_list(
|
|
reports_service.activities(),
|
|
"list",
|
|
"items",
|
|
**kwargs,
|
|
)
|
|
|
|
results["password_changes"] = []
|
|
|
|
# Also check user-initiated password changes
|
|
user_pw_kwargs = {
|
|
"userKey": "all",
|
|
"applicationName": "login",
|
|
"startTime": start_time,
|
|
"eventName": "password_edit",
|
|
"maxResults": 500,
|
|
}
|
|
user_pw_activities = paginate_list(
|
|
reports_service.activities(),
|
|
"list",
|
|
"items",
|
|
**user_pw_kwargs,
|
|
)
|
|
|
|
all_activities = activities + user_pw_activities
|
|
|
|
if not all_activities:
|
|
print(" [OK] No password changes detected in the last 30 days")
|
|
return
|
|
|
|
for activity in all_activities:
|
|
actor_email = activity.get("actor", {}).get("email", "N/A")
|
|
ip_address = activity.get("ipAddress", "N/A")
|
|
event_time = activity.get("id", {}).get("time", "N/A")
|
|
events = activity.get("events", [])
|
|
|
|
for event in events:
|
|
event_name = event.get("name", "N/A")
|
|
parameters = {
|
|
p.get("name", ""): p.get("value", p.get("multiValue", ""))
|
|
for p in event.get("parameters", [])
|
|
}
|
|
|
|
target_email = parameters.get("USER_EMAIL", actor_email)
|
|
|
|
# Check if admin reset vs self-service
|
|
is_admin_reset = event_name == "CHANGE_PASSWORD" and actor_email != target_email
|
|
marker = "[ALERT] " if is_admin_reset else "[INFO] "
|
|
change_type = "Admin Reset" if is_admin_reset else "Self-Service"
|
|
|
|
if target_user and target_user.lower() not in (
|
|
actor_email.lower(),
|
|
target_email.lower(),
|
|
):
|
|
continue
|
|
|
|
print(
|
|
f" {marker} {event_time} | {change_type} | Actor: {actor_email} "
|
|
f"| Target: {target_email} | IP: {ip_address}"
|
|
)
|
|
|
|
results["password_changes"].append({
|
|
"time": event_time,
|
|
"actor": actor_email,
|
|
"target": target_email,
|
|
"ip": ip_address,
|
|
"type": change_type,
|
|
"event_name": event_name,
|
|
})
|
|
|
|
print(f"\n Total password changes: {len(results['password_changes'])}")
|
|
|
|
|
|
def step6_admin_activity(reports_service, target_user: Optional[str]) -> None:
|
|
"""Check admin console activity for the last 30 days."""
|
|
print_separator("STEP 6: ADMIN ACTIVITY (Last 30 Days)")
|
|
|
|
start_time = (datetime.now(timezone.utc) - timedelta(days=30)).strftime(
|
|
"%Y-%m-%dT%H:%M:%S.000Z"
|
|
)
|
|
|
|
# Critical admin events to watch for
|
|
critical_events = [
|
|
"CREATE_USER",
|
|
"DELETE_USER",
|
|
"SUSPEND_USER",
|
|
"UNSUSPEND_USER",
|
|
"GRANT_ADMIN_PRIVILEGE",
|
|
"REVOKE_ADMIN_PRIVILEGE",
|
|
"ADD_GROUP_MEMBER",
|
|
"REMOVE_GROUP_MEMBER",
|
|
"CHANGE_PASSWORD",
|
|
"AUTHORIZE_API_CLIENT_ACCESS",
|
|
"TOGGLE_SERVICE_ENABLED",
|
|
"CHANGE_TWO_STEP_VERIFICATION_ENROLLMENT_PERIOD",
|
|
"CHANGE_ALLOWED_TWO_STEP_VERIFICATION_METHODS",
|
|
"TOGGLE_TWO_STEP_VERIFICATION",
|
|
"CHANGE_APPLICATION_SETTING",
|
|
"CREATE_APPLICATION_SETTING",
|
|
"DELETE_APPLICATION_SETTING",
|
|
"CHANGE_DOMAIN_DEFAULT_LOCALE",
|
|
"ADD_DOMAIN_ALIAS",
|
|
"ADD_TRUSTED_DOMAINS",
|
|
]
|
|
|
|
kwargs = {
|
|
"userKey": "all",
|
|
"applicationName": "admin",
|
|
"startTime": start_time,
|
|
"maxResults": 1000,
|
|
}
|
|
|
|
activities = paginate_list(
|
|
reports_service.activities(),
|
|
"list",
|
|
"items",
|
|
**kwargs,
|
|
)
|
|
|
|
results["admin_activity"] = []
|
|
|
|
if not activities:
|
|
print(" [INFO] No admin activity found in the last 30 days")
|
|
return
|
|
|
|
for activity in activities:
|
|
actor_email = activity.get("actor", {}).get("email", "N/A")
|
|
ip_address = activity.get("ipAddress", "N/A")
|
|
event_time = activity.get("id", {}).get("time", "N/A")
|
|
events = activity.get("events", [])
|
|
|
|
for event in events:
|
|
event_name = event.get("name", "N/A")
|
|
event_type = event.get("type", "N/A")
|
|
parameters = {
|
|
p.get("name", ""): p.get("value", p.get("multiValue", ""))
|
|
for p in event.get("parameters", [])
|
|
}
|
|
|
|
target_email = parameters.get("USER_EMAIL", "")
|
|
|
|
# Filter for target user if specified
|
|
if target_user and target_user.lower() not in (
|
|
actor_email.lower(),
|
|
target_email.lower() if target_email else "",
|
|
):
|
|
continue
|
|
|
|
is_critical = event_name in critical_events
|
|
marker = "[ALERT] " if is_critical else "[INFO] "
|
|
|
|
param_summary = ""
|
|
if target_email:
|
|
param_summary = f" | Target: {target_email}"
|
|
if parameters.get("DOMAIN_NAME"):
|
|
param_summary += f" | Domain: {parameters['DOMAIN_NAME']}"
|
|
if parameters.get("GROUP_EMAIL"):
|
|
param_summary += f" | Group: {parameters['GROUP_EMAIL']}"
|
|
if parameters.get("APP_NAME"):
|
|
param_summary += f" | App: {parameters['APP_NAME']}"
|
|
|
|
print(
|
|
f" {marker} {event_time} | {event_name} | Actor: {actor_email} "
|
|
f"| IP: {ip_address}{param_summary}"
|
|
)
|
|
|
|
results["admin_activity"].append({
|
|
"time": event_time,
|
|
"event": event_name,
|
|
"type": event_type,
|
|
"actor": actor_email,
|
|
"ip": ip_address,
|
|
"target_user": target_email,
|
|
"parameters": parameters,
|
|
"is_critical": is_critical,
|
|
})
|
|
|
|
critical_count = sum(1 for a in results["admin_activity"] if a.get("is_critical"))
|
|
print(f"\n --- Admin Activity Summary ---")
|
|
print(f" Total admin events: {len(results['admin_activity'])}")
|
|
print(f" Critical events: {critical_count}")
|
|
|
|
if critical_count > 0:
|
|
print(f" [ALERT] {critical_count} critical admin action(s) detected - review above")
|
|
|
|
|
|
def step7_gmail_delegation(
|
|
admin_email: str, domain: str, users: List[Dict], target_user: Optional[str]
|
|
) -> None:
|
|
"""Check for Gmail delegated mailbox access."""
|
|
print_separator("STEP 7: GMAIL DELEGATION (Delegated Mailbox Access)")
|
|
|
|
users_to_check = []
|
|
if target_user:
|
|
users_to_check = [target_user]
|
|
print(f" Checking delegation for: {target_user}")
|
|
else:
|
|
users_to_check = [
|
|
u.get("primaryEmail", "")
|
|
for u in users
|
|
if u.get("primaryEmail") and not u.get("suspended", False)
|
|
]
|
|
print(f" Checking delegation for {len(users_to_check)} active user(s)")
|
|
|
|
results["gmail_delegation"] = {}
|
|
delegation_found = 0
|
|
|
|
for user_email in users_to_check:
|
|
gmail_service = safe_api_call(
|
|
lambda email=user_email: build_service(
|
|
"gmail", "v1", email,
|
|
scopes=["https://www.googleapis.com/auth/gmail.settings.basic"],
|
|
),
|
|
f"Gmail service for {user_email}",
|
|
)
|
|
|
|
if gmail_service is None:
|
|
continue
|
|
|
|
delegates = safe_api_call(
|
|
lambda svc=gmail_service, em=user_email: svc.users()
|
|
.settings()
|
|
.delegates()
|
|
.list(userId=em)
|
|
.execute(),
|
|
f"delegation for {user_email}",
|
|
)
|
|
|
|
if delegates and "delegates" in delegates:
|
|
delegate_list = delegates["delegates"]
|
|
results["gmail_delegation"][user_email] = delegate_list
|
|
delegation_found += len(delegate_list)
|
|
|
|
for d in delegate_list:
|
|
delegate_email = d.get("delegateEmail", "N/A")
|
|
status = d.get("verificationStatus", "N/A")
|
|
is_external = domain not in delegate_email.lower()
|
|
|
|
marker = "[ALERT] " if is_external else "[WARNING] "
|
|
label = "EXTERNAL" if is_external else "INTERNAL"
|
|
|
|
print(
|
|
f" {marker} {user_email} -> delegated to {delegate_email} "
|
|
f"({label}) [Status: {status}]"
|
|
)
|
|
|
|
if delegation_found == 0:
|
|
print(" [OK] No Gmail delegation found")
|
|
else:
|
|
print(f"\n [WARNING] {delegation_found} delegation(s) found - review above")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Google Workspace Compromise Investigation Tool"
|
|
)
|
|
parser.add_argument(
|
|
"--domain",
|
|
default=DEFAULT_DOMAIN,
|
|
help=f"Target domain (default: {DEFAULT_DOMAIN})",
|
|
)
|
|
parser.add_argument(
|
|
"--admin",
|
|
default=DEFAULT_ADMIN,
|
|
help=f"Admin email to impersonate (default: {DEFAULT_ADMIN})",
|
|
)
|
|
parser.add_argument(
|
|
"--user",
|
|
default=None,
|
|
help="Investigate a specific user (optional)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
domain = args.domain
|
|
admin_email = args.admin
|
|
target_user = args.user
|
|
|
|
# Validate key file exists
|
|
if not SERVICE_ACCOUNT_KEY.exists():
|
|
print(f"[ERROR] Service account key not found: {SERVICE_ACCOUNT_KEY}")
|
|
print(" Ensure the key file is in the same directory as this script.")
|
|
sys.exit(1)
|
|
|
|
# Banner
|
|
print("=" * 70)
|
|
print(" GOOGLE WORKSPACE - COMPROMISE INVESTIGATION")
|
|
print(f" Date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
|
|
print(f" Domain: {domain}")
|
|
print(f" Admin: {admin_email}")
|
|
if target_user:
|
|
print(f" Target User: {target_user}")
|
|
print("=" * 70)
|
|
|
|
results["investigation_metadata"] = {
|
|
"date": datetime.now(timezone.utc).isoformat(),
|
|
"domain": domain,
|
|
"admin": admin_email,
|
|
"target_user": target_user,
|
|
}
|
|
|
|
# Build services
|
|
print("\n[*] Building API service clients...")
|
|
|
|
admin_service = safe_api_call(
|
|
lambda: build_service("admin", "directory_v1", admin_email),
|
|
"Admin Directory service",
|
|
)
|
|
if admin_service is None:
|
|
print("[ERROR] Cannot build Admin Directory service. Check delegation setup.")
|
|
sys.exit(1)
|
|
print("[OK] Admin Directory service ready")
|
|
|
|
reports_service = safe_api_call(
|
|
lambda: build_service("admin", "reports_v1", admin_email),
|
|
"Admin Reports service",
|
|
)
|
|
if reports_service is None:
|
|
print("[WARNING] Cannot build Reports service - some checks will be skipped")
|
|
else:
|
|
print("[OK] Admin Reports service ready")
|
|
|
|
# Run investigation steps
|
|
users = step1_list_users(admin_service, domain)
|
|
|
|
if reports_service:
|
|
step2_signin_audit(reports_service, domain, target_user)
|
|
else:
|
|
print_separator("STEP 2: SIGN-IN AUDIT LOGS")
|
|
print(" [WARNING] Skipped - Reports API not available")
|
|
|
|
step3_email_forwarding(admin_email, domain, users, target_user)
|
|
step4_oauth_grants(admin_service, users, target_user)
|
|
|
|
if reports_service:
|
|
step5_password_changes(reports_service, target_user)
|
|
step6_admin_activity(reports_service, target_user)
|
|
else:
|
|
print_separator("STEP 5: PASSWORD CHANGES")
|
|
print(" [WARNING] Skipped - Reports API not available")
|
|
print_separator("STEP 6: ADMIN ACTIVITY")
|
|
print(" [WARNING] Skipped - Reports API not available")
|
|
|
|
step7_gmail_delegation(admin_email, domain, users, target_user)
|
|
|
|
# Save results
|
|
print_separator("SAVING RESULTS")
|
|
with open(RESULTS_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(results, f, indent=2, default=str)
|
|
print(f" Results saved to: {RESULTS_FILE}")
|
|
|
|
# Final summary
|
|
print_separator("INVESTIGATION SUMMARY")
|
|
|
|
findings: List[str] = []
|
|
|
|
# User findings
|
|
user_summary = results.get("user_summary", {})
|
|
if user_summary.get("no_2fa", 0) > 0:
|
|
findings.append(
|
|
f"[ALERT] {user_summary['no_2fa']} user(s) without 2FA enabled"
|
|
)
|
|
|
|
# Sign-in findings
|
|
signin_summary = results.get("signin_summary", {})
|
|
if signin_summary.get("failed_logins", 0) > 0:
|
|
findings.append(
|
|
f"[ALERT] {signin_summary['failed_logins']} failed login attempt(s) in last 7 days"
|
|
)
|
|
if signin_summary.get("suspicious_events", 0) > 0:
|
|
findings.append(
|
|
f"[ALERT] {signin_summary['suspicious_events']} suspicious login event(s) "
|
|
f"flagged by Google"
|
|
)
|
|
|
|
# Forwarding findings
|
|
fwd_rules = results.get("forwarding_rules", {})
|
|
external_fwd = sum(
|
|
1
|
|
for v in fwd_rules.values()
|
|
if isinstance(v, dict) and v.get("is_external", False)
|
|
)
|
|
if external_fwd > 0:
|
|
findings.append(
|
|
f"[ALERT] {external_fwd} external email forwarding rule(s) detected"
|
|
)
|
|
|
|
# OAuth findings
|
|
suspicious_oauth = results.get("suspicious_oauth_apps", [])
|
|
if suspicious_oauth:
|
|
findings.append(
|
|
f"[ALERT] {len(suspicious_oauth)} suspicious OAuth app grant(s) with "
|
|
f"sensitive scopes"
|
|
)
|
|
|
|
# Password findings
|
|
pw_changes = results.get("password_changes", [])
|
|
admin_resets = [p for p in pw_changes if p.get("type") == "Admin Reset"]
|
|
if admin_resets:
|
|
findings.append(
|
|
f"[WARNING] {len(admin_resets)} admin-initiated password reset(s) in last 30 days"
|
|
)
|
|
|
|
# Admin activity findings
|
|
admin_acts = results.get("admin_activity", [])
|
|
critical_admin = [a for a in admin_acts if a.get("is_critical")]
|
|
if critical_admin:
|
|
findings.append(
|
|
f"[ALERT] {len(critical_admin)} critical admin action(s) in last 30 days"
|
|
)
|
|
|
|
# Delegation findings
|
|
delegation = results.get("gmail_delegation", {})
|
|
total_delegates = sum(len(v) for v in delegation.values() if isinstance(v, list))
|
|
if total_delegates > 0:
|
|
findings.append(
|
|
f"[WARNING] {total_delegates} Gmail delegation(s) found"
|
|
)
|
|
|
|
print(f"""
|
|
Domain: {domain}
|
|
Admin: {admin_email}
|
|
Target User: {target_user or 'All users'}
|
|
Investigation Date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}
|
|
Total Users: {user_summary.get('total', 'N/A')}
|
|
|
|
KEY FINDINGS:
|
|
=============
|
|
""")
|
|
|
|
if findings:
|
|
for f in findings:
|
|
print(f" {f}")
|
|
else:
|
|
print(" [OK] No critical findings detected. Review detailed output above for context.")
|
|
|
|
print(f"""
|
|
RECOMMENDED ACTIONS:
|
|
====================
|
|
1. Disable any external email forwarding rules immediately
|
|
2. Revoke suspicious OAuth app grants (Admin Console > Security > API Controls)
|
|
3. Force password reset for compromised accounts
|
|
4. Enable 2FA enforcement for all users without it
|
|
5. Review and remove any unauthorized Gmail delegations
|
|
6. Check Google Drive sharing for external data exposure
|
|
7. Review mobile device access (Admin Console > Devices)
|
|
8. Enable enhanced pre-delivery message scanning
|
|
9. Review DLP rules and alert configurations
|
|
10. Monitor account activity for the next 30 days
|
|
|
|
Investigation script: {Path(__file__).resolve()}
|
|
Raw results: {RESULTS_FILE}
|
|
""")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|