Files
claudetools/clients/cascades-tucson/reports/user-detail-batches/_batch7_collect.py
Howard Enos 74890d51ec sync: auto-sync from ACG-TECH03L at 2026-04-18 14:28:21
Author: Howard Enos
Machine: ACG-TECH03L
Timestamp: 2026-04-18 14:28:21
2026-04-18 14:34:04 -07:00

248 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""Collect M365 user detail for batch 7 (combined 7+8) of Cascades Tucson.
Covers the remaining 8 users: veronica.feller, fax (shared), five Exchange
Online Essentials (suspended SKU) accounts, and sysadmin@ (MSP admin).
"""
import csv
import json
import sys
import time
import urllib.parse
import urllib.request
import urllib.error
TOKEN = open(r"C:\claudetools\clients\cascades-tucson\reports\user-detail-batches\.token").read().strip()
OUT = r"C:\claudetools\clients\cascades-tucson\reports\user-detail-batches\batch-7.csv"
# (UPN, MailboxType) — MailboxType is Shared for fax@, User for everyone else.
USERS = [
("veronica.feller@cascadestucson.com", "User"),
("fax@cascadestucson.com", "Shared"),
("medtech@cascadestucson.com", "User"),
("nurse@cascadestucson.com", "User"),
("transportation@cascadestucson.com", "User"),
("Britney.Thompson@cascadestucson.com", "User"),
("Shelby.Trozzi@cascadestucson.com", "User"),
("sysadmin@cascadestucson.com", "User"),
]
# Users whose primary license is Exchange Online Essentials (the suspended SKU).
ESSENTIALS_USERS = {
"medtech@cascadestucson.com",
"nurse@cascadestucson.com",
"transportation@cascadestucson.com",
"britney.thompson@cascadestucson.com",
"shelby.trozzi@cascadestucson.com",
}
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
MFA_TYPE_MAP = {
"#microsoft.graph.microsoftAuthenticatorAuthenticationMethod": "Authenticator",
"#microsoft.graph.phoneAuthenticationMethod": "SMS",
"#microsoft.graph.fido2AuthenticationMethod": "FIDO2",
"#microsoft.graph.temporaryAccessPassAuthenticationMethod": "TAP",
"#microsoft.graph.softwareOathAuthenticationMethod": "OATH",
"#microsoft.graph.passwordAuthenticationMethod": "Password",
"#microsoft.graph.emailAuthenticationMethod": "Email",
"#microsoft.graph.windowsHelloForBusinessAuthenticationMethod": "WindowsHello",
"#microsoft.graph.passwordlessMicrosoftAuthenticatorAuthenticationMethod": "PasswordlessAuthenticator",
}
def _do(url, extra_headers=None):
hdrs = dict(HEADERS)
if extra_headers:
hdrs.update(extra_headers)
backoff = 2
for attempt in range(6):
req = urllib.request.Request(url, headers=hdrs)
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 429 or e.code >= 500:
retry_after = e.headers.get("Retry-After")
wait = int(retry_after) if retry_after and retry_after.isdigit() else backoff
print(f" [retry {attempt+1}] HTTP {e.code}, waiting {wait}s...", file=sys.stderr)
time.sleep(wait)
backoff = min(backoff * 2, 60)
continue
try:
body = json.loads(e.read().decode("utf-8"))
except Exception:
body = {"error": str(e)}
return e.code, body
except Exception as e:
print(f" [retry {attempt+1}] transport err: {e}", file=sys.stderr)
time.sleep(backoff)
backoff = min(backoff * 2, 60)
continue
return 0, {"error": "max_retries_exceeded"}
def gget(url):
return _do(url)
def gget_consistency(url):
return _do(url, {"ConsistencyLevel": "eventual"})
def _enc(upn):
# Percent-encode just in case (most UPNs are safe, but be defensive).
return urllib.parse.quote(upn, safe="@._-")
def collect_user(upn, mailbox_type):
row = {
"UPN": upn, "DisplayName": "", "AccountEnabled": "",
"MailboxType": mailbox_type, "Licenses": "", "MFARegistered": "",
"MFAMethods": "", "DefaultMFAMethod": "", "LastSignIn": "",
"LastInteractiveSignIn": "", "AdminRoles": "", "JobTitle": "",
"Department": "", "GroupCount": "", "Notes": "",
}
notes = []
upn_enc = _enc(upn)
# 1a. Basic profile by UPN (no signInActivity here — it requires GUID addressing).
status, body = gget(
f"https://graph.microsoft.com/v1.0/users/{upn_enc}"
"?$select=id,userPrincipalName,displayName,accountEnabled,jobTitle,department"
)
if status == 404:
row["Notes"] = "not_found"
return row
if status != 200:
row["Notes"] = f"profile_error:{status}"
return row
user_id = body.get("id", "")
row["DisplayName"] = body.get("displayName", "") or ""
row["AccountEnabled"] = str(body.get("accountEnabled", "")).lower()
row["JobTitle"] = body.get("jobTitle", "") or ""
row["Department"] = body.get("department", "") or ""
# 1b. signInActivity via GUID.
if user_id:
status, body = gget(
f"https://graph.microsoft.com/v1.0/users/{user_id}?$select=signInActivity"
)
if status == 200:
sia = body.get("signInActivity") or {}
row["LastInteractiveSignIn"] = sia.get("lastSignInDateTime", "") or ""
li = sia.get("lastSignInDateTime", "") or ""
lni = sia.get("lastNonInteractiveSignInDateTime", "") or ""
row["LastSignIn"] = max(li, lni) if (li or lni) else ""
elif status == 403:
row["LastSignIn"] = "scope_unavailable"
row["LastInteractiveSignIn"] = "scope_unavailable"
else:
notes.append(f"signInActivity:{status}")
# 2. Licenses
status, body = gget(f"https://graph.microsoft.com/v1.0/users/{upn_enc}/licenseDetails")
if status == 200:
skus = [x.get("skuPartNumber", "") for x in body.get("value", []) if x.get("skuPartNumber")]
row["Licenses"] = ";".join(skus)
elif status == 403:
row["Licenses"] = "scope_unavailable"
else:
notes.append(f"licenses:{status}")
# 3. MFA methods
status, body = gget(f"https://graph.microsoft.com/v1.0/users/{upn_enc}/authentication/methods")
methods = []
if status == 200:
for m in body.get("value", []):
t = m.get("@odata.type", "")
methods.append(MFA_TYPE_MAP.get(t, t.replace("#microsoft.graph.", "").replace("AuthenticationMethod", "")))
row["MFAMethods"] = ";".join(methods)
non_password = [m for m in methods if m != "Password"]
row["MFARegistered"] = "true" if non_password else "false"
elif status == 403:
row["MFAMethods"] = "scope_unavailable"
row["MFARegistered"] = "scope_unavailable"
else:
notes.append(f"methods:{status}")
# 4. Default MFA method (beta)
status, body = gget(f"https://graph.microsoft.com/beta/users/{upn_enc}/authentication/signInPreferences")
if status == 200:
row["DefaultMFAMethod"] = body.get("userPreferredMethodForSecondaryAuthentication", "") or ""
elif status == 403:
row["DefaultMFAMethod"] = "scope_unavailable"
else:
notes.append(f"signInPrefs:{status}")
# 5. Directory roles (admin membership)
status, body = gget(
f"https://graph.microsoft.com/v1.0/users/{upn_enc}/transitiveMemberOf/microsoft.graph.directoryRole"
)
if status == 200:
roles = [r.get("displayName", "") for r in body.get("value", []) if r.get("displayName")]
row["AdminRoles"] = ";".join(roles)
elif status == 403:
row["AdminRoles"] = "scope_unavailable"
else:
notes.append(f"roles:{status}")
# 6. Group count
status, body = gget_consistency(
f"https://graph.microsoft.com/v1.0/users/{upn_enc}/memberOf?$count=true&$top=1"
)
if status == 200:
row["GroupCount"] = str(body.get("@odata.count", ""))
elif status == 403:
row["GroupCount"] = "scope_unavailable"
else:
notes.append(f"groups:{status}")
# Callout: Essentials SKU (suspended) — flag if present.
if upn.lower() in ESSENTIALS_USERS:
lic_upper = (row["Licenses"] or "").upper()
has_essentials = "EXCHANGE" in lic_upper and ("ESSENTIALS" in lic_upper or "EXCHANGESTANDARD" in lic_upper or "S_ESSENTIALS" in lic_upper)
# Be generous — any variant containing "ESSENTIALS" counts.
has_essentials = "ESSENTIALS" in lic_upper
notes.append(
"essentials_sku_suspended:" + ("licensed" if has_essentials else "no_essentials_sku_visible")
)
if upn.lower() == "fax@cascadestucson.com":
notes.append("shared_mailbox:fax_to_email_routing")
if upn.lower() == "sysadmin@cascadestucson.com":
notes.append("msp_admin_account")
if notes:
row["Notes"] = ";".join(notes)
return row
def main():
rows = []
for upn, mbxtype in USERS:
print(f"Processing {upn}...", file=sys.stderr)
row = collect_user(upn, mbxtype)
rows.append(row)
time.sleep(0.5)
cols = ["UPN", "DisplayName", "AccountEnabled", "MailboxType", "Licenses",
"MFARegistered", "MFAMethods", "DefaultMFAMethod", "LastSignIn",
"LastInteractiveSignIn", "AdminRoles", "JobTitle", "Department",
"GroupCount", "Notes"]
with open(OUT, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols, quoting=csv.QUOTE_MINIMAL)
w.writeheader()
for r in rows:
w.writerow(r)
print(f"Wrote {len(rows)} rows to {OUT}", file=sys.stderr)
# Print each row as JSON for visibility.
for r in rows:
print(json.dumps(r))
if __name__ == "__main__":
main()