Files
claudetools/clients/cascades-tucson/reports/user-detail-batches/_batch2_collect.py
Howard Enos 74890d51ec sync: auto-sync from ACG-TECH03L at 2026-04-18 14:28:21
Author: Howard Enos
Machine: ACG-TECH03L
Timestamp: 2026-04-18 14:28:21
2026-04-18 14:34:04 -07:00

248 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""Collect per-user M365 detail for Cascades Tucson batch 2."""
import csv
import json
import sys
import time
import urllib.parse
import urllib.request
TOKEN_FILE = r"C:\claudetools\clients\cascades-tucson\reports\user-detail-batches\.token_batch2"
OUT_FILE = r"C:\claudetools\clients\cascades-tucson\reports\user-detail-batches\batch-2.csv"
USERS = [
"ann.dery@cascadestucson.com",
"ashley.jensen@cascadestucson.com",
"boadmin@cascadestucson.com",
"christina.dupras@cascadestucson.com",
"christine.nyanzunda@cascadestucson.com",
]
with open(TOKEN_FILE, "r") as f:
TOKEN = f.read().strip()
HEADERS_BASE = {
"Authorization": f"Bearer {TOKEN}",
"Accept": "application/json",
}
# Map Graph method @odata.type -> short name
METHOD_MAP = {
"#microsoft.graph.microsoftAuthenticatorAuthenticationMethod": "Authenticator",
"#microsoft.graph.phoneAuthenticationMethod": "SMS",
"#microsoft.graph.fido2AuthenticationMethod": "FIDO2",
"#microsoft.graph.temporaryAccessPassAuthenticationMethod": "TAP",
"#microsoft.graph.softwareOathAuthenticationMethod": "OATH",
"#microsoft.graph.passwordAuthenticationMethod": "Password",
"#microsoft.graph.emailAuthenticationMethod": "Email",
"#microsoft.graph.windowsHelloForBusinessAuthenticationMethod": "WindowsHello",
"#microsoft.graph.passwordlessMicrosoftAuthenticatorAuthenticationMethod": "AuthenticatorPasswordless",
}
def graph_get(url, extra_headers=None, max_retries=4):
headers = dict(HEADERS_BASE)
if extra_headers:
headers.update(extra_headers)
attempt = 0
while True:
req = urllib.request.Request(url, headers=headers, method="GET")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
status = resp.status
data = resp.read().decode("utf-8")
return status, json.loads(data) if data else {}
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
try:
j = json.loads(body)
except Exception:
j = {"raw": body}
if e.code == 429 and attempt < max_retries:
retry_after = int(e.headers.get("Retry-After", "2"))
time.sleep(max(retry_after, 2))
attempt += 1
continue
if e.code >= 500 and attempt < max_retries:
time.sleep(2 ** attempt)
attempt += 1
continue
return e.code, j
except Exception as e:
if attempt < max_retries:
time.sleep(2 ** attempt)
attempt += 1
continue
return 0, {"error": str(e)}
def collect_user(upn):
row = {
"UPN": upn,
"DisplayName": "",
"AccountEnabled": "",
"MailboxType": "User",
"Licenses": "",
"MFARegistered": "false",
"MFAMethods": "",
"DefaultMFAMethod": "",
"LastSignIn": "",
"LastInteractiveSignIn": "",
"AdminRoles": "",
"JobTitle": "",
"Department": "",
"GroupCount": "",
"Notes": "",
}
notes = []
enc_upn = urllib.parse.quote(upn)
# 1a. Core profile (without signInActivity - Graph requires GUID key for that)
url = (f"https://graph.microsoft.com/v1.0/users/{enc_upn}"
"?$select=id,userPrincipalName,displayName,accountEnabled,"
"jobTitle,department")
status, data = graph_get(url)
if status == 404:
row["Notes"] = "not_found"
return row
user_id = ""
if status == 200:
user_id = data.get("id", "")
row["DisplayName"] = data.get("displayName") or ""
row["AccountEnabled"] = str(data.get("accountEnabled", "")).lower()
row["JobTitle"] = data.get("jobTitle") or ""
row["Department"] = data.get("department") or ""
elif status in (401, 403):
notes.append("profile:scope_unavailable")
else:
notes.append(f"profile:http_{status}")
# 1b. signInActivity (requires GUID)
if user_id:
url = (f"https://graph.microsoft.com/v1.0/users/{user_id}"
"?$select=signInActivity")
status, data = graph_get(url)
if status == 200:
sia = data.get("signInActivity") or {}
row["LastSignIn"] = sia.get("lastSignInDateTime") or ""
row["LastInteractiveSignIn"] = (
sia.get("lastSignInDateTime")
or sia.get("lastNonInteractiveSignInDateTime")
or ""
)
elif status in (401, 403):
row["LastSignIn"] = "scope_unavailable"
row["LastInteractiveSignIn"] = "scope_unavailable"
else:
notes.append(f"signin:http_{status}")
# 2. Licenses
url = f"https://graph.microsoft.com/v1.0/users/{enc_upn}/licenseDetails"
status, data = graph_get(url)
if status == 200:
skus = [x.get("skuPartNumber", "") for x in data.get("value", [])]
row["Licenses"] = ";".join(s for s in skus if s)
elif status == 404:
notes.append("licenses:not_found")
elif status in (401, 403):
row["Licenses"] = "scope_unavailable"
else:
notes.append(f"licenses:http_{status}")
# 3. MFA methods
url = f"https://graph.microsoft.com/v1.0/users/{enc_upn}/authentication/methods"
status, data = graph_get(url)
if status == 200:
methods_raw = data.get("value", [])
short_names = []
non_password_count = 0
for m in methods_raw:
t = m.get("@odata.type", "")
short = METHOD_MAP.get(t, t.replace("#microsoft.graph.", "").replace("AuthenticationMethod", ""))
short_names.append(short)
if t != "#microsoft.graph.passwordAuthenticationMethod":
non_password_count += 1
row["MFAMethods"] = ";".join(short_names)
row["MFARegistered"] = "true" if non_password_count > 0 else "false"
elif status in (401, 403):
row["MFAMethods"] = "scope_unavailable"
row["MFARegistered"] = "scope_unavailable"
elif status == 404:
notes.append("mfa:not_found")
else:
notes.append(f"mfa:http_{status}")
# 4. Default MFA method
url = f"https://graph.microsoft.com/beta/users/{enc_upn}/authentication/signInPreferences"
status, data = graph_get(url)
if status == 200:
row["DefaultMFAMethod"] = data.get("userPreferredMethodForSecondaryAuthentication") or ""
elif status in (401, 403):
row["DefaultMFAMethod"] = "scope_unavailable"
elif status == 404:
notes.append("default_mfa:not_found")
else:
notes.append(f"default_mfa:http_{status}")
# 5. Directory roles (transitive)
url = (f"https://graph.microsoft.com/v1.0/users/{enc_upn}/transitiveMemberOf/"
"microsoft.graph.directoryRole")
status, data = graph_get(url)
if status == 200:
roles = [r.get("displayName", "") for r in data.get("value", [])]
row["AdminRoles"] = ";".join(r for r in roles if r)
elif status in (401, 403):
row["AdminRoles"] = "scope_unavailable"
elif status == 404:
pass
else:
notes.append(f"roles:http_{status}")
# 6. Group count
url = f"https://graph.microsoft.com/v1.0/users/{enc_upn}/memberOf?$count=true&$top=1"
status, data = graph_get(url, extra_headers={"ConsistencyLevel": "eventual"})
if status == 200:
cnt = data.get("@odata.count")
row["GroupCount"] = str(cnt) if cnt is not None else ""
elif status in (401, 403):
row["GroupCount"] = "scope_unavailable"
elif status == 404:
pass
else:
notes.append(f"groups:http_{status}")
if notes and not row["Notes"]:
row["Notes"] = ";".join(notes)
return row
def main():
rows = []
for i, upn in enumerate(USERS):
print(f"[{i+1}/{len(USERS)}] {upn}", file=sys.stderr)
row = collect_user(upn)
rows.append(row)
if i < len(USERS) - 1:
time.sleep(0.25)
fieldnames = [
"UPN", "DisplayName", "AccountEnabled", "MailboxType", "Licenses",
"MFARegistered", "MFAMethods", "DefaultMFAMethod", "LastSignIn",
"LastInteractiveSignIn", "AdminRoles", "JobTitle", "Department",
"GroupCount", "Notes",
]
with open(OUT_FILE, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for r in rows:
writer.writerow(r)
print(f"Wrote {len(rows)} rows to {OUT_FILE}", file=sys.stderr)
# Also print summary to stdout
for r in rows:
print(json.dumps(r))
if __name__ == "__main__":
main()