Files
claudetools/temp/bardach_compare_temp_main.py
Mike Swanson fa15b03180 sync: Auto-sync from ACG-M-L5090 at 2026-03-10 19:11:00
Synced files:
- Quote wizard frontend (all components, hooks, types, config)
- API updates (config, models, routers, schemas, services)
- Client work (bg-builders, gurushow)
- Scripts (BGB Lesley termination, CIPP, Datto, migration)
- Temp files (Bardach contacts, VWP investigation, misc)
- Credentials and session logs
- Email service, PHP API, session logs

Machine: ACG-M-L5090
Timestamp: 2026-03-10 19:11:00

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 19:59:08 -07:00

397 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Compare Bardach Temp contacts folder against main Contacts folder in Microsoft 365.
Uses subprocess + curl for all HTTP requests.
"""
import subprocess
import json
import sys
import time
from collections import defaultdict
# --- Configuration ---
TENANT_ID = "dd4a82e8-85a3-44ac-8800-07945ab4d95f"
CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418"
CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO"
SCOPE = "https://graph.microsoft.com/.default"
USER = "barbara@bardach.net"
GRAPH_BASE = f"https://graph.microsoft.com/v1.0/users/{USER}"
SELECT_FIELDS = "id,displayName,givenName,surname,emailAddresses,homePhones,businessPhones,companyName,jobTitle,personalNotes,homeAddress,businessAddress,otherAddress,birthday,nickName,categories,lastModifiedDateTime"
OUTPUT_FILE = "D:/ClaudeTools/temp/bardach_temp_vs_main.json"
api_call_count = 0
access_token = None
def get_token():
"""Acquire OAuth2 token via client credentials."""
global access_token
print("[INFO] Acquiring access token...")
cmd = [
"curl", "-s", "-X", "POST",
f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token",
"-H", "Content-Type: application/x-www-form-urlencoded",
"-d", f"client_id={CLIENT_ID}&scope={SCOPE}&client_secret={CLIENT_SECRET}&grant_type=client_credentials"
]
result = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(result.stdout)
if "access_token" not in data:
print(f"[ERROR] Failed to get token: {data}")
sys.exit(1)
access_token = data["access_token"]
print("[OK] Token acquired.")
def api_get(url):
"""Make a GET request to Graph API, re-acquiring token every 500 calls."""
global api_call_count, access_token
api_call_count += 1
if api_call_count % 500 == 0:
print(f"[INFO] Re-acquiring token after {api_call_count} API calls...")
get_token()
cmd = [
"curl", "-s", "-X", "GET", url,
"-H", f"Authorization: Bearer {access_token}",
"-H", "Content-Type: application/json"
]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
print(f"[ERROR] Non-JSON response from: {url}")
print(result.stdout[:500])
return None
if "error" in data:
err = data["error"]
# Handle throttling
if err.get("code") == "TooManyRequests" or err.get("code") == "429":
retry_after = 30
print(f"[WARNING] Throttled. Waiting {retry_after}s...")
time.sleep(retry_after)
return api_get(url)
print(f"[ERROR] API error: {err.get('code')}: {err.get('message')}")
return None
return data
def get_contact_folders():
"""Find the Temp folder ID and the default Contacts folder ID."""
print("[INFO] Fetching contact folders...")
url = f"{GRAPH_BASE}/contactFolders?$top=100"
data = api_get(url)
if not data:
print("[ERROR] Could not fetch contact folders.")
sys.exit(1)
temp_folder_id = None
default_folder_id = None
for folder in data.get("value", []):
name = folder.get("displayName", "")
fid = folder.get("id", "")
parent = folder.get("parentFolderId", "")
print(f" Folder: {name} (id: {fid[:20]}...)")
if name.lower() == "temp":
temp_folder_id = fid
# The default contacts folder usually has displayName = "Contacts" at top level
# but we can also just use the /contacts endpoint for default
# For the main contacts folder, we use the default /contacts endpoint
# which returns contacts in the default Contacts folder
print(f"[INFO] Temp folder ID: {temp_folder_id[:20] if temp_folder_id else 'NOT FOUND'}...")
if not temp_folder_id:
print("[ERROR] Temp folder not found!")
sys.exit(1)
return temp_folder_id
def fetch_all_contacts(url_base, label):
"""Fetch all contacts from a folder with pagination."""
contacts = []
url = f"{url_base}?$top=100&$select={SELECT_FIELDS}"
page = 1
while url:
print(f" Fetching {label} page {page}...")
data = api_get(url)
if not data:
break
batch = data.get("value", [])
contacts.extend(batch)
url = data.get("@odata.nextLink", None)
page += 1
print(f"[OK] Fetched {len(contacts)} contacts from {label}.")
return contacts
def normalize(s):
"""Lowercase and strip whitespace."""
if not s:
return ""
return s.strip().lower()
def get_emails(contact):
"""Extract lowercase email set from a contact."""
emails = set()
for e in (contact.get("emailAddresses") or []):
addr = (e.get("address") or "").strip().lower()
if addr:
emails.add(addr)
return emails
def is_blank(contact):
"""Check if a contact is essentially empty."""
dn = normalize(contact.get("displayName", ""))
emails = get_emails(contact)
gn = normalize(contact.get("givenName", ""))
sn = normalize(contact.get("surname", ""))
company = normalize(contact.get("companyName", ""))
return not dn and not emails and not gn and not sn and not company
def has_address(addr):
"""Check if an address dict has any content."""
if not addr:
return False
for key in ["street", "city", "state", "postalCode", "countryOrRegion"]:
if (addr.get(key) or "").strip():
return True
return False
def find_extras(temp_contact, main_contact):
"""Find fields that Temp has but Main is missing."""
extras = {}
# Check emails - find emails in temp not in main
temp_emails = get_emails(temp_contact)
main_emails = get_emails(main_contact)
extra_emails = temp_emails - main_emails
if extra_emails:
extras["emailAddresses"] = list(extra_emails)
# Check phones
for phone_field in ["homePhones", "businessPhones"]:
temp_phones = set(p.strip() for p in (temp_contact.get(phone_field) or []) if p.strip())
main_phones = set(p.strip() for p in (main_contact.get(phone_field) or []) if p.strip())
extra_phones = temp_phones - main_phones
if extra_phones:
extras[phone_field] = list(extra_phones)
# Check simple string fields
for field in ["companyName", "jobTitle", "nickName", "birthday"]:
temp_val = (temp_contact.get(field) or "").strip()
main_val = (main_contact.get(field) or "").strip()
if temp_val and not main_val:
extras[field] = temp_val
# personalNotes - temp has content, main doesn't
temp_notes = (temp_contact.get("personalNotes") or "").strip()
main_notes = (main_contact.get("personalNotes") or "").strip()
if temp_notes and not main_notes:
extras["personalNotes"] = temp_notes[:200] + ("..." if len(temp_notes) > 200 else "")
# Addresses
for addr_field in ["homeAddress", "businessAddress", "otherAddress"]:
if has_address(temp_contact.get(addr_field)) and not has_address(main_contact.get(addr_field)):
extras[addr_field] = temp_contact.get(addr_field)
# Categories
temp_cats = set(temp_contact.get("categories") or [])
main_cats = set(main_contact.get("categories") or [])
extra_cats = temp_cats - main_cats
if extra_cats:
extras["categories"] = list(extra_cats)
return extras
def main():
get_token()
# Step 1: Find folder IDs
temp_folder_id = get_contact_folders()
# Step 2: Fetch all contacts from both folders
print("\n[INFO] Fetching Temp folder contacts...")
temp_contacts = fetch_all_contacts(
f"{GRAPH_BASE}/contactFolders/{temp_folder_id}/contacts",
"Temp"
)
print("\n[INFO] Fetching Main (default) contacts...")
main_contacts = fetch_all_contacts(
f"{GRAPH_BASE}/contacts",
"Main/Default"
)
# Step 3: Build indexes for main contacts
print("\n[INFO] Building main contact indexes...")
main_by_displayname = defaultdict(list)
main_by_email = defaultdict(list)
main_by_name_combo = defaultdict(list)
for mc in main_contacts:
dn = normalize(mc.get("displayName", ""))
if dn:
main_by_displayname[dn].append(mc)
for email in get_emails(mc):
main_by_email[email].append(mc)
gn = normalize(mc.get("givenName", ""))
sn = normalize(mc.get("surname", ""))
if gn and sn:
main_by_name_combo[f"{gn}|{sn}"].append(mc)
# Step 4: Compare each Temp contact
print("[INFO] Comparing contacts...")
exact_matches = []
matches_with_extras = []
unique_to_temp = []
blank_contacts = []
for tc in temp_contacts:
# Check blank first
if is_blank(tc):
blank_contacts.append({"temp_id": tc["id"]})
continue
# Try matching
matched_main = None
# Match by displayName
dn = normalize(tc.get("displayName", ""))
if dn and dn in main_by_displayname:
matched_main = main_by_displayname[dn][0]
# Match by email
if not matched_main:
temp_emails = get_emails(tc)
for email in temp_emails:
if email in main_by_email:
matched_main = main_by_email[email][0]
break
# Match by givenName+surname
if not matched_main:
gn = normalize(tc.get("givenName", ""))
sn = normalize(tc.get("surname", ""))
if gn and sn:
combo = f"{gn}|{sn}"
if combo in main_by_name_combo:
matched_main = main_by_name_combo[combo][0]
if matched_main:
extras = find_extras(tc, matched_main)
if extras:
matches_with_extras.append({
"temp_id": tc["id"],
"main_id": matched_main["id"],
"displayName": tc.get("displayName", ""),
"extra_fields": extras
})
else:
exact_matches.append({
"temp_id": tc["id"],
"main_id": matched_main["id"],
"displayName": tc.get("displayName", "")
})
else:
emails_list = [e.get("address", "") for e in (tc.get("emailAddresses") or [])]
unique_to_temp.append({
"temp_id": tc["id"],
"displayName": tc.get("displayName", ""),
"emails": emails_list,
"company": tc.get("companyName", "")
})
# Step 5: Check for duplicates within Main contacts
print("[INFO] Checking for duplicates within Main contacts...")
main_name_counts = defaultdict(list)
for mc in main_contacts:
dn = normalize(mc.get("displayName", ""))
if dn:
main_name_counts[dn].append(mc["id"])
main_internal_dupes = []
for name, ids in main_name_counts.items():
if len(ids) > 1:
main_internal_dupes.append({
"name": name,
"count": len(ids),
"ids": ids
})
# Step 6: Print report
print("\n" + "=" * 70)
print("BARDACH TEMP vs MAIN CONTACTS - COMPARISON REPORT")
print("=" * 70)
print(f"\nTotal Temp contacts: {len(temp_contacts)}")
print(f"Total Main contacts: {len(main_contacts)}")
print()
print(f"EXACT MATCH (no extra data): {len(exact_matches)}")
print(f"MATCH WITH EXTRAS: {len(matches_with_extras)}")
print(f"UNIQUE TO TEMP: {len(unique_to_temp)}")
print(f"BLANK/EMPTY: {len(blank_contacts)}")
# Extras breakdown
if matches_with_extras:
print(f"\n--- MATCH WITH EXTRAS Breakdown ---")
field_counts = defaultdict(int)
for m in matches_with_extras:
for field in m["extra_fields"]:
field_counts[field] += 1
for field, count in sorted(field_counts.items(), key=lambda x: -x[1]):
print(f" {count:>5} contacts have '{field}' that Main lacks")
# Unique to Temp - first 50
if unique_to_temp:
print(f"\n--- UNIQUE TO TEMP (first 50 of {len(unique_to_temp)}) ---")
for i, u in enumerate(unique_to_temp[:50]):
emails_str = ", ".join(u["emails"][:2]) if u["emails"] else "(no email)"
company_str = u.get("company") or ""
dn = u.get("displayName") or "(no name)"
print(f" {i+1:>3}. {dn:<35} {emails_str:<40} {company_str}")
# Main internal dupes
print(f"\n--- MAIN FOLDER INTERNAL DUPLICATES ---")
print(f" {len(main_internal_dupes)} names appear more than once in Main contacts")
if main_internal_dupes:
dupes_sorted = sorted(main_internal_dupes, key=lambda x: -x["count"])
for d in dupes_sorted[:30]:
print(f" {d['name']:<40} appears {d['count']}x")
# Step 7: Save JSON
print(f"\n[INFO] Saving full analysis to {OUTPUT_FILE}...")
output = {
"summary": {
"total_temp": len(temp_contacts),
"total_main": len(main_contacts),
"exact_matches": len(exact_matches),
"matches_with_extras": len(matches_with_extras),
"unique_to_temp": len(unique_to_temp),
"blank": len(blank_contacts),
"main_internal_dupes": len(main_internal_dupes)
},
"exact_matches": exact_matches,
"matches_with_extras": matches_with_extras,
"unique_to_temp": unique_to_temp,
"blank": blank_contacts,
"main_internal_dupes": main_internal_dupes
}
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, ensure_ascii=False, default=str)
print(f"[OK] Saved to {OUTPUT_FILE}")
print(f"\n[INFO] Total API calls made: {api_call_count}")
print("[SUCCESS] Comparison complete.")
if __name__ == "__main__":
main()