Files
claudetools/temp/bardach_dedup_step2_plan.py
Mike Swanson fa15b03180 sync: Auto-sync from ACG-M-L5090 at 2026-03-10 19:11:00
Synced files:
- Quote wizard frontend (all components, hooks, types, config)
- API updates (config, models, routers, schemas, services)
- Client work (bg-builders, gurushow)
- Scripts (BGB Lesley termination, CIPP, Datto, migration)
- Temp files (Bardach contacts, VWP investigation, misc)
- Credentials and session logs
- Email service, PHP API, session logs

Machine: ACG-M-L5090
Timestamp: 2026-03-10 19:11:00

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 19:59:08 -07:00

276 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""Step 2: Build dedup plan from backup contacts."""
import json
import os
from collections import defaultdict
from datetime import datetime
BACKUP_FILE = "D:/ClaudeTools/temp/bardach_temp_backup_prededup.json"
PLAN_FILE = "D:/ClaudeTools/temp/bardach_dedup_plan.json"
def load_backup():
with open(BACKUP_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
return data["contacts"]
def normalize_name(name):
"""Normalize display name for grouping."""
if not name:
return ""
return name.strip().lower()
def get_emails(contact):
"""Extract email addresses as lowercase set."""
emails = set()
for e in (contact.get("emailAddresses") or []):
addr = (e.get("address") or "").strip().lower()
if addr:
emails.add(addr)
return emails
def get_phones(contact, field):
"""Extract phone numbers as set."""
phones = set()
for p in (contact.get(field) or []):
cleaned = p.strip()
if cleaned:
phones.add(cleaned)
return phones
def is_address_empty(addr):
"""Check if an address object is empty."""
if not addr:
return True
for key in ["street", "city", "state", "postalCode", "countryOrRegion"]:
if (addr.get(key) or "").strip():
return False
return True
def score_contact(contact):
"""Score a contact by richness of data."""
score = 0
# Email addresses (2 pts each)
emails = get_emails(contact)
score += len(emails) * 2
# Phone numbers (2 pts each)
for field in ["homePhones", "businessPhones"]:
score += len(get_phones(contact, field)) * 2
# Text fields (1 pt each if non-empty)
for field in ["companyName", "jobTitle", "nickName", "birthday"]:
if (contact.get(field) or "").strip():
score += 1
# Personal notes (2 pts if non-empty, more for longer)
notes = (contact.get("personalNotes") or "").strip()
if notes:
score += 2
if len(notes) > 50:
score += 1
# Addresses (2 pts each if non-empty)
for field in ["homeAddress", "businessAddress", "otherAddress"]:
if not is_address_empty(contact.get(field)):
score += 2
# Categories (1 pt if has any)
if contact.get("categories"):
score += 1
# Given/surname (1 pt each)
if (contact.get("givenName") or "").strip():
score += 1
if (contact.get("surname") or "").strip():
score += 1
# Recency bonus: slight preference for more recently modified
lm = contact.get("lastModifiedDateTime")
if lm:
try:
dt = datetime.fromisoformat(lm.replace("Z", "+00:00"))
# Give up to 2 bonus points for recency (within last year = 2, older = less)
days_ago = (datetime.now(dt.tzinfo) - dt).days
if days_ago < 365:
score += 2
elif days_ago < 730:
score += 1
except Exception:
pass
return score
def build_merge_updates(keeper, duplicates):
"""Determine what unique data from duplicates should be merged into keeper."""
updates = {}
# Merge emails
keeper_emails = get_emails(keeper)
new_emails = set()
for dup in duplicates:
new_emails |= get_emails(dup)
new_emails -= keeper_emails
if new_emails:
# Build new emailAddresses list: keeper's existing + new ones
existing = list(keeper.get("emailAddresses") or [])
for addr in new_emails:
existing.append({"address": addr, "name": ""})
updates["emailAddresses"] = existing
# Merge phones
for field in ["homePhones", "businessPhones"]:
keeper_phones = get_phones(keeper, field)
new_phones = set()
for dup in duplicates:
new_phones |= get_phones(dup, field)
new_phones -= keeper_phones
if new_phones:
existing = list(keeper.get(field) or [])
existing.extend(list(new_phones))
updates[field] = existing
# Merge notes (append unique notes)
keeper_notes = (keeper.get("personalNotes") or "").strip()
for dup in duplicates:
dup_notes = (dup.get("personalNotes") or "").strip()
if dup_notes and dup_notes != keeper_notes and dup_notes not in keeper_notes:
if keeper_notes:
keeper_notes += "\n---\n" + dup_notes
else:
keeper_notes = dup_notes
if keeper_notes != (keeper.get("personalNotes") or "").strip():
updates["personalNotes"] = keeper_notes
# Fill blank fields from duplicates
for field in ["companyName", "jobTitle", "nickName", "birthday"]:
if not (keeper.get(field) or "").strip():
for dup in duplicates:
val = (dup.get(field) or "").strip()
if val:
updates[field] = val
break
# Fill blank addresses
for field in ["homeAddress", "businessAddress", "otherAddress"]:
if is_address_empty(keeper.get(field)):
for dup in duplicates:
if not is_address_empty(dup.get(field)):
updates[field] = dup[field]
break
# Fill given/surname if blank
for field in ["givenName", "surname"]:
if not (keeper.get(field) or "").strip():
for dup in duplicates:
val = (dup.get(field) or "").strip()
if val:
updates[field] = val
break
# Merge categories
keeper_cats = set(keeper.get("categories") or [])
new_cats = set()
for dup in duplicates:
new_cats |= set(dup.get("categories") or [])
new_cats -= keeper_cats
if new_cats:
updates["categories"] = list(keeper_cats | new_cats)
return updates
def main():
print("=" * 60)
print("STEP 2: Build dedup plan")
print("=" * 60)
contacts = load_backup()
print(f"[OK] Loaded {len(contacts)} contacts from backup")
# Group by normalized displayName
groups = defaultdict(list)
no_name_count = 0
for c in contacts:
name = normalize_name(c.get("displayName"))
if not name:
no_name_count += 1
continue
groups[name].append(c)
print(f"[INFO] Unique names: {len(groups)}")
print(f"[INFO] Contacts without displayName: {no_name_count}")
# Find duplicate groups (2+ contacts with same name)
dup_groups = {name: clist for name, clist in groups.items() if len(clist) >= 2}
print(f"[INFO] Duplicate groups (2+ contacts with same name): {len(dup_groups)}")
total_dupes = sum(len(v) for v in dup_groups.values())
total_to_delete = total_dupes - len(dup_groups) # keep one per group
print(f"[INFO] Total contacts in duplicate groups: {total_dupes}")
print(f"[INFO] Contacts to delete (extras): {total_to_delete}")
# Build merge plan
plan = []
keepers_needing_updates = 0
for name, clist in sorted(dup_groups.items()):
# Score each contact
scored = [(score_contact(c), c) for c in clist]
scored.sort(key=lambda x: x[0], reverse=True)
keeper = scored[0][1]
duplicates = [s[1] for s in scored[1:]]
# Build updates
updates = build_merge_updates(keeper, duplicates)
entry = {
"display_name": name,
"group_size": len(clist),
"keeper_id": keeper["id"],
"keeper_score": scored[0][0],
"updates_to_apply": updates,
"delete_ids": [d["id"] for d in duplicates],
"delete_count": len(duplicates)
}
plan.append(entry)
if updates:
keepers_needing_updates += 1
# Save plan
with open(PLAN_FILE, "w", encoding="utf-8") as f:
json.dump({"total_groups": len(plan), "plan": plan}, f, indent=2, ensure_ascii=False)
# Summary
total_deletes = sum(e["delete_count"] for e in plan)
print(f"\n{'=' * 60}")
print(f"DEDUP PLAN SUMMARY")
print(f"{'=' * 60}")
print(f" Duplicate groups: {len(plan)}")
print(f" Keepers needing updates: {keepers_needing_updates}")
print(f" Contacts to delete: {total_deletes}")
print(f" Contacts to keep (dupes): {len(plan)}")
print(f" Unique contacts (no dup): {len(groups) - len(dup_groups)}")
print(f" Final expected count: {len(groups) - len(dup_groups) + len(plan) + no_name_count}")
print(f"[OK] Plan saved to {PLAN_FILE}")
# Show top 10 largest duplicate groups
by_size = sorted(plan, key=lambda x: x["group_size"], reverse=True)[:10]
print(f"\nTop 10 largest duplicate groups:")
for e in by_size:
print(f" {e['display_name']}: {e['group_size']} copies (delete {e['delete_count']})")
if __name__ == "__main__":
main()