#!/usr/bin/env python3 """ Mine per-model DSCA33/DSCA45 Final-Test templates from the ORIGINAL certs stored on Dataforth's Hoffman API (the spec files lost in the cryptolocker event are recoverable here because the original software published these before the wipe). Input : a JSON map [{"m": model, "s": serial}, ...] of UPLOADED serials. Output: dsca33-45-templates.json (schema-compatible with dsca-templates.json: { model: { "accOut": "...", "rows": [ {"name","spec"}, ... ] } }) + a human report on stdout. Same extraction as the STAGE-1 extractor: the '===' rule under the Final-Test "Parameter ... Measured" header gives exact column spans; name = Parameter col, spec = Specification col. Keeps the richest sheet (most rows) per model. """ import json, re, sys, time, urllib.request, urllib.parse, os TOKEN_URL = "https://login.dataforth.com/connect/token" API_BASE = "https://www.dataforth.com" CID, CSEC, SCOPE = "dataforth.onprem.sync", "Trxvwee2234-Awer8723-2", "dataforth.web" def get_token(): body = urllib.parse.urlencode({ "grant_type": "client_credentials", "client_id": CID, "client_secret": CSEC, "scope": SCOPE}).encode() req = urllib.request.Request(TOKEN_URL, body, {"Content-Type": "application/x-www-form-urlencoded"}) return json.loads(urllib.request.urlopen(req, timeout=30).read())["access_token"] def get_cert(serial, tok): url = f"{API_BASE}/api/v1/TestReportDataFiles/{urllib.parse.quote(serial)}" req = urllib.request.Request(url, headers={"Authorization": f"Bearer {tok}"}) try: with urllib.request.urlopen(req, timeout=30) as r: return json.loads(r.read()) except urllib.error.HTTPError as e: if e.code == 404: return None raise def col_spans(sep): return [(m.start(), m.end()) for m in re.finditer(r"=+", sep)] def extract(t): lines = t.replace("\r\n", "\n").split("\n") ahi = next((i for i, l in enumerate(lines) if "Error (%)" in l and "Status" in l), -1) acc_hdr = lines[ahi] if ahi >= 0 else "" # capture the verbatim 2-line accuracy header (super-header + column line) so # AD2 can reproduce the model-specific input label + VDC/mADC/Hz headers exactly acc_header = [lines[ahi - 1].rstrip(), lines[ahi].rstrip()] if ahi > 0 else [] m = re.search(r"Output \([^)]*\)|Vout \([^)]*\)", acc_hdr) acc_out = m.group(0) if m else "?" fi = next((i for i, l in enumerate(lines) if "FINAL TEST RESULTS" in l), -1) if fi < 0: return None hi = next((i for i in range(fi + 1, len(lines)) if re.search(r"Parameter\s+Measured", lines[i])), -1) if hi < 0: return None sep = lines[hi + 1] if hi + 1 < len(lines) else "" if "=" not in sep: return None cols = col_spans(sep) if len(cols) < 4: return None pc, mc, sc, stc = cols[0], cols[1], cols[2], cols[3] rows = [] for i in range(hi + 2, len(lines)): l = lines[i] if re.search(r"Check List|^\s*_{5,}", l): break if not l.strip(): continue name = l[pc[0]:mc[0]].strip() spec = l[sc[0]:stc[0]].strip() if not name and not spec: continue rows.append({"name": name, "spec": spec}) return {"accOut": acc_out, "rows": rows, "accHdr": acc_hdr.strip(), "accHeader": acc_header} def main(): mp = json.load(open(sys.argv[1])) outpath = sys.argv[2] tok = get_token() by_model = {} # model -> best {accOut, rows, accHdr, serial} meta = {} # model -> diagnostics missing = [] for row in mp: model, serial = row["m"], row["s"] cert = get_cert(serial, tok) if not cert or not cert.get("Content"): missing.append((model, serial)); continue tpl = extract(cert["Content"]) if not tpl: meta.setdefault(model, {}).setdefault("noextract", []).append(serial); continue cur = by_model.get(model) if not cur or len(tpl["rows"]) > len(cur["rows"]): tpl["serial"] = serial by_model[model] = tpl # build schema-compatible output out = {} for model in sorted(by_model): t = by_model[model] out[model] = {"accOut": t["accOut"], "accHeader": t["accHeader"], "rows": t["rows"], "_srcSerial": t["serial"]} with open(outpath, "w") as f: json.dump(out, f, indent=0) # report fams = {} print(f"=== Mined {len(out)} models from Hoffman -> {outpath} ===\n") print(f"{'MODEL':<14} {'rows':>4} {'accOut':<16} src-serial accuracy-header") for model in sorted(out): t = by_model[model] fam = model.split("-")[0] fams[fam] = fams.get(fam, 0) + 1 flag = " <-- LOW" if len(t["rows"]) < 3 else "" print(f"{model:<14} {len(t['rows']):>4} {t['accOut']:<16} {t['serial']:<11} {t['accHdr'][:60]}{flag}") print("\nper-family models mined:", dict(fams)) distinct_accout = sorted(set(o["accOut"] for o in out.values())) print("distinct accOut tokens:", distinct_accout) if missing: print(f"\n[WARN] {len(missing)} serials returned 404 (not on Hoffman):", missing[:10], "..." if len(missing) > 10 else "") no_tpl = [m for m in {r['m'] for r in mp} if m not in out] if no_tpl: print(f"\n[WARN] models with NO usable template ({len(no_tpl)}):", no_tpl) if __name__ == "__main__": main()