Files
claudetools/clients/cascades-tucson/reports/user-detail-batches/pull-single-user.py
Howard Enos 74890d51ec sync: auto-sync from ACG-TECH03L at 2026-04-18 14:28:21
Author: Howard Enos
Machine: ACG-TECH03L
Timestamp: 2026-04-18 14:28:21
2026-04-18 14:34:04 -07:00

242 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""Pull Graph detail for one user. Writes single-row CSV.
Usage: pull-single-user.py --id <objectId> --upn <upn> --mailbox-type <type> --out <path>
"""
import argparse
import csv
import json
import sys
import time
import urllib.parse
import urllib.request
import urllib.error
TENANT_ID = "207fa277-e9d8-4eb7-ada1-1064d2221498"
TOKEN_FILE = r"C:\claudetools\clients\cascades-tucson\reports\user-detail-batches\.token"
METHOD_SHORT = {
"#microsoft.graph.microsoftAuthenticatorAuthenticationMethod": "Authenticator",
"#microsoft.graph.phoneAuthenticationMethod": "Phone",
"#microsoft.graph.passwordAuthenticationMethod": "Password",
"#microsoft.graph.fido2AuthenticationMethod": "FIDO2",
"#microsoft.graph.windowsHelloForBusinessAuthenticationMethod": "WindowsHello",
"#microsoft.graph.temporaryAccessPassAuthenticationMethod": "TAP",
"#microsoft.graph.softwareOathAuthenticationMethod": "OATH",
"#microsoft.graph.emailAuthenticationMethod": "Email",
"#microsoft.graph.passwordlessMicrosoftAuthenticatorAuthenticationMethod": "PasswordlessAuthenticator",
"#microsoft.graph.platformCredentialAuthenticationMethod": "PlatformCredential",
"#microsoft.graph.hardwareOathAuthenticationMethod": "HardwareOATH",
"#microsoft.graph.smsAuthenticationMethod": "SMS",
}
def short_method(odata_type):
if odata_type in METHOD_SHORT:
return METHOD_SHORT[odata_type]
s = odata_type.replace("#microsoft.graph.", "").replace("AuthenticationMethod", "")
return s[:1].upper() + s[1:] if s else odata_type
def get_token():
with open(TOKEN_FILE, "r") as f:
return f.read().strip()
def graph_get(token, path, extra_headers=None, max_retries=5):
if path.startswith("http"):
url = path
else:
url = f"https://graph.microsoft.com{path}"
for attempt in range(max_retries):
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {token}")
if extra_headers:
for k, v in extra_headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status, json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
try:
body = json.loads(e.read().decode("utf-8"))
except Exception:
body = {"error_raw": str(e)}
if e.code == 429 or (500 <= e.code < 600):
retry_after = 0
try:
retry_after = int(e.headers.get("Retry-After", "0"))
except Exception:
retry_after = 0
delay = max(retry_after, 2 ** attempt * 5)
print(f" [retry {attempt+1}/{max_retries}] HTTP {e.code}, sleeping {delay}s", file=sys.stderr)
time.sleep(delay)
continue
return e.code, body
except Exception as e:
if attempt < max_retries - 1:
time.sleep(5)
continue
return 0, {"error_exc": str(e)}
return 429, {"error": "max_retries_exceeded"}
def process_user(token, user_id, upn_display, mailbox_type):
"""user_id is the Graph objectId (works for both members and guests)."""
row = {
"UPN": upn_display,
"DisplayName": "",
"AccountEnabled": "",
"MailboxType": mailbox_type,
"Licenses": "",
"MFARegistered": "",
"MFAMethods": "",
"DefaultMFAMethod": "",
"LastSignIn": "",
"LastInteractiveSignIn": "",
"AdminRoles": "",
"JobTitle": "",
"Department": "",
"GroupCount": "",
"Notes": "",
}
notes = []
uid = urllib.parse.quote(user_id)
# 1a. Core profile
status, data = graph_get(
token,
f"/v1.0/users/{uid}?$select=id,userPrincipalName,displayName,accountEnabled,jobTitle,department,userType,mail",
)
if status == 404:
row["Notes"] = "not_found"
return row
if status != 200:
notes.append(f"profile_err_{status}")
else:
row["DisplayName"] = data.get("displayName") or ""
row["AccountEnabled"] = str(data.get("accountEnabled", "")).lower()
row["JobTitle"] = data.get("jobTitle") or ""
row["Department"] = data.get("department") or ""
utype = data.get("userType") or ""
if utype == "Guest":
notes.append(f"guest_user:{data.get('mail','')}")
# 1b. signInActivity
status, data = graph_get(token, f"/v1.0/users/{uid}?$select=signInActivity")
if status == 200:
sia = data.get("signInActivity") or {}
row["LastSignIn"] = sia.get("lastSignInDateTime") or ""
row["LastInteractiveSignIn"] = (
sia.get("lastSignInDateTime") or sia.get("lastNonInteractiveSignInDateTime") or ""
)
elif status == 403:
row["LastSignIn"] = "scope_unavailable"
row["LastInteractiveSignIn"] = "scope_unavailable"
else:
notes.append(f"signin_err_{status}")
# 2. Licenses
status, data = graph_get(token, f"/v1.0/users/{uid}/licenseDetails")
if status == 200:
skus = [v.get("skuPartNumber", "") for v in data.get("value", [])]
row["Licenses"] = ";".join([s for s in skus if s])
else:
notes.append(f"lic_err_{status}")
# 3. MFA methods
status, data = graph_get(token, f"/v1.0/users/{uid}/authentication/methods")
if status == 200:
methods = []
non_pw = False
for m in data.get("value", []):
t = m.get("@odata.type", "")
short = short_method(t)
methods.append(short)
if t != "#microsoft.graph.passwordAuthenticationMethod":
non_pw = True
row["MFAMethods"] = ";".join(methods)
row["MFARegistered"] = "true" if non_pw else "false"
elif status == 403:
row["MFAMethods"] = "scope_unavailable"
row["MFARegistered"] = "scope_unavailable"
else:
notes.append(f"mfa_err_{status}")
# 4. Default MFA method (beta)
status, data = graph_get(token, f"/beta/users/{uid}/authentication/signInPreferences")
if status == 200:
row["DefaultMFAMethod"] = data.get("userPreferredMethodForSecondaryAuthentication") or ""
elif status == 403:
row["DefaultMFAMethod"] = "scope_unavailable"
else:
notes.append(f"defmfa_err_{status}")
# 5. Directory roles
status, data = graph_get(
token, f"/v1.0/users/{uid}/transitiveMemberOf/microsoft.graph.directoryRole"
)
if status == 200:
roles = [r.get("displayName", "") for r in data.get("value", [])]
row["AdminRoles"] = ";".join([r for r in roles if r])
elif status == 403:
row["AdminRoles"] = "scope_unavailable"
else:
notes.append(f"roles_err_{status}")
# 6. Group count
status, data = graph_get(
token,
f"/v1.0/users/{uid}/memberOf?$count=true&$top=1",
extra_headers={"ConsistencyLevel": "eventual"},
)
if status == 200:
count = data.get("@odata.count")
row["GroupCount"] = str(count) if count is not None else ""
else:
notes.append(f"groups_err_{status}")
# Auto-notes
auto_notes = []
if mailbox_type == "Shared" and row["Licenses"]:
auto_notes.append("shared_mailbox_still_licensed")
if row["AccountEnabled"] == "false" and row["Licenses"]:
auto_notes.append("disabled_account_still_licensed")
if row["AccountEnabled"] == "false":
auto_notes.append("account_disabled")
if row["MFARegistered"] == "false":
auto_notes.append("no_mfa_registered")
if row["AdminRoles"] and row["AdminRoles"] != "scope_unavailable":
auto_notes.append("has_admin_role")
notes.extend(auto_notes)
row["Notes"] = ";".join(notes)
return row
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--id", required=True, help="Graph objectId")
ap.add_argument("--upn", required=True, help="UPN for display in CSV")
ap.add_argument("--mailbox-type", default="User")
ap.add_argument("--out", required=True, help="Output CSV path (single row)")
args = ap.parse_args()
token = get_token()
row = process_user(token, args.id, args.upn, args.mailbox_type)
fieldnames = [
"UPN", "DisplayName", "AccountEnabled", "MailboxType", "Licenses",
"MFARegistered", "MFAMethods", "DefaultMFAMethod", "LastSignIn",
"LastInteractiveSignIn", "AdminRoles", "JobTitle", "Department",
"GroupCount", "Notes",
]
with open(args.out, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fieldnames, quoting=csv.QUOTE_MINIMAL)
w.writeheader()
w.writerow(row)
print(f"OK {args.upn} -> {args.out}")
if __name__ == "__main__":
main()