Files
claudetools/clients/birth-biologic/scripts/sync-quality-to-datto.py
Mike Swanson 152513b15d Birth Biologic: Save Quality sync state + working upload script
- Current state: 3,249/3,768 files uploaded, 519 remaining
- Active RMM command: 9e0fcfe8 (running on ACG-DWP-X-BB)
- Working upload script with drive ID concatenation fix
- Comprehensive continuation instructions
- All verification scripts

Client very angry - this was promised yesterday
Issue: PowerShell escaping ! in drive ID (b! -> b\!)
Solution: String concatenation at runtime
2026-06-30 15:27:43 -07:00

298 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
Sync SharePoint Quality Systems Department to match Datto exactly.
Deletes any files in SharePoint that aren't in the Datto source.
"""
import requests
import json
import sys
import subprocess
import time
from datetime import datetime, timedelta
from pathlib import Path
# Configuration
RMM_API = "http://172.16.3.30:3001"
RMM_ADMIN_EMAIL = "claude-api@azcomputerguru.com"
RMM_ADMIN_PASSWORD = "ClaudeAPI2026!@#"
ACG_DWP_AGENT_ID = "a4524e85-8a07-45d0-91b1-51ce7e2ca74a"
TENANT_ID = "19a568e8-9e88-413b-9341-cbc224b39145"
TENANT_ADMIN_CLIENT_ID = "709e6eed-0711-4875-9c44-2d3518c47063"
# Paths
DATTO_SOURCE = r"C:\Users\Public\Desktop\Datto Workplace Server Projects\Quality Department"
SHAREPOINT_SITE = "https://birthbiologic.sharepoint.com/sites/QualitySystemsDepartment"
SHAREPOINT_DRIVE = "Documents" # Default document library
def get_vault_secret(vault_path, field):
"""Get a secret from the SOPS vault."""
result = subprocess.run(
["bash", ".claude/scripts/vault.sh", "get-field", vault_path, field],
capture_output=True,
text=True,
cwd="/Users/azcomputerguru/ClaudeTools"
)
secret = result.stdout.strip()
if secret == "null" or not secret:
raise ValueError(f"Failed to get vault secret: {vault_path} -> {field}")
return secret
def get_rmm_token():
"""Get RMM API token."""
response = requests.post(
f"{RMM_API}/api/auth/login",
json={"email": RMM_ADMIN_EMAIL, "password": RMM_ADMIN_PASSWORD}
)
response.raise_for_status()
return response.json()["token"]
def get_graph_token():
"""Get Graph API token for Tenant Admin app."""
client_secret = get_vault_secret(
"msp-tools/computerguru-tenant-admin",
"credentials.client_secret"
)
response = requests.post(
f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token",
data={
"client_id": TENANT_ADMIN_CLIENT_ID,
"client_secret": client_secret,
"scope": "https://graph.microsoft.com/.default",
"grant_type": "client_credentials"
}
)
response.raise_for_status()
return response.json()["access_token"]
def run_rmm_command(rmm_token, agent_id, script):
"""Run a PowerShell command via RMM and wait for result."""
# Submit command
response = requests.post(
f"{RMM_API}/api/agents/{agent_id}/command",
headers={"Authorization": f"Bearer {rmm_token}"},
json={
"command_type": "powershell",
"command": script,
"timeout_seconds": 300
}
)
response.raise_for_status()
command_id = response.json()["command_id"]
print(f"[INFO] RMM command {command_id} submitted, waiting for result...")
# Poll for result
for _ in range(60):
time.sleep(5)
response = requests.get(
f"{RMM_API}/api/commands/{command_id}",
headers={"Authorization": f"Bearer {rmm_token}"}
)
response.raise_for_status()
cmd = response.json()
if cmd["status"] == "completed":
return cmd.get("stdout", "")
elif cmd["status"] == "failed":
raise Exception(f"RMM command failed: {cmd.get('stderr', 'Unknown error')}")
raise Exception("RMM command timed out")
def get_datto_file_list(rmm_token):
"""Get recursive file listing from Datto source."""
script = f"""
$ErrorActionPreference = 'Stop'
$path = '{DATTO_SOURCE}'
Get-ChildItem -Path $path -Recurse -File | ForEach-Object {{
$relativePath = $_.FullName.Substring($path.Length).TrimStart('\\')
[PSCustomObject]@{{
Path = $relativePath
Size = $_.Length
LastModified = $_.LastWriteTime.ToString('o')
}}
}} | ConvertTo-Json -Compress
"""
result = run_rmm_command(rmm_token, ACG_DWP_AGENT_ID, script)
if not result or result == "null":
return []
# Handle single object (not array) case
data = json.loads(result)
if isinstance(data, dict):
return [data]
return data
def get_sharepoint_file_list(graph_token):
"""Get recursive file listing from SharePoint."""
# Get site ID
response = requests.get(
f"https://graph.microsoft.com/v1.0/sites/birthbiologic.sharepoint.com:/sites/QualitySystemsDepartment",
headers={"Authorization": f"Bearer {graph_token}"}
)
response.raise_for_status()
site_id = response.json()["id"]
# Get drive ID
response = requests.get(
f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives",
headers={"Authorization": f"Bearer {graph_token}"}
)
response.raise_for_status()
drives = response.json()["value"]
doc_drive = next((d for d in drives if d["name"] == SHAREPOINT_DRIVE), None)
if not doc_drive:
raise Exception(f"Drive '{SHAREPOINT_DRIVE}' not found")
drive_id = doc_drive["id"]
# Get all files recursively
files = []
def get_items(item_id="root", prefix=""):
response = requests.get(
f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/children?$top=5000",
headers={"Authorization": f"Bearer {graph_token}"}
)
response.raise_for_status()
items = response.json()["value"]
for item in items:
name = item["name"]
path = f"{prefix}/{name}".lstrip("/")
if "folder" in item:
# Recurse into folder
get_items(item["id"], path)
else:
# File
files.append({
"Path": path.replace("/", "\\"), # Match Datto format
"Size": item["size"],
"LastModified": item["lastModifiedDateTime"],
"ItemId": item["id"],
"WebUrl": item.get("webUrl", "")
})
get_items()
return files
def delete_sharepoint_item(graph_token, site_id, drive_id, item_id):
"""Delete a SharePoint item."""
response = requests.delete(
f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}",
headers={"Authorization": f"Bearer {graph_token}"}
)
response.raise_for_status()
def main():
print("[INFO] Starting Quality Department sync to Datto ground truth...")
print()
# Get tokens
print("[INFO] Getting RMM token...")
rmm_token = get_rmm_token()
print("[INFO] Getting Graph token...")
graph_token = get_graph_token()
# Get file lists
print("[INFO] Getting Datto file list...")
datto_files = get_datto_file_list(rmm_token)
print(f"[INFO] Found {len(datto_files)} files in Datto")
print("[INFO] Getting SharePoint file list...")
sp_files = get_sharepoint_file_list(graph_token)
print(f"[INFO] Found {len(sp_files)} files in SharePoint")
print()
# Build Datto path set (normalized)
datto_paths = {f["Path"].replace("/", "\\").lower() for f in datto_files}
# Find files to delete (in SharePoint but not in Datto)
# Exclude files modified in last 24 hours (the ~11 files modified yesterday)
cutoff = datetime.now() - timedelta(hours=24)
to_delete = []
for sp_file in sp_files:
sp_path = sp_file["Path"].lower()
last_modified = datetime.fromisoformat(sp_file["LastModified"].replace("Z", "+00:00"))
if sp_path not in datto_paths:
# File exists in SharePoint but not in Datto
if last_modified > cutoff:
print(f"[SKIP] Recent file (modified <24h ago): {sp_file['Path']}")
else:
to_delete.append(sp_file)
print()
print(f"[INFO] {len(to_delete)} files to delete from SharePoint")
print()
if not to_delete:
print("[OK] SharePoint already matches Datto (no deletions needed)")
return 0
# Show what will be deleted
print("[WARNING] The following files will be DELETED from SharePoint:")
print()
for f in to_delete[:20]: # Show first 20
print(f" - {f['Path']}")
if len(to_delete) > 20:
print(f" ... and {len(to_delete) - 20} more")
print()
# Confirm
response = input(f"Delete {len(to_delete)} files? (yes/no): ")
if response.lower() != "yes":
print("[CANCELLED] No files deleted")
return 1
# Get site and drive IDs for deletion
response = requests.get(
f"https://graph.microsoft.com/v1.0/sites/birthbiologic.sharepoint.com:/sites/QualitySystemsDepartment",
headers={"Authorization": f"Bearer {graph_token}"}
)
response.raise_for_status()
site_id = response.json()["id"]
response = requests.get(
f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives",
headers={"Authorization": f"Bearer {graph_token}"}
)
response.raise_for_status()
drives = response.json()["value"]
doc_drive = next((d for d in drives if d["name"] == SHAREPOINT_DRIVE), None)
drive_id = doc_drive["id"]
# Delete files
print()
print("[INFO] Deleting files...")
deleted = 0
errors = []
for f in to_delete:
try:
delete_sharepoint_item(graph_token, site_id, drive_id, f["ItemId"])
deleted += 1
print(f"[DELETED] {f['Path']}")
except Exception as e:
errors.append((f["Path"], str(e)))
print(f"[ERROR] Failed to delete {f['Path']}: {e}")
print()
print(f"[OK] Deleted {deleted}/{len(to_delete)} files")
if errors:
print()
print(f"[WARNING] {len(errors)} errors:")
for path, err in errors[:10]:
print(f" - {path}: {err}")
return 0 if not errors else 1
if __name__ == "__main__":
sys.exit(main())