Synced files: - Quote wizard frontend (all components, hooks, types, config) - API updates (config, models, routers, schemas, services) - Client work (bg-builders, gurushow) - Scripts (BGB Lesley termination, CIPP, Datto, migration) - Temp files (Bardach contacts, VWP investigation, misc) - Credentials and session logs - Email service, PHP API, session logs Machine: ACG-M-L5090 Timestamp: 2026-03-10 19:11:00 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
504 lines
17 KiB
Python
504 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Bardach Contacts - Notes Analysis
|
|
Pulls all contacts from main Contacts folder, analyzes personalNotes
|
|
for junk, duplication, promotable data, and cross-contact duplicates.
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
# --- Config ---
|
|
TENANT_ID = "dd4a82e8-85a3-44ac-8800-07945ab4d95f"
|
|
CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418"
|
|
CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO"
|
|
SCOPE = "https://graph.microsoft.com/.default"
|
|
USER = "barbara@bardach.net"
|
|
OUTPUT_FILE = "D:/ClaudeTools/temp/bardach_notes_analysis.json"
|
|
TOP = 100
|
|
TOKEN_REFRESH_INTERVAL = 500
|
|
|
|
# --- Helpers ---
|
|
def get_token():
|
|
result = subprocess.run([
|
|
"curl", "-s", "-X", "POST",
|
|
f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token",
|
|
"-H", "Content-Type: application/x-www-form-urlencoded",
|
|
"-d", f"client_id={CLIENT_ID}",
|
|
"-d", f"client_secret={CLIENT_SECRET}",
|
|
"-d", f"scope={SCOPE}",
|
|
"-d", "grant_type=client_credentials"
|
|
], capture_output=True, text=True)
|
|
data = json.loads(result.stdout)
|
|
if "access_token" not in data:
|
|
print(f"[ERROR] Token acquisition failed: {data}")
|
|
sys.exit(1)
|
|
return data["access_token"]
|
|
|
|
|
|
def api_get(url, token):
|
|
result = subprocess.run([
|
|
"curl", "-s",
|
|
"-H", f"Authorization: Bearer {token}",
|
|
url
|
|
], capture_output=True, text=True)
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
def pull_all_contacts(token):
|
|
"""Pull all contacts from default Contacts folder with pagination."""
|
|
select_fields = (
|
|
"id,displayName,givenName,surname,emailAddresses,homePhones,"
|
|
"businessPhones,mobilePhone,companyName,jobTitle,personalNotes,"
|
|
"homeAddress,businessAddress,otherAddress,birthday,lastModifiedDateTime"
|
|
)
|
|
url = (
|
|
f"https://graph.microsoft.com/v1.0/users/{USER}/contacts"
|
|
f"?$select={select_fields}&$top={TOP}"
|
|
)
|
|
|
|
all_contacts = []
|
|
api_calls = 0
|
|
page = 0
|
|
|
|
while url:
|
|
page += 1
|
|
api_calls += 1
|
|
|
|
# Re-acquire token every N calls
|
|
if api_calls % TOKEN_REFRESH_INTERVAL == 0:
|
|
print(f" Re-acquiring token after {api_calls} API calls...")
|
|
token = get_token()
|
|
|
|
print(f" Fetching page {page} ({len(all_contacts)} contacts so far)...")
|
|
data = api_get(url, token)
|
|
|
|
if "value" not in data:
|
|
print(f"[ERROR] Unexpected response: {json.dumps(data)[:500]}")
|
|
break
|
|
|
|
all_contacts.extend(data["value"])
|
|
url = data.get("@odata.nextLink")
|
|
|
|
print(f" Total contacts fetched: {len(all_contacts)} in {api_calls} API calls")
|
|
return all_contacts, token
|
|
|
|
|
|
# --- Analysis Functions ---
|
|
|
|
ICLOUD_PATTERNS = [
|
|
r"this contact is read[\s-]*only",
|
|
r"edit.*in outlook",
|
|
r"tap the link",
|
|
r"this contact was created from a read[\s-]*only account",
|
|
r"read[\s-]*only contact",
|
|
r"icloud",
|
|
]
|
|
|
|
PHONE_PATTERNS = [
|
|
r'\(?\d{3}\)?[\s.\-]?\d{3}[\s.\-]?\d{4}',
|
|
r'\+?\d[\d\s.\-]{7,14}\d',
|
|
r'\d{3}[\s.\-]\d{4}',
|
|
]
|
|
|
|
EMAIL_PATTERN = r'[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}'
|
|
|
|
|
|
def normalize_phone(p):
|
|
"""Strip phone to digits only for comparison."""
|
|
return re.sub(r'\D', '', str(p))
|
|
|
|
|
|
def extract_phones_from_text(text):
|
|
"""Extract phone numbers from free text."""
|
|
phones = set()
|
|
for pat in PHONE_PATTERNS:
|
|
for m in re.finditer(pat, text):
|
|
digits = normalize_phone(m.group())
|
|
if len(digits) >= 7:
|
|
phones.add(digits)
|
|
return phones
|
|
|
|
|
|
def extract_emails_from_text(text):
|
|
"""Extract email addresses from free text."""
|
|
return {e.lower() for e in re.findall(EMAIL_PATTERN, text)}
|
|
|
|
|
|
def get_contact_phones(c):
|
|
"""Get all phone numbers from structured fields."""
|
|
phones = set()
|
|
for p in c.get("homePhones") or []:
|
|
d = normalize_phone(p)
|
|
if d:
|
|
phones.add(d)
|
|
for p in c.get("businessPhones") or []:
|
|
d = normalize_phone(p)
|
|
if d:
|
|
phones.add(d)
|
|
mob = c.get("mobilePhone")
|
|
if mob:
|
|
d = normalize_phone(mob)
|
|
if d:
|
|
phones.add(d)
|
|
return phones
|
|
|
|
|
|
def get_contact_emails(c):
|
|
"""Get all emails from structured fields."""
|
|
emails = set()
|
|
for e in c.get("emailAddresses") or []:
|
|
addr = (e.get("address") or "").lower().strip()
|
|
if addr:
|
|
emails.add(addr)
|
|
return emails
|
|
|
|
|
|
def format_address(addr):
|
|
"""Convert address dict to string for comparison."""
|
|
if not addr:
|
|
return ""
|
|
parts = []
|
|
for k in ["street", "city", "state", "postalCode", "countryOrRegion"]:
|
|
v = (addr.get(k) or "").strip()
|
|
if v:
|
|
parts.append(v)
|
|
return " ".join(parts).lower()
|
|
|
|
|
|
def analyze_notes(contacts):
|
|
report = {}
|
|
|
|
# Separate contacts with/without notes
|
|
with_notes = []
|
|
without_notes = []
|
|
for c in contacts:
|
|
notes = (c.get("personalNotes") or "").strip()
|
|
if notes:
|
|
with_notes.append(c)
|
|
else:
|
|
without_notes.append(c)
|
|
|
|
# --- A. Junk/Boilerplate Notes ---
|
|
icloud_warnings = []
|
|
empty_whitespace = []
|
|
|
|
for c in contacts:
|
|
raw_notes = c.get("personalNotes") or ""
|
|
stripped = raw_notes.strip()
|
|
|
|
if raw_notes and not stripped:
|
|
empty_whitespace.append({
|
|
"id": c["id"],
|
|
"displayName": c.get("displayName", ""),
|
|
"note_repr": repr(raw_notes[:100])
|
|
})
|
|
continue
|
|
|
|
if stripped:
|
|
lower = stripped.lower()
|
|
for pat in ICLOUD_PATTERNS:
|
|
if re.search(pat, lower):
|
|
icloud_warnings.append({
|
|
"id": c["id"],
|
|
"displayName": c.get("displayName", ""),
|
|
"note_preview": stripped[:200]
|
|
})
|
|
break
|
|
|
|
report["A_junk_boilerplate"] = {
|
|
"icloud_warnings_count": len(icloud_warnings),
|
|
"icloud_warnings": icloud_warnings,
|
|
"empty_whitespace_count": len(empty_whitespace),
|
|
"empty_whitespace": empty_whitespace
|
|
}
|
|
print(f"\n[A] Junk/Boilerplate: {len(icloud_warnings)} iCloud warnings, {len(empty_whitespace)} empty/whitespace")
|
|
|
|
# --- B. Notes that duplicate structured fields ---
|
|
dup_phones = []
|
|
dup_emails = []
|
|
dup_company = []
|
|
dup_jobtitle = []
|
|
dup_address = []
|
|
|
|
for c in with_notes:
|
|
notes = c.get("personalNotes", "").strip()
|
|
notes_lower = notes.lower()
|
|
name = c.get("displayName", "")
|
|
|
|
# Phone duplication
|
|
note_phones = extract_phones_from_text(notes)
|
|
field_phones = get_contact_phones(c)
|
|
overlap_phones = note_phones & field_phones
|
|
if overlap_phones:
|
|
dup_phones.append({
|
|
"displayName": name,
|
|
"duplicated_phones": list(overlap_phones)
|
|
})
|
|
|
|
# Email duplication
|
|
note_emails = extract_emails_from_text(notes)
|
|
field_emails = get_contact_emails(c)
|
|
overlap_emails = note_emails & field_emails
|
|
if overlap_emails:
|
|
dup_emails.append({
|
|
"displayName": name,
|
|
"duplicated_emails": list(overlap_emails)
|
|
})
|
|
|
|
# Company duplication
|
|
company = (c.get("companyName") or "").strip().lower()
|
|
if company and len(company) > 2 and company in notes_lower:
|
|
dup_company.append({
|
|
"displayName": name,
|
|
"company": c.get("companyName")
|
|
})
|
|
|
|
# Job title duplication
|
|
title = (c.get("jobTitle") or "").strip().lower()
|
|
if title and len(title) > 2 and title in notes_lower:
|
|
dup_jobtitle.append({
|
|
"displayName": name,
|
|
"jobTitle": c.get("jobTitle")
|
|
})
|
|
|
|
# Address duplication
|
|
for addr_field in ["homeAddress", "businessAddress", "otherAddress"]:
|
|
addr_str = format_address(c.get(addr_field))
|
|
if addr_str and len(addr_str) > 5:
|
|
# Check if significant parts of address appear in notes
|
|
addr_parts = [p for p in addr_str.split() if len(p) > 3]
|
|
matches = sum(1 for p in addr_parts if p in notes_lower)
|
|
if len(addr_parts) > 0 and matches >= len(addr_parts) * 0.5:
|
|
dup_address.append({
|
|
"displayName": name,
|
|
"field": addr_field,
|
|
"address": format_address(c.get(addr_field))
|
|
})
|
|
break # one match per contact is enough
|
|
|
|
report["B_duplicates_in_notes"] = {
|
|
"phones_duplicated_count": len(dup_phones),
|
|
"phones_duplicated": dup_phones,
|
|
"emails_duplicated_count": len(dup_emails),
|
|
"emails_duplicated": dup_emails,
|
|
"company_duplicated_count": len(dup_company),
|
|
"company_duplicated": dup_company,
|
|
"jobtitle_duplicated_count": len(dup_jobtitle),
|
|
"jobtitle_duplicated": dup_jobtitle,
|
|
"address_duplicated_count": len(dup_address),
|
|
"address_duplicated": dup_address
|
|
}
|
|
print(f"[B] Duplicated in notes: {len(dup_phones)} phones, {len(dup_emails)} emails, "
|
|
f"{len(dup_company)} companies, {len(dup_jobtitle)} titles, {len(dup_address)} addresses")
|
|
|
|
# --- C. Notes with structured data that SHOULD be in fields ---
|
|
promotable_phones = []
|
|
promotable_emails = []
|
|
|
|
for c in with_notes:
|
|
notes = c.get("personalNotes", "").strip()
|
|
name = c.get("displayName", "")
|
|
|
|
# Phones in notes NOT in fields
|
|
note_phones = extract_phones_from_text(notes)
|
|
field_phones = get_contact_phones(c)
|
|
extra_phones = note_phones - field_phones
|
|
if extra_phones:
|
|
promotable_phones.append({
|
|
"displayName": name,
|
|
"phones_in_notes_only": list(extra_phones),
|
|
"note_preview": notes[:200]
|
|
})
|
|
|
|
# Emails in notes NOT in fields
|
|
note_emails = extract_emails_from_text(notes)
|
|
field_emails = get_contact_emails(c)
|
|
extra_emails = note_emails - field_emails
|
|
if extra_emails:
|
|
promotable_emails.append({
|
|
"displayName": name,
|
|
"emails_in_notes_only": list(extra_emails),
|
|
"note_preview": notes[:200]
|
|
})
|
|
|
|
report["C_promotable_data"] = {
|
|
"phones_promotable_count": len(promotable_phones),
|
|
"phones_promotable": promotable_phones,
|
|
"emails_promotable_count": len(promotable_emails),
|
|
"emails_promotable": promotable_emails
|
|
}
|
|
print(f"[C] Promotable data: {len(promotable_phones)} contacts with phones in notes only, "
|
|
f"{len(promotable_emails)} contacts with emails in notes only")
|
|
|
|
# --- D. Duplicate notes across contacts ---
|
|
notes_groups = defaultdict(list)
|
|
for c in with_notes:
|
|
notes = c.get("personalNotes", "").strip()
|
|
if notes:
|
|
notes_groups[notes].append(c.get("displayName", c["id"]))
|
|
|
|
duplicate_groups = []
|
|
for notes_text, names in sorted(notes_groups.items(), key=lambda x: -len(x[1])):
|
|
if len(names) >= 2:
|
|
duplicate_groups.append({
|
|
"note_preview": notes_text[:200],
|
|
"count": len(names),
|
|
"contacts": names
|
|
})
|
|
|
|
report["D_duplicate_notes_across_contacts"] = {
|
|
"groups_count": len(duplicate_groups),
|
|
"groups": duplicate_groups
|
|
}
|
|
print(f"[D] Duplicate notes across contacts: {len(duplicate_groups)} groups")
|
|
|
|
# --- E. General statistics ---
|
|
note_lengths = [len(c.get("personalNotes", "").strip()) for c in with_notes]
|
|
|
|
buckets = {"1-50": 0, "51-200": 0, "201-500": 0, "500+": 0}
|
|
for l in note_lengths:
|
|
if l <= 50:
|
|
buckets["1-50"] += 1
|
|
elif l <= 200:
|
|
buckets["51-200"] += 1
|
|
elif l <= 500:
|
|
buckets["201-500"] += 1
|
|
else:
|
|
buckets["500+"] += 1
|
|
|
|
avg_len = sum(note_lengths) / len(note_lengths) if note_lengths else 0
|
|
|
|
# Sample 20 notes of varying lengths
|
|
sorted_by_len = sorted(with_notes, key=lambda c: len(c.get("personalNotes", "")))
|
|
sample_indices = []
|
|
n = len(sorted_by_len)
|
|
if n <= 20:
|
|
sample_indices = list(range(n))
|
|
else:
|
|
step = n / 20
|
|
sample_indices = [int(i * step) for i in range(20)]
|
|
|
|
samples = []
|
|
for i in sample_indices:
|
|
c = sorted_by_len[i]
|
|
notes = c.get("personalNotes", "").strip()
|
|
samples.append({
|
|
"displayName": c.get("displayName", ""),
|
|
"note_length": len(notes),
|
|
"note_preview": notes[:200]
|
|
})
|
|
|
|
report["E_statistics"] = {
|
|
"total_contacts": len(contacts),
|
|
"contacts_with_notes": len(with_notes),
|
|
"contacts_without_notes": len(without_notes),
|
|
"average_note_length": round(avg_len, 1),
|
|
"length_distribution": buckets,
|
|
"sample_notes": samples
|
|
}
|
|
print(f"[E] Stats: {len(contacts)} total, {len(with_notes)} with notes, "
|
|
f"{len(without_notes)} without, avg length {avg_len:.1f}")
|
|
|
|
return report
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("Bardach Contacts - Notes Analysis")
|
|
print("=" * 60)
|
|
|
|
print("\n[1] Acquiring token...")
|
|
token = get_token()
|
|
print(" [OK] Token acquired")
|
|
|
|
print("\n[2] Pulling all contacts...")
|
|
contacts, token = pull_all_contacts(token)
|
|
|
|
print(f"\n[3] Analyzing notes across {len(contacts)} contacts...")
|
|
report = analyze_notes(contacts)
|
|
|
|
report["_metadata"] = {
|
|
"generated": datetime.now().isoformat(),
|
|
"total_contacts_analyzed": len(contacts),
|
|
"user": USER
|
|
}
|
|
|
|
print(f"\n[4] Saving report to {OUTPUT_FILE}...")
|
|
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
|
|
print(" [OK] Report saved")
|
|
|
|
# --- Print comprehensive report ---
|
|
print("\n" + "=" * 60)
|
|
print("COMPREHENSIVE NOTES ANALYSIS REPORT")
|
|
print("=" * 60)
|
|
|
|
print(f"\nTotal contacts: {report['E_statistics']['total_contacts']}")
|
|
print(f"With notes: {report['E_statistics']['contacts_with_notes']}")
|
|
print(f"Without notes: {report['E_statistics']['contacts_without_notes']}")
|
|
print(f"Average note length: {report['E_statistics']['average_note_length']} chars")
|
|
|
|
print(f"\n--- A. Junk/Boilerplate ---")
|
|
a = report["A_junk_boilerplate"]
|
|
print(f"iCloud warnings: {a['icloud_warnings_count']}")
|
|
for item in a["icloud_warnings"]:
|
|
print(f" - {item['displayName']}: {item['note_preview'][:80]}")
|
|
print(f"Empty/whitespace notes: {a['empty_whitespace_count']}")
|
|
for item in a["empty_whitespace"]:
|
|
print(f" - {item['displayName']}")
|
|
|
|
print(f"\n--- B. Notes Duplicating Structured Fields ---")
|
|
b = report["B_duplicates_in_notes"]
|
|
print(f"Phone numbers duplicated: {b['phones_duplicated_count']}")
|
|
for item in b["phones_duplicated"]:
|
|
print(f" - {item['displayName']}: {item['duplicated_phones']}")
|
|
print(f"Emails duplicated: {b['emails_duplicated_count']}")
|
|
for item in b["emails_duplicated"]:
|
|
print(f" - {item['displayName']}: {item['duplicated_emails']}")
|
|
print(f"Company names duplicated: {b['company_duplicated_count']}")
|
|
for item in b["company_duplicated"]:
|
|
print(f" - {item['displayName']}: {item['company']}")
|
|
print(f"Job titles duplicated: {b['jobtitle_duplicated_count']}")
|
|
for item in b["jobtitle_duplicated"]:
|
|
print(f" - {item['displayName']}: {item['jobTitle']}")
|
|
print(f"Addresses duplicated: {b['address_duplicated_count']}")
|
|
for item in b["address_duplicated"]:
|
|
print(f" - {item['displayName']}: {item['field']} = {item['address']}")
|
|
|
|
print(f"\n--- C. Promotable Data (in notes but NOT in fields) ---")
|
|
c_data = report["C_promotable_data"]
|
|
print(f"Contacts with phones in notes only: {c_data['phones_promotable_count']}")
|
|
for item in c_data["phones_promotable"]:
|
|
print(f" - {item['displayName']}: {item['phones_in_notes_only']}")
|
|
print(f"Contacts with emails in notes only: {c_data['emails_promotable_count']}")
|
|
for item in c_data["emails_promotable"]:
|
|
print(f" - {item['displayName']}: {item['emails_in_notes_only']}")
|
|
|
|
print(f"\n--- D. Duplicate Notes Across Contacts ---")
|
|
d = report["D_duplicate_notes_across_contacts"]
|
|
print(f"Groups with identical notes: {d['groups_count']}")
|
|
for g in d["groups"]:
|
|
print(f" - {g['count']} contacts share: \"{g['note_preview'][:100]}\"")
|
|
for name in g["contacts"]:
|
|
print(f" {name}")
|
|
|
|
print(f"\n--- E. Note Length Distribution ---")
|
|
dist = report["E_statistics"]["length_distribution"]
|
|
for bucket, count in dist.items():
|
|
print(f" {bucket}: {count}")
|
|
|
|
print(f"\n--- E. Sample Notes (20 samples, varying lengths) ---")
|
|
for s in report["E_statistics"]["sample_notes"]:
|
|
print(f" [{s['note_length']} chars] {s['displayName']}: {s['note_preview'][:120]}")
|
|
|
|
print("\n[DONE]")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|