"""Run idempotency, update, and bulk tests against the Dataforth API.""" import json import urllib.request import urllib.parse import hashlib import os, sys TOKEN_URL = os.environ.get("CF_TOKEN_URL", "https://login.dataforth.com/connect/token") API_BASE = os.environ.get("CF_API_BASE", "https://www.dataforth.com") + "/api/v1" CLIENT_ID = os.environ.get("CF_CLIENT_ID", "") CLIENT_SECRET = os.environ.get("CF_CLIENT_SECRET", "") SCOPE = os.environ.get("CF_SCOPE", "dataforth.web") if not CLIENT_ID or not CLIENT_SECRET: sys.exit("set CF_CLIENT_ID + CF_CLIENT_SECRET (vault: clients/dataforth/api-oauth.sops.yaml)") SAMPLE_DIR = r"D:\claudetools\projects\dataforth-dos\datasheet-pipeline\scmvas-hvas-research\samples\backfill-verify" def get_token(): data = urllib.parse.urlencode({ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": SCOPE, }).encode() with urllib.request.urlopen(urllib.request.Request(TOKEN_URL, data=data)) as r: return json.loads(r.read())["access_token"] def api(method, path, token, body=None): headers = {"Authorization": f"Bearer {token}"} data = None if body is not None: data = json.dumps(body).encode() headers["Content-Type"] = "application/json" req = urllib.request.Request(API_BASE + path, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req) as r: return r.status, r.read().decode() except urllib.error.HTTPError as e: return e.code, e.read().decode() def read_sample(sn): with open(f"{SAMPLE_DIR}\\{sn}-source.txt", "rb") as f: return f.read().decode("utf-8", errors="replace") def sha16(s): return hashlib.sha256(s.encode()).hexdigest()[:16] token = get_token() # ---------- TEST A: Idempotency ---------- print("=" * 60) print("TEST A: Idempotency (re-POST 179377-5 with same content)") print("=" * 60) content = read_sample("179377-5") print(f"Content hash[16]: {sha16(content)}") # Baseline stats status, body = api("GET", "/TestReportDataFiles/stats", token) stats_before = json.loads(body) print(f"Stats before: TotalCount={stats_before['TotalCount']}") status, body = api("POST", "/TestReportDataFiles", token, {"SerialNumber": "179377-5", "Content": content}) print(f"POST same content -> HTTP {status} body: {body}") status, body = api("GET", "/TestReportDataFiles/stats", token) stats_after = json.loads(body) print(f"Stats after: TotalCount={stats_after['TotalCount']} (delta {stats_after['TotalCount']-stats_before['TotalCount']})") status, body = api("GET", "/TestReportDataFiles/179377-5", token) obj = json.loads(body) print(f"CreatedAtUtc: {obj.get('CreatedAtUtc')}") print(f"UpdatedAtUtc: {obj.get('UpdatedAtUtc')}") print() # ---------- TEST B: Update ---------- print("=" * 60) print("TEST B: Update (re-POST 179377-5 with MODIFIED content)") print("=" * 60) modified = content + "\n[TEST B MODIFICATION MARKER - WILL BE REVERTED]\n" print(f"Modified hash[16]: {sha16(modified)} (bytes {len(modified.encode())})") status, body = api("POST", "/TestReportDataFiles", token, {"SerialNumber": "179377-5", "Content": modified}) print(f"POST modified content -> HTTP {status} body: {body}") status, body = api("GET", "/TestReportDataFiles/179377-5", token) obj = json.loads(body) fetched = obj.get("Content", "") print(f"Server bytes after update: {len(fetched.encode())} hash[16]: {sha16(fetched)}") print(f"CreatedAtUtc: {obj.get('CreatedAtUtc')}") print(f"UpdatedAtUtc: {obj.get('UpdatedAtUtc')} <-- should be non-null now") print(f"Last line of fetched content: {fetched.rstrip().splitlines()[-1]!r}") print() # ---------- TEST B cleanup: restore ---------- print("=" * 60) print("Restoring 179377-5 to original content") print("=" * 60) status, body = api("POST", "/TestReportDataFiles", token, {"SerialNumber": "179377-5", "Content": content}) print(f"POST original content -> HTTP {status} body: {body}") status, body = api("GET", "/TestReportDataFiles/179377-5", token) obj = json.loads(body) restored_hash = sha16(obj.get("Content", "")) print(f"Restored hash[16]: {restored_hash} (expect {sha16(content)})") print(f"Match: {restored_hash == sha16(content)}") print(f"UpdatedAtUtc: {obj.get('UpdatedAtUtc')}") print() # ---------- TEST C: Bulk ---------- print("=" * 60) print("TEST C: Bulk upload (179377-7, -8, -9)") print("=" * 60) items = [] for sn in ["179377-7", "179377-8", "179377-9"]: c = read_sample(sn) items.append({"SerialNumber": sn, "Content": c}) print(f" Staged {sn}: {len(c.encode())} bytes, hash[16]={sha16(c)}") status, body = api("POST", "/TestReportDataFiles/bulk", token, {"Items": items}) print(f"\nPOST /bulk -> HTTP {status}") print(f"Response: {body}") print() for sn in ["179377-7", "179377-8", "179377-9"]: status, body = api("GET", f"/TestReportDataFiles/{sn}", token) if status == 200: o = json.loads(body) local = read_sample(sn) match = "MATCH" if sha16(local) == sha16(o.get("Content", "")) else "DIFF" print(f" {sn}: GET {status} content {match}, Created={o.get('CreatedAtUtc')}") else: print(f" {sn}: GET {status} body={body}") # Final stats status, body = api("GET", "/TestReportDataFiles/stats", token) stats_final = json.loads(body) print(f"\nFinal TotalCount: {stats_final['TotalCount']} (started at {stats_before['TotalCount']})")