Files
claudetools/temp/m365_security_scan.py
azcomputerguru a1a19f8c00 sync: Auto-sync from Mikes-MacBook-Air.local at 2026-03-09 08:14:13
Synced files:
- Session logs updated
- Latest context and credentials
- Command/directive updates

Machine: Mikes-MacBook-Air.local
Timestamp: 2026-03-09 08:14:13

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-09 08:14:13 -07:00

405 lines
15 KiB
Python

#!/usr/bin/env python3
"""
M365 Security Scan - Check all accounts for compromise indicators
Scans: Sign-in logs, inbox rules, OAuth grants, MFA methods, forwarding
"""
import requests
import json
from datetime import datetime, timedelta
# Claude-MSP-Access Multi-Tenant App
CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418"
CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO"
TENANTS = {
"Valley Wide Plastering": {
"tenant_id": "5c53ae9f-7071-4248-b834-8685b646450f",
"domain": "valleywideplastering.com"
},
"BG Builders LLC": {
"tenant_id": "ededa4fb-f6eb-4398-851d-5eb3e11fab27",
"domain": "bgbuildersllc.com"
}
}
# Known suspicious patterns
SUSPICIOUS_RULE_PATTERNS = [".", "..", "...", "spam", "junk", "filter"]
SUSPICIOUS_OAUTH_APPS = ["gmail", "yahoo", "p2p", "autoforward"]
US_COUNTRY_CODES = ["US", "United States"]
def get_token(tenant_id):
"""Get Graph API access token for tenant"""
url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"scope": "https://graph.microsoft.com/.default",
"grant_type": "client_credentials"
}
resp = requests.post(url, data=data)
if resp.status_code == 200:
return resp.json().get("access_token")
else:
print(f" [ERROR] Token failed: {resp.status_code} - {resp.text[:200]}")
return None
def graph_get(token, endpoint, params=None):
"""Make Graph API GET request"""
headers = {"Authorization": f"Bearer {token}"}
url = f"https://graph.microsoft.com/v1.0{endpoint}"
resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 404:
return None
else:
return {"error": resp.status_code, "message": resp.text[:200]}
def graph_get_beta(token, endpoint, params=None):
"""Make Graph API beta GET request"""
headers = {"Authorization": f"Bearer {token}"}
url = f"https://graph.microsoft.com/beta{endpoint}"
resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 404:
return None
else:
return {"error": resp.status_code, "message": resp.text[:200]}
def check_signin_logs(token, user_id, user_email, days=30):
"""Check sign-in logs for foreign/suspicious IPs"""
issues = []
cutoff = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
# Get sign-in logs
params = {
"$filter": f"userId eq '{user_id}' and createdDateTime ge {cutoff}",
"$top": 100,
"$orderby": "createdDateTime desc"
}
result = graph_get_beta(token, "/auditLogs/signIns", params)
if result and "value" in result:
foreign_logins = []
failed_foreign = []
for signin in result["value"]:
location = signin.get("location", {})
country = location.get("countryOrRegion", "Unknown")
status = signin.get("status", {})
error_code = status.get("errorCode", 0)
ip = signin.get("ipAddress", "Unknown")
if country not in US_COUNTRY_CODES and country != "Unknown":
entry = {
"ip": ip,
"country": country,
"city": location.get("city", "Unknown"),
"time": signin.get("createdDateTime"),
"success": error_code == 0,
"error": error_code
}
if error_code == 0:
foreign_logins.append(entry)
else:
failed_foreign.append(entry)
if foreign_logins:
issues.append({
"type": "FOREIGN_SUCCESS_LOGIN",
"severity": "CRITICAL",
"count": len(foreign_logins),
"details": foreign_logins[:5] # Top 5
})
if failed_foreign:
# Group by country
countries = list(set([f["country"] for f in failed_foreign]))
issues.append({
"type": "FOREIGN_FAILED_ATTEMPTS",
"severity": "INFO",
"count": len(failed_foreign),
"countries": countries
})
elif result and "error" in result:
if result["error"] != 404:
issues.append({"type": "SIGNIN_LOG_ERROR", "severity": "WARNING", "details": result})
return issues
def check_inbox_rules(token, user_id, user_email):
"""Check for malicious inbox rules"""
issues = []
result = graph_get(token, f"/users/{user_id}/mailFolders/inbox/messageRules")
if result and "value" in result:
for rule in result["value"]:
name = rule.get("displayName", "")
is_enabled = rule.get("isEnabled", False)
# Check for suspicious patterns
suspicious = False
reasons = []
# Short/dot names
if name in SUSPICIOUS_RULE_PATTERNS or len(name) <= 2:
suspicious = True
reasons.append(f"Suspicious name: '{name}'")
# Rules that delete/move and mark read
actions = rule.get("actions", {})
if actions.get("markAsRead") and (actions.get("delete") or actions.get("moveToFolder")):
suspicious = True
reasons.append("Marks read + moves/deletes")
# Stop processing
if actions.get("stopProcessingRules") and (actions.get("moveToFolder") or actions.get("delete")):
suspicious = True
reasons.append("Stops processing + hides mail")
# Forwarding rules
if actions.get("forwardTo") or actions.get("forwardAsAttachmentTo") or actions.get("redirectTo"):
forward_targets = actions.get("forwardTo", []) + actions.get("forwardAsAttachmentTo", []) + actions.get("redirectTo", [])
suspicious = True
reasons.append(f"Forwards to external: {forward_targets}")
if suspicious and is_enabled:
issues.append({
"type": "SUSPICIOUS_INBOX_RULE",
"severity": "CRITICAL",
"rule_name": name,
"rule_id": rule.get("id"),
"reasons": reasons
})
elif result and "error" in result:
if result["error"] != 404:
issues.append({"type": "INBOX_RULE_ERROR", "severity": "WARNING", "details": result})
return issues
def check_oauth_grants(token, user_id, user_email):
"""Check for suspicious OAuth app grants"""
issues = []
result = graph_get(token, f"/users/{user_id}/oauth2PermissionGrants")
if result and "value" in result:
for grant in result["value"]:
client_id = grant.get("clientId", "")
scope = grant.get("scope", "")
# Get app details
app_result = graph_get(token, f"/servicePrincipals/{client_id}")
app_name = app_result.get("displayName", "Unknown") if app_result else "Unknown"
# Check for suspicious apps
suspicious = False
for pattern in SUSPICIOUS_OAUTH_APPS:
if pattern.lower() in app_name.lower():
suspicious = True
break
# Check for sensitive scopes
sensitive_scopes = ["Mail.ReadWrite", "Mail.Send", "MailboxSettings", "full_access"]
has_sensitive = any(s.lower() in scope.lower() for s in sensitive_scopes)
if suspicious or (has_sensitive and "Microsoft" not in app_name):
issues.append({
"type": "SUSPICIOUS_OAUTH_APP",
"severity": "HIGH" if suspicious else "MEDIUM",
"app_name": app_name,
"client_id": client_id,
"scope": scope
})
return issues
def check_mfa_methods(token, user_id, user_email):
"""Check MFA methods for suspicious devices"""
issues = []
result = graph_get(token, f"/users/{user_id}/authentication/methods")
if result and "value" in result:
methods = []
for method in result["value"]:
method_type = method.get("@odata.type", "")
if "phone" in method_type.lower():
phone = method.get("phoneNumber", "Unknown")
methods.append({"type": "phone", "value": phone})
elif "microsoftAuthenticator" in method_type:
device = method.get("displayName", method.get("deviceTag", "Unknown"))
methods.append({"type": "authenticator", "device": device})
elif "fido2" in method_type.lower():
methods.append({"type": "fido2", "model": method.get("model", "Unknown")})
# Flag if multiple authenticator devices (potential attacker device)
auth_devices = [m for m in methods if m.get("type") == "authenticator"]
if len(auth_devices) > 2:
issues.append({
"type": "MULTIPLE_AUTH_DEVICES",
"severity": "MEDIUM",
"count": len(auth_devices),
"devices": auth_devices
})
return issues
def check_mailbox_settings(token, user_id, user_email):
"""Check mailbox for forwarding/auto-replies"""
issues = []
result = graph_get(token, f"/users/{user_id}/mailboxSettings")
if result and "error" not in result:
# Check auto-forwarding
# Note: Graph API doesn't expose SMTP forwarding directly, need Exchange
# Check automatic replies
auto_reply = result.get("automaticRepliesSetting", {})
if auto_reply.get("status") == "alwaysEnabled":
issues.append({
"type": "AUTO_REPLY_ALWAYS_ON",
"severity": "LOW",
"message": auto_reply.get("internalReplyMessage", "")[:100]
})
return issues
def scan_tenant(tenant_name, tenant_info):
"""Scan all users in a tenant"""
print(f"\n{'='*60}")
print(f"Scanning: {tenant_name}")
print(f"Tenant ID: {tenant_info['tenant_id']}")
print(f"{'='*60}")
token = get_token(tenant_info["tenant_id"])
if not token:
return {"error": "Failed to get token - admin consent may be needed"}
# Get all users
users_result = graph_get(token, "/users", {"$select": "id,displayName,mail,userPrincipalName,accountEnabled"})
if not users_result or "value" not in users_result:
return {"error": f"Failed to get users: {users_result}"}
users = users_result["value"]
print(f"Found {len(users)} users")
results = {
"tenant": tenant_name,
"scan_time": datetime.utcnow().isoformat(),
"total_users": len(users),
"clean_users": [],
"flagged_users": [],
"disabled_users": [],
"errors": []
}
for user in users:
user_id = user.get("id")
email = user.get("mail") or user.get("userPrincipalName", "Unknown")
name = user.get("displayName", "Unknown")
enabled = user.get("accountEnabled", True)
if not enabled:
results["disabled_users"].append({"name": name, "email": email})
print(f" [SKIP] {name} - disabled")
continue
print(f" Checking: {name} ({email})...", end=" ")
all_issues = []
# Run all checks
try:
all_issues.extend(check_signin_logs(token, user_id, email))
except Exception as e:
results["errors"].append({"user": email, "check": "signin_logs", "error": str(e)})
try:
all_issues.extend(check_inbox_rules(token, user_id, email))
except Exception as e:
results["errors"].append({"user": email, "check": "inbox_rules", "error": str(e)})
try:
all_issues.extend(check_oauth_grants(token, user_id, email))
except Exception as e:
results["errors"].append({"user": email, "check": "oauth_grants", "error": str(e)})
try:
all_issues.extend(check_mfa_methods(token, user_id, email))
except Exception as e:
results["errors"].append({"user": email, "check": "mfa_methods", "error": str(e)})
try:
all_issues.extend(check_mailbox_settings(token, user_id, email))
except Exception as e:
results["errors"].append({"user": email, "check": "mailbox_settings", "error": str(e)})
# Categorize by severity
critical = [i for i in all_issues if i.get("severity") == "CRITICAL"]
high = [i for i in all_issues if i.get("severity") == "HIGH"]
if critical or high:
results["flagged_users"].append({
"name": name,
"email": email,
"user_id": user_id,
"issues": all_issues
})
print(f"[FLAGGED] {len(critical)} critical, {len(high)} high")
else:
results["clean_users"].append({"name": name, "email": email})
info_issues = [i for i in all_issues if i.get("severity") == "INFO"]
if info_issues:
print(f"[OK] ({len(info_issues)} info)")
else:
print("[OK]")
return results
def main():
print("M365 Security Scan")
print(f"Started: {datetime.utcnow().isoformat()}")
all_results = {}
for tenant_name, tenant_info in TENANTS.items():
try:
results = scan_tenant(tenant_name, tenant_info)
all_results[tenant_name] = results
except Exception as e:
all_results[tenant_name] = {"error": str(e)}
print(f" [ERROR] Tenant scan failed: {e}")
# Save results
output_file = "/Users/azcomputerguru/ClaudeTools/temp/m365_security_scan_results.json"
with open(output_file, "w") as f:
json.dump(all_results, f, indent=2)
# Print summary
print("\n" + "="*60)
print("SCAN SUMMARY")
print("="*60)
for tenant_name, results in all_results.items():
print(f"\n{tenant_name}:")
if "error" in results:
print(f" [ERROR] {results['error']}")
else:
print(f" Total users: {results['total_users']}")
print(f" Clean: {len(results['clean_users'])}")
print(f" Flagged: {len(results['flagged_users'])}")
print(f" Disabled: {len(results['disabled_users'])}")
if results["flagged_users"]:
print("\n FLAGGED ACCOUNTS:")
for user in results["flagged_users"]:
print(f" - {user['name']} ({user['email']})")
for issue in user["issues"]:
print(f" [{issue['severity']}] {issue['type']}")
print(f"\nResults saved to: {output_file}")
if __name__ == "__main__":
main()