The DSCA33/DSCA45 main spec files lost in the cryptolocker wipe are recoverable: the original software published correct certs to the Hoffman product API before the wipe and our null-skipping renderer never overwrote them. Mine per-model Final-Test templates (names + specs + verbatim accuracy headers) straight from those originals instead of requesting spec files from Dataforth/John. - dsca33-45-templates.json: 56 models (DSCA33 34/35, DSCA45 22/23); only DSCA33-1948 + DSCA45-1746 (24 units) lack an original. - mine-hoffman-dsca.py: the re-runnable miner. - DSCA33-45-HOFFMAN-RECOVERY handoff for the AD2 session (incl. the gate: validate each render vs its Hoffman original before enabling live rendering). - memories: Hoffman recovery (supersedes the spec-gap "need John" note) and the AD2 SSH MTU-blackhole root cause/fix; errorlog entries (syncro jq, ssh correction). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
124 lines
5.3 KiB
Python
124 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Mine per-model DSCA33/DSCA45 Final-Test templates from the ORIGINAL certs stored
|
|
on Dataforth's Hoffman API (the spec files lost in the cryptolocker event are
|
|
recoverable here because the original software published these before the wipe).
|
|
|
|
Input : a JSON map [{"m": model, "s": serial}, ...] of UPLOADED serials.
|
|
Output: dsca33-45-templates.json (schema-compatible with dsca-templates.json:
|
|
{ model: { "accOut": "...", "rows": [ {"name","spec"}, ... ] } })
|
|
+ a human report on stdout.
|
|
|
|
Same extraction as the STAGE-1 extractor: the '===' rule under the Final-Test
|
|
"Parameter ... Measured" header gives exact column spans; name = Parameter col,
|
|
spec = Specification col. Keeps the richest sheet (most rows) per model.
|
|
"""
|
|
import json, re, sys, time, urllib.request, urllib.parse, os
|
|
|
|
TOKEN_URL = "https://login.dataforth.com/connect/token"
|
|
API_BASE = "https://www.dataforth.com"
|
|
CID, CSEC, SCOPE = "dataforth.onprem.sync", "Trxvwee2234-Awer8723-2", "dataforth.web"
|
|
|
|
def get_token():
|
|
body = urllib.parse.urlencode({
|
|
"grant_type": "client_credentials", "client_id": CID,
|
|
"client_secret": CSEC, "scope": SCOPE}).encode()
|
|
req = urllib.request.Request(TOKEN_URL, body,
|
|
{"Content-Type": "application/x-www-form-urlencoded"})
|
|
return json.loads(urllib.request.urlopen(req, timeout=30).read())["access_token"]
|
|
|
|
def get_cert(serial, tok):
|
|
url = f"{API_BASE}/api/v1/TestReportDataFiles/{urllib.parse.quote(serial)}"
|
|
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {tok}"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
return json.loads(r.read())
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404: return None
|
|
raise
|
|
|
|
def col_spans(sep):
|
|
return [(m.start(), m.end()) for m in re.finditer(r"=+", sep)]
|
|
|
|
def extract(t):
|
|
lines = t.replace("\r\n", "\n").split("\n")
|
|
ahi = next((i for i, l in enumerate(lines)
|
|
if "Error (%)" in l and "Status" in l), -1)
|
|
acc_hdr = lines[ahi] if ahi >= 0 else ""
|
|
# capture the verbatim 2-line accuracy header (super-header + column line) so
|
|
# AD2 can reproduce the model-specific input label + VDC/mADC/Hz headers exactly
|
|
acc_header = [lines[ahi - 1].rstrip(), lines[ahi].rstrip()] if ahi > 0 else []
|
|
m = re.search(r"Output \([^)]*\)|Vout \([^)]*\)", acc_hdr)
|
|
acc_out = m.group(0) if m else "?"
|
|
fi = next((i for i, l in enumerate(lines) if "FINAL TEST RESULTS" in l), -1)
|
|
if fi < 0: return None
|
|
hi = next((i for i in range(fi + 1, len(lines))
|
|
if re.search(r"Parameter\s+Measured", lines[i])), -1)
|
|
if hi < 0: return None
|
|
sep = lines[hi + 1] if hi + 1 < len(lines) else ""
|
|
if "=" not in sep: return None
|
|
cols = col_spans(sep)
|
|
if len(cols) < 4: return None
|
|
pc, mc, sc, stc = cols[0], cols[1], cols[2], cols[3]
|
|
rows = []
|
|
for i in range(hi + 2, len(lines)):
|
|
l = lines[i]
|
|
if re.search(r"Check List|^\s*_{5,}", l): break
|
|
if not l.strip(): continue
|
|
name = l[pc[0]:mc[0]].strip()
|
|
spec = l[sc[0]:stc[0]].strip()
|
|
if not name and not spec: continue
|
|
rows.append({"name": name, "spec": spec})
|
|
return {"accOut": acc_out, "rows": rows, "accHdr": acc_hdr.strip(),
|
|
"accHeader": acc_header}
|
|
|
|
def main():
|
|
mp = json.load(open(sys.argv[1]))
|
|
outpath = sys.argv[2]
|
|
tok = get_token()
|
|
by_model = {} # model -> best {accOut, rows, accHdr, serial}
|
|
meta = {} # model -> diagnostics
|
|
missing = []
|
|
for row in mp:
|
|
model, serial = row["m"], row["s"]
|
|
cert = get_cert(serial, tok)
|
|
if not cert or not cert.get("Content"):
|
|
missing.append((model, serial)); continue
|
|
tpl = extract(cert["Content"])
|
|
if not tpl:
|
|
meta.setdefault(model, {}).setdefault("noextract", []).append(serial); continue
|
|
cur = by_model.get(model)
|
|
if not cur or len(tpl["rows"]) > len(cur["rows"]):
|
|
tpl["serial"] = serial
|
|
by_model[model] = tpl
|
|
# build schema-compatible output
|
|
out = {}
|
|
for model in sorted(by_model):
|
|
t = by_model[model]
|
|
out[model] = {"accOut": t["accOut"], "accHeader": t["accHeader"],
|
|
"rows": t["rows"], "_srcSerial": t["serial"]}
|
|
with open(outpath, "w") as f:
|
|
json.dump(out, f, indent=0)
|
|
# report
|
|
fams = {}
|
|
print(f"=== Mined {len(out)} models from Hoffman -> {outpath} ===\n")
|
|
print(f"{'MODEL':<14} {'rows':>4} {'accOut':<16} src-serial accuracy-header")
|
|
for model in sorted(out):
|
|
t = by_model[model]
|
|
fam = model.split("-")[0]
|
|
fams[fam] = fams.get(fam, 0) + 1
|
|
flag = " <-- LOW" if len(t["rows"]) < 3 else ""
|
|
print(f"{model:<14} {len(t['rows']):>4} {t['accOut']:<16} {t['serial']:<11} {t['accHdr'][:60]}{flag}")
|
|
print("\nper-family models mined:", dict(fams))
|
|
distinct_accout = sorted(set(o["accOut"] for o in out.values()))
|
|
print("distinct accOut tokens:", distinct_accout)
|
|
if missing:
|
|
print(f"\n[WARN] {len(missing)} serials returned 404 (not on Hoffman):",
|
|
missing[:10], "..." if len(missing) > 10 else "")
|
|
no_tpl = [m for m in {r['m'] for r in mp} if m not in out]
|
|
if no_tpl:
|
|
print(f"\n[WARN] models with NO usable template ({len(no_tpl)}):", no_tpl)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|