#!/usr/bin/env python3 """ Bardach Contacts - Notes Analysis Pulls all contacts from main Contacts folder, analyzes personalNotes for junk, duplication, promotable data, and cross-contact duplicates. """ import subprocess import json import re import sys from collections import defaultdict from datetime import datetime # --- Config --- TENANT_ID = "dd4a82e8-85a3-44ac-8800-07945ab4d95f" CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418" CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO" SCOPE = "https://graph.microsoft.com/.default" USER = "barbara@bardach.net" OUTPUT_FILE = "D:/ClaudeTools/temp/bardach_notes_analysis.json" TOP = 100 TOKEN_REFRESH_INTERVAL = 500 # --- Helpers --- def get_token(): result = subprocess.run([ "curl", "-s", "-X", "POST", f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token", "-H", "Content-Type: application/x-www-form-urlencoded", "-d", f"client_id={CLIENT_ID}", "-d", f"client_secret={CLIENT_SECRET}", "-d", f"scope={SCOPE}", "-d", "grant_type=client_credentials" ], capture_output=True, text=True) data = json.loads(result.stdout) if "access_token" not in data: print(f"[ERROR] Token acquisition failed: {data}") sys.exit(1) return data["access_token"] def api_get(url, token): result = subprocess.run([ "curl", "-s", "-H", f"Authorization: Bearer {token}", url ], capture_output=True, text=True) return json.loads(result.stdout) def pull_all_contacts(token): """Pull all contacts from default Contacts folder with pagination.""" select_fields = ( "id,displayName,givenName,surname,emailAddresses,homePhones," "businessPhones,mobilePhone,companyName,jobTitle,personalNotes," "homeAddress,businessAddress,otherAddress,birthday,lastModifiedDateTime" ) url = ( f"https://graph.microsoft.com/v1.0/users/{USER}/contacts" f"?$select={select_fields}&$top={TOP}" ) all_contacts = [] api_calls = 0 page = 0 while url: page += 1 api_calls += 1 # Re-acquire token every N calls if api_calls % TOKEN_REFRESH_INTERVAL == 0: print(f" Re-acquiring token after {api_calls} API calls...") token = get_token() print(f" Fetching page {page} ({len(all_contacts)} contacts so far)...") data = api_get(url, token) if "value" not in data: print(f"[ERROR] Unexpected response: {json.dumps(data)[:500]}") break all_contacts.extend(data["value"]) url = data.get("@odata.nextLink") print(f" Total contacts fetched: {len(all_contacts)} in {api_calls} API calls") return all_contacts, token # --- Analysis Functions --- ICLOUD_PATTERNS = [ r"this contact is read[\s-]*only", r"edit.*in outlook", r"tap the link", r"this contact was created from a read[\s-]*only account", r"read[\s-]*only contact", r"icloud", ] PHONE_PATTERNS = [ r'\(?\d{3}\)?[\s.\-]?\d{3}[\s.\-]?\d{4}', r'\+?\d[\d\s.\-]{7,14}\d', r'\d{3}[\s.\-]\d{4}', ] EMAIL_PATTERN = r'[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}' def normalize_phone(p): """Strip phone to digits only for comparison.""" return re.sub(r'\D', '', str(p)) def extract_phones_from_text(text): """Extract phone numbers from free text.""" phones = set() for pat in PHONE_PATTERNS: for m in re.finditer(pat, text): digits = normalize_phone(m.group()) if len(digits) >= 7: phones.add(digits) return phones def extract_emails_from_text(text): """Extract email addresses from free text.""" return {e.lower() for e in re.findall(EMAIL_PATTERN, text)} def get_contact_phones(c): """Get all phone numbers from structured fields.""" phones = set() for p in c.get("homePhones") or []: d = normalize_phone(p) if d: phones.add(d) for p in c.get("businessPhones") or []: d = normalize_phone(p) if d: phones.add(d) mob = c.get("mobilePhone") if mob: d = normalize_phone(mob) if d: phones.add(d) return phones def get_contact_emails(c): """Get all emails from structured fields.""" emails = set() for e in c.get("emailAddresses") or []: addr = (e.get("address") or "").lower().strip() if addr: emails.add(addr) return emails def format_address(addr): """Convert address dict to string for comparison.""" if not addr: return "" parts = [] for k in ["street", "city", "state", "postalCode", "countryOrRegion"]: v = (addr.get(k) or "").strip() if v: parts.append(v) return " ".join(parts).lower() def analyze_notes(contacts): report = {} # Separate contacts with/without notes with_notes = [] without_notes = [] for c in contacts: notes = (c.get("personalNotes") or "").strip() if notes: with_notes.append(c) else: without_notes.append(c) # --- A. Junk/Boilerplate Notes --- icloud_warnings = [] empty_whitespace = [] for c in contacts: raw_notes = c.get("personalNotes") or "" stripped = raw_notes.strip() if raw_notes and not stripped: empty_whitespace.append({ "id": c["id"], "displayName": c.get("displayName", ""), "note_repr": repr(raw_notes[:100]) }) continue if stripped: lower = stripped.lower() for pat in ICLOUD_PATTERNS: if re.search(pat, lower): icloud_warnings.append({ "id": c["id"], "displayName": c.get("displayName", ""), "note_preview": stripped[:200] }) break report["A_junk_boilerplate"] = { "icloud_warnings_count": len(icloud_warnings), "icloud_warnings": icloud_warnings, "empty_whitespace_count": len(empty_whitespace), "empty_whitespace": empty_whitespace } print(f"\n[A] Junk/Boilerplate: {len(icloud_warnings)} iCloud warnings, {len(empty_whitespace)} empty/whitespace") # --- B. Notes that duplicate structured fields --- dup_phones = [] dup_emails = [] dup_company = [] dup_jobtitle = [] dup_address = [] for c in with_notes: notes = c.get("personalNotes", "").strip() notes_lower = notes.lower() name = c.get("displayName", "") # Phone duplication note_phones = extract_phones_from_text(notes) field_phones = get_contact_phones(c) overlap_phones = note_phones & field_phones if overlap_phones: dup_phones.append({ "displayName": name, "duplicated_phones": list(overlap_phones) }) # Email duplication note_emails = extract_emails_from_text(notes) field_emails = get_contact_emails(c) overlap_emails = note_emails & field_emails if overlap_emails: dup_emails.append({ "displayName": name, "duplicated_emails": list(overlap_emails) }) # Company duplication company = (c.get("companyName") or "").strip().lower() if company and len(company) > 2 and company in notes_lower: dup_company.append({ "displayName": name, "company": c.get("companyName") }) # Job title duplication title = (c.get("jobTitle") or "").strip().lower() if title and len(title) > 2 and title in notes_lower: dup_jobtitle.append({ "displayName": name, "jobTitle": c.get("jobTitle") }) # Address duplication for addr_field in ["homeAddress", "businessAddress", "otherAddress"]: addr_str = format_address(c.get(addr_field)) if addr_str and len(addr_str) > 5: # Check if significant parts of address appear in notes addr_parts = [p for p in addr_str.split() if len(p) > 3] matches = sum(1 for p in addr_parts if p in notes_lower) if len(addr_parts) > 0 and matches >= len(addr_parts) * 0.5: dup_address.append({ "displayName": name, "field": addr_field, "address": format_address(c.get(addr_field)) }) break # one match per contact is enough report["B_duplicates_in_notes"] = { "phones_duplicated_count": len(dup_phones), "phones_duplicated": dup_phones, "emails_duplicated_count": len(dup_emails), "emails_duplicated": dup_emails, "company_duplicated_count": len(dup_company), "company_duplicated": dup_company, "jobtitle_duplicated_count": len(dup_jobtitle), "jobtitle_duplicated": dup_jobtitle, "address_duplicated_count": len(dup_address), "address_duplicated": dup_address } print(f"[B] Duplicated in notes: {len(dup_phones)} phones, {len(dup_emails)} emails, " f"{len(dup_company)} companies, {len(dup_jobtitle)} titles, {len(dup_address)} addresses") # --- C. Notes with structured data that SHOULD be in fields --- promotable_phones = [] promotable_emails = [] for c in with_notes: notes = c.get("personalNotes", "").strip() name = c.get("displayName", "") # Phones in notes NOT in fields note_phones = extract_phones_from_text(notes) field_phones = get_contact_phones(c) extra_phones = note_phones - field_phones if extra_phones: promotable_phones.append({ "displayName": name, "phones_in_notes_only": list(extra_phones), "note_preview": notes[:200] }) # Emails in notes NOT in fields note_emails = extract_emails_from_text(notes) field_emails = get_contact_emails(c) extra_emails = note_emails - field_emails if extra_emails: promotable_emails.append({ "displayName": name, "emails_in_notes_only": list(extra_emails), "note_preview": notes[:200] }) report["C_promotable_data"] = { "phones_promotable_count": len(promotable_phones), "phones_promotable": promotable_phones, "emails_promotable_count": len(promotable_emails), "emails_promotable": promotable_emails } print(f"[C] Promotable data: {len(promotable_phones)} contacts with phones in notes only, " f"{len(promotable_emails)} contacts with emails in notes only") # --- D. Duplicate notes across contacts --- notes_groups = defaultdict(list) for c in with_notes: notes = c.get("personalNotes", "").strip() if notes: notes_groups[notes].append(c.get("displayName", c["id"])) duplicate_groups = [] for notes_text, names in sorted(notes_groups.items(), key=lambda x: -len(x[1])): if len(names) >= 2: duplicate_groups.append({ "note_preview": notes_text[:200], "count": len(names), "contacts": names }) report["D_duplicate_notes_across_contacts"] = { "groups_count": len(duplicate_groups), "groups": duplicate_groups } print(f"[D] Duplicate notes across contacts: {len(duplicate_groups)} groups") # --- E. General statistics --- note_lengths = [len(c.get("personalNotes", "").strip()) for c in with_notes] buckets = {"1-50": 0, "51-200": 0, "201-500": 0, "500+": 0} for l in note_lengths: if l <= 50: buckets["1-50"] += 1 elif l <= 200: buckets["51-200"] += 1 elif l <= 500: buckets["201-500"] += 1 else: buckets["500+"] += 1 avg_len = sum(note_lengths) / len(note_lengths) if note_lengths else 0 # Sample 20 notes of varying lengths sorted_by_len = sorted(with_notes, key=lambda c: len(c.get("personalNotes", ""))) sample_indices = [] n = len(sorted_by_len) if n <= 20: sample_indices = list(range(n)) else: step = n / 20 sample_indices = [int(i * step) for i in range(20)] samples = [] for i in sample_indices: c = sorted_by_len[i] notes = c.get("personalNotes", "").strip() samples.append({ "displayName": c.get("displayName", ""), "note_length": len(notes), "note_preview": notes[:200] }) report["E_statistics"] = { "total_contacts": len(contacts), "contacts_with_notes": len(with_notes), "contacts_without_notes": len(without_notes), "average_note_length": round(avg_len, 1), "length_distribution": buckets, "sample_notes": samples } print(f"[E] Stats: {len(contacts)} total, {len(with_notes)} with notes, " f"{len(without_notes)} without, avg length {avg_len:.1f}") return report def main(): print("=" * 60) print("Bardach Contacts - Notes Analysis") print("=" * 60) print("\n[1] Acquiring token...") token = get_token() print(" [OK] Token acquired") print("\n[2] Pulling all contacts...") contacts, token = pull_all_contacts(token) print(f"\n[3] Analyzing notes across {len(contacts)} contacts...") report = analyze_notes(contacts) report["_metadata"] = { "generated": datetime.now().isoformat(), "total_contacts_analyzed": len(contacts), "user": USER } print(f"\n[4] Saving report to {OUTPUT_FILE}...") with open(OUTPUT_FILE, "w", encoding="utf-8") as f: json.dump(report, f, indent=2, ensure_ascii=False, default=str) print(" [OK] Report saved") # --- Print comprehensive report --- print("\n" + "=" * 60) print("COMPREHENSIVE NOTES ANALYSIS REPORT") print("=" * 60) print(f"\nTotal contacts: {report['E_statistics']['total_contacts']}") print(f"With notes: {report['E_statistics']['contacts_with_notes']}") print(f"Without notes: {report['E_statistics']['contacts_without_notes']}") print(f"Average note length: {report['E_statistics']['average_note_length']} chars") print(f"\n--- A. Junk/Boilerplate ---") a = report["A_junk_boilerplate"] print(f"iCloud warnings: {a['icloud_warnings_count']}") for item in a["icloud_warnings"]: print(f" - {item['displayName']}: {item['note_preview'][:80]}") print(f"Empty/whitespace notes: {a['empty_whitespace_count']}") for item in a["empty_whitespace"]: print(f" - {item['displayName']}") print(f"\n--- B. Notes Duplicating Structured Fields ---") b = report["B_duplicates_in_notes"] print(f"Phone numbers duplicated: {b['phones_duplicated_count']}") for item in b["phones_duplicated"]: print(f" - {item['displayName']}: {item['duplicated_phones']}") print(f"Emails duplicated: {b['emails_duplicated_count']}") for item in b["emails_duplicated"]: print(f" - {item['displayName']}: {item['duplicated_emails']}") print(f"Company names duplicated: {b['company_duplicated_count']}") for item in b["company_duplicated"]: print(f" - {item['displayName']}: {item['company']}") print(f"Job titles duplicated: {b['jobtitle_duplicated_count']}") for item in b["jobtitle_duplicated"]: print(f" - {item['displayName']}: {item['jobTitle']}") print(f"Addresses duplicated: {b['address_duplicated_count']}") for item in b["address_duplicated"]: print(f" - {item['displayName']}: {item['field']} = {item['address']}") print(f"\n--- C. Promotable Data (in notes but NOT in fields) ---") c_data = report["C_promotable_data"] print(f"Contacts with phones in notes only: {c_data['phones_promotable_count']}") for item in c_data["phones_promotable"]: print(f" - {item['displayName']}: {item['phones_in_notes_only']}") print(f"Contacts with emails in notes only: {c_data['emails_promotable_count']}") for item in c_data["emails_promotable"]: print(f" - {item['displayName']}: {item['emails_in_notes_only']}") print(f"\n--- D. Duplicate Notes Across Contacts ---") d = report["D_duplicate_notes_across_contacts"] print(f"Groups with identical notes: {d['groups_count']}") for g in d["groups"]: print(f" - {g['count']} contacts share: \"{g['note_preview'][:100]}\"") for name in g["contacts"]: print(f" {name}") print(f"\n--- E. Note Length Distribution ---") dist = report["E_statistics"]["length_distribution"] for bucket, count in dist.items(): print(f" {bucket}: {count}") print(f"\n--- E. Sample Notes (20 samples, varying lengths) ---") for s in report["E_statistics"]["sample_notes"]: print(f" [{s['note_length']} chars] {s['displayName']}: {s['note_preview'][:120]}") print("\n[DONE]") if __name__ == "__main__": main()