Session log + DFWDS Node port + Hoffman API uploader pipeline
Built the missing piece between the test datasheet pipeline and Dataforth's new product API. End-to-end: - Pulled DFWDS (Dataforth Web Datasheet System) VB6 source from AD1\Engineering\ENGR\ATE\Test Datasheets\DFWDS to local for analysis - Decoded its filename validation: A-J prefix decodes (A=10..J=19), all- numeric WO# valid (no leading 0), anything else bad - Ported the validation + move logic to Node (dfwds-process.js) - Built bulk uploader (upload-delta.js) for Hoffman's Swagger API (POST /api/v1/TestReportDataFiles/bulk with OAuth client_credentials) Sanitized 3 prior reference scripts (fetch-server-inventory, test-scenarios, test-upload-two) to read CF_* env vars instead of hardcoded creds. Live drain results: - 897 files moved Test_Datasheets -> For_Web (all valid, no renames, no bad), DFWDS port summary in 1.1s - Pushed entire For_Web (7,061 files) to Hoffman API in 49.7s @ 142/s: Created=803 Updated=114 Unchanged=6,144 Errors=0 - Server count: 489,579 -> 490,382 (+803 net new) Also: - Added clients/dataforth/.gitignore to exclude plaintext Oauth.txt note - Added clients/instrumental-music-center/docs/2026-04-13-ticket-notes.md (ticket write-up of 2026-04-11/12/13 IMC1 RDS removal/SQL migration work) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
74
projects/dataforth-dos/datasheet-pipeline/compute-delta.py
Normal file
74
projects/dataforth-dos/datasheet-pipeline/compute-delta.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Compare local For_Web serials vs server-side serials; emit delta list."""
|
||||
import os
|
||||
|
||||
LOCAL = r"C:\Users\guru\AppData\Local\Temp\for_web_inventory.txt"
|
||||
SERVER = r"C:\Users\guru\AppData\Local\Temp\server_inventory.txt"
|
||||
DELTA_OUT = r"C:\Users\guru\AppData\Local\Temp\delta_to_upload.txt"
|
||||
|
||||
# Load server set (case-sensitive exact; mirror what we'd POST as SerialNumber)
|
||||
server = set()
|
||||
with open(SERVER) as f:
|
||||
for line in f:
|
||||
s = line.strip()
|
||||
if s:
|
||||
server.add(s)
|
||||
|
||||
# Also load a case-insensitive lookup just to see if anything only differs by case
|
||||
server_ci = {s.lower() for s in server}
|
||||
|
||||
# Walk local inventory
|
||||
local_total = 0
|
||||
local_sns = set()
|
||||
path_by_sn = {}
|
||||
with open(LOCAL) as f:
|
||||
for line in f:
|
||||
parts = line.rstrip("\n").split("|")
|
||||
if len(parts) < 4:
|
||||
continue
|
||||
full, sn, size, mtime = parts[0], parts[1], parts[2], parts[3]
|
||||
local_total += 1
|
||||
local_sns.add(sn)
|
||||
path_by_sn[sn] = (full, int(size), mtime)
|
||||
|
||||
# Diff
|
||||
missing_on_server = sorted(sn for sn in local_sns if sn not in server)
|
||||
missing_ci_only = sorted(sn for sn in local_sns if sn not in server and sn.lower() in server_ci)
|
||||
already_present = sorted(sn for sn in local_sns if sn in server)
|
||||
|
||||
print(f"Local total: {local_total}")
|
||||
print(f"Local unique serials: {len(local_sns)}")
|
||||
print(f"Server total: {len(server)}")
|
||||
print(f"")
|
||||
print(f"Already on server (case-sensitive exact match): {len(already_present)}")
|
||||
print(f"Missing on server (case-sensitive): {len(missing_on_server)}")
|
||||
print(f" ...of which only differ by case: {len(missing_ci_only)}")
|
||||
print(f"")
|
||||
|
||||
# Write delta list (paths) to upload
|
||||
with open(DELTA_OUT, "w") as f:
|
||||
for sn in missing_on_server:
|
||||
full, size, mtime = path_by_sn[sn]
|
||||
f.write(f"{sn}|{full}|{size}|{mtime}\n")
|
||||
|
||||
print(f"Wrote delta list ({len(missing_on_server)} entries) -> {DELTA_OUT}")
|
||||
|
||||
# Show sample of delta
|
||||
if missing_on_server:
|
||||
print("\nFirst 10 missing SNs:")
|
||||
for sn in missing_on_server[:10]:
|
||||
full, size, mtime = path_by_sn[sn]
|
||||
print(f" {sn} ({size} bytes, {mtime[:19]})")
|
||||
print("\nLast 10 missing SNs:")
|
||||
for sn in missing_on_server[-10:]:
|
||||
full, size, mtime = path_by_sn[sn]
|
||||
print(f" {sn} ({size} bytes, {mtime[:19]})")
|
||||
|
||||
# Show counts by year-month for the delta
|
||||
from collections import Counter
|
||||
by_month = Counter()
|
||||
for sn in missing_on_server:
|
||||
_, _, mtime = path_by_sn[sn]
|
||||
by_month[mtime[:7]] += 1
|
||||
print("\nDelta by year-month:")
|
||||
for k in sorted(by_month):
|
||||
print(f" {k}: {by_month[k]}")
|
||||
136
projects/dataforth-dos/datasheet-pipeline/dfwds-process.js
Normal file
136
projects/dataforth-dos/datasheet-pipeline/dfwds-process.js
Normal file
@@ -0,0 +1,136 @@
|
||||
/**
|
||||
* DFWDS-equivalent in Node — moves test datasheets from staging to For_Web,
|
||||
* decodes any DOS A-J prefix on filenames, quarantines invalid files.
|
||||
*
|
||||
* Mirrors the validation logic from the original VB6 DFWDS.bas:
|
||||
* - filename must be .txt
|
||||
* - must contain a dash
|
||||
* - work-order portion (left of dash) must be all-numeric and not start with 0
|
||||
* - OR the first char is A-J (DOS-encoded), which decodes to 10-19 and renames
|
||||
*
|
||||
* Defaults match the original DFWDS_NAMES.txt:
|
||||
* --in \\ad2\webshare\Test_Datasheets
|
||||
* --bad \\ad2\webshare\Bad_Datasheets
|
||||
* --out \\ad2\webshare\For_Web
|
||||
* --log \\ad2\webshare\Datasheets_Log
|
||||
*
|
||||
* Use --dry-run to see what would happen without moving anything.
|
||||
*/
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
|
||||
const args = process.argv.slice(2);
|
||||
const arg = (n, d) => { const i = args.indexOf(n); return i >= 0 ? args[i+1] : d; };
|
||||
const flag = (n) => args.includes(n);
|
||||
|
||||
const IN_DIR = arg('--in', String.raw`C:\Shares\webshare\Test_Datasheets`);
|
||||
const BAD_DIR = arg('--bad', String.raw`C:\Shares\webshare\Bad_Datasheets`);
|
||||
const OUT_DIR = arg('--out', String.raw`C:\Shares\webshare\For_Web`);
|
||||
const LOG_DIR = arg('--log', String.raw`C:\Shares\webshare\Datasheets_Log`);
|
||||
const DRY = flag('--dry-run');
|
||||
const LIMIT = parseInt(arg('--limit', '0'), 10);
|
||||
|
||||
// Per VB funcDecodeWOchar: A-J map to 10..19 (one char -> two digits)
|
||||
const PREFIX_DECODE = {
|
||||
A: '10', B: '11', C: '12', D: '13', E: '14',
|
||||
F: '15', G: '16', H: '17', I: '18', J: '19',
|
||||
};
|
||||
|
||||
function classify(filename) {
|
||||
// Returns: {action: 'valid'|'rename'|'bad', newName?: string, reason?: string}
|
||||
if (!filename.toLowerCase().endsWith('.txt')) {
|
||||
return {action:'bad', reason:'not .txt'};
|
||||
}
|
||||
const stem = filename.slice(0, -4); // strip .txt
|
||||
const dash = stem.indexOf('-');
|
||||
if (dash <= 0) return {action:'bad', reason:'no dash or dash at start'};
|
||||
|
||||
const wo = stem.slice(0, dash).toUpperCase();
|
||||
const tail = stem.slice(dash); // includes leading '-'
|
||||
|
||||
if (/^\d+$/.test(wo)) {
|
||||
if (wo.startsWith('0')) return {action:'bad', reason:'WO starts with 0'};
|
||||
return {action:'valid'};
|
||||
}
|
||||
|
||||
// Check DOS-encoded: first char A-J, rest all numeric
|
||||
const first = wo[0];
|
||||
if (PREFIX_DECODE[first] && /^\d+$/.test(wo.slice(1))) {
|
||||
const newWo = PREFIX_DECODE[first] + wo.slice(1);
|
||||
return {action:'rename', newName: newWo + tail + '.txt'};
|
||||
}
|
||||
|
||||
return {action:'bad', reason:`WO "${wo}" not numeric and not A-J prefixed`};
|
||||
}
|
||||
|
||||
function ensureDir(d) { fs.mkdirSync(d, {recursive: true}); }
|
||||
|
||||
function moveFile(src, dst) {
|
||||
ensureDir(path.dirname(dst));
|
||||
if (fs.existsSync(dst)) {
|
||||
// overwrite by removing first; rename in same dir is fine but cross-dir on Win sometimes errors if exists
|
||||
fs.unlinkSync(dst);
|
||||
}
|
||||
fs.renameSync(src, dst);
|
||||
}
|
||||
|
||||
function main() {
|
||||
if (!fs.existsSync(IN_DIR)) {
|
||||
console.error(`[FAIL] input dir not found: ${IN_DIR}`);
|
||||
process.exit(1);
|
||||
}
|
||||
ensureDir(BAD_DIR); ensureDir(OUT_DIR); ensureDir(LOG_DIR);
|
||||
|
||||
// Log filename matches DFWDS pattern: DFWDS_YYYY_MM_DD.log
|
||||
const now = new Date();
|
||||
const ymd = `${now.getFullYear()}_${String(now.getMonth()+1).padStart(2,'0')}_${String(now.getDate()).padStart(2,'0')}`;
|
||||
const logPath = path.join(LOG_DIR, `DFWDS_${ymd}.log`);
|
||||
const logFh = fs.openSync(logPath, 'a');
|
||||
function log(msg) {
|
||||
const line = `[${new Date().toISOString()}] ${msg}\n`;
|
||||
process.stdout.write(line);
|
||||
fs.writeSync(logFh, line);
|
||||
}
|
||||
|
||||
log(`=== DFWDS-process start (Node port) ===`);
|
||||
log(` in: ${IN_DIR}`);
|
||||
log(` out: ${OUT_DIR}`);
|
||||
log(` bad: ${BAD_DIR}`);
|
||||
log(` dry: ${DRY}, limit: ${LIMIT||'all'}`);
|
||||
|
||||
const all = fs.readdirSync(IN_DIR).filter(n => fs.statSync(path.join(IN_DIR, n)).isFile());
|
||||
const queue = LIMIT ? all.slice(0, LIMIT) : all;
|
||||
log(` queued: ${queue.length} files (of ${all.length} in dir)`);
|
||||
|
||||
const stats = {valid:0, renamed:0, bad:0, errors:0};
|
||||
for (const name of queue) {
|
||||
const src = path.join(IN_DIR, name);
|
||||
const cls = classify(name);
|
||||
try {
|
||||
if (cls.action === 'valid') {
|
||||
const dst = path.join(OUT_DIR, name);
|
||||
if (DRY) log(` [DRY VALID] ${name} -> ${dst}`);
|
||||
else { moveFile(src, dst); log(` VALID ${name}`); }
|
||||
stats.valid++;
|
||||
} else if (cls.action === 'rename') {
|
||||
const dst = path.join(OUT_DIR, cls.newName);
|
||||
if (DRY) log(` [DRY RENAME] ${name} -> ${cls.newName} -> ${dst}`);
|
||||
else { moveFile(src, dst); log(` RENAMED ${name} -> ${cls.newName}`); }
|
||||
stats.renamed++;
|
||||
} else {
|
||||
const dst = path.join(BAD_DIR, name);
|
||||
if (DRY) log(` [DRY BAD] ${name} (${cls.reason}) -> ${dst}`);
|
||||
else { moveFile(src, dst); log(` BAD ${name} (${cls.reason})`); }
|
||||
stats.bad++;
|
||||
}
|
||||
} catch (e) {
|
||||
log(` ERR ${name}: ${e.message}`);
|
||||
stats.errors++;
|
||||
}
|
||||
}
|
||||
log(`=== summary: valid=${stats.valid} renamed=${stats.renamed} bad=${stats.bad} errors=${stats.errors}`);
|
||||
fs.closeSync(logFh);
|
||||
console.log(`\n[INFO] log: ${logPath}`);
|
||||
}
|
||||
|
||||
main();
|
||||
@@ -0,0 +1,80 @@
|
||||
"""Paginate API to fetch every server-side SerialNumber, write to file."""
|
||||
import json
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
|
||||
import os, sys
|
||||
TOKEN_URL = os.environ.get("CF_TOKEN_URL", "https://login.dataforth.com/connect/token")
|
||||
API_BASE = os.environ.get("CF_API_BASE", "https://www.dataforth.com") + "/api/v1"
|
||||
CLIENT_ID = os.environ.get("CF_CLIENT_ID", "")
|
||||
CLIENT_SECRET = os.environ.get("CF_CLIENT_SECRET", "")
|
||||
SCOPE = os.environ.get("CF_SCOPE", "dataforth.web")
|
||||
if not CLIENT_ID or not CLIENT_SECRET:
|
||||
sys.exit("set CF_CLIENT_ID + CF_CLIENT_SECRET (vault: clients/dataforth/api-oauth.sops.yaml)")
|
||||
|
||||
OUTFILE = r"C:\Users\guru\AppData\Local\Temp\server_inventory.txt"
|
||||
PAGE_SIZE = 1000 # try 1000 first; smaller if server complains
|
||||
|
||||
|
||||
def get_token():
|
||||
data = urllib.parse.urlencode({
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": CLIENT_ID,
|
||||
"client_secret": CLIENT_SECRET,
|
||||
"scope": SCOPE,
|
||||
}).encode()
|
||||
with urllib.request.urlopen(urllib.request.Request(TOKEN_URL, data=data)) as r:
|
||||
return json.loads(r.read())["access_token"]
|
||||
|
||||
|
||||
def main():
|
||||
token = get_token()
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
total = 0
|
||||
page = 1
|
||||
cursor = None
|
||||
t_start = time.time()
|
||||
|
||||
with open(OUTFILE, "w") as f:
|
||||
while True:
|
||||
params = {"page": page, "pageSize": PAGE_SIZE}
|
||||
if cursor:
|
||||
params["afterSerialNumber"] = cursor
|
||||
url = f"{API_BASE}/TestReportDataFiles?" + urllib.parse.urlencode(params)
|
||||
req = urllib.request.Request(url, headers=headers)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as r:
|
||||
obj = json.loads(r.read())
|
||||
except Exception as e:
|
||||
print(f" ERROR at page {page}: {e}")
|
||||
# Refresh token and retry once
|
||||
token = get_token()
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
with urllib.request.urlopen(urllib.request.Request(url, headers=headers), timeout=30) as r:
|
||||
obj = json.loads(r.read())
|
||||
|
||||
items = obj.get("Items", [])
|
||||
if not items:
|
||||
break
|
||||
|
||||
for it in items:
|
||||
f.write(it["SerialNumber"] + "\n")
|
||||
total += len(items)
|
||||
cursor = obj.get("NextCursor")
|
||||
|
||||
if page % 50 == 0 or page == 1:
|
||||
rate = total / max(1, time.time() - t_start)
|
||||
print(f" page {page}: total={total} rate={rate:.0f}/s cursor={cursor!r}")
|
||||
|
||||
if not cursor:
|
||||
break
|
||||
page += 1
|
||||
|
||||
elapsed = time.time() - t_start
|
||||
print(f"\nDONE: {total} serials written to {OUTFILE} in {elapsed:.1f}s")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,14 @@
|
||||
"""Check AD2 for Python/Node availability."""
|
||||
import paramiko, subprocess, yaml
|
||||
pwd_raw = yaml.safe_load(subprocess.run(['sops','-d','D:/vault/clients/dataforth/ad2.sops.yaml'],
|
||||
capture_output=True, text=True, timeout=30, check=True).stdout)['credentials']['password']
|
||||
PWD = pwd_raw.replace('\\', '')
|
||||
c = paramiko.SSHClient(); c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
c.connect('192.168.0.6', username='sysadmin', password=PWD,
|
||||
timeout=30, banner_timeout=45, look_for_keys=False, allow_agent=False)
|
||||
for cmd in ['where python', 'where python3', 'where node', 'python --version 2>&1']:
|
||||
print(f'$ {cmd}')
|
||||
_, o, _ = c.exec_command(cmd, timeout=15)
|
||||
print(o.read().decode().rstrip())
|
||||
print()
|
||||
c.close()
|
||||
121
projects/dataforth-dos/datasheet-pipeline/run_full_drain.py
Normal file
121
projects/dataforth-dos/datasheet-pipeline/run_full_drain.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Drain Test_Datasheets through DFWDS-Node, refresh inventory+delta, push to API.
|
||||
|
||||
Steps:
|
||||
1. SFTP dfwds-process.js + fetch-server-inventory.js + compute-delta logic
|
||||
(we'll use the existing Python ones for inventory; keep upload-delta.js as-is)
|
||||
2. Run DFWDS dry-run on AD2 to see what 897 would do
|
||||
3. Run DFWDS for real
|
||||
4. Re-build for_web inventory on AD2 (PowerShell one-liner)
|
||||
5. SFTP for_web inventory back, fetch server inventory locally, compute delta locally
|
||||
6. SFTP delta to AD2, run upload-delta.js
|
||||
"""
|
||||
import base64, paramiko, subprocess, sys, time, yaml, os, threading
|
||||
|
||||
LIMIT = 0 # 0 = all 897
|
||||
DRY = False
|
||||
for i, a in enumerate(sys.argv[1:]):
|
||||
if a == '--limit': LIMIT = int(sys.argv[i+2])
|
||||
if a == '--dry-run': DRY = True
|
||||
|
||||
ad2_pwd = yaml.safe_load(subprocess.run(['sops','-d','D:/vault/clients/dataforth/ad2.sops.yaml'],
|
||||
capture_output=True, text=True, timeout=30, check=True).stdout)['credentials']['password'].replace('\\','')
|
||||
api = yaml.safe_load(subprocess.run(['sops','-d','D:/vault/clients/dataforth/api-oauth.sops.yaml'],
|
||||
capture_output=True, text=True, timeout=30, check=True).stdout)
|
||||
|
||||
REMOTE_DIR = 'C:/Users/sysadmin/Documents/dataforth-uploader'
|
||||
LOCAL = r'D:\claudetools\projects\dataforth-dos\datasheet-pipeline'
|
||||
TEMP_DIR = r'C:\Users\guru\AppData\Local\Temp'
|
||||
|
||||
c = paramiko.SSHClient(); c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
c.connect('192.168.0.6', username='sysadmin', password=ad2_pwd,
|
||||
timeout=30, banner_timeout=45, look_for_keys=False, allow_agent=False)
|
||||
|
||||
def run(cmd, to=300):
|
||||
enc = base64.b64encode(cmd.encode('utf-16-le')).decode()
|
||||
_, o, _ = c.exec_command(f'powershell -NoProfile -EncodedCommand {enc}', timeout=to)
|
||||
return o.read().decode('utf-8','replace')
|
||||
|
||||
def stream(cmd, to=7200):
|
||||
enc = base64.b64encode(cmd.encode('utf-16-le')).decode()
|
||||
stdin, stdout, stderr = c.exec_command(f'powershell -NoProfile -EncodedCommand {enc}', timeout=to)
|
||||
def reader(s):
|
||||
try:
|
||||
for line in iter(lambda: s.readline(), ''):
|
||||
if not line: break
|
||||
print(line.rstrip(), flush=True)
|
||||
except Exception: pass
|
||||
t = threading.Thread(target=reader, args=(stdout,), daemon=True); t.start()
|
||||
t2 = threading.Thread(target=reader, args=(stderr,), daemon=True); t2.start()
|
||||
t0 = time.time()
|
||||
while time.time() - t0 < to:
|
||||
if stdout.channel.exit_status_ready(): break
|
||||
time.sleep(1)
|
||||
t.join(timeout=5); t2.join(timeout=5)
|
||||
return stdout.channel.recv_exit_status() if stdout.channel.exit_status_ready() else -1
|
||||
|
||||
print('[1] sftp dfwds-process.js to AD2')
|
||||
sftp = c.open_sftp()
|
||||
sftp.put(os.path.join(LOCAL, 'dfwds-process.js'), f'{REMOTE_DIR}/dfwds-process.js')
|
||||
sftp.close()
|
||||
|
||||
print(f'\n[2] dry-run DFWDS on Test_Datasheets (limit={LIMIT or "all"})')
|
||||
flags = '--dry-run'
|
||||
if LIMIT: flags += f' --limit {LIMIT}'
|
||||
rc = stream(f'cd "{REMOTE_DIR}"; & node dfwds-process.js {flags} 2>&1', to=300)
|
||||
print(f'[dry-run rc={rc}]')
|
||||
|
||||
if DRY:
|
||||
print('\n--dry-run flag set on outer script -- stopping here')
|
||||
c.close(); sys.exit(0)
|
||||
|
||||
print(f'\n[3] LIVE DFWDS run')
|
||||
flags = ''
|
||||
if LIMIT: flags = f'--limit {LIMIT}'
|
||||
rc = stream(f'cd "{REMOTE_DIR}"; & node dfwds-process.js {flags} 2>&1', to=600)
|
||||
print(f'[live rc={rc}]')
|
||||
|
||||
print('\n[4] regenerate for_web inventory on AD2')
|
||||
ps_inv = (
|
||||
r'$out = "C:\Users\sysadmin\Documents\dataforth-uploader\for_web_inventory.txt"; '
|
||||
r'Get-ChildItem "C:\Shares\webshare\For_Web" -File -Filter *.TXT | '
|
||||
r'ForEach-Object { "$($_.FullName)|$([System.IO.Path]::GetFileNameWithoutExtension($_.Name))|$($_.Length)|$($_.LastWriteTime.ToString("o"))" } | '
|
||||
r'Set-Content -Path $out -Encoding ASCII; '
|
||||
r'(Get-Content $out).Count'
|
||||
)
|
||||
out = run(ps_inv, to=120)
|
||||
print(f' for_web entries: {out.strip()}')
|
||||
|
||||
# Pull inventory back to workstation
|
||||
sftp = c.open_sftp()
|
||||
sftp.get(f'{REMOTE_DIR}/for_web_inventory.txt', os.path.join(TEMP_DIR, 'for_web_inventory.txt'))
|
||||
sftp.close()
|
||||
print(f' pulled to {TEMP_DIR}\\for_web_inventory.txt')
|
||||
|
||||
print('\n[5] fetch fresh server inventory + compute delta locally')
|
||||
rc = subprocess.run([sys.executable, '-u', os.path.join(LOCAL, 'fetch-server-inventory.py')],
|
||||
timeout=600).returncode
|
||||
print(f' fetch-server-inventory rc={rc}')
|
||||
rc = subprocess.run([sys.executable, '-u', os.path.join(LOCAL, 'compute-delta.py')],
|
||||
timeout=120).returncode
|
||||
print(f' compute-delta rc={rc}')
|
||||
|
||||
# Push fresh delta to AD2
|
||||
sftp = c.open_sftp()
|
||||
sftp.put(os.path.join(TEMP_DIR, 'delta_to_upload.txt'), f'{REMOTE_DIR}/delta_to_upload.txt')
|
||||
sftp.close()
|
||||
|
||||
print('\n[6] run upload-delta.js with fresh delta')
|
||||
ps_upload = (
|
||||
f'$env:CF_TOKEN_URL = "{api["endpoints"]["token-url"]}"; '
|
||||
f'$env:CF_API_BASE = "{api["endpoints"]["api-base"]}"; '
|
||||
f'$env:CF_CLIENT_ID = "{api["credentials"]["client-id"]}"; '
|
||||
f'$env:CF_CLIENT_SECRET = "{api["credentials"]["client-secret"]}"; '
|
||||
f'$env:CF_SCOPE = "{api["credentials"]["scope"]}"; '
|
||||
f'cd "{REMOTE_DIR}"; '
|
||||
f'& node upload-delta.js --batch 100 2>&1'
|
||||
)
|
||||
rc = stream(ps_upload, to=3600)
|
||||
print(f'\n[upload rc={rc}]')
|
||||
|
||||
c.close()
|
||||
print('\n[OK] full drain complete')
|
||||
@@ -0,0 +1,83 @@
|
||||
"""SFTP upload-delta.js + delta_to_upload.txt to AD2, then run via SSH with creds in env vars."""
|
||||
import paramiko, subprocess, sys, time, yaml, os
|
||||
|
||||
LIMIT = 0 # 0 = all
|
||||
BATCH = 100
|
||||
DRY = False
|
||||
START = 0
|
||||
# Allow CLI overrides
|
||||
for i, a in enumerate(sys.argv[1:]):
|
||||
if a == '--limit': LIMIT = int(sys.argv[i+2])
|
||||
if a == '--batch': BATCH = int(sys.argv[i+2])
|
||||
if a == '--start': START = int(sys.argv[i+2])
|
||||
if a == '--dry-run': DRY = True
|
||||
|
||||
ad2_pwd = yaml.safe_load(subprocess.run(['sops','-d','D:/vault/clients/dataforth/ad2.sops.yaml'],
|
||||
capture_output=True, text=True, timeout=30, check=True).stdout)['credentials']['password'].replace('\\','')
|
||||
api = yaml.safe_load(subprocess.run(['sops','-d','D:/vault/clients/dataforth/api-oauth.sops.yaml'],
|
||||
capture_output=True, text=True, timeout=30, check=True).stdout)
|
||||
|
||||
REMOTE_DIR = 'C:/Users/sysadmin/Documents/dataforth-uploader'
|
||||
LOCAL_DELTA = r'C:\Users\guru\AppData\Local\Temp\delta_to_upload.txt'
|
||||
LOCAL_JS = r'D:\claudetools\projects\dataforth-dos\datasheet-pipeline\upload-delta.js'
|
||||
|
||||
c = paramiko.SSHClient(); c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
c.connect('192.168.0.6', username='sysadmin', password=ad2_pwd,
|
||||
timeout=30, banner_timeout=45, look_for_keys=False, allow_agent=False)
|
||||
|
||||
def run(cmd, to=120):
|
||||
_, o, e = c.exec_command(cmd, timeout=to)
|
||||
return o.read().decode('utf-8','replace'), e.read().decode('utf-8','replace'), o.channel.recv_exit_status()
|
||||
|
||||
print('[1] mkdir + sftp upload')
|
||||
run(f'powershell -Command "New-Item -ItemType Directory -Force -Path \\"{REMOTE_DIR}\\" | Out-Null"')
|
||||
sftp = c.open_sftp()
|
||||
sftp.put(LOCAL_JS, f'{REMOTE_DIR}/upload-delta.js')
|
||||
sftp.put(LOCAL_DELTA, f'{REMOTE_DIR}/delta_to_upload.txt')
|
||||
sftp.close()
|
||||
out, _, _ = run(f'powershell -Command "Get-ChildItem \\"{REMOTE_DIR}\\" | Select Name,Length | Format-Table -AutoSize | Out-String"')
|
||||
print(out.rstrip())
|
||||
|
||||
print('\n[2] run uploader on AD2 (env-var creds)')
|
||||
flags = []
|
||||
if LIMIT: flags += ['--limit', str(LIMIT)]
|
||||
if BATCH: flags += ['--batch', str(BATCH)]
|
||||
if START: flags += ['--start', str(START)]
|
||||
if DRY: flags += ['--dry-run']
|
||||
flag_str = ' '.join(flags)
|
||||
|
||||
# Build powershell command that sets env vars then runs node
|
||||
ps_cmd = (
|
||||
f'$env:CF_TOKEN_URL = "{api["endpoints"]["token-url"]}"; '
|
||||
f'$env:CF_API_BASE = "{api["endpoints"]["api-base"]}"; '
|
||||
f'$env:CF_CLIENT_ID = "{api["credentials"]["client-id"]}"; '
|
||||
f'$env:CF_CLIENT_SECRET = "{api["credentials"]["client-secret"]}"; '
|
||||
f'$env:CF_SCOPE = "{api["credentials"]["scope"]}"; '
|
||||
f'cd "{REMOTE_DIR}"; '
|
||||
f'& node upload-delta.js {flag_str} 2>&1'
|
||||
)
|
||||
import base64
|
||||
enc = base64.b64encode(ps_cmd.encode('utf-16-le')).decode()
|
||||
|
||||
stdin, stdout, stderr = c.exec_command(f'powershell -NoProfile -EncodedCommand {enc}', timeout=7200, get_pty=False)
|
||||
import threading
|
||||
def reader(stream, label):
|
||||
try:
|
||||
for line in iter(lambda: stream.readline(), ''):
|
||||
if not line: break
|
||||
print(line.rstrip(), flush=True)
|
||||
except Exception as e:
|
||||
print(f'[{label} reader err] {e}')
|
||||
t = threading.Thread(target=reader, args=(stdout,'out'), daemon=True); t.start()
|
||||
err_t = threading.Thread(target=reader, args=(stderr,'err'), daemon=True); err_t.start()
|
||||
|
||||
t0 = time.time()
|
||||
while time.time() - t0 < 7200:
|
||||
if stdout.channel.exit_status_ready(): break
|
||||
time.sleep(2)
|
||||
|
||||
t.join(timeout=5)
|
||||
err_t.join(timeout=5)
|
||||
rc = stdout.channel.recv_exit_status() if stdout.channel.exit_status_ready() else -1
|
||||
print(f'\n[exit {rc}]')
|
||||
c.close()
|
||||
143
projects/dataforth-dos/datasheet-pipeline/test-scenarios.py
Normal file
143
projects/dataforth-dos/datasheet-pipeline/test-scenarios.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Run idempotency, update, and bulk tests against the Dataforth API."""
|
||||
import json
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import hashlib
|
||||
|
||||
import os, sys
|
||||
TOKEN_URL = os.environ.get("CF_TOKEN_URL", "https://login.dataforth.com/connect/token")
|
||||
API_BASE = os.environ.get("CF_API_BASE", "https://www.dataforth.com") + "/api/v1"
|
||||
CLIENT_ID = os.environ.get("CF_CLIENT_ID", "")
|
||||
CLIENT_SECRET = os.environ.get("CF_CLIENT_SECRET", "")
|
||||
SCOPE = os.environ.get("CF_SCOPE", "dataforth.web")
|
||||
if not CLIENT_ID or not CLIENT_SECRET:
|
||||
sys.exit("set CF_CLIENT_ID + CF_CLIENT_SECRET (vault: clients/dataforth/api-oauth.sops.yaml)")
|
||||
SAMPLE_DIR = r"D:\claudetools\projects\dataforth-dos\datasheet-pipeline\scmvas-hvas-research\samples\backfill-verify"
|
||||
|
||||
|
||||
def get_token():
|
||||
data = urllib.parse.urlencode({
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": CLIENT_ID,
|
||||
"client_secret": CLIENT_SECRET,
|
||||
"scope": SCOPE,
|
||||
}).encode()
|
||||
with urllib.request.urlopen(urllib.request.Request(TOKEN_URL, data=data)) as r:
|
||||
return json.loads(r.read())["access_token"]
|
||||
|
||||
|
||||
def api(method, path, token, body=None):
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
data = None
|
||||
if body is not None:
|
||||
data = json.dumps(body).encode()
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(API_BASE + path, data=data, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as r:
|
||||
return r.status, r.read().decode()
|
||||
except urllib.error.HTTPError as e:
|
||||
return e.code, e.read().decode()
|
||||
|
||||
|
||||
def read_sample(sn):
|
||||
with open(f"{SAMPLE_DIR}\\{sn}-source.txt", "rb") as f:
|
||||
return f.read().decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def sha16(s):
|
||||
return hashlib.sha256(s.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
token = get_token()
|
||||
|
||||
# ---------- TEST A: Idempotency ----------
|
||||
print("=" * 60)
|
||||
print("TEST A: Idempotency (re-POST 179377-5 with same content)")
|
||||
print("=" * 60)
|
||||
content = read_sample("179377-5")
|
||||
print(f"Content hash[16]: {sha16(content)}")
|
||||
|
||||
# Baseline stats
|
||||
status, body = api("GET", "/TestReportDataFiles/stats", token)
|
||||
stats_before = json.loads(body)
|
||||
print(f"Stats before: TotalCount={stats_before['TotalCount']}")
|
||||
|
||||
status, body = api("POST", "/TestReportDataFiles", token,
|
||||
{"SerialNumber": "179377-5", "Content": content})
|
||||
print(f"POST same content -> HTTP {status} body: {body}")
|
||||
|
||||
status, body = api("GET", "/TestReportDataFiles/stats", token)
|
||||
stats_after = json.loads(body)
|
||||
print(f"Stats after: TotalCount={stats_after['TotalCount']} (delta {stats_after['TotalCount']-stats_before['TotalCount']})")
|
||||
|
||||
status, body = api("GET", "/TestReportDataFiles/179377-5", token)
|
||||
obj = json.loads(body)
|
||||
print(f"CreatedAtUtc: {obj.get('CreatedAtUtc')}")
|
||||
print(f"UpdatedAtUtc: {obj.get('UpdatedAtUtc')}")
|
||||
print()
|
||||
|
||||
# ---------- TEST B: Update ----------
|
||||
print("=" * 60)
|
||||
print("TEST B: Update (re-POST 179377-5 with MODIFIED content)")
|
||||
print("=" * 60)
|
||||
modified = content + "\n[TEST B MODIFICATION MARKER - WILL BE REVERTED]\n"
|
||||
print(f"Modified hash[16]: {sha16(modified)} (bytes {len(modified.encode())})")
|
||||
|
||||
status, body = api("POST", "/TestReportDataFiles", token,
|
||||
{"SerialNumber": "179377-5", "Content": modified})
|
||||
print(f"POST modified content -> HTTP {status} body: {body}")
|
||||
|
||||
status, body = api("GET", "/TestReportDataFiles/179377-5", token)
|
||||
obj = json.loads(body)
|
||||
fetched = obj.get("Content", "")
|
||||
print(f"Server bytes after update: {len(fetched.encode())} hash[16]: {sha16(fetched)}")
|
||||
print(f"CreatedAtUtc: {obj.get('CreatedAtUtc')}")
|
||||
print(f"UpdatedAtUtc: {obj.get('UpdatedAtUtc')} <-- should be non-null now")
|
||||
print(f"Last line of fetched content: {fetched.rstrip().splitlines()[-1]!r}")
|
||||
print()
|
||||
|
||||
# ---------- TEST B cleanup: restore ----------
|
||||
print("=" * 60)
|
||||
print("Restoring 179377-5 to original content")
|
||||
print("=" * 60)
|
||||
status, body = api("POST", "/TestReportDataFiles", token,
|
||||
{"SerialNumber": "179377-5", "Content": content})
|
||||
print(f"POST original content -> HTTP {status} body: {body}")
|
||||
status, body = api("GET", "/TestReportDataFiles/179377-5", token)
|
||||
obj = json.loads(body)
|
||||
restored_hash = sha16(obj.get("Content", ""))
|
||||
print(f"Restored hash[16]: {restored_hash} (expect {sha16(content)})")
|
||||
print(f"Match: {restored_hash == sha16(content)}")
|
||||
print(f"UpdatedAtUtc: {obj.get('UpdatedAtUtc')}")
|
||||
print()
|
||||
|
||||
# ---------- TEST C: Bulk ----------
|
||||
print("=" * 60)
|
||||
print("TEST C: Bulk upload (179377-7, -8, -9)")
|
||||
print("=" * 60)
|
||||
items = []
|
||||
for sn in ["179377-7", "179377-8", "179377-9"]:
|
||||
c = read_sample(sn)
|
||||
items.append({"SerialNumber": sn, "Content": c})
|
||||
print(f" Staged {sn}: {len(c.encode())} bytes, hash[16]={sha16(c)}")
|
||||
|
||||
status, body = api("POST", "/TestReportDataFiles/bulk", token, {"Items": items})
|
||||
print(f"\nPOST /bulk -> HTTP {status}")
|
||||
print(f"Response: {body}")
|
||||
print()
|
||||
|
||||
for sn in ["179377-7", "179377-8", "179377-9"]:
|
||||
status, body = api("GET", f"/TestReportDataFiles/{sn}", token)
|
||||
if status == 200:
|
||||
o = json.loads(body)
|
||||
local = read_sample(sn)
|
||||
match = "MATCH" if sha16(local) == sha16(o.get("Content", "")) else "DIFF"
|
||||
print(f" {sn}: GET {status} content {match}, Created={o.get('CreatedAtUtc')}")
|
||||
else:
|
||||
print(f" {sn}: GET {status} body={body}")
|
||||
|
||||
# Final stats
|
||||
status, body = api("GET", "/TestReportDataFiles/stats", token)
|
||||
stats_final = json.loads(body)
|
||||
print(f"\nFinal TotalCount: {stats_final['TotalCount']} (started at {stats_before['TotalCount']})")
|
||||
94
projects/dataforth-dos/datasheet-pipeline/test-upload-two.py
Normal file
94
projects/dataforth-dos/datasheet-pipeline/test-upload-two.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Upload two real datasheets, fetch them back, diff byte-for-byte."""
|
||||
import json
|
||||
import sys
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import hashlib
|
||||
|
||||
import os, sys
|
||||
TOKEN_URL = os.environ.get("CF_TOKEN_URL", "https://login.dataforth.com/connect/token")
|
||||
API_BASE = os.environ.get("CF_API_BASE", "https://www.dataforth.com") + "/api/v1"
|
||||
CLIENT_ID = os.environ.get("CF_CLIENT_ID", "")
|
||||
CLIENT_SECRET = os.environ.get("CF_CLIENT_SECRET", "")
|
||||
SCOPE = os.environ.get("CF_SCOPE", "dataforth.web")
|
||||
if not CLIENT_ID or not CLIENT_SECRET:
|
||||
sys.exit("set CF_CLIENT_ID + CF_CLIENT_SECRET (vault: clients/dataforth/api-oauth.sops.yaml)")
|
||||
|
||||
SAMPLES = [
|
||||
("179377-5", r"D:\claudetools\projects\dataforth-dos\datasheet-pipeline\scmvas-hvas-research\samples\backfill-verify\179377-5-source.txt"),
|
||||
("179377-6", r"D:\claudetools\projects\dataforth-dos\datasheet-pipeline\scmvas-hvas-research\samples\backfill-verify\179377-6-source.txt"),
|
||||
]
|
||||
|
||||
|
||||
def get_token():
|
||||
data = urllib.parse.urlencode({
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": CLIENT_ID,
|
||||
"client_secret": CLIENT_SECRET,
|
||||
"scope": SCOPE,
|
||||
}).encode()
|
||||
req = urllib.request.Request(TOKEN_URL, data=data)
|
||||
with urllib.request.urlopen(req) as r:
|
||||
return json.loads(r.read())["access_token"]
|
||||
|
||||
|
||||
def api(method, path, token, body=None):
|
||||
url = API_BASE + path
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
if body is not None:
|
||||
body = json.dumps(body).encode()
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as r:
|
||||
return r.status, r.read().decode()
|
||||
except urllib.error.HTTPError as e:
|
||||
return e.code, e.read().decode()
|
||||
|
||||
|
||||
def main():
|
||||
token = get_token()
|
||||
print(f"[OK] Got access token (len={len(token)})\n")
|
||||
|
||||
for sn, path in SAMPLES:
|
||||
with open(path, "rb") as f:
|
||||
content_bytes = f.read()
|
||||
content = content_bytes.decode("utf-8", errors="replace")
|
||||
local_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
|
||||
print(f"=== {sn} ===")
|
||||
print(f" Local file: {path}")
|
||||
print(f" Local bytes: {len(content_bytes)} sha256[16]: {local_hash}")
|
||||
|
||||
status, body = api("POST", "/TestReportDataFiles", token,
|
||||
{"SerialNumber": sn, "Content": content})
|
||||
print(f" POST -> HTTP {status}")
|
||||
print(f" Server response: {body}")
|
||||
|
||||
status, body = api("GET", f"/TestReportDataFiles/{sn}", token)
|
||||
print(f" GET -> HTTP {status}")
|
||||
if status != 200:
|
||||
print(f" !! Fetch failed: {body}")
|
||||
continue
|
||||
obj = json.loads(body)
|
||||
fetched = obj.get("Content", "")
|
||||
fetched_hash = hashlib.sha256(fetched.encode()).hexdigest()[:16]
|
||||
print(f" Server bytes: {len(fetched.encode('utf-8'))} sha256[16]: {fetched_hash}")
|
||||
match = "MATCH" if content == fetched else "DIFF"
|
||||
print(f" Content match: {match}")
|
||||
print(f" CreatedAtUtc: {obj.get('CreatedAtUtc')}")
|
||||
print(f" UpdatedAtUtc: {obj.get('UpdatedAtUtc')}")
|
||||
|
||||
if content != fetched:
|
||||
# Show first diff
|
||||
for i, (a, b) in enumerate(zip(content, fetched)):
|
||||
if a != b:
|
||||
print(f" First diff at char {i}: local={a!r} server={b!r}")
|
||||
print(f" context: ...{content[max(0,i-20):i+20]!r}")
|
||||
break
|
||||
else:
|
||||
print(f" Length diff: local={len(content)} server={len(fetched)}")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
204
projects/dataforth-dos/datasheet-pipeline/upload-delta.js
Normal file
204
projects/dataforth-dos/datasheet-pipeline/upload-delta.js
Normal file
@@ -0,0 +1,204 @@
|
||||
/**
|
||||
* Bulk-upload delta to Dataforth API.
|
||||
*
|
||||
* Reads delta_to_upload.txt (pipe-delimited: SerialNumber|Path|Size|MTime),
|
||||
* batches into POST /api/v1/TestReportDataFiles/bulk, refreshes token before
|
||||
* expiry, logs result line-by-line.
|
||||
*
|
||||
* Required env vars:
|
||||
* CF_TOKEN_URL, CF_API_BASE, CF_CLIENT_ID, CF_CLIENT_SECRET, CF_SCOPE
|
||||
*
|
||||
* Usage: node upload-delta.js [--delta path] [--batch 100] [--limit N]
|
||||
* [--start N] [--dry-run]
|
||||
*/
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const https = require('https');
|
||||
const url = require('url');
|
||||
|
||||
const args = process.argv.slice(2);
|
||||
function arg(name, dflt) {
|
||||
const i = args.indexOf(name);
|
||||
if (i < 0) return dflt;
|
||||
return args[i+1];
|
||||
}
|
||||
const flag = (name) => args.includes(name);
|
||||
|
||||
const DELTA = arg('--delta', 'C:\\Users\\sysadmin\\Documents\\dataforth-uploader\\delta_to_upload.txt');
|
||||
const BATCH = parseInt(arg('--batch', '100'), 10);
|
||||
const LIMIT = parseInt(arg('--limit', '0'), 10);
|
||||
const START = parseInt(arg('--start', '0'), 10);
|
||||
const DRY = flag('--dry-run');
|
||||
|
||||
const CRED = {
|
||||
tokenUrl: process.env.CF_TOKEN_URL,
|
||||
apiBase: process.env.CF_API_BASE,
|
||||
clientId: process.env.CF_CLIENT_ID,
|
||||
clientSecret: process.env.CF_CLIENT_SECRET,
|
||||
scope: process.env.CF_SCOPE,
|
||||
};
|
||||
for (const k of Object.keys(CRED)) {
|
||||
if (!CRED[k] && !DRY) { console.error(`[FAIL] missing env var for ${k}`); process.exit(1); }
|
||||
}
|
||||
|
||||
const LOG_DIR = path.join(path.dirname(DELTA), 'upload-logs');
|
||||
fs.mkdirSync(LOG_DIR, {recursive: true});
|
||||
const LOG_PATH = path.join(LOG_DIR, `upload-${new Date().toISOString().replace(/[:.]/g,'-').slice(0,19)}.log`);
|
||||
const log = fs.createWriteStream(LOG_PATH);
|
||||
|
||||
let tokenCache = {value: null, expiresAt: 0};
|
||||
|
||||
function postForm(uri, formObj) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const u = new url.URL(uri);
|
||||
const body = Object.entries(formObj).map(([k,v]) => `${encodeURIComponent(k)}=${encodeURIComponent(v)}`).join('&');
|
||||
const req = https.request({
|
||||
hostname: u.hostname, port: u.port||443, path: u.pathname + u.search,
|
||||
method:'POST',
|
||||
headers:{'Content-Type':'application/x-www-form-urlencoded','Content-Length':Buffer.byteLength(body)},
|
||||
timeout: 30000,
|
||||
}, res => {
|
||||
let data=''; res.on('data', c => data+=c);
|
||||
res.on('end', () => {
|
||||
try { resolve({status:res.statusCode, body:JSON.parse(data)}); }
|
||||
catch (e) { resolve({status:res.statusCode, body:{_raw:data}}); }
|
||||
});
|
||||
});
|
||||
req.on('error', reject); req.on('timeout', () => req.destroy(new Error('timeout')));
|
||||
req.write(body); req.end();
|
||||
});
|
||||
}
|
||||
|
||||
function postJson(uri, jsonObj, token) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const u = new url.URL(uri);
|
||||
const body = JSON.stringify(jsonObj);
|
||||
const req = https.request({
|
||||
hostname: u.hostname, port: u.port||443, path: u.pathname + u.search,
|
||||
method:'POST',
|
||||
headers:{
|
||||
'Authorization':`Bearer ${token}`,
|
||||
'Content-Type':'application/json',
|
||||
'Content-Length':Buffer.byteLength(body),
|
||||
},
|
||||
timeout: 180000,
|
||||
}, res => {
|
||||
let data=''; res.on('data', c => data+=c);
|
||||
res.on('end', () => {
|
||||
try { resolve({status:res.statusCode, body:JSON.parse(data)}); }
|
||||
catch (e) { resolve({status:res.statusCode, body:{_raw:data}}); }
|
||||
});
|
||||
});
|
||||
req.on('error', reject); req.on('timeout', () => req.destroy(new Error('timeout')));
|
||||
req.write(body); req.end();
|
||||
});
|
||||
}
|
||||
|
||||
async function getToken(force=false) {
|
||||
if (!force && tokenCache.value && Date.now() < tokenCache.expiresAt - 60000) {
|
||||
return tokenCache.value;
|
||||
}
|
||||
const r = await postForm(CRED.tokenUrl, {
|
||||
grant_type: 'client_credentials',
|
||||
client_id: CRED.clientId,
|
||||
client_secret: CRED.clientSecret,
|
||||
scope: CRED.scope,
|
||||
});
|
||||
if (r.status !== 200 || !r.body.access_token) {
|
||||
throw new Error(`token fetch failed: ${r.status} ${JSON.stringify(r.body)}`);
|
||||
}
|
||||
tokenCache.value = r.body.access_token;
|
||||
tokenCache.expiresAt = Date.now() + (r.body.expires_in || 3600) * 1000;
|
||||
return tokenCache.value;
|
||||
}
|
||||
|
||||
async function bulkUpload(items) {
|
||||
for (let attempt = 0; attempt < 2; attempt++) {
|
||||
const token = await getToken(attempt > 0);
|
||||
try {
|
||||
const r = await postJson(`${CRED.apiBase}/api/v1/TestReportDataFiles/bulk`, {Items: items}, token);
|
||||
if (r.status === 401 && attempt === 0) continue;
|
||||
return r;
|
||||
} catch (e) {
|
||||
if (attempt === 0) { await new Promise(r => setTimeout(r, 5000)); continue; }
|
||||
return {status: 0, body: {_error: e.message}};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function loadDelta() {
|
||||
const items = [];
|
||||
const lines = fs.readFileSync(DELTA, 'utf-8').split(/\r?\n/);
|
||||
for (let i = 0; i < lines.length; i++) {
|
||||
if (i < START) continue;
|
||||
if (LIMIT && items.length >= LIMIT) break;
|
||||
const parts = lines[i].split('|');
|
||||
if (parts.length < 4) continue;
|
||||
items.push({sn: parts[0], path: parts[1], size: parseInt(parts[2],10) || 0});
|
||||
}
|
||||
return items;
|
||||
}
|
||||
|
||||
(async () => {
|
||||
console.log(`[INFO] delta file: ${DELTA}`);
|
||||
console.log(`[INFO] log file: ${LOG_PATH}`);
|
||||
const items = loadDelta();
|
||||
console.log(`[INFO] ${items.length} items queued (start=${START} limit=${LIMIT||'all'} batch=${BATCH})`);
|
||||
console.log(`[INFO] dry-run: ${DRY}`);
|
||||
log.write(`# upload run ${new Date().toISOString()}\n`);
|
||||
log.write(`# delta=${items.length} batch=${BATCH} dry=${DRY}\n`);
|
||||
|
||||
if (!DRY) {
|
||||
const t = await getToken();
|
||||
console.log(`[OK] token len=${t.length}`);
|
||||
}
|
||||
|
||||
const totals = {received:0, created:0, updated:0, unchanged:0, errors:0};
|
||||
const t0 = Date.now();
|
||||
const nBatches = Math.ceil(items.length / BATCH);
|
||||
|
||||
for (let i = 0; i < items.length; i += BATCH) {
|
||||
const chunk = items.slice(i, i + BATCH);
|
||||
const bulk = [];
|
||||
for (const it of chunk) {
|
||||
try {
|
||||
const buf = fs.readFileSync(it.path);
|
||||
bulk.push({SerialNumber: it.sn, Content: buf.toString('utf-8')});
|
||||
} catch (e) {
|
||||
log.write(`READ_FAIL ${it.sn} ${it.path} ${e.message}\n`);
|
||||
totals.errors++;
|
||||
}
|
||||
}
|
||||
if (DRY) {
|
||||
console.log(` [DRY] batch ${(i/BATCH)+1}/${nBatches}: ${bulk.length} items`);
|
||||
continue;
|
||||
}
|
||||
const r = await bulkUpload(bulk);
|
||||
if (r.status !== 200) {
|
||||
console.log(` [FAIL] batch ${(i/BATCH)+1} HTTP ${r.status}: ${JSON.stringify(r.body).slice(0,300)}`);
|
||||
log.write(`BATCH_FAIL idx=${i} status=${r.status} body=${JSON.stringify(r.body).slice(0,500)}\n`);
|
||||
totals.errors += bulk.length;
|
||||
continue;
|
||||
}
|
||||
totals.received += r.body.TotalReceived || 0;
|
||||
totals.created += r.body.Created || 0;
|
||||
totals.updated += r.body.Updated || 0;
|
||||
totals.unchanged += r.body.Unchanged || 0;
|
||||
for (const e of (r.body.Errors || [])) {
|
||||
log.write(`BATCH_ITEM_ERR ${e}\n`);
|
||||
totals.errors++;
|
||||
}
|
||||
const done = i + chunk.length;
|
||||
const rate = done / Math.max(1, (Date.now()-t0)/1000);
|
||||
const etaS = Math.round((items.length - done) / Math.max(1, rate));
|
||||
console.log(` batch ${(i/BATCH)+1}/${nBatches}: recv=${r.body.TotalReceived} cre=${r.body.Created} upd=${r.body.Updated} unch=${r.body.Unchanged} err=${(r.body.Errors||[]).length} | rate=${rate.toFixed(0)}/s eta=${etaS}s`);
|
||||
log.write(`BATCH idx=${i} rcv=${r.body.TotalReceived} cre=${r.body.Created} upd=${r.body.Updated} unch=${r.body.Unchanged} err=${(r.body.Errors||[]).length}\n`);
|
||||
}
|
||||
|
||||
const elapsed = (Date.now() - t0) / 1000;
|
||||
console.log(`\n[DONE] elapsed ${elapsed.toFixed(1)}s`);
|
||||
for (const [k,v] of Object.entries(totals)) console.log(` ${k}: ${v}`);
|
||||
log.write(`\n# totals ${JSON.stringify(totals)}\n# elapsed ${elapsed.toFixed(1)}s\n`);
|
||||
log.end();
|
||||
console.log(`[INFO] log: ${LOG_PATH}`);
|
||||
})().catch(e => { console.error('[FATAL]', e); process.exit(1); });
|
||||
162
projects/dataforth-dos/datasheet-pipeline/upload-delta.py
Normal file
162
projects/dataforth-dos/datasheet-pipeline/upload-delta.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""Bulk-upload the delta to Hoffman's API.
|
||||
|
||||
Reads delta_to_upload.txt (produced by compute-delta.py), POSTs to
|
||||
/api/v1/TestReportDataFiles/bulk in batches, retries on transient errors,
|
||||
refreshes token before expiry, logs results.
|
||||
|
||||
Usage:
|
||||
python upload-delta.py [--batch 100] [--dry-run] [--limit N] [--start N]
|
||||
"""
|
||||
import argparse, json, os, subprocess, sys, time, urllib.request, urllib.error, urllib.parse
|
||||
import yaml
|
||||
|
||||
DELTA = r'C:\Users\guru\AppData\Local\Temp\delta_to_upload.txt'
|
||||
LOG_DIR = r'D:\claudetools\projects\dataforth-dos\datasheet-pipeline\upload-logs'
|
||||
os.makedirs(LOG_DIR, exist_ok=True)
|
||||
LOG_PATH = os.path.join(LOG_DIR, time.strftime('upload-%Y%m%d-%H%M%S.log'))
|
||||
|
||||
VAULT = 'D:/vault/clients/dataforth/api-oauth.sops.yaml'
|
||||
|
||||
def load_creds():
|
||||
r = subprocess.run(['sops','-d',VAULT], capture_output=True, text=True, timeout=30, check=True)
|
||||
y = yaml.safe_load(r.stdout)
|
||||
return {
|
||||
'token_url': y['endpoints']['token-url'],
|
||||
'api_base': y['endpoints']['api-base'],
|
||||
'client_id': y['credentials']['client-id'],
|
||||
'client_secret': y['credentials']['client-secret'],
|
||||
'scope': y['credentials']['scope'],
|
||||
}
|
||||
|
||||
CREDS = load_creds()
|
||||
_token = {'value': None, 'expires_at': 0}
|
||||
|
||||
def get_token(force=False):
|
||||
if not force and _token['value'] and time.time() < _token['expires_at'] - 60:
|
||||
return _token['value']
|
||||
body = urllib.parse.urlencode({
|
||||
'grant_type':'client_credentials',
|
||||
'client_id':CREDS['client_id'],
|
||||
'client_secret':CREDS['client_secret'],
|
||||
'scope':CREDS['scope'],
|
||||
}).encode()
|
||||
req = urllib.request.Request(CREDS['token_url'], data=body, method='POST',
|
||||
headers={'Content-Type':'application/x-www-form-urlencoded'})
|
||||
with urllib.request.urlopen(req, timeout=30) as r:
|
||||
t = json.loads(r.read())
|
||||
_token['value'] = t['access_token']
|
||||
_token['expires_at'] = time.time() + t.get('expires_in', 3600)
|
||||
return _token['value']
|
||||
|
||||
def api_post(path, body):
|
||||
"""POST with one retry on 401/transient."""
|
||||
url = f"{CREDS['api_base']}{path}"
|
||||
data = json.dumps(body).encode()
|
||||
for attempt in range(2):
|
||||
token = get_token(force=(attempt > 0))
|
||||
req = urllib.request.Request(url, data=data, method='POST',
|
||||
headers={'Authorization':f'Bearer {token}','Content-Type':'application/json'})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=120) as r:
|
||||
return r.status, json.loads(r.read())
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 401 and attempt == 0:
|
||||
continue
|
||||
try: return e.code, json.loads(e.read())
|
||||
except Exception: return e.code, {'_raw': e.read().decode(errors='replace')[:500]}
|
||||
except (urllib.error.URLError, TimeoutError) as e:
|
||||
if attempt == 0:
|
||||
time.sleep(5); continue
|
||||
return 0, {'_error': str(e)}
|
||||
|
||||
def load_delta(start=0, limit=0):
|
||||
items = []
|
||||
with open(DELTA, encoding='utf-8') as f:
|
||||
for i, line in enumerate(f):
|
||||
if i < start: continue
|
||||
if limit and len(items) >= limit: break
|
||||
parts = line.rstrip('\n').split('|')
|
||||
if len(parts) < 4: continue
|
||||
sn, path, size, mtime = parts[0], parts[1], parts[2], parts[3]
|
||||
items.append({'sn': sn, 'path': path, 'size': int(size)})
|
||||
return items
|
||||
|
||||
def read_content(path):
|
||||
with open(path, 'rb') as f:
|
||||
return f.read().decode('utf-8', errors='replace')
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument('--batch', type=int, default=100, help='items per /bulk POST')
|
||||
ap.add_argument('--dry-run', action='store_true')
|
||||
ap.add_argument('--limit', type=int, default=0, help='cap total processed (0 = all)')
|
||||
ap.add_argument('--start', type=int, default=0, help='skip first N rows in delta file')
|
||||
args = ap.parse_args()
|
||||
|
||||
print(f'[INFO] delta file: {DELTA}')
|
||||
print(f'[INFO] log file: {LOG_PATH}')
|
||||
items = load_delta(start=args.start, limit=args.limit)
|
||||
print(f'[INFO] {len(items)} items queued (start={args.start} limit={args.limit or "all"}, batch={args.batch})')
|
||||
print(f'[INFO] dry-run: {args.dry_run}')
|
||||
|
||||
if not args.dry_run:
|
||||
token = get_token()
|
||||
print(f'[OK] token len={len(token)}')
|
||||
|
||||
log = open(LOG_PATH, 'w', encoding='utf-8')
|
||||
log.write(f'# upload run {time.strftime("%Y-%m-%d %H:%M:%S")}\n')
|
||||
log.write(f'# delta size: {len(items)}, batch: {args.batch}, dry_run: {args.dry_run}\n')
|
||||
|
||||
totals = {'received':0,'created':0,'updated':0,'unchanged':0,'errors':0}
|
||||
t0 = time.time()
|
||||
n = len(items)
|
||||
for i in range(0, n, args.batch):
|
||||
chunk = items[i:i+args.batch]
|
||||
# Read file content for each
|
||||
bulk = []
|
||||
for it in chunk:
|
||||
try:
|
||||
bulk.append({'SerialNumber': it['sn'], 'Content': read_content(it['path'])})
|
||||
except Exception as e:
|
||||
log.write(f'READ_FAIL {it["sn"]} {it["path"]} {e}\n')
|
||||
totals['errors'] += 1
|
||||
|
||||
if args.dry_run:
|
||||
print(f' [DRY] batch {i//args.batch + 1}: {len(bulk)} items (first sn={bulk[0]["SerialNumber"] if bulk else "-"})')
|
||||
continue
|
||||
|
||||
status, resp = api_post('/api/v1/TestReportDataFiles/bulk', {'Items': bulk})
|
||||
if status != 200:
|
||||
print(f' [FAIL] batch {i//args.batch+1} HTTP {status}: {resp}')
|
||||
log.write(f'BATCH_FAIL idx={i} status={status} resp={resp}\n')
|
||||
totals['errors'] += len(bulk)
|
||||
continue
|
||||
totals['received'] += resp.get('TotalReceived', 0)
|
||||
totals['created'] += resp.get('Created', 0)
|
||||
totals['updated'] += resp.get('Updated', 0)
|
||||
totals['unchanged']+= resp.get('Unchanged', 0)
|
||||
for err in (resp.get('Errors') or []):
|
||||
log.write(f'BATCH_ITEM_ERR {err}\n')
|
||||
totals['errors'] += 1
|
||||
|
||||
rate = (i + len(chunk)) / max(1, time.time() - t0)
|
||||
eta_s = int((n - (i + len(chunk))) / max(1, rate))
|
||||
print(f' batch {i//args.batch+1:>4}/{(n+args.batch-1)//args.batch}: '
|
||||
f'recv={resp.get("TotalReceived",0)} '
|
||||
f'cre={resp.get("Created",0)} upd={resp.get("Updated",0)} unch={resp.get("Unchanged",0)} '
|
||||
f'err={len(resp.get("Errors") or [])} | rate={rate:.0f}/s eta={eta_s}s')
|
||||
log.write(f'BATCH idx={i} rcv={resp.get("TotalReceived")} cre={resp.get("Created")} upd={resp.get("Updated")} unch={resp.get("Unchanged")} err={len(resp.get("Errors") or [])}\n')
|
||||
|
||||
elapsed = time.time() - t0
|
||||
print(f'\n[DONE] elapsed {elapsed:.1f}s')
|
||||
print(f' received: {totals["received"]}')
|
||||
print(f' created: {totals["created"]}')
|
||||
print(f' updated: {totals["updated"]}')
|
||||
print(f' unchanged: {totals["unchanged"]}')
|
||||
print(f' errors: {totals["errors"]}')
|
||||
log.write(f'\n# totals: {totals}\n# elapsed: {elapsed:.1f}s\n')
|
||||
log.close()
|
||||
print(f'[INFO] log: {LOG_PATH}')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -0,0 +1,79 @@
|
||||
"""Build delta = entire For_Web folder, upload via upload-delta.js.
|
||||
|
||||
Server is idempotent; already-present items return Unchanged, new ones Created.
|
||||
Avoids the slow full server-inventory pull when we just want to drain new content.
|
||||
"""
|
||||
import base64, paramiko, subprocess, time, threading, yaml
|
||||
|
||||
ad2_pwd = yaml.safe_load(subprocess.run(['sops','-d','D:/vault/clients/dataforth/ad2.sops.yaml'],
|
||||
capture_output=True, text=True, timeout=30, check=True).stdout)['credentials']['password'].replace('\\','')
|
||||
api = yaml.safe_load(subprocess.run(['sops','-d','D:/vault/clients/dataforth/api-oauth.sops.yaml'],
|
||||
capture_output=True, text=True, timeout=30, check=True).stdout)
|
||||
|
||||
REMOTE_DIR = 'C:/Users/sysadmin/Documents/dataforth-uploader'
|
||||
DELTA_NAME = 'delta_for_web_all.txt'
|
||||
|
||||
c = paramiko.SSHClient(); c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
c.connect('192.168.0.6', username='sysadmin', password=ad2_pwd,
|
||||
timeout=30, banner_timeout=45, look_for_keys=False, allow_agent=False)
|
||||
|
||||
def ps_b64(cmd, to=300):
|
||||
enc = base64.b64encode(cmd.encode('utf-16-le')).decode()
|
||||
_, o, e = c.exec_command(f'powershell -NoProfile -EncodedCommand {enc}', timeout=to)
|
||||
return o.read().decode('utf-8','replace'), e.read().decode('utf-8','replace')
|
||||
|
||||
print('[1] enumerate For_Web on AD2')
|
||||
ps = (
|
||||
f'$out = "{REMOTE_DIR}/{DELTA_NAME}"; '
|
||||
r'Get-ChildItem "C:\Shares\webshare\For_Web" -File -Filter *.TXT | '
|
||||
r'ForEach-Object { '
|
||||
r' $sn = [System.IO.Path]::GetFileNameWithoutExtension($_.Name); '
|
||||
r' "$sn|$($_.FullName)|$($_.Length)|$($_.LastWriteTime.ToString("o"))" '
|
||||
r'} | Set-Content -Path $out -Encoding ASCII; '
|
||||
r'(Get-Content $out).Count'
|
||||
)
|
||||
out, err = ps_b64(ps, to=180)
|
||||
print(f' delta entries: {out.strip()}')
|
||||
if err.strip() and 'CLIXML' not in err: print('[stderr]', err[:300])
|
||||
|
||||
print('\n[2] upload all via upload-delta.js (idempotent)')
|
||||
ps_upload = (
|
||||
f'$env:CF_TOKEN_URL = "{api["endpoints"]["token-url"]}"; '
|
||||
f'$env:CF_API_BASE = "{api["endpoints"]["api-base"]}"; '
|
||||
f'$env:CF_CLIENT_ID = "{api["credentials"]["client-id"]}"; '
|
||||
f'$env:CF_CLIENT_SECRET = "{api["credentials"]["client-secret"]}"; '
|
||||
f'$env:CF_SCOPE = "{api["credentials"]["scope"]}"; '
|
||||
f'cd "{REMOTE_DIR}"; '
|
||||
f'& node upload-delta.js --delta "{REMOTE_DIR}/{DELTA_NAME}" --batch 100 2>&1'
|
||||
)
|
||||
enc = base64.b64encode(ps_upload.encode('utf-16-le')).decode()
|
||||
stdin, stdout, stderr = c.exec_command(f'powershell -NoProfile -EncodedCommand {enc}', timeout=3600)
|
||||
|
||||
def reader(s):
|
||||
for line in iter(lambda: s.readline(), ''):
|
||||
if not line: break
|
||||
print(line.rstrip(), flush=True)
|
||||
t = threading.Thread(target=reader, args=(stdout,), daemon=True); t.start()
|
||||
t2 = threading.Thread(target=reader, args=(stderr,), daemon=True); t2.start()
|
||||
t0 = time.time()
|
||||
while time.time() - t0 < 3600:
|
||||
if stdout.channel.exit_status_ready(): break
|
||||
time.sleep(1)
|
||||
t.join(timeout=5); t2.join(timeout=5)
|
||||
rc = stdout.channel.recv_exit_status() if stdout.channel.exit_status_ready() else -1
|
||||
print(f'\n[upload rc={rc}]')
|
||||
|
||||
print('\n[3] post-upload server stats')
|
||||
import json, urllib.request, urllib.parse
|
||||
body = urllib.parse.urlencode({'grant_type':api['credentials']['grant-type'],
|
||||
'client_id':api['credentials']['client-id'],
|
||||
'client_secret':api['credentials']['client-secret'],
|
||||
'scope':api['credentials']['scope']}).encode()
|
||||
req = urllib.request.Request(api['endpoints']['token-url'], data=body, method='POST',
|
||||
headers={'Content-Type':'application/x-www-form-urlencoded'})
|
||||
tok = json.loads(urllib.request.urlopen(req,timeout=30).read())['access_token']
|
||||
req = urllib.request.Request(f'{api["endpoints"]["api-base"]}/api/v1/TestReportDataFiles/stats',
|
||||
headers={'Authorization':f'Bearer {tok}'})
|
||||
print(json.dumps(json.loads(urllib.request.urlopen(req,timeout=30).read()), indent=2))
|
||||
|
||||
c.close()
|
||||
Reference in New Issue
Block a user