#!/usr/bin/env python3 """Step 2: Build dedup plan from backup contacts.""" import json import os from collections import defaultdict from datetime import datetime BACKUP_FILE = "D:/ClaudeTools/temp/bardach_temp_backup_prededup.json" PLAN_FILE = "D:/ClaudeTools/temp/bardach_dedup_plan.json" def load_backup(): with open(BACKUP_FILE, "r", encoding="utf-8") as f: data = json.load(f) return data["contacts"] def normalize_name(name): """Normalize display name for grouping.""" if not name: return "" return name.strip().lower() def get_emails(contact): """Extract email addresses as lowercase set.""" emails = set() for e in (contact.get("emailAddresses") or []): addr = (e.get("address") or "").strip().lower() if addr: emails.add(addr) return emails def get_phones(contact, field): """Extract phone numbers as set.""" phones = set() for p in (contact.get(field) or []): cleaned = p.strip() if cleaned: phones.add(cleaned) return phones def is_address_empty(addr): """Check if an address object is empty.""" if not addr: return True for key in ["street", "city", "state", "postalCode", "countryOrRegion"]: if (addr.get(key) or "").strip(): return False return True def score_contact(contact): """Score a contact by richness of data.""" score = 0 # Email addresses (2 pts each) emails = get_emails(contact) score += len(emails) * 2 # Phone numbers (2 pts each) for field in ["homePhones", "businessPhones"]: score += len(get_phones(contact, field)) * 2 # Text fields (1 pt each if non-empty) for field in ["companyName", "jobTitle", "nickName", "birthday"]: if (contact.get(field) or "").strip(): score += 1 # Personal notes (2 pts if non-empty, more for longer) notes = (contact.get("personalNotes") or "").strip() if notes: score += 2 if len(notes) > 50: score += 1 # Addresses (2 pts each if non-empty) for field in ["homeAddress", "businessAddress", "otherAddress"]: if not is_address_empty(contact.get(field)): score += 2 # Categories (1 pt if has any) if contact.get("categories"): score += 1 # Given/surname (1 pt each) if (contact.get("givenName") or "").strip(): score += 1 if (contact.get("surname") or "").strip(): score += 1 # Recency bonus: slight preference for more recently modified lm = contact.get("lastModifiedDateTime") if lm: try: dt = datetime.fromisoformat(lm.replace("Z", "+00:00")) # Give up to 2 bonus points for recency (within last year = 2, older = less) days_ago = (datetime.now(dt.tzinfo) - dt).days if days_ago < 365: score += 2 elif days_ago < 730: score += 1 except Exception: pass return score def build_merge_updates(keeper, duplicates): """Determine what unique data from duplicates should be merged into keeper.""" updates = {} # Merge emails keeper_emails = get_emails(keeper) new_emails = set() for dup in duplicates: new_emails |= get_emails(dup) new_emails -= keeper_emails if new_emails: # Build new emailAddresses list: keeper's existing + new ones existing = list(keeper.get("emailAddresses") or []) for addr in new_emails: existing.append({"address": addr, "name": ""}) updates["emailAddresses"] = existing # Merge phones for field in ["homePhones", "businessPhones"]: keeper_phones = get_phones(keeper, field) new_phones = set() for dup in duplicates: new_phones |= get_phones(dup, field) new_phones -= keeper_phones if new_phones: existing = list(keeper.get(field) or []) existing.extend(list(new_phones)) updates[field] = existing # Merge notes (append unique notes) keeper_notes = (keeper.get("personalNotes") or "").strip() for dup in duplicates: dup_notes = (dup.get("personalNotes") or "").strip() if dup_notes and dup_notes != keeper_notes and dup_notes not in keeper_notes: if keeper_notes: keeper_notes += "\n---\n" + dup_notes else: keeper_notes = dup_notes if keeper_notes != (keeper.get("personalNotes") or "").strip(): updates["personalNotes"] = keeper_notes # Fill blank fields from duplicates for field in ["companyName", "jobTitle", "nickName", "birthday"]: if not (keeper.get(field) or "").strip(): for dup in duplicates: val = (dup.get(field) or "").strip() if val: updates[field] = val break # Fill blank addresses for field in ["homeAddress", "businessAddress", "otherAddress"]: if is_address_empty(keeper.get(field)): for dup in duplicates: if not is_address_empty(dup.get(field)): updates[field] = dup[field] break # Fill given/surname if blank for field in ["givenName", "surname"]: if not (keeper.get(field) or "").strip(): for dup in duplicates: val = (dup.get(field) or "").strip() if val: updates[field] = val break # Merge categories keeper_cats = set(keeper.get("categories") or []) new_cats = set() for dup in duplicates: new_cats |= set(dup.get("categories") or []) new_cats -= keeper_cats if new_cats: updates["categories"] = list(keeper_cats | new_cats) return updates def main(): print("=" * 60) print("STEP 2: Build dedup plan") print("=" * 60) contacts = load_backup() print(f"[OK] Loaded {len(contacts)} contacts from backup") # Group by normalized displayName groups = defaultdict(list) no_name_count = 0 for c in contacts: name = normalize_name(c.get("displayName")) if not name: no_name_count += 1 continue groups[name].append(c) print(f"[INFO] Unique names: {len(groups)}") print(f"[INFO] Contacts without displayName: {no_name_count}") # Find duplicate groups (2+ contacts with same name) dup_groups = {name: clist for name, clist in groups.items() if len(clist) >= 2} print(f"[INFO] Duplicate groups (2+ contacts with same name): {len(dup_groups)}") total_dupes = sum(len(v) for v in dup_groups.values()) total_to_delete = total_dupes - len(dup_groups) # keep one per group print(f"[INFO] Total contacts in duplicate groups: {total_dupes}") print(f"[INFO] Contacts to delete (extras): {total_to_delete}") # Build merge plan plan = [] keepers_needing_updates = 0 for name, clist in sorted(dup_groups.items()): # Score each contact scored = [(score_contact(c), c) for c in clist] scored.sort(key=lambda x: x[0], reverse=True) keeper = scored[0][1] duplicates = [s[1] for s in scored[1:]] # Build updates updates = build_merge_updates(keeper, duplicates) entry = { "display_name": name, "group_size": len(clist), "keeper_id": keeper["id"], "keeper_score": scored[0][0], "updates_to_apply": updates, "delete_ids": [d["id"] for d in duplicates], "delete_count": len(duplicates) } plan.append(entry) if updates: keepers_needing_updates += 1 # Save plan with open(PLAN_FILE, "w", encoding="utf-8") as f: json.dump({"total_groups": len(plan), "plan": plan}, f, indent=2, ensure_ascii=False) # Summary total_deletes = sum(e["delete_count"] for e in plan) print(f"\n{'=' * 60}") print(f"DEDUP PLAN SUMMARY") print(f"{'=' * 60}") print(f" Duplicate groups: {len(plan)}") print(f" Keepers needing updates: {keepers_needing_updates}") print(f" Contacts to delete: {total_deletes}") print(f" Contacts to keep (dupes): {len(plan)}") print(f" Unique contacts (no dup): {len(groups) - len(dup_groups)}") print(f" Final expected count: {len(groups) - len(dup_groups) + len(plan) + no_name_count}") print(f"[OK] Plan saved to {PLAN_FILE}") # Show top 10 largest duplicate groups by_size = sorted(plan, key=lambda x: x["group_size"], reverse=True)[:10] print(f"\nTop 10 largest duplicate groups:") for e in by_size: print(f" {e['display_name']}: {e['group_size']} copies (delete {e['delete_count']})") if __name__ == "__main__": main()