Files
claudetools/temp/bardach_email_contacts_scan.py
Mike Swanson fa15b03180 sync: Auto-sync from ACG-M-L5090 at 2026-03-10 19:11:00
Synced files:
- Quote wizard frontend (all components, hooks, types, config)
- API updates (config, models, routers, schemas, services)
- Client work (bg-builders, gurushow)
- Scripts (BGB Lesley termination, CIPP, Datto, migration)
- Temp files (Bardach contacts, VWP investigation, misc)
- Credentials and session logs
- Email service, PHP API, session logs

Machine: ACG-M-L5090
Timestamp: 2026-03-10 19:11:00

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 19:59:08 -07:00

243 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""Scan Barbara Bardach's email to find frequent correspondents missing from contacts."""
import subprocess
import json
import sys
import time
import urllib.parse
from collections import defaultdict
from datetime import datetime
TENANT_ID = "dd4a82e8-85a3-44ac-8800-07945ab4d95f"
APP_ID = "fabb3421-8b34-484b-bc17-e46de9703418"
APP_SECRET = "~QJ8Q~NyQSs4OcGqHZyPrA2CVnq9KBfKiimntbMO"
USER_EMAIL = "barbara@bardach.net"
BASE_URL = f"https://graph.microsoft.com/v1.0/users/{USER_EMAIL}"
FILTER_KEYWORDS = ["noreply", "no-reply", "donotreply", "do-not-reply", "notification",
"alert", "mailer-daemon", "postmaster", "bounce", "automated",
"system", "daemon", "undeliverable"]
BARBARA_ALIASES = {"barbara@bardach.net"}
def get_token():
"""Get OAuth2 token using client credentials."""
url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
result = subprocess.run([
"curl", "-s", "-X", "POST", url,
"-H", "Content-Type: application/x-www-form-urlencoded",
"-d", f"client_id={APP_ID}&scope=https%3A%2F%2Fgraph.microsoft.com%2F.default&client_secret={APP_SECRET}&grant_type=client_credentials"
], capture_output=True, text=True)
data = json.loads(result.stdout)
if "access_token" not in data:
print(f"[ERROR] Failed to get token: {data}")
sys.exit(1)
print("[OK] Got access token")
return data["access_token"]
def graph_get(url, token):
"""Make a GET request to Graph API."""
result = subprocess.run([
"curl", "-s", "-X", "GET", url,
"-H", f"Authorization: Bearer {token}",
"-H", "Content-Type: application/json"
], capture_output=True, text=True)
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
print(f"[ERROR] Failed to parse response from {url[:100]}...")
print(f" stdout: {result.stdout[:200]}")
return None
def paginate_all(initial_url, token, label="items", max_pages=500):
"""Paginate through all results, refreshing token every 50 pages."""
all_items = []
url = initial_url
page = 0
current_token = token
while url and page < max_pages:
if page > 0 and page % 50 == 0:
print(f" Refreshing token at page {page}...")
current_token = get_token()
data = graph_get(url, current_token)
if data is None:
print(f" [WARNING] Null response at page {page}, stopping.")
break
if "error" in data:
print(f" [ERROR] API error at page {page}: {data['error'].get('message', '')}")
break
items = data.get("value", [])
all_items.extend(items)
page += 1
if page % 10 == 0:
print(f" [{label}] Page {page}: {len(all_items)} total so far...")
url = data.get("@odata.nextLink")
print(f" [{label}] Done: {len(all_items)} items across {page} pages")
return all_items, current_token
def is_automated(email):
"""Check if an email address looks automated."""
lower = email.lower()
for kw in FILTER_KEYWORDS:
if kw in lower:
return True
return False
def main():
start_time = time.time()
print("=" * 70)
print("Barbara Bardach - Email Contact Gap Analysis")
print("=" * 70)
# Step 1: Get token
token = get_token()
# Step 2: Pull all contacts
print("\n[INFO] Pulling contacts...")
contacts_url = f"{BASE_URL}/contacts?$top=999&$select=emailAddresses"
# Note: contacts URL doesn't have filter so $ signs are fine in query params
all_contacts, token = paginate_all(contacts_url, token, label="Contacts", max_pages=100)
contact_emails = set()
for c in all_contacts:
for ea in c.get("emailAddresses", []):
addr = ea.get("address", "").strip().lower()
if addr:
contact_emails.add(addr)
print(f"[OK] Found {len(all_contacts)} contacts with {len(contact_emails)} unique email addresses")
# Step 3: Pull SENT mail - last 12 months
print("\n[INFO] Pulling sent mail (last 12 months)...")
sent_params = urllib.parse.urlencode({
"$filter": "sentDateTime ge 2025-03-05T00:00:00Z",
"$select": "toRecipients,ccRecipients,subject,sentDateTime",
"$top": "250"
})
sent_url = f"{BASE_URL}/mailFolders/sentitems/messages?{sent_params}"
sent_messages, token = paginate_all(sent_url, token, label="Sent", max_pages=500)
# Step 4: Pull INBOX - last 12 months
print("\n[INFO] Pulling inbox (last 12 months)...")
inbox_params = urllib.parse.urlencode({
"$filter": "receivedDateTime ge 2025-03-05T00:00:00Z",
"$select": "from,subject,receivedDateTime",
"$top": "250"
})
inbox_url = f"{BASE_URL}/mailFolders/inbox/messages?{inbox_params}"
inbox_messages, token = paginate_all(inbox_url, token, label="Inbox", max_pages=500)
# Step 5 & 6: Count frequencies
print("\n[INFO] Counting frequencies...")
# Track email -> {sent_count, received_count, display_name}
email_data = defaultdict(lambda: {"sent_count": 0, "received_count": 0, "display_name": ""})
# Sent mail: count recipients
for msg in sent_messages:
for field in ["toRecipients", "ccRecipients"]:
for recip in msg.get(field, []) or []:
ea = recip.get("emailAddress", {})
addr = ea.get("address", "").strip().lower()
name = ea.get("name", "").strip()
if addr:
email_data[addr]["sent_count"] += 1
if name and not email_data[addr]["display_name"]:
email_data[addr]["display_name"] = name
# Inbox: count senders
for msg in inbox_messages:
fr = msg.get("from", {})
ea = fr.get("emailAddress", {}) if fr else {}
addr = ea.get("address", "").strip().lower() if ea else ""
name = ea.get("name", "").strip() if ea else ""
if addr:
email_data[addr]["received_count"] += 1
if name and not email_data[addr]["display_name"]:
email_data[addr]["display_name"] = name
total_unique = len(email_data)
print(f"[OK] Found {total_unique} unique email addresses in mail")
# Step 8: Filter
already_in_contacts = 0
filtered_out = 0
missing = []
for email, data in email_data.items():
if email in contact_emails:
already_in_contacts += 1
continue
if email in BARBARA_ALIASES:
filtered_out += 1
continue
if is_automated(email):
filtered_out += 1
continue
total = data["sent_count"] + data["received_count"]
missing.append({
"email": email,
"display_name": data["display_name"],
"sent_count": data["sent_count"],
"received_count": data["received_count"],
"total": total
})
# Sort by total descending
missing.sort(key=lambda x: x["total"], reverse=True)
# Step 10: Report
print("\n" + "=" * 70)
print("RESULTS")
print("=" * 70)
print(f"Total unique email addresses in mail: {total_unique}")
print(f"Already in contacts: {already_in_contacts}")
print(f"Filtered (Barbara/automated): {filtered_out}")
print(f"Missing from contacts: {len(missing)}")
print(f"Sent messages scanned: {len(sent_messages)}")
print(f"Inbox messages scanned: {len(inbox_messages)}")
print(f"\nTop 50 most frequent correspondents NOT in contacts:")
print("-" * 90)
print(f"{'#':>3} {'Email':<40} {'Name':<25} {'Sent':>5} {'Recv':>5} {'Total':>5}")
print("-" * 90)
for i, entry in enumerate(missing[:50], 1):
email_disp = entry["email"][:39]
name_disp = entry["display_name"][:24]
print(f"{i:>3} {email_disp:<40} {name_disp:<25} {entry['sent_count']:>5} {entry['received_count']:>5} {entry['total']:>5}")
# Step 11: Save JSON
output = {
"generated": datetime.now().isoformat(),
"total_mail_addresses": total_unique,
"already_in_contacts": already_in_contacts,
"missing_from_contacts": len(missing),
"sent_messages_scanned": len(sent_messages),
"inbox_messages_scanned": len(inbox_messages),
"missing": missing
}
output_path = r"D:\ClaudeTools\temp\bardach_missing_contacts.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
elapsed = time.time() - start_time
print(f"\n[OK] Full list saved to {output_path}")
print(f"[OK] Completed in {elapsed:.1f} seconds")
if __name__ == "__main__":
main()