Files
claudetools/temp/bardach_notes_analysis.py
Mike Swanson fa15b03180 sync: Auto-sync from ACG-M-L5090 at 2026-03-10 19:11:00
Synced files:
- Quote wizard frontend (all components, hooks, types, config)
- API updates (config, models, routers, schemas, services)
- Client work (bg-builders, gurushow)
- Scripts (BGB Lesley termination, CIPP, Datto, migration)
- Temp files (Bardach contacts, VWP investigation, misc)
- Credentials and session logs
- Email service, PHP API, session logs

Machine: ACG-M-L5090
Timestamp: 2026-03-10 19:11:00

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 19:59:08 -07:00

504 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Bardach Contacts - Notes Analysis
Pulls all contacts from main Contacts folder, analyzes personalNotes
for junk, duplication, promotable data, and cross-contact duplicates.
"""
import subprocess
import json
import re
import sys
from collections import defaultdict
from datetime import datetime
# --- Config ---
TENANT_ID = "dd4a82e8-85a3-44ac-8800-07945ab4d95f"
CLIENT_ID = "fabb3421-8b34-484b-bc17-e46de9703418"
CLIENT_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO"
SCOPE = "https://graph.microsoft.com/.default"
USER = "barbara@bardach.net"
OUTPUT_FILE = "D:/ClaudeTools/temp/bardach_notes_analysis.json"
TOP = 100
TOKEN_REFRESH_INTERVAL = 500
# --- Helpers ---
def get_token():
result = subprocess.run([
"curl", "-s", "-X", "POST",
f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token",
"-H", "Content-Type: application/x-www-form-urlencoded",
"-d", f"client_id={CLIENT_ID}",
"-d", f"client_secret={CLIENT_SECRET}",
"-d", f"scope={SCOPE}",
"-d", "grant_type=client_credentials"
], capture_output=True, text=True)
data = json.loads(result.stdout)
if "access_token" not in data:
print(f"[ERROR] Token acquisition failed: {data}")
sys.exit(1)
return data["access_token"]
def api_get(url, token):
result = subprocess.run([
"curl", "-s",
"-H", f"Authorization: Bearer {token}",
url
], capture_output=True, text=True)
return json.loads(result.stdout)
def pull_all_contacts(token):
"""Pull all contacts from default Contacts folder with pagination."""
select_fields = (
"id,displayName,givenName,surname,emailAddresses,homePhones,"
"businessPhones,mobilePhone,companyName,jobTitle,personalNotes,"
"homeAddress,businessAddress,otherAddress,birthday,lastModifiedDateTime"
)
url = (
f"https://graph.microsoft.com/v1.0/users/{USER}/contacts"
f"?$select={select_fields}&$top={TOP}"
)
all_contacts = []
api_calls = 0
page = 0
while url:
page += 1
api_calls += 1
# Re-acquire token every N calls
if api_calls % TOKEN_REFRESH_INTERVAL == 0:
print(f" Re-acquiring token after {api_calls} API calls...")
token = get_token()
print(f" Fetching page {page} ({len(all_contacts)} contacts so far)...")
data = api_get(url, token)
if "value" not in data:
print(f"[ERROR] Unexpected response: {json.dumps(data)[:500]}")
break
all_contacts.extend(data["value"])
url = data.get("@odata.nextLink")
print(f" Total contacts fetched: {len(all_contacts)} in {api_calls} API calls")
return all_contacts, token
# --- Analysis Functions ---
ICLOUD_PATTERNS = [
r"this contact is read[\s-]*only",
r"edit.*in outlook",
r"tap the link",
r"this contact was created from a read[\s-]*only account",
r"read[\s-]*only contact",
r"icloud",
]
PHONE_PATTERNS = [
r'\(?\d{3}\)?[\s.\-]?\d{3}[\s.\-]?\d{4}',
r'\+?\d[\d\s.\-]{7,14}\d',
r'\d{3}[\s.\-]\d{4}',
]
EMAIL_PATTERN = r'[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}'
def normalize_phone(p):
"""Strip phone to digits only for comparison."""
return re.sub(r'\D', '', str(p))
def extract_phones_from_text(text):
"""Extract phone numbers from free text."""
phones = set()
for pat in PHONE_PATTERNS:
for m in re.finditer(pat, text):
digits = normalize_phone(m.group())
if len(digits) >= 7:
phones.add(digits)
return phones
def extract_emails_from_text(text):
"""Extract email addresses from free text."""
return {e.lower() for e in re.findall(EMAIL_PATTERN, text)}
def get_contact_phones(c):
"""Get all phone numbers from structured fields."""
phones = set()
for p in c.get("homePhones") or []:
d = normalize_phone(p)
if d:
phones.add(d)
for p in c.get("businessPhones") or []:
d = normalize_phone(p)
if d:
phones.add(d)
mob = c.get("mobilePhone")
if mob:
d = normalize_phone(mob)
if d:
phones.add(d)
return phones
def get_contact_emails(c):
"""Get all emails from structured fields."""
emails = set()
for e in c.get("emailAddresses") or []:
addr = (e.get("address") or "").lower().strip()
if addr:
emails.add(addr)
return emails
def format_address(addr):
"""Convert address dict to string for comparison."""
if not addr:
return ""
parts = []
for k in ["street", "city", "state", "postalCode", "countryOrRegion"]:
v = (addr.get(k) or "").strip()
if v:
parts.append(v)
return " ".join(parts).lower()
def analyze_notes(contacts):
report = {}
# Separate contacts with/without notes
with_notes = []
without_notes = []
for c in contacts:
notes = (c.get("personalNotes") or "").strip()
if notes:
with_notes.append(c)
else:
without_notes.append(c)
# --- A. Junk/Boilerplate Notes ---
icloud_warnings = []
empty_whitespace = []
for c in contacts:
raw_notes = c.get("personalNotes") or ""
stripped = raw_notes.strip()
if raw_notes and not stripped:
empty_whitespace.append({
"id": c["id"],
"displayName": c.get("displayName", ""),
"note_repr": repr(raw_notes[:100])
})
continue
if stripped:
lower = stripped.lower()
for pat in ICLOUD_PATTERNS:
if re.search(pat, lower):
icloud_warnings.append({
"id": c["id"],
"displayName": c.get("displayName", ""),
"note_preview": stripped[:200]
})
break
report["A_junk_boilerplate"] = {
"icloud_warnings_count": len(icloud_warnings),
"icloud_warnings": icloud_warnings,
"empty_whitespace_count": len(empty_whitespace),
"empty_whitespace": empty_whitespace
}
print(f"\n[A] Junk/Boilerplate: {len(icloud_warnings)} iCloud warnings, {len(empty_whitespace)} empty/whitespace")
# --- B. Notes that duplicate structured fields ---
dup_phones = []
dup_emails = []
dup_company = []
dup_jobtitle = []
dup_address = []
for c in with_notes:
notes = c.get("personalNotes", "").strip()
notes_lower = notes.lower()
name = c.get("displayName", "")
# Phone duplication
note_phones = extract_phones_from_text(notes)
field_phones = get_contact_phones(c)
overlap_phones = note_phones & field_phones
if overlap_phones:
dup_phones.append({
"displayName": name,
"duplicated_phones": list(overlap_phones)
})
# Email duplication
note_emails = extract_emails_from_text(notes)
field_emails = get_contact_emails(c)
overlap_emails = note_emails & field_emails
if overlap_emails:
dup_emails.append({
"displayName": name,
"duplicated_emails": list(overlap_emails)
})
# Company duplication
company = (c.get("companyName") or "").strip().lower()
if company and len(company) > 2 and company in notes_lower:
dup_company.append({
"displayName": name,
"company": c.get("companyName")
})
# Job title duplication
title = (c.get("jobTitle") or "").strip().lower()
if title and len(title) > 2 and title in notes_lower:
dup_jobtitle.append({
"displayName": name,
"jobTitle": c.get("jobTitle")
})
# Address duplication
for addr_field in ["homeAddress", "businessAddress", "otherAddress"]:
addr_str = format_address(c.get(addr_field))
if addr_str and len(addr_str) > 5:
# Check if significant parts of address appear in notes
addr_parts = [p for p in addr_str.split() if len(p) > 3]
matches = sum(1 for p in addr_parts if p in notes_lower)
if len(addr_parts) > 0 and matches >= len(addr_parts) * 0.5:
dup_address.append({
"displayName": name,
"field": addr_field,
"address": format_address(c.get(addr_field))
})
break # one match per contact is enough
report["B_duplicates_in_notes"] = {
"phones_duplicated_count": len(dup_phones),
"phones_duplicated": dup_phones,
"emails_duplicated_count": len(dup_emails),
"emails_duplicated": dup_emails,
"company_duplicated_count": len(dup_company),
"company_duplicated": dup_company,
"jobtitle_duplicated_count": len(dup_jobtitle),
"jobtitle_duplicated": dup_jobtitle,
"address_duplicated_count": len(dup_address),
"address_duplicated": dup_address
}
print(f"[B] Duplicated in notes: {len(dup_phones)} phones, {len(dup_emails)} emails, "
f"{len(dup_company)} companies, {len(dup_jobtitle)} titles, {len(dup_address)} addresses")
# --- C. Notes with structured data that SHOULD be in fields ---
promotable_phones = []
promotable_emails = []
for c in with_notes:
notes = c.get("personalNotes", "").strip()
name = c.get("displayName", "")
# Phones in notes NOT in fields
note_phones = extract_phones_from_text(notes)
field_phones = get_contact_phones(c)
extra_phones = note_phones - field_phones
if extra_phones:
promotable_phones.append({
"displayName": name,
"phones_in_notes_only": list(extra_phones),
"note_preview": notes[:200]
})
# Emails in notes NOT in fields
note_emails = extract_emails_from_text(notes)
field_emails = get_contact_emails(c)
extra_emails = note_emails - field_emails
if extra_emails:
promotable_emails.append({
"displayName": name,
"emails_in_notes_only": list(extra_emails),
"note_preview": notes[:200]
})
report["C_promotable_data"] = {
"phones_promotable_count": len(promotable_phones),
"phones_promotable": promotable_phones,
"emails_promotable_count": len(promotable_emails),
"emails_promotable": promotable_emails
}
print(f"[C] Promotable data: {len(promotable_phones)} contacts with phones in notes only, "
f"{len(promotable_emails)} contacts with emails in notes only")
# --- D. Duplicate notes across contacts ---
notes_groups = defaultdict(list)
for c in with_notes:
notes = c.get("personalNotes", "").strip()
if notes:
notes_groups[notes].append(c.get("displayName", c["id"]))
duplicate_groups = []
for notes_text, names in sorted(notes_groups.items(), key=lambda x: -len(x[1])):
if len(names) >= 2:
duplicate_groups.append({
"note_preview": notes_text[:200],
"count": len(names),
"contacts": names
})
report["D_duplicate_notes_across_contacts"] = {
"groups_count": len(duplicate_groups),
"groups": duplicate_groups
}
print(f"[D] Duplicate notes across contacts: {len(duplicate_groups)} groups")
# --- E. General statistics ---
note_lengths = [len(c.get("personalNotes", "").strip()) for c in with_notes]
buckets = {"1-50": 0, "51-200": 0, "201-500": 0, "500+": 0}
for l in note_lengths:
if l <= 50:
buckets["1-50"] += 1
elif l <= 200:
buckets["51-200"] += 1
elif l <= 500:
buckets["201-500"] += 1
else:
buckets["500+"] += 1
avg_len = sum(note_lengths) / len(note_lengths) if note_lengths else 0
# Sample 20 notes of varying lengths
sorted_by_len = sorted(with_notes, key=lambda c: len(c.get("personalNotes", "")))
sample_indices = []
n = len(sorted_by_len)
if n <= 20:
sample_indices = list(range(n))
else:
step = n / 20
sample_indices = [int(i * step) for i in range(20)]
samples = []
for i in sample_indices:
c = sorted_by_len[i]
notes = c.get("personalNotes", "").strip()
samples.append({
"displayName": c.get("displayName", ""),
"note_length": len(notes),
"note_preview": notes[:200]
})
report["E_statistics"] = {
"total_contacts": len(contacts),
"contacts_with_notes": len(with_notes),
"contacts_without_notes": len(without_notes),
"average_note_length": round(avg_len, 1),
"length_distribution": buckets,
"sample_notes": samples
}
print(f"[E] Stats: {len(contacts)} total, {len(with_notes)} with notes, "
f"{len(without_notes)} without, avg length {avg_len:.1f}")
return report
def main():
print("=" * 60)
print("Bardach Contacts - Notes Analysis")
print("=" * 60)
print("\n[1] Acquiring token...")
token = get_token()
print(" [OK] Token acquired")
print("\n[2] Pulling all contacts...")
contacts, token = pull_all_contacts(token)
print(f"\n[3] Analyzing notes across {len(contacts)} contacts...")
report = analyze_notes(contacts)
report["_metadata"] = {
"generated": datetime.now().isoformat(),
"total_contacts_analyzed": len(contacts),
"user": USER
}
print(f"\n[4] Saving report to {OUTPUT_FILE}...")
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
print(" [OK] Report saved")
# --- Print comprehensive report ---
print("\n" + "=" * 60)
print("COMPREHENSIVE NOTES ANALYSIS REPORT")
print("=" * 60)
print(f"\nTotal contacts: {report['E_statistics']['total_contacts']}")
print(f"With notes: {report['E_statistics']['contacts_with_notes']}")
print(f"Without notes: {report['E_statistics']['contacts_without_notes']}")
print(f"Average note length: {report['E_statistics']['average_note_length']} chars")
print(f"\n--- A. Junk/Boilerplate ---")
a = report["A_junk_boilerplate"]
print(f"iCloud warnings: {a['icloud_warnings_count']}")
for item in a["icloud_warnings"]:
print(f" - {item['displayName']}: {item['note_preview'][:80]}")
print(f"Empty/whitespace notes: {a['empty_whitespace_count']}")
for item in a["empty_whitespace"]:
print(f" - {item['displayName']}")
print(f"\n--- B. Notes Duplicating Structured Fields ---")
b = report["B_duplicates_in_notes"]
print(f"Phone numbers duplicated: {b['phones_duplicated_count']}")
for item in b["phones_duplicated"]:
print(f" - {item['displayName']}: {item['duplicated_phones']}")
print(f"Emails duplicated: {b['emails_duplicated_count']}")
for item in b["emails_duplicated"]:
print(f" - {item['displayName']}: {item['duplicated_emails']}")
print(f"Company names duplicated: {b['company_duplicated_count']}")
for item in b["company_duplicated"]:
print(f" - {item['displayName']}: {item['company']}")
print(f"Job titles duplicated: {b['jobtitle_duplicated_count']}")
for item in b["jobtitle_duplicated"]:
print(f" - {item['displayName']}: {item['jobTitle']}")
print(f"Addresses duplicated: {b['address_duplicated_count']}")
for item in b["address_duplicated"]:
print(f" - {item['displayName']}: {item['field']} = {item['address']}")
print(f"\n--- C. Promotable Data (in notes but NOT in fields) ---")
c_data = report["C_promotable_data"]
print(f"Contacts with phones in notes only: {c_data['phones_promotable_count']}")
for item in c_data["phones_promotable"]:
print(f" - {item['displayName']}: {item['phones_in_notes_only']}")
print(f"Contacts with emails in notes only: {c_data['emails_promotable_count']}")
for item in c_data["emails_promotable"]:
print(f" - {item['displayName']}: {item['emails_in_notes_only']}")
print(f"\n--- D. Duplicate Notes Across Contacts ---")
d = report["D_duplicate_notes_across_contacts"]
print(f"Groups with identical notes: {d['groups_count']}")
for g in d["groups"]:
print(f" - {g['count']} contacts share: \"{g['note_preview'][:100]}\"")
for name in g["contacts"]:
print(f" {name}")
print(f"\n--- E. Note Length Distribution ---")
dist = report["E_statistics"]["length_distribution"]
for bucket, count in dist.items():
print(f" {bucket}: {count}")
print(f"\n--- E. Sample Notes (20 samples, varying lengths) ---")
for s in report["E_statistics"]["sample_notes"]:
print(f" [{s['note_length']} chars] {s['displayName']}: {s['note_preview'][:120]}")
print("\n[DONE]")
if __name__ == "__main__":
main()