Built the missing piece between the test datasheet pipeline and Dataforth's new product API. End-to-end: - Pulled DFWDS (Dataforth Web Datasheet System) VB6 source from AD1\Engineering\ENGR\ATE\Test Datasheets\DFWDS to local for analysis - Decoded its filename validation: A-J prefix decodes (A=10..J=19), all- numeric WO# valid (no leading 0), anything else bad - Ported the validation + move logic to Node (dfwds-process.js) - Built bulk uploader (upload-delta.js) for Hoffman's Swagger API (POST /api/v1/TestReportDataFiles/bulk with OAuth client_credentials) Sanitized 3 prior reference scripts (fetch-server-inventory, test-scenarios, test-upload-two) to read CF_* env vars instead of hardcoded creds. Live drain results: - 897 files moved Test_Datasheets -> For_Web (all valid, no renames, no bad), DFWDS port summary in 1.1s - Pushed entire For_Web (7,061 files) to Hoffman API in 49.7s @ 142/s: Created=803 Updated=114 Unchanged=6,144 Errors=0 - Server count: 489,579 -> 490,382 (+803 net new) Also: - Added clients/dataforth/.gitignore to exclude plaintext Oauth.txt note - Added clients/instrumental-music-center/docs/2026-04-13-ticket-notes.md (ticket write-up of 2026-04-11/12/13 IMC1 RDS removal/SQL migration work) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
163 lines
6.8 KiB
Python
163 lines
6.8 KiB
Python
"""Bulk-upload the delta to Hoffman's API.
|
|
|
|
Reads delta_to_upload.txt (produced by compute-delta.py), POSTs to
|
|
/api/v1/TestReportDataFiles/bulk in batches, retries on transient errors,
|
|
refreshes token before expiry, logs results.
|
|
|
|
Usage:
|
|
python upload-delta.py [--batch 100] [--dry-run] [--limit N] [--start N]
|
|
"""
|
|
import argparse, json, os, subprocess, sys, time, urllib.request, urllib.error, urllib.parse
|
|
import yaml
|
|
|
|
DELTA = r'C:\Users\guru\AppData\Local\Temp\delta_to_upload.txt'
|
|
LOG_DIR = r'D:\claudetools\projects\dataforth-dos\datasheet-pipeline\upload-logs'
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
LOG_PATH = os.path.join(LOG_DIR, time.strftime('upload-%Y%m%d-%H%M%S.log'))
|
|
|
|
VAULT = 'D:/vault/clients/dataforth/api-oauth.sops.yaml'
|
|
|
|
def load_creds():
|
|
r = subprocess.run(['sops','-d',VAULT], capture_output=True, text=True, timeout=30, check=True)
|
|
y = yaml.safe_load(r.stdout)
|
|
return {
|
|
'token_url': y['endpoints']['token-url'],
|
|
'api_base': y['endpoints']['api-base'],
|
|
'client_id': y['credentials']['client-id'],
|
|
'client_secret': y['credentials']['client-secret'],
|
|
'scope': y['credentials']['scope'],
|
|
}
|
|
|
|
CREDS = load_creds()
|
|
_token = {'value': None, 'expires_at': 0}
|
|
|
|
def get_token(force=False):
|
|
if not force and _token['value'] and time.time() < _token['expires_at'] - 60:
|
|
return _token['value']
|
|
body = urllib.parse.urlencode({
|
|
'grant_type':'client_credentials',
|
|
'client_id':CREDS['client_id'],
|
|
'client_secret':CREDS['client_secret'],
|
|
'scope':CREDS['scope'],
|
|
}).encode()
|
|
req = urllib.request.Request(CREDS['token_url'], data=body, method='POST',
|
|
headers={'Content-Type':'application/x-www-form-urlencoded'})
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
t = json.loads(r.read())
|
|
_token['value'] = t['access_token']
|
|
_token['expires_at'] = time.time() + t.get('expires_in', 3600)
|
|
return _token['value']
|
|
|
|
def api_post(path, body):
|
|
"""POST with one retry on 401/transient."""
|
|
url = f"{CREDS['api_base']}{path}"
|
|
data = json.dumps(body).encode()
|
|
for attempt in range(2):
|
|
token = get_token(force=(attempt > 0))
|
|
req = urllib.request.Request(url, data=data, method='POST',
|
|
headers={'Authorization':f'Bearer {token}','Content-Type':'application/json'})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=120) as r:
|
|
return r.status, json.loads(r.read())
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 401 and attempt == 0:
|
|
continue
|
|
try: return e.code, json.loads(e.read())
|
|
except Exception: return e.code, {'_raw': e.read().decode(errors='replace')[:500]}
|
|
except (urllib.error.URLError, TimeoutError) as e:
|
|
if attempt == 0:
|
|
time.sleep(5); continue
|
|
return 0, {'_error': str(e)}
|
|
|
|
def load_delta(start=0, limit=0):
|
|
items = []
|
|
with open(DELTA, encoding='utf-8') as f:
|
|
for i, line in enumerate(f):
|
|
if i < start: continue
|
|
if limit and len(items) >= limit: break
|
|
parts = line.rstrip('\n').split('|')
|
|
if len(parts) < 4: continue
|
|
sn, path, size, mtime = parts[0], parts[1], parts[2], parts[3]
|
|
items.append({'sn': sn, 'path': path, 'size': int(size)})
|
|
return items
|
|
|
|
def read_content(path):
|
|
with open(path, 'rb') as f:
|
|
return f.read().decode('utf-8', errors='replace')
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument('--batch', type=int, default=100, help='items per /bulk POST')
|
|
ap.add_argument('--dry-run', action='store_true')
|
|
ap.add_argument('--limit', type=int, default=0, help='cap total processed (0 = all)')
|
|
ap.add_argument('--start', type=int, default=0, help='skip first N rows in delta file')
|
|
args = ap.parse_args()
|
|
|
|
print(f'[INFO] delta file: {DELTA}')
|
|
print(f'[INFO] log file: {LOG_PATH}')
|
|
items = load_delta(start=args.start, limit=args.limit)
|
|
print(f'[INFO] {len(items)} items queued (start={args.start} limit={args.limit or "all"}, batch={args.batch})')
|
|
print(f'[INFO] dry-run: {args.dry_run}')
|
|
|
|
if not args.dry_run:
|
|
token = get_token()
|
|
print(f'[OK] token len={len(token)}')
|
|
|
|
log = open(LOG_PATH, 'w', encoding='utf-8')
|
|
log.write(f'# upload run {time.strftime("%Y-%m-%d %H:%M:%S")}\n')
|
|
log.write(f'# delta size: {len(items)}, batch: {args.batch}, dry_run: {args.dry_run}\n')
|
|
|
|
totals = {'received':0,'created':0,'updated':0,'unchanged':0,'errors':0}
|
|
t0 = time.time()
|
|
n = len(items)
|
|
for i in range(0, n, args.batch):
|
|
chunk = items[i:i+args.batch]
|
|
# Read file content for each
|
|
bulk = []
|
|
for it in chunk:
|
|
try:
|
|
bulk.append({'SerialNumber': it['sn'], 'Content': read_content(it['path'])})
|
|
except Exception as e:
|
|
log.write(f'READ_FAIL {it["sn"]} {it["path"]} {e}\n')
|
|
totals['errors'] += 1
|
|
|
|
if args.dry_run:
|
|
print(f' [DRY] batch {i//args.batch + 1}: {len(bulk)} items (first sn={bulk[0]["SerialNumber"] if bulk else "-"})')
|
|
continue
|
|
|
|
status, resp = api_post('/api/v1/TestReportDataFiles/bulk', {'Items': bulk})
|
|
if status != 200:
|
|
print(f' [FAIL] batch {i//args.batch+1} HTTP {status}: {resp}')
|
|
log.write(f'BATCH_FAIL idx={i} status={status} resp={resp}\n')
|
|
totals['errors'] += len(bulk)
|
|
continue
|
|
totals['received'] += resp.get('TotalReceived', 0)
|
|
totals['created'] += resp.get('Created', 0)
|
|
totals['updated'] += resp.get('Updated', 0)
|
|
totals['unchanged']+= resp.get('Unchanged', 0)
|
|
for err in (resp.get('Errors') or []):
|
|
log.write(f'BATCH_ITEM_ERR {err}\n')
|
|
totals['errors'] += 1
|
|
|
|
rate = (i + len(chunk)) / max(1, time.time() - t0)
|
|
eta_s = int((n - (i + len(chunk))) / max(1, rate))
|
|
print(f' batch {i//args.batch+1:>4}/{(n+args.batch-1)//args.batch}: '
|
|
f'recv={resp.get("TotalReceived",0)} '
|
|
f'cre={resp.get("Created",0)} upd={resp.get("Updated",0)} unch={resp.get("Unchanged",0)} '
|
|
f'err={len(resp.get("Errors") or [])} | rate={rate:.0f}/s eta={eta_s}s')
|
|
log.write(f'BATCH idx={i} rcv={resp.get("TotalReceived")} cre={resp.get("Created")} upd={resp.get("Updated")} unch={resp.get("Unchanged")} err={len(resp.get("Errors") or [])}\n')
|
|
|
|
elapsed = time.time() - t0
|
|
print(f'\n[DONE] elapsed {elapsed:.1f}s')
|
|
print(f' received: {totals["received"]}')
|
|
print(f' created: {totals["created"]}')
|
|
print(f' updated: {totals["updated"]}')
|
|
print(f' unchanged: {totals["unchanged"]}')
|
|
print(f' errors: {totals["errors"]}')
|
|
log.write(f'\n# totals: {totals}\n# elapsed: {elapsed:.1f}s\n')
|
|
log.close()
|
|
print(f'[INFO] log: {LOG_PATH}')
|
|
|
|
if __name__ == '__main__':
|
|
main()
|