#!/usr/bin/env python3 """ M365 Security Scan - Check all accounts for compromise indicators Scans: Sign-in logs, inbox rules, OAuth grants, MFA methods, forwarding """ import requests import json from datetime import datetime, timedelta # Claude-MSP-Access Multi-Tenant App CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418" CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO" TENANTS = { "Valley Wide Plastering": { "tenant_id": "5c53ae9f-7071-4248-b834-8685b646450f", "domain": "valleywideplastering.com" }, "BG Builders LLC": { "tenant_id": "ededa4fb-f6eb-4398-851d-5eb3e11fab27", "domain": "bgbuildersllc.com" } } # Known suspicious patterns SUSPICIOUS_RULE_PATTERNS = [".", "..", "...", "spam", "junk", "filter"] SUSPICIOUS_OAUTH_APPS = ["gmail", "yahoo", "p2p", "autoforward"] US_COUNTRY_CODES = ["US", "United States"] def get_token(tenant_id): """Get Graph API access token for tenant""" url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" data = { "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "https://graph.microsoft.com/.default", "grant_type": "client_credentials" } resp = requests.post(url, data=data) if resp.status_code == 200: return resp.json().get("access_token") else: print(f" [ERROR] Token failed: {resp.status_code} - {resp.text[:200]}") return None def graph_get(token, endpoint, params=None): """Make Graph API GET request""" headers = {"Authorization": f"Bearer {token}"} url = f"https://graph.microsoft.com/v1.0{endpoint}" resp = requests.get(url, headers=headers, params=params) if resp.status_code == 200: return resp.json() elif resp.status_code == 404: return None else: return {"error": resp.status_code, "message": resp.text[:200]} def graph_get_beta(token, endpoint, params=None): """Make Graph API beta GET request""" headers = {"Authorization": f"Bearer {token}"} url = f"https://graph.microsoft.com/beta{endpoint}" resp = requests.get(url, headers=headers, params=params) if resp.status_code == 200: return resp.json() elif resp.status_code == 404: return None else: return {"error": resp.status_code, "message": resp.text[:200]} def check_signin_logs(token, user_id, user_email, days=30): """Check sign-in logs for foreign/suspicious IPs""" issues = [] cutoff = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ") # Get sign-in logs params = { "$filter": f"userId eq '{user_id}' and createdDateTime ge {cutoff}", "$top": 100, "$orderby": "createdDateTime desc" } result = graph_get_beta(token, "/auditLogs/signIns", params) if result and "value" in result: foreign_logins = [] failed_foreign = [] for signin in result["value"]: location = signin.get("location", {}) country = location.get("countryOrRegion", "Unknown") status = signin.get("status", {}) error_code = status.get("errorCode", 0) ip = signin.get("ipAddress", "Unknown") if country not in US_COUNTRY_CODES and country != "Unknown": entry = { "ip": ip, "country": country, "city": location.get("city", "Unknown"), "time": signin.get("createdDateTime"), "success": error_code == 0, "error": error_code } if error_code == 0: foreign_logins.append(entry) else: failed_foreign.append(entry) if foreign_logins: issues.append({ "type": "FOREIGN_SUCCESS_LOGIN", "severity": "CRITICAL", "count": len(foreign_logins), "details": foreign_logins[:5] # Top 5 }) if failed_foreign: # Group by country countries = list(set([f["country"] for f in failed_foreign])) issues.append({ "type": "FOREIGN_FAILED_ATTEMPTS", "severity": "INFO", "count": len(failed_foreign), "countries": countries }) elif result and "error" in result: if result["error"] != 404: issues.append({"type": "SIGNIN_LOG_ERROR", "severity": "WARNING", "details": result}) return issues def check_inbox_rules(token, user_id, user_email): """Check for malicious inbox rules""" issues = [] result = graph_get(token, f"/users/{user_id}/mailFolders/inbox/messageRules") if result and "value" in result: for rule in result["value"]: name = rule.get("displayName", "") is_enabled = rule.get("isEnabled", False) # Check for suspicious patterns suspicious = False reasons = [] # Short/dot names if name in SUSPICIOUS_RULE_PATTERNS or len(name) <= 2: suspicious = True reasons.append(f"Suspicious name: '{name}'") # Rules that delete/move and mark read actions = rule.get("actions", {}) if actions.get("markAsRead") and (actions.get("delete") or actions.get("moveToFolder")): suspicious = True reasons.append("Marks read + moves/deletes") # Stop processing if actions.get("stopProcessingRules") and (actions.get("moveToFolder") or actions.get("delete")): suspicious = True reasons.append("Stops processing + hides mail") # Forwarding rules if actions.get("forwardTo") or actions.get("forwardAsAttachmentTo") or actions.get("redirectTo"): forward_targets = actions.get("forwardTo", []) + actions.get("forwardAsAttachmentTo", []) + actions.get("redirectTo", []) suspicious = True reasons.append(f"Forwards to external: {forward_targets}") if suspicious and is_enabled: issues.append({ "type": "SUSPICIOUS_INBOX_RULE", "severity": "CRITICAL", "rule_name": name, "rule_id": rule.get("id"), "reasons": reasons }) elif result and "error" in result: if result["error"] != 404: issues.append({"type": "INBOX_RULE_ERROR", "severity": "WARNING", "details": result}) return issues def check_oauth_grants(token, user_id, user_email): """Check for suspicious OAuth app grants""" issues = [] result = graph_get(token, f"/users/{user_id}/oauth2PermissionGrants") if result and "value" in result: for grant in result["value"]: client_id = grant.get("clientId", "") scope = grant.get("scope", "") # Get app details app_result = graph_get(token, f"/servicePrincipals/{client_id}") app_name = app_result.get("displayName", "Unknown") if app_result else "Unknown" # Check for suspicious apps suspicious = False for pattern in SUSPICIOUS_OAUTH_APPS: if pattern.lower() in app_name.lower(): suspicious = True break # Check for sensitive scopes sensitive_scopes = ["Mail.ReadWrite", "Mail.Send", "MailboxSettings", "full_access"] has_sensitive = any(s.lower() in scope.lower() for s in sensitive_scopes) if suspicious or (has_sensitive and "Microsoft" not in app_name): issues.append({ "type": "SUSPICIOUS_OAUTH_APP", "severity": "HIGH" if suspicious else "MEDIUM", "app_name": app_name, "client_id": client_id, "scope": scope }) return issues def check_mfa_methods(token, user_id, user_email): """Check MFA methods for suspicious devices""" issues = [] result = graph_get(token, f"/users/{user_id}/authentication/methods") if result and "value" in result: methods = [] for method in result["value"]: method_type = method.get("@odata.type", "") if "phone" in method_type.lower(): phone = method.get("phoneNumber", "Unknown") methods.append({"type": "phone", "value": phone}) elif "microsoftAuthenticator" in method_type: device = method.get("displayName", method.get("deviceTag", "Unknown")) methods.append({"type": "authenticator", "device": device}) elif "fido2" in method_type.lower(): methods.append({"type": "fido2", "model": method.get("model", "Unknown")}) # Flag if multiple authenticator devices (potential attacker device) auth_devices = [m for m in methods if m.get("type") == "authenticator"] if len(auth_devices) > 2: issues.append({ "type": "MULTIPLE_AUTH_DEVICES", "severity": "MEDIUM", "count": len(auth_devices), "devices": auth_devices }) return issues def check_mailbox_settings(token, user_id, user_email): """Check mailbox for forwarding/auto-replies""" issues = [] result = graph_get(token, f"/users/{user_id}/mailboxSettings") if result and "error" not in result: # Check auto-forwarding # Note: Graph API doesn't expose SMTP forwarding directly, need Exchange # Check automatic replies auto_reply = result.get("automaticRepliesSetting", {}) if auto_reply.get("status") == "alwaysEnabled": issues.append({ "type": "AUTO_REPLY_ALWAYS_ON", "severity": "LOW", "message": auto_reply.get("internalReplyMessage", "")[:100] }) return issues def scan_tenant(tenant_name, tenant_info): """Scan all users in a tenant""" print(f"\n{'='*60}") print(f"Scanning: {tenant_name}") print(f"Tenant ID: {tenant_info['tenant_id']}") print(f"{'='*60}") token = get_token(tenant_info["tenant_id"]) if not token: return {"error": "Failed to get token - admin consent may be needed"} # Get all users users_result = graph_get(token, "/users", {"$select": "id,displayName,mail,userPrincipalName,accountEnabled"}) if not users_result or "value" not in users_result: return {"error": f"Failed to get users: {users_result}"} users = users_result["value"] print(f"Found {len(users)} users") results = { "tenant": tenant_name, "scan_time": datetime.utcnow().isoformat(), "total_users": len(users), "clean_users": [], "flagged_users": [], "disabled_users": [], "errors": [] } for user in users: user_id = user.get("id") email = user.get("mail") or user.get("userPrincipalName", "Unknown") name = user.get("displayName", "Unknown") enabled = user.get("accountEnabled", True) if not enabled: results["disabled_users"].append({"name": name, "email": email}) print(f" [SKIP] {name} - disabled") continue print(f" Checking: {name} ({email})...", end=" ") all_issues = [] # Run all checks try: all_issues.extend(check_signin_logs(token, user_id, email)) except Exception as e: results["errors"].append({"user": email, "check": "signin_logs", "error": str(e)}) try: all_issues.extend(check_inbox_rules(token, user_id, email)) except Exception as e: results["errors"].append({"user": email, "check": "inbox_rules", "error": str(e)}) try: all_issues.extend(check_oauth_grants(token, user_id, email)) except Exception as e: results["errors"].append({"user": email, "check": "oauth_grants", "error": str(e)}) try: all_issues.extend(check_mfa_methods(token, user_id, email)) except Exception as e: results["errors"].append({"user": email, "check": "mfa_methods", "error": str(e)}) try: all_issues.extend(check_mailbox_settings(token, user_id, email)) except Exception as e: results["errors"].append({"user": email, "check": "mailbox_settings", "error": str(e)}) # Categorize by severity critical = [i for i in all_issues if i.get("severity") == "CRITICAL"] high = [i for i in all_issues if i.get("severity") == "HIGH"] if critical or high: results["flagged_users"].append({ "name": name, "email": email, "user_id": user_id, "issues": all_issues }) print(f"[FLAGGED] {len(critical)} critical, {len(high)} high") else: results["clean_users"].append({"name": name, "email": email}) info_issues = [i for i in all_issues if i.get("severity") == "INFO"] if info_issues: print(f"[OK] ({len(info_issues)} info)") else: print("[OK]") return results def main(): print("M365 Security Scan") print(f"Started: {datetime.utcnow().isoformat()}") all_results = {} for tenant_name, tenant_info in TENANTS.items(): try: results = scan_tenant(tenant_name, tenant_info) all_results[tenant_name] = results except Exception as e: all_results[tenant_name] = {"error": str(e)} print(f" [ERROR] Tenant scan failed: {e}") # Save results output_file = "/Users/azcomputerguru/ClaudeTools/temp/m365_security_scan_results.json" with open(output_file, "w") as f: json.dump(all_results, f, indent=2) # Print summary print("\n" + "="*60) print("SCAN SUMMARY") print("="*60) for tenant_name, results in all_results.items(): print(f"\n{tenant_name}:") if "error" in results: print(f" [ERROR] {results['error']}") else: print(f" Total users: {results['total_users']}") print(f" Clean: {len(results['clean_users'])}") print(f" Flagged: {len(results['flagged_users'])}") print(f" Disabled: {len(results['disabled_users'])}") if results["flagged_users"]: print("\n FLAGGED ACCOUNTS:") for user in results["flagged_users"]: print(f" - {user['name']} ({user['email']})") for issue in user["issues"]: print(f" [{issue['severity']}] {issue['type']}") print(f"\nResults saved to: {output_file}") if __name__ == "__main__": main()