Extends the Test Datasheet Pipeline on AD2:C:\Shares\testdatadb to
generate web-published datasheets for the SCMVAS-Mxxx (obsolete) and
SCMHVAS-Mxxxx (replacement) High Voltage Input Module product lines.
Both are tested either with the existing TESTHV3 software (production
VASLOG .DAT logs) or in Engineering with plain .txt output.
Key changes on AD2 (all deployed 2026-04-12 with dated backups):
- parsers/spec-reader.js: getSpecs() returns a `{_family:'SCMVAS',
_noSpecs:true}` sentinel for SCMVAS/SCMHVAS/VAS-M/HVAS-M model prefixes
so the export pipeline does not silently skip them for missing specs.
- templates/datasheet-exact.js: new Accuracy-only template branch
(generateSCMVASDatasheet + helpers) that mirrors the existing shipped
format byte-for-byte. Extraction regex covers both QuickBASIC STR$()
output formats: scientific-with-trailing-status-digit (98.4% of
records) and plain-decimal (1.6% of records above QB's threshold).
- parsers/vaslog-engtxt.js (new): parses the Engineering-Tested .txt
files in TS-3R\LOGS\VASLOG\VASLOG - Engineering Tested\. Filename SN
regex strips optional trailing 14-digit timestamp; in-file "SN:"
header is the authoritative source when the filename is malformed.
- database/import.js: LOG_TYPES grows a VASLOG_ENG entry with
subfolder + recursive flags. Pre-existing 7 log types keep their
implicit recursive=true behaviour (config.recursive !== false).
importFiles() routes VASLOG_ENG paths before the generic loop so a
VASLOG - Engineering Tested/*.txt path does not mis-dispatch to the
multiline parser.
- database/export-datasheets.js: VASLOG_ENG records are written
verbatim via fs.copyFileSync(source_file, For_Web/<SN>.TXT) for true
byte-level pass-through, with a graceful raw_data fallback when the
source file is no longer on disk.
Deploy outcome:
- 27,503 SCMVAS/SCMHVAS datasheets rendered (27,065 from scientific +
438 from plain-decimal PASS lines, post-patch rerun)
- 434 Engineering-Tested .txt files pass-through-copied to For_Web
- 0 errors across both batches
Repo layout added here:
- scmvas-hvas-research/: discovery artifacts (source .BAS, hvin.dat,
sample .DAT + .txt, binary-format notes, IMPLEMENTATION_PLAN.md)
- implementation/: staged final code + deploy helpers + local test
harness + per-step verification scripts
- backups/pre-deploy-20260412/: independent local snapshot of the 4
AD2 files replaced, pulled byte-for-byte before deploy
All helper scripts fetch the AD2 password at runtime from the SOPS
vault (clients/dataforth/ad2.sops.yaml). None of the committed files
contain the plaintext credential. Known vault-entry hygiene issue
(stale shell-escape backslash before the `!`) is documented in the
fetcher comments and stripped at read-time; flagged separately for
cleanup.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
225 lines
7.3 KiB
Python
225 lines
7.3 KiB
Python
"""
|
|
Deploy staged pipeline changes to AD2:C:\\Shares\\testdatadb\\.
|
|
|
|
Backs up each existing target to <name>.bak-YYYYMMDD before overwriting.
|
|
Fails if a target file does not exist on AD2 (excluding brand-new files
|
|
declared in NEW_FILES below).
|
|
|
|
Usage:
|
|
python deploy-to-ad2.py --dry-run
|
|
python deploy-to-ad2.py
|
|
|
|
Credentials: fetched at runtime from the SOPS vault
|
|
(clients/dataforth/ad2.sops.yaml -> credentials.password). No hardcoded
|
|
password; no env-var / prompt fallback. Fails loud if the vault read fails.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
import paramiko
|
|
|
|
HOST = '192.168.0.6'
|
|
USER = 'sysadmin'
|
|
|
|
VAULT_SH = 'D:/vault/scripts/vault.sh'
|
|
VAULT_ENTRY = 'clients/dataforth/ad2.sops.yaml'
|
|
VAULT_FIELD = 'credentials.password'
|
|
|
|
|
|
def get_ad2_password() -> str:
|
|
"""Fetch the AD2 sysadmin password from the SOPS vault.
|
|
|
|
Fails loud (raises) on any error: missing vault, decryption failure,
|
|
empty value. Do NOT fall back to env vars or prompts -- per CLAUDE.md
|
|
deploy scripts must not hold credentials.
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
['bash', VAULT_SH, 'get-field', VAULT_ENTRY, VAULT_FIELD],
|
|
capture_output=True, text=True, timeout=30, check=False,
|
|
)
|
|
except FileNotFoundError as e:
|
|
raise RuntimeError(
|
|
f'[FAIL] vault helper not runnable: {VAULT_SH} ({e})'
|
|
) from e
|
|
except subprocess.TimeoutExpired as e:
|
|
raise RuntimeError(
|
|
f'[FAIL] vault read timed out after 30s for {VAULT_ENTRY}'
|
|
) from e
|
|
|
|
if result.returncode != 0:
|
|
stderr = (result.stderr or '').strip()
|
|
raise RuntimeError(
|
|
f'[FAIL] vault read failed (rc={result.returncode}) for '
|
|
f'{VAULT_ENTRY}:{VAULT_FIELD}: {stderr}'
|
|
)
|
|
|
|
pwd = (result.stdout or '').strip()
|
|
if not pwd:
|
|
raise RuntimeError(
|
|
f'[FAIL] vault returned empty value for {VAULT_ENTRY}:{VAULT_FIELD}'
|
|
)
|
|
return pwd
|
|
|
|
REMOTE_ROOT = 'C:/Shares/testdatadb'
|
|
LOCAL_ROOT = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Deployment file lists. Each list has different semantics:
|
|
#
|
|
# UPDATE_FILES -- file MUST already exist on AD2. Backup-then-overwrite.
|
|
# Fails loud if the remote file is missing (that's a drift
|
|
# signal -- something changed on the box we didn't expect).
|
|
#
|
|
# NEW_FILES -- file must NOT already exist on AD2. Creates it.
|
|
# Fails loud if the remote file is already present (we would
|
|
# otherwise silently clobber something we didn't back up).
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Files that already exist on AD2 and will be backed up + overwritten.
|
|
UPDATE_FILES = [
|
|
('parsers/spec-reader.js', 'parsers/spec-reader.js'),
|
|
('templates/datasheet-exact.js', 'templates/datasheet-exact.js'),
|
|
('database/import.js', 'database/import.js'),
|
|
('database/export-datasheets.js', 'database/export-datasheets.js'),
|
|
]
|
|
|
|
# Files that do NOT yet exist on AD2 and must be created fresh.
|
|
NEW_FILES = [
|
|
('parsers/vaslog-engtxt.js', 'parsers/vaslog-engtxt.js'),
|
|
]
|
|
|
|
|
|
def connect() -> paramiko.SSHClient:
|
|
pwd = get_ad2_password()
|
|
c = paramiko.SSHClient()
|
|
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
c.connect(
|
|
HOST, username=USER, password=pwd,
|
|
timeout=15, look_for_keys=False, allow_agent=False, banner_timeout=30,
|
|
)
|
|
return c
|
|
|
|
|
|
def remote_exists(sftp: paramiko.SFTPClient, path: str) -> bool:
|
|
try:
|
|
sftp.stat(path)
|
|
return True
|
|
except IOError:
|
|
return False
|
|
|
|
|
|
def to_remote(rel: str) -> str:
|
|
return f'{REMOTE_ROOT}/{rel}'
|
|
|
|
|
|
def backup_and_copy(sftp: paramiko.SFTPClient, ssh: paramiko.SSHClient,
|
|
local_rel: str, remote_rel: str, dry_run: bool, stamp: str) -> None:
|
|
local_path = os.path.join(LOCAL_ROOT, local_rel.replace('/', os.sep))
|
|
remote_path = to_remote(remote_rel)
|
|
backup_path = f'{remote_path}.bak-{stamp}'
|
|
|
|
if not os.path.isfile(local_path):
|
|
raise FileNotFoundError(f'[FAIL] local file missing: {local_path}')
|
|
|
|
if not remote_exists(sftp, remote_path):
|
|
raise FileNotFoundError(f'[FAIL] remote file missing on AD2: {remote_path}')
|
|
|
|
print(f'[INFO] {remote_rel}')
|
|
if dry_run:
|
|
print(f' would back up to: {backup_path}')
|
|
print(f' would upload: {local_path} -> {remote_path}')
|
|
return
|
|
|
|
# Backup via SFTP copy (read + re-upload). Paramiko has no server-side copy.
|
|
with sftp.open(remote_path, 'rb') as src:
|
|
data = src.read()
|
|
with sftp.open(backup_path, 'wb') as dst:
|
|
dst.write(data)
|
|
print(f' backup: {backup_path} ({len(data)} bytes)')
|
|
|
|
sftp.put(local_path, remote_path)
|
|
size = os.path.getsize(local_path)
|
|
print(f' uploaded: {local_path} -> {remote_path} ({size} bytes)')
|
|
|
|
|
|
def create_new(sftp: paramiko.SFTPClient, local_rel: str, remote_rel: str,
|
|
dry_run: bool) -> None:
|
|
"""Create a file that is expected to be NEW on AD2.
|
|
|
|
Fails loud if the remote file already exists -- NEW_FILES declares this
|
|
is a brand-new file, so pre-existence is a drift signal. If a previous
|
|
deploy partially ran, clean up manually or move the entry to
|
|
UPDATE_FILES.
|
|
"""
|
|
local_path = os.path.join(LOCAL_ROOT, local_rel.replace('/', os.sep))
|
|
remote_path = to_remote(remote_rel)
|
|
|
|
if not os.path.isfile(local_path):
|
|
raise FileNotFoundError(f'[FAIL] local file missing: {local_path}')
|
|
|
|
print(f'[INFO] {remote_rel} (NEW)')
|
|
|
|
if remote_exists(sftp, remote_path):
|
|
raise FileExistsError(
|
|
f'[FAIL] remote target already exists but is declared NEW: {remote_path} '
|
|
f'-- move to UPDATE_FILES or remove remote manually'
|
|
)
|
|
|
|
if dry_run:
|
|
print(f' would create: {local_path} -> {remote_path}')
|
|
return
|
|
|
|
sftp.put(local_path, remote_path)
|
|
size = os.path.getsize(local_path)
|
|
print(f' created: {remote_path} ({size} bytes)')
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument('--dry-run', action='store_true', help='print actions without writing')
|
|
args = ap.parse_args()
|
|
|
|
stamp = datetime.date.today().strftime('%Y%m%d')
|
|
|
|
print('=' * 72)
|
|
print('Deploy staged pipeline changes to AD2')
|
|
print('=' * 72)
|
|
print(f'Host: {HOST}')
|
|
print(f'Remote root: {REMOTE_ROOT}')
|
|
print(f'Local root: {LOCAL_ROOT}')
|
|
print(f'Dry run: {args.dry_run}')
|
|
print(f'Backup tag: .bak-{stamp}')
|
|
print('')
|
|
|
|
ssh = connect()
|
|
try:
|
|
sftp = ssh.open_sftp()
|
|
try:
|
|
for local_rel, remote_rel in UPDATE_FILES:
|
|
backup_and_copy(sftp, ssh, local_rel, remote_rel, args.dry_run, stamp)
|
|
|
|
for local_rel, remote_rel in NEW_FILES:
|
|
create_new(sftp, local_rel, remote_rel, args.dry_run)
|
|
finally:
|
|
sftp.close()
|
|
finally:
|
|
ssh.close()
|
|
|
|
print('')
|
|
print('[OK] done' if not args.dry_run else '[OK] dry-run complete (no changes made)')
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
sys.exit(main())
|
|
except Exception as e:
|
|
print(f'[FAIL] {e}', file=sys.stderr)
|
|
sys.exit(1)
|