#!/usr/bin/env python3 """ Compare Bardach Temp contacts folder against main Contacts folder in Microsoft 365. Uses subprocess + curl for all HTTP requests. """ import subprocess import json import sys import time from collections import defaultdict # --- Configuration --- TENANT_ID = "dd4a82e8-85a3-44ac-8800-07945ab4d95f" CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418" CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO" SCOPE = "https://graph.microsoft.com/.default" USER = "barbara@bardach.net" GRAPH_BASE = f"https://graph.microsoft.com/v1.0/users/{USER}" SELECT_FIELDS = "id,displayName,givenName,surname,emailAddresses,homePhones,businessPhones,companyName,jobTitle,personalNotes,homeAddress,businessAddress,otherAddress,birthday,nickName,categories,lastModifiedDateTime" OUTPUT_FILE = "D:/ClaudeTools/temp/bardach_temp_vs_main.json" api_call_count = 0 access_token = None def get_token(): """Acquire OAuth2 token via client credentials.""" global access_token print("[INFO] Acquiring access token...") cmd = [ "curl", "-s", "-X", "POST", f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token", "-H", "Content-Type: application/x-www-form-urlencoded", "-d", f"client_id={CLIENT_ID}&scope={SCOPE}&client_secret={CLIENT_SECRET}&grant_type=client_credentials" ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) if "access_token" not in data: print(f"[ERROR] Failed to get token: {data}") sys.exit(1) access_token = data["access_token"] print("[OK] Token acquired.") def api_get(url): """Make a GET request to Graph API, re-acquiring token every 500 calls.""" global api_call_count, access_token api_call_count += 1 if api_call_count % 500 == 0: print(f"[INFO] Re-acquiring token after {api_call_count} API calls...") get_token() cmd = [ "curl", "-s", "-X", "GET", url, "-H", f"Authorization: Bearer {access_token}", "-H", "Content-Type: application/json" ] result = subprocess.run(cmd, capture_output=True, text=True) try: data = json.loads(result.stdout) except json.JSONDecodeError: print(f"[ERROR] Non-JSON response from: {url}") print(result.stdout[:500]) return None if "error" in data: err = data["error"] # Handle throttling if err.get("code") == "TooManyRequests" or err.get("code") == "429": retry_after = 30 print(f"[WARNING] Throttled. Waiting {retry_after}s...") time.sleep(retry_after) return api_get(url) print(f"[ERROR] API error: {err.get('code')}: {err.get('message')}") return None return data def get_contact_folders(): """Find the Temp folder ID and the default Contacts folder ID.""" print("[INFO] Fetching contact folders...") url = f"{GRAPH_BASE}/contactFolders?$top=100" data = api_get(url) if not data: print("[ERROR] Could not fetch contact folders.") sys.exit(1) temp_folder_id = None default_folder_id = None for folder in data.get("value", []): name = folder.get("displayName", "") fid = folder.get("id", "") parent = folder.get("parentFolderId", "") print(f" Folder: {name} (id: {fid[:20]}...)") if name.lower() == "temp": temp_folder_id = fid # The default contacts folder usually has displayName = "Contacts" at top level # but we can also just use the /contacts endpoint for default # For the main contacts folder, we use the default /contacts endpoint # which returns contacts in the default Contacts folder print(f"[INFO] Temp folder ID: {temp_folder_id[:20] if temp_folder_id else 'NOT FOUND'}...") if not temp_folder_id: print("[ERROR] Temp folder not found!") sys.exit(1) return temp_folder_id def fetch_all_contacts(url_base, label): """Fetch all contacts from a folder with pagination.""" contacts = [] url = f"{url_base}?$top=100&$select={SELECT_FIELDS}" page = 1 while url: print(f" Fetching {label} page {page}...") data = api_get(url) if not data: break batch = data.get("value", []) contacts.extend(batch) url = data.get("@odata.nextLink", None) page += 1 print(f"[OK] Fetched {len(contacts)} contacts from {label}.") return contacts def normalize(s): """Lowercase and strip whitespace.""" if not s: return "" return s.strip().lower() def get_emails(contact): """Extract lowercase email set from a contact.""" emails = set() for e in (contact.get("emailAddresses") or []): addr = (e.get("address") or "").strip().lower() if addr: emails.add(addr) return emails def is_blank(contact): """Check if a contact is essentially empty.""" dn = normalize(contact.get("displayName", "")) emails = get_emails(contact) gn = normalize(contact.get("givenName", "")) sn = normalize(contact.get("surname", "")) company = normalize(contact.get("companyName", "")) return not dn and not emails and not gn and not sn and not company def has_address(addr): """Check if an address dict has any content.""" if not addr: return False for key in ["street", "city", "state", "postalCode", "countryOrRegion"]: if (addr.get(key) or "").strip(): return True return False def find_extras(temp_contact, main_contact): """Find fields that Temp has but Main is missing.""" extras = {} # Check emails - find emails in temp not in main temp_emails = get_emails(temp_contact) main_emails = get_emails(main_contact) extra_emails = temp_emails - main_emails if extra_emails: extras["emailAddresses"] = list(extra_emails) # Check phones for phone_field in ["homePhones", "businessPhones"]: temp_phones = set(p.strip() for p in (temp_contact.get(phone_field) or []) if p.strip()) main_phones = set(p.strip() for p in (main_contact.get(phone_field) or []) if p.strip()) extra_phones = temp_phones - main_phones if extra_phones: extras[phone_field] = list(extra_phones) # Check simple string fields for field in ["companyName", "jobTitle", "nickName", "birthday"]: temp_val = (temp_contact.get(field) or "").strip() main_val = (main_contact.get(field) or "").strip() if temp_val and not main_val: extras[field] = temp_val # personalNotes - temp has content, main doesn't temp_notes = (temp_contact.get("personalNotes") or "").strip() main_notes = (main_contact.get("personalNotes") or "").strip() if temp_notes and not main_notes: extras["personalNotes"] = temp_notes[:200] + ("..." if len(temp_notes) > 200 else "") # Addresses for addr_field in ["homeAddress", "businessAddress", "otherAddress"]: if has_address(temp_contact.get(addr_field)) and not has_address(main_contact.get(addr_field)): extras[addr_field] = temp_contact.get(addr_field) # Categories temp_cats = set(temp_contact.get("categories") or []) main_cats = set(main_contact.get("categories") or []) extra_cats = temp_cats - main_cats if extra_cats: extras["categories"] = list(extra_cats) return extras def main(): get_token() # Step 1: Find folder IDs temp_folder_id = get_contact_folders() # Step 2: Fetch all contacts from both folders print("\n[INFO] Fetching Temp folder contacts...") temp_contacts = fetch_all_contacts( f"{GRAPH_BASE}/contactFolders/{temp_folder_id}/contacts", "Temp" ) print("\n[INFO] Fetching Main (default) contacts...") main_contacts = fetch_all_contacts( f"{GRAPH_BASE}/contacts", "Main/Default" ) # Step 3: Build indexes for main contacts print("\n[INFO] Building main contact indexes...") main_by_displayname = defaultdict(list) main_by_email = defaultdict(list) main_by_name_combo = defaultdict(list) for mc in main_contacts: dn = normalize(mc.get("displayName", "")) if dn: main_by_displayname[dn].append(mc) for email in get_emails(mc): main_by_email[email].append(mc) gn = normalize(mc.get("givenName", "")) sn = normalize(mc.get("surname", "")) if gn and sn: main_by_name_combo[f"{gn}|{sn}"].append(mc) # Step 4: Compare each Temp contact print("[INFO] Comparing contacts...") exact_matches = [] matches_with_extras = [] unique_to_temp = [] blank_contacts = [] for tc in temp_contacts: # Check blank first if is_blank(tc): blank_contacts.append({"temp_id": tc["id"]}) continue # Try matching matched_main = None # Match by displayName dn = normalize(tc.get("displayName", "")) if dn and dn in main_by_displayname: matched_main = main_by_displayname[dn][0] # Match by email if not matched_main: temp_emails = get_emails(tc) for email in temp_emails: if email in main_by_email: matched_main = main_by_email[email][0] break # Match by givenName+surname if not matched_main: gn = normalize(tc.get("givenName", "")) sn = normalize(tc.get("surname", "")) if gn and sn: combo = f"{gn}|{sn}" if combo in main_by_name_combo: matched_main = main_by_name_combo[combo][0] if matched_main: extras = find_extras(tc, matched_main) if extras: matches_with_extras.append({ "temp_id": tc["id"], "main_id": matched_main["id"], "displayName": tc.get("displayName", ""), "extra_fields": extras }) else: exact_matches.append({ "temp_id": tc["id"], "main_id": matched_main["id"], "displayName": tc.get("displayName", "") }) else: emails_list = [e.get("address", "") for e in (tc.get("emailAddresses") or [])] unique_to_temp.append({ "temp_id": tc["id"], "displayName": tc.get("displayName", ""), "emails": emails_list, "company": tc.get("companyName", "") }) # Step 5: Check for duplicates within Main contacts print("[INFO] Checking for duplicates within Main contacts...") main_name_counts = defaultdict(list) for mc in main_contacts: dn = normalize(mc.get("displayName", "")) if dn: main_name_counts[dn].append(mc["id"]) main_internal_dupes = [] for name, ids in main_name_counts.items(): if len(ids) > 1: main_internal_dupes.append({ "name": name, "count": len(ids), "ids": ids }) # Step 6: Print report print("\n" + "=" * 70) print("BARDACH TEMP vs MAIN CONTACTS - COMPARISON REPORT") print("=" * 70) print(f"\nTotal Temp contacts: {len(temp_contacts)}") print(f"Total Main contacts: {len(main_contacts)}") print() print(f"EXACT MATCH (no extra data): {len(exact_matches)}") print(f"MATCH WITH EXTRAS: {len(matches_with_extras)}") print(f"UNIQUE TO TEMP: {len(unique_to_temp)}") print(f"BLANK/EMPTY: {len(blank_contacts)}") # Extras breakdown if matches_with_extras: print(f"\n--- MATCH WITH EXTRAS Breakdown ---") field_counts = defaultdict(int) for m in matches_with_extras: for field in m["extra_fields"]: field_counts[field] += 1 for field, count in sorted(field_counts.items(), key=lambda x: -x[1]): print(f" {count:>5} contacts have '{field}' that Main lacks") # Unique to Temp - first 50 if unique_to_temp: print(f"\n--- UNIQUE TO TEMP (first 50 of {len(unique_to_temp)}) ---") for i, u in enumerate(unique_to_temp[:50]): emails_str = ", ".join(u["emails"][:2]) if u["emails"] else "(no email)" company_str = u.get("company") or "" dn = u.get("displayName") or "(no name)" print(f" {i+1:>3}. {dn:<35} {emails_str:<40} {company_str}") # Main internal dupes print(f"\n--- MAIN FOLDER INTERNAL DUPLICATES ---") print(f" {len(main_internal_dupes)} names appear more than once in Main contacts") if main_internal_dupes: dupes_sorted = sorted(main_internal_dupes, key=lambda x: -x["count"]) for d in dupes_sorted[:30]: print(f" {d['name']:<40} appears {d['count']}x") # Step 7: Save JSON print(f"\n[INFO] Saving full analysis to {OUTPUT_FILE}...") output = { "summary": { "total_temp": len(temp_contacts), "total_main": len(main_contacts), "exact_matches": len(exact_matches), "matches_with_extras": len(matches_with_extras), "unique_to_temp": len(unique_to_temp), "blank": len(blank_contacts), "main_internal_dupes": len(main_internal_dupes) }, "exact_matches": exact_matches, "matches_with_extras": matches_with_extras, "unique_to_temp": unique_to_temp, "blank": blank_contacts, "main_internal_dupes": main_internal_dupes } with open(OUTPUT_FILE, "w", encoding="utf-8") as f: json.dump(output, f, indent=2, ensure_ascii=False, default=str) print(f"[OK] Saved to {OUTPUT_FILE}") print(f"\n[INFO] Total API calls made: {api_call_count}") print("[SUCCESS] Comparison complete.") if __name__ == "__main__": main()