Files
claudetools/clients/birth-biologic/scripts/bb-recover.py
Mike Swanson 801ff788a6 sync: auto-sync from GURU-5070 at 2026-06-30 06:05:04
Author: Mike Swanson
Machine: GURU-5070
Timestamp: 2026-06-30 06:05:04
2026-06-30 06:07:35 -07:00

186 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""Detect + recover BirthBio SharePoint files corrupted by byte-array->decimal-text upload.
A broken file's content is ASCII: space-separated decimal byte values (0-255).
We reconstruct the original bytes, validate a known binary magic, and (in apply mode)
PUT the bytes back to the SAME driveItem id so existing share links keep working.
Usage: bb-recover.py <site_path> [--apply]
site_path e.g. birthbiologic.sharepoint.com:/sites/QualitySystemsDepartment
without --apply => detect + reconstruct + validate only (no writes)
with --apply => also re-upload recovered bytes in place
Auth: set BBSEC to the Tenant Admin app client secret, e.g.
export BBSEC=$(bash <vault>/scripts/vault.sh get-field \
msp-tools/computerguru-tenant-admin.sops.yaml credentials.client_secret)
python bb-recover.py birthbiologic.sharepoint.com:/sites/QualitySystemsDepartment # dry run
python bb-recover.py birthbiologic.sharepoint.com:/sites/QualitySystemsDepartment --apply # recover
PROVENANCE: built 2026-06-29 (session 1290fc6c) for the BirthBio QMS corruption (coord todo
28e3e7ab). Graduated from session scratchpad into the repo so any session/machine can run it.
Method proven on the Surgenex contact-list xlsx. Recovery is in-place (preserves item IDs/links)
and only replaces a file when the reconstructed bytes start with a known binary magic.
Re-scan live before relying on any saved corrupt-file list — the QSD library was reconciled
2026-06-29 (dedupe/backfill/old-site deletion) so counts shift from the original 47-of-4,314 scan.
"""
import os, sys, json, time, urllib.request, urllib.parse, re
TENANT = "19a568e8-9e88-413b-9341-cbc224b39145"
CLIENT = "709e6eed-0711-4875-9c44-2d3518c47063"
GRAPH = "https://graph.microsoft.com/v1.0"
SEC = os.environ["BBSEC"]
APPLY = "--apply" in sys.argv
args = [a for a in sys.argv[1:] if not a.startswith("--")]
SITE_PATH = args[0]
# Known binary magics -> we only auto-replace when reconstruction yields one of these
MAGICS = {
b"PK\x03\x04": "zip/ooxml (xlsx/docx/pptx)",
b"%PDF": "pdf",
b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1": "ole (legacy doc/xls/ppt)",
b"\x89PNG": "png",
b"\xff\xd8\xff": "jpeg",
b"GIF8": "gif",
b"{\\rtf": "rtf",
b"\x1f\x8b": "gzip",
b"BM": "bmp",
}
def _req(method, url, data=None, headers=None, raw=False):
h = dict(headers or {})
last = None
for attempt in range(6):
# always use the freshest auth header if this is an authed Graph call
if "Authorization" in h and "graph.microsoft.com" in url:
h["Authorization"] = f"Bearer {TOK}"
req = urllib.request.Request(url, data=data, headers=h, method=method)
try:
r = urllib.request.urlopen(req, timeout=120)
body = r.read()
return r.status, (body if raw else json.loads(body) if body else {})
except urllib.error.HTTPError as e:
errbody = e.read()
if e.code == 401 and "Authorization" in h:
refresh_token() # token expired mid-run -> re-mint and retry
time.sleep(1)
continue
if e.code in (429, 503, 504):
time.sleep(2 ** attempt)
continue
return e.code, (errbody if raw else _safe(errbody))
except Exception as e:
time.sleep(2 ** attempt)
last = str(e)
return 0, {"error": last}
def _safe(b):
try: return json.loads(b)
except: return {"raw": b[:300].decode("utf-8","replace")}
def token():
body = urllib.parse.urlencode({
"grant_type": "client_credentials", "client_id": CLIENT,
"client_secret": SEC, "scope": "https://graph.microsoft.com/.default",
}).encode()
st, j = _req("POST", f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/token",
data=body, headers={"Content-Type": "application/x-www-form-urlencoded"})
return j["access_token"]
def refresh_token():
global TOK, H
TOK = token()
H = {"Authorization": f"Bearer {TOK}"}
print("[token] refreshed")
TOK = token()
H = {"Authorization": f"Bearer {TOK}"}
def g(url):
st, j = _req("GET", url, headers=H)
return j
# --- resolve site + drives ---
site = g(f"{GRAPH}/sites/{SITE_PATH}")
site_id = site["id"]
print(f"[site] {site.get('displayName')} id={site_id}")
drives = g(f"{GRAPH}/sites/{site_id}/drives").get("value", [])
print(f"[drives] {len(drives)}: " + ", ".join(d['name'] for d in drives))
# --- walk all files in all drives (BFS) ---
files = []
for d in drives:
did = d["id"]
stack = [f"{GRAPH}/drives/{did}/root/children"]
while stack:
url = stack.pop()
while url:
j = g(url)
for it in j.get("value", []):
if "folder" in it:
stack.append(f"{GRAPH}/drives/{did}/items/{it['id']}/children")
elif "file" in it:
files.append((did, it["id"], it["name"], it.get("size", 0)))
url = j.get("@odata.nextLink")
print(f"[files] total files across libraries: {len(files)}")
# --- detect: range-read first 48 bytes, test decimal-text signature ---
sig_re = re.compile(rb"^\s*\d{1,3}( \d{1,3}){6,}") # several leading "NN NN NN ..." tokens
broken = []
for did, iid, name, size in files:
st, head = _req("GET", f"{GRAPH}/drives/{did}/items/{iid}/content",
headers={**H, "Range": "bytes=0-47"}, raw=True)
if isinstance(head, (bytes, bytearray)) and sig_re.match(head):
broken.append((did, iid, name, size))
print(f"\n[detect] suspected corrupt (decimal-text) files: {len(broken)}")
for did, iid, name, size in broken:
print(f" - {name} ({size} B)")
# --- reconstruct + validate (+ apply) ---
print(f"\n[recover] mode = {'APPLY (re-upload)' if APPLY else 'DRY-RUN (no writes)'}")
ok, skipped, failed = [], [], []
for did, iid, name, size in broken:
st, body = _req("GET", f"{GRAPH}/drives/{did}/items/{iid}/content", headers=H, raw=True)
if not isinstance(body, (bytes, bytearray)):
failed.append((name, "download failed")); print(f" [FAIL] {name}: download"); continue
# guard: if the body is actually an API error JSON (e.g. transient auth), skip rather than mangle
if body[:1] == b"{" and b'"error"' in body[:200]:
skipped.append((name, "got API error body, not file content - retry")); print(f" [SKIP] {name}: API error body"); continue
try:
toks = body.split()
nums = [int(t) for t in toks]
if any(n < 0 or n > 255 for n in nums):
raise ValueError("token out of 0-255 range")
recovered = bytes(nums)
except Exception as e:
skipped.append((name, f"not clean decimal-text: {e}"))
print(f" [SKIP] {name}: {e}")
continue
magic = next((desc for sig, desc in MAGICS.items() if recovered.startswith(sig)), None)
if not magic:
skipped.append((name, f"reconstructed but unknown magic {recovered[:8]!r} - needs review"))
print(f" [SKIP] {name}: unknown magic {recovered[:8]!r}")
continue
info = f"{size} B text -> {len(recovered)} B {magic}"
if not APPLY:
ok.append((name, info)); print(f" [WOULD-FIX] {name}: {info}")
continue
# upload recovered bytes in place (simple PUT; reconstructed files are small)
st, resp = _req("PUT", f"{GRAPH}/drives/{did}/items/{iid}/content",
data=recovered, headers={**H, "Content-Type": "application/octet-stream"}, raw=False)
if st in (200, 201):
ok.append((name, info)); print(f" [FIXED] {name}: {info}")
else:
failed.append((name, f"upload {st}: {str(resp)[:160]}"))
print(f" [FAIL] {name}: upload {st} {str(resp)[:160]}")
print("\n==================== SUMMARY ====================")
print(f"files scanned : {len(files)}")
print(f"detected broken: {len(broken)}")
print(f"{'fixed' if APPLY else 'recoverable'}: {len(ok)}")
print(f"skipped (review): {len(skipped)}")
for n, why in skipped: print(f" - {n}: {why}")
print(f"failed: {len(failed)}")
for n, why in failed: print(f" - {n}: {why}")