Files
claudetools/clients/cascades-tucson/reports/user-detail-batches/batch-4-collect.py
Howard Enos 74890d51ec sync: auto-sync from ACG-TECH03L at 2026-04-18 14:28:21
Author: Howard Enos
Machine: ACG-TECH03L
Timestamp: 2026-04-18 14:28:21
2026-04-18 14:34:04 -07:00

217 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""Collect M365 user detail for batch 4 of Cascades Tucson."""
import csv
import json
import sys
import time
import urllib.request
import urllib.error
TOKEN = open(r"C:\claudetools\clients\cascades-tucson\reports\user-detail-batches\.token").read().strip()
OUT = r"C:\claudetools\clients\cascades-tucson\reports\user-detail-batches\batch-4.csv"
USERS = [
"jodi.ramstack@cascadestucson.com",
"john.trozzi@cascadestucson.com",
"karen.rossini@cascadestucson.com",
"lauren.hasselman@cascadestucson.com",
"lois.lane@cascadestucson.com",
]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
MFA_TYPE_MAP = {
"#microsoft.graph.microsoftAuthenticatorAuthenticationMethod": "Authenticator",
"#microsoft.graph.phoneAuthenticationMethod": "SMS",
"#microsoft.graph.fido2AuthenticationMethod": "FIDO2",
"#microsoft.graph.temporaryAccessPassAuthenticationMethod": "TAP",
"#microsoft.graph.softwareOathAuthenticationMethod": "OATH",
"#microsoft.graph.passwordAuthenticationMethod": "Password",
"#microsoft.graph.emailAuthenticationMethod": "Email",
"#microsoft.graph.windowsHelloForBusinessAuthenticationMethod": "WindowsHello",
"#microsoft.graph.passwordlessMicrosoftAuthenticatorAuthenticationMethod": "PasswordlessAuthenticator",
}
def _do(url, extra_headers=None):
hdrs = dict(HEADERS)
if extra_headers:
hdrs.update(extra_headers)
backoff = 2
for attempt in range(6):
req = urllib.request.Request(url, headers=hdrs)
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 429 or e.code >= 500:
retry_after = e.headers.get("Retry-After")
wait = int(retry_after) if retry_after and retry_after.isdigit() else backoff
print(f" [retry {attempt+1}] HTTP {e.code}, waiting {wait}s...", file=sys.stderr)
time.sleep(wait)
backoff = min(backoff * 2, 60)
continue
try:
body = json.loads(e.read().decode("utf-8"))
except Exception:
body = {"error": str(e)}
return e.code, body
except Exception as e:
print(f" [retry {attempt+1}] transport err: {e}", file=sys.stderr)
time.sleep(backoff)
backoff = min(backoff * 2, 60)
continue
return 0, {"error": "max_retries_exceeded"}
def gget(url):
return _do(url)
def gget_consistency(url):
return _do(url, {"ConsistencyLevel": "eventual"})
def collect_user(upn):
row = {
"UPN": upn, "DisplayName": "", "AccountEnabled": "",
"MailboxType": "User", "Licenses": "", "MFARegistered": "",
"MFAMethods": "", "DefaultMFAMethod": "", "LastSignIn": "",
"LastInteractiveSignIn": "", "AdminRoles": "", "JobTitle": "",
"Department": "", "GroupCount": "", "Notes": "",
}
notes = []
# 1a. Basic profile by UPN (signInActivity requires GUID addressing, fetched below)
status, body = gget(
f"https://graph.microsoft.com/v1.0/users/{upn}"
"?$select=id,userPrincipalName,displayName,accountEnabled,jobTitle,department"
)
if status == 404:
row["Notes"] = "not_found"
return row
if status != 200:
row["Notes"] = f"profile_error:{status}"
return row
user_id = body.get("id", "")
row["DisplayName"] = body.get("displayName", "") or ""
row["AccountEnabled"] = str(body.get("accountEnabled", "")).lower()
row["JobTitle"] = body.get("jobTitle", "") or ""
row["Department"] = body.get("department", "") or ""
# 1b. signInActivity - requires GUID addressing
if user_id:
status, body = gget(
f"https://graph.microsoft.com/v1.0/users/{user_id}?$select=signInActivity"
)
if status == 200:
sia = body.get("signInActivity") or {}
# lastSignInDateTime = last interactive sign-in per MS docs
row["LastInteractiveSignIn"] = sia.get("lastSignInDateTime", "") or ""
# LastSignIn = most recent of either (interactive or non-interactive)
li = sia.get("lastSignInDateTime", "") or ""
lni = sia.get("lastNonInteractiveSignInDateTime", "") or ""
row["LastSignIn"] = max(li, lni) if (li or lni) else ""
elif status == 403:
row["LastSignIn"] = "scope_unavailable"
row["LastInteractiveSignIn"] = "scope_unavailable"
else:
notes.append(f"signInActivity:{status}")
# 2. Licenses
status, body = gget(f"https://graph.microsoft.com/v1.0/users/{upn}/licenseDetails")
if status == 200:
skus = [x.get("skuPartNumber", "") for x in body.get("value", []) if x.get("skuPartNumber")]
row["Licenses"] = ";".join(skus)
elif status == 403:
row["Licenses"] = "scope_unavailable"
else:
notes.append(f"licenses:{status}")
# 3. MFA methods
status, body = gget(f"https://graph.microsoft.com/v1.0/users/{upn}/authentication/methods")
methods = []
if status == 200:
for m in body.get("value", []):
t = m.get("@odata.type", "")
methods.append(MFA_TYPE_MAP.get(t, t.replace("#microsoft.graph.", "").replace("AuthenticationMethod", "")))
row["MFAMethods"] = ";".join(methods)
non_password = [m for m in methods if m != "Password"]
row["MFARegistered"] = "true" if non_password else "false"
elif status == 403:
row["MFAMethods"] = "scope_unavailable"
row["MFARegistered"] = "scope_unavailable"
else:
notes.append(f"methods:{status}")
# 4. Default MFA method
status, body = gget(f"https://graph.microsoft.com/beta/users/{upn}/authentication/signInPreferences")
if status == 200:
row["DefaultMFAMethod"] = body.get("userPreferredMethodForSecondaryAuthentication", "") or ""
elif status == 403:
row["DefaultMFAMethod"] = "scope_unavailable"
else:
notes.append(f"signInPrefs:{status}")
# 5. Directory roles
status, body = gget(
f"https://graph.microsoft.com/v1.0/users/{upn}/transitiveMemberOf/microsoft.graph.directoryRole"
)
if status == 200:
roles = [r.get("displayName", "") for r in body.get("value", []) if r.get("displayName")]
row["AdminRoles"] = ";".join(roles)
elif status == 403:
row["AdminRoles"] = "scope_unavailable"
else:
notes.append(f"roles:{status}")
# 6. Group count
status, body = gget_consistency(
f"https://graph.microsoft.com/v1.0/users/{upn}/memberOf?$count=true&$top=1"
)
if status == 200:
row["GroupCount"] = str(body.get("@odata.count", ""))
elif status == 403:
row["GroupCount"] = "scope_unavailable"
else:
notes.append(f"groups:{status}")
# Special note for jodi.ramstack
if upn.lower().startswith("jodi.ramstack"):
jodi_state = []
jodi_state.append(f"flagged_for_delete_2026-04-13:enabled={row['AccountEnabled']}")
jodi_state.append(f"licensed={'yes' if row['Licenses'] and row['Licenses'] != 'scope_unavailable' else 'no'}")
notes.extend(jodi_state)
if notes:
row["Notes"] = ";".join(notes)
return row
def main():
rows = []
for upn in USERS:
print(f"Processing {upn}...", file=sys.stderr)
row = collect_user(upn)
rows.append(row)
time.sleep(0.5)
cols = ["UPN", "DisplayName", "AccountEnabled", "MailboxType", "Licenses",
"MFARegistered", "MFAMethods", "DefaultMFAMethod", "LastSignIn",
"LastInteractiveSignIn", "AdminRoles", "JobTitle", "Department",
"GroupCount", "Notes"]
with open(OUT, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols, quoting=csv.QUOTE_MINIMAL)
w.writeheader()
for r in rows:
w.writerow(r)
print(f"Wrote {len(rows)} rows to {OUT}", file=sys.stderr)
# Print to stdout for visibility
for r in rows:
print(json.dumps(r))
if __name__ == "__main__":
main()