"""Bulk-upload the delta to Hoffman's API. Reads delta_to_upload.txt (produced by compute-delta.py), POSTs to /api/v1/TestReportDataFiles/bulk in batches, retries on transient errors, refreshes token before expiry, logs results. Usage: python upload-delta.py [--batch 100] [--dry-run] [--limit N] [--start N] """ import argparse, json, os, subprocess, sys, time, urllib.request, urllib.error, urllib.parse import yaml DELTA = r'C:\Users\guru\AppData\Local\Temp\delta_to_upload.txt' LOG_DIR = r'D:\claudetools\projects\dataforth-dos\datasheet-pipeline\upload-logs' os.makedirs(LOG_DIR, exist_ok=True) LOG_PATH = os.path.join(LOG_DIR, time.strftime('upload-%Y%m%d-%H%M%S.log')) VAULT = 'D:/vault/clients/dataforth/api-oauth.sops.yaml' def load_creds(): r = subprocess.run(['sops','-d',VAULT], capture_output=True, text=True, timeout=30, check=True) y = yaml.safe_load(r.stdout) return { 'token_url': y['endpoints']['token-url'], 'api_base': y['endpoints']['api-base'], 'client_id': y['credentials']['client-id'], 'client_secret': y['credentials']['client-secret'], 'scope': y['credentials']['scope'], } CREDS = load_creds() _token = {'value': None, 'expires_at': 0} def get_token(force=False): if not force and _token['value'] and time.time() < _token['expires_at'] - 60: return _token['value'] body = urllib.parse.urlencode({ 'grant_type':'client_credentials', 'client_id':CREDS['client_id'], 'client_secret':CREDS['client_secret'], 'scope':CREDS['scope'], }).encode() req = urllib.request.Request(CREDS['token_url'], data=body, method='POST', headers={'Content-Type':'application/x-www-form-urlencoded'}) with urllib.request.urlopen(req, timeout=30) as r: t = json.loads(r.read()) _token['value'] = t['access_token'] _token['expires_at'] = time.time() + t.get('expires_in', 3600) return _token['value'] def api_post(path, body): """POST with one retry on 401/transient.""" url = f"{CREDS['api_base']}{path}" data = json.dumps(body).encode() for attempt in range(2): token = get_token(force=(attempt > 0)) req = urllib.request.Request(url, data=data, method='POST', headers={'Authorization':f'Bearer {token}','Content-Type':'application/json'}) try: with urllib.request.urlopen(req, timeout=120) as r: return r.status, json.loads(r.read()) except urllib.error.HTTPError as e: if e.code == 401 and attempt == 0: continue try: return e.code, json.loads(e.read()) except Exception: return e.code, {'_raw': e.read().decode(errors='replace')[:500]} except (urllib.error.URLError, TimeoutError) as e: if attempt == 0: time.sleep(5); continue return 0, {'_error': str(e)} def load_delta(start=0, limit=0): items = [] with open(DELTA, encoding='utf-8') as f: for i, line in enumerate(f): if i < start: continue if limit and len(items) >= limit: break parts = line.rstrip('\n').split('|') if len(parts) < 4: continue sn, path, size, mtime = parts[0], parts[1], parts[2], parts[3] items.append({'sn': sn, 'path': path, 'size': int(size)}) return items def read_content(path): with open(path, 'rb') as f: return f.read().decode('utf-8', errors='replace') def main(): ap = argparse.ArgumentParser() ap.add_argument('--batch', type=int, default=100, help='items per /bulk POST') ap.add_argument('--dry-run', action='store_true') ap.add_argument('--limit', type=int, default=0, help='cap total processed (0 = all)') ap.add_argument('--start', type=int, default=0, help='skip first N rows in delta file') args = ap.parse_args() print(f'[INFO] delta file: {DELTA}') print(f'[INFO] log file: {LOG_PATH}') items = load_delta(start=args.start, limit=args.limit) print(f'[INFO] {len(items)} items queued (start={args.start} limit={args.limit or "all"}, batch={args.batch})') print(f'[INFO] dry-run: {args.dry_run}') if not args.dry_run: token = get_token() print(f'[OK] token len={len(token)}') log = open(LOG_PATH, 'w', encoding='utf-8') log.write(f'# upload run {time.strftime("%Y-%m-%d %H:%M:%S")}\n') log.write(f'# delta size: {len(items)}, batch: {args.batch}, dry_run: {args.dry_run}\n') totals = {'received':0,'created':0,'updated':0,'unchanged':0,'errors':0} t0 = time.time() n = len(items) for i in range(0, n, args.batch): chunk = items[i:i+args.batch] # Read file content for each bulk = [] for it in chunk: try: bulk.append({'SerialNumber': it['sn'], 'Content': read_content(it['path'])}) except Exception as e: log.write(f'READ_FAIL {it["sn"]} {it["path"]} {e}\n') totals['errors'] += 1 if args.dry_run: print(f' [DRY] batch {i//args.batch + 1}: {len(bulk)} items (first sn={bulk[0]["SerialNumber"] if bulk else "-"})') continue status, resp = api_post('/api/v1/TestReportDataFiles/bulk', {'Items': bulk}) if status != 200: print(f' [FAIL] batch {i//args.batch+1} HTTP {status}: {resp}') log.write(f'BATCH_FAIL idx={i} status={status} resp={resp}\n') totals['errors'] += len(bulk) continue totals['received'] += resp.get('TotalReceived', 0) totals['created'] += resp.get('Created', 0) totals['updated'] += resp.get('Updated', 0) totals['unchanged']+= resp.get('Unchanged', 0) for err in (resp.get('Errors') or []): log.write(f'BATCH_ITEM_ERR {err}\n') totals['errors'] += 1 rate = (i + len(chunk)) / max(1, time.time() - t0) eta_s = int((n - (i + len(chunk))) / max(1, rate)) print(f' batch {i//args.batch+1:>4}/{(n+args.batch-1)//args.batch}: ' f'recv={resp.get("TotalReceived",0)} ' f'cre={resp.get("Created",0)} upd={resp.get("Updated",0)} unch={resp.get("Unchanged",0)} ' f'err={len(resp.get("Errors") or [])} | rate={rate:.0f}/s eta={eta_s}s') log.write(f'BATCH idx={i} rcv={resp.get("TotalReceived")} cre={resp.get("Created")} upd={resp.get("Updated")} unch={resp.get("Unchanged")} err={len(resp.get("Errors") or [])}\n') elapsed = time.time() - t0 print(f'\n[DONE] elapsed {elapsed:.1f}s') print(f' received: {totals["received"]}') print(f' created: {totals["created"]}') print(f' updated: {totals["updated"]}') print(f' unchanged: {totals["unchanged"]}') print(f' errors: {totals["errors"]}') log.write(f'\n# totals: {totals}\n# elapsed: {elapsed:.1f}s\n') log.close() print(f'[INFO] log: {LOG_PATH}') if __name__ == '__main__': main()