#!/usr/bin/env python3 """Find and analyze duplicate contacts in Barbara Bardach's Main Contacts folder.""" import subprocess import json import sys from collections import defaultdict TENANT_ID = "dd4a82e8-85a3-44ac-8800-07945ab4d95f" CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418" CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO" USER = "barbara@bardach.net" SELECT_FIELDS = "id,displayName,givenName,surname,emailAddresses,homePhones,businessPhones,companyName,jobTitle,personalNotes,homeAddress,businessAddress,birthday,lastModifiedDateTime" def curl_json(args): """Run curl and return parsed JSON.""" result = subprocess.run( ["curl", "-s", "-S"] + args, capture_output=True, text=True, timeout=60 ) if result.returncode != 0: print(f"[ERROR] curl failed: {result.stderr}", file=sys.stderr) sys.exit(1) try: return json.loads(result.stdout) except json.JSONDecodeError: print(f"[ERROR] Invalid JSON response: {result.stdout[:500]}", file=sys.stderr) sys.exit(1) def get_token(): """Get access token using client credentials flow.""" url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token" data = ( f"grant_type=client_credentials" f"&client_id={CLIENT_ID}" f"&client_secret={CLIENT_SECRET}" f"&scope=https%3A%2F%2Fgraph.microsoft.com%2F.default" ) resp = curl_json([ "-X", "POST", url, "-H", "Content-Type: application/x-www-form-urlencoded", "-d", data ]) if "access_token" not in resp: print(f"[ERROR] Token request failed: {json.dumps(resp, indent=2)}", file=sys.stderr) sys.exit(1) print("[OK] Got access token") return resp["access_token"] def get_all_contacts(token): """Pull all contacts from the default contacts folder with pagination.""" contacts = [] url = ( f"https://graph.microsoft.com/v1.0/users/{USER}/contacts" f"?$select={SELECT_FIELDS}&$top=250" ) page = 1 while url: print(f" Fetching page {page}...") resp = curl_json([ "-H", f"Authorization: Bearer {token}", "-H", "Content-Type: application/json", url ]) if "error" in resp: print(f"[ERROR] Graph API error: {json.dumps(resp['error'], indent=2)}", file=sys.stderr) sys.exit(1) batch = resp.get("value", []) contacts.extend(batch) print(f" Got {len(batch)} contacts (total: {len(contacts)})") url = resp.get("@odata.nextLink") page += 1 return contacts def count_filled_fields(contact): """Count how many fields have meaningful data.""" score = 0 for key in ["givenName", "surname", "companyName", "jobTitle", "birthday"]: if contact.get(key): score += 1 if contact.get("personalNotes") and contact["personalNotes"].strip(): score += 2 # notes are valuable for key in ["emailAddresses", "homePhones", "businessPhones"]: val = contact.get(key) if val and len(val) > 0: score += len(val) for key in ["homeAddress", "businessAddress"]: addr = contact.get(key) if addr and any(addr.get(f) for f in ["street", "city", "state", "postalCode"]): score += 1 # Prefer more recently modified return score def summarize_differences(contacts): """Summarize what differs between duplicate contacts.""" diffs = [] fields_to_compare = [ "givenName", "surname", "companyName", "jobTitle", "birthday", "personalNotes" ] list_fields = ["emailAddresses", "homePhones", "businessPhones"] addr_fields = ["homeAddress", "businessAddress"] for field in fields_to_compare: values = set() for c in contacts: v = c.get(field) if v: values.add(str(v).strip()) if len(values) > 1: diffs.append(f"{field}: {values}") elif len(values) == 1: pass # same across all # if 0, nobody has it for field in list_fields: all_vals = [] for c in contacts: v = c.get(field, []) or [] if field == "emailAddresses": items = sorted([e.get("address", "") for e in v if e.get("address")]) else: items = sorted(v) if v else [] all_vals.append(tuple(items)) if len(set(all_vals)) > 1: diffs.append(f"{field} differ: {[list(x) for x in all_vals]}") for field in addr_fields: addrs = [] for c in contacts: a = c.get(field) or {} parts = [a.get("street",""), a.get("city",""), a.get("state",""), a.get("postalCode","")] addrs.append(tuple(p.strip() if p else "" for p in parts)) if len(set(addrs)) > 1: diffs.append(f"{field} differ") # Check lastModifiedDateTime dates = [c.get("lastModifiedDateTime", "unknown") for c in contacts] if len(set(dates)) > 1: diffs.append(f"lastModified: {dates}") return "; ".join(diffs) if diffs else "No differences found (exact duplicates)" def analyze_duplicates(contacts): """Group by displayName and find duplicates.""" groups = defaultdict(list) for c in contacts: name = (c.get("displayName") or "").strip().lower() if name: groups[name].append(c) duplicate_groups = [] for name, group in sorted(groups.items()): if len(group) < 2: continue # Score each contact scored = [(count_filled_fields(c), c.get("lastModifiedDateTime", ""), c) for c in group] # Sort by score desc, then by lastModified desc scored.sort(key=lambda x: (x[0], x[1]), reverse=True) keeper = scored[0][2] deletable = [s[2] for s in scored[1:]] differences = summarize_differences(group) duplicate_groups.append({ "name": group[0].get("displayName", name), "count": len(group), "contacts": group, "keeper_id": keeper["id"], "delete_ids": [c["id"] for c in deletable], "differences": differences, "_scores": [(s[0], s[2]["id"][:8]) for s in scored] }) return duplicate_groups def print_report(contacts, dup_groups): """Print a detailed report.""" total_removable = sum(len(g["delete_ids"]) for g in dup_groups) print("\n" + "=" * 80) print(f"DUPLICATE CONTACTS ANALYSIS - Barbara Bardach") print("=" * 80) print(f"Total contacts in Main Contacts: {len(contacts)}") print(f"Duplicate groups found: {len(dup_groups)}") print(f"Total removable contacts: {total_removable}") print("=" * 80) for i, g in enumerate(dup_groups, 1): print(f"\n--- Group {i}: {g['name']} ({g['count']} contacts) ---") for j, c in enumerate(g["contacts"]): is_keeper = c["id"] == g["keeper_id"] marker = "[KEEP]" if is_keeper else "[DELETE]" score = [s[0] for s in g["_scores"] if s[1] == c["id"][:8]][0] if g.get("_scores") else "?" print(f" {marker} (score={score}) id={c['id'][:12]}...") print(f" displayName: {c.get('displayName')}") print(f" givenName: {c.get('givenName')} surname: {c.get('surname')}") emails = c.get("emailAddresses") or [] if emails: print(f" emails: {[e.get('address') for e in emails]}") hphones = c.get("homePhones") or [] if hphones: print(f" homePhones: {hphones}") bphones = c.get("businessPhones") or [] if bphones: print(f" businessPhones: {bphones}") if c.get("companyName"): print(f" company: {c['companyName']}") if c.get("jobTitle"): print(f" jobTitle: {c['jobTitle']}") if c.get("birthday"): print(f" birthday: {c['birthday']}") for addr_field in ["homeAddress", "businessAddress"]: addr = c.get(addr_field) or {} parts = [addr.get(f, "") for f in ["street", "city", "state", "postalCode"]] if any(p for p in parts): print(f" {addr_field}: {', '.join(p for p in parts if p)}") notes = c.get("personalNotes", "") if notes and notes.strip(): preview = notes.strip()[:80].replace("\n", " ") print(f" notes: {preview}{'...' if len(notes.strip()) > 80 else ''}") print(f" lastModified: {c.get('lastModifiedDateTime')}") print(f" Differences: {g['differences']}") return total_removable def main(): print("[INFO] Starting duplicate contact analysis for Barbara Bardach") # Step 1: Get token token = get_token() # Step 2+3: Get all contacts from default contacts folder print("[INFO] Fetching all contacts from Main Contacts folder...") contacts = get_all_contacts(token) print(f"[OK] Retrieved {len(contacts)} total contacts") if not contacts: print("[WARNING] No contacts found!") sys.exit(0) # Step 4+5: Find duplicates print("[INFO] Analyzing duplicates...") dup_groups = analyze_duplicates(contacts) # Step 6+7: Print report total_removable = print_report(contacts, dup_groups) # Step 8: Save analysis JSON # Remove internal _scores from output output_groups = [] for g in dup_groups: out = dict(g) out.pop("_scores", None) output_groups.append(out) analysis = { "total_contacts": len(contacts), "duplicate_groups": len(dup_groups), "total_removable": total_removable, "groups": output_groups } output_path = r"D:\ClaudeTools\temp\bardach_main_dupes_analysis.json" with open(output_path, "w", encoding="utf-8") as f: json.dump(analysis, f, indent=2, default=str) print(f"\n[OK] Analysis saved to {output_path}") if __name__ == "__main__": main()