/** * Bulk-upload delta to Dataforth API. * * Reads delta_to_upload.txt (pipe-delimited: SerialNumber|Path|Size|MTime), * batches into POST /api/v1/TestReportDataFiles/bulk, refreshes token before * expiry, logs result line-by-line. * * Required env vars: * CF_TOKEN_URL, CF_API_BASE, CF_CLIENT_ID, CF_CLIENT_SECRET, CF_SCOPE * * Usage: node upload-delta.js [--delta path] [--batch 100] [--limit N] * [--start N] [--dry-run] */ const fs = require('fs'); const path = require('path'); const https = require('https'); const url = require('url'); const args = process.argv.slice(2); function arg(name, dflt) { const i = args.indexOf(name); if (i < 0) return dflt; return args[i+1]; } const flag = (name) => args.includes(name); const DELTA = arg('--delta', 'C:\\Users\\sysadmin\\Documents\\dataforth-uploader\\delta_to_upload.txt'); const BATCH = parseInt(arg('--batch', '100'), 10); const LIMIT = parseInt(arg('--limit', '0'), 10); const START = parseInt(arg('--start', '0'), 10); const DRY = flag('--dry-run'); const CRED = { tokenUrl: process.env.CF_TOKEN_URL, apiBase: process.env.CF_API_BASE, clientId: process.env.CF_CLIENT_ID, clientSecret: process.env.CF_CLIENT_SECRET, scope: process.env.CF_SCOPE, }; for (const k of Object.keys(CRED)) { if (!CRED[k] && !DRY) { console.error(`[FAIL] missing env var for ${k}`); process.exit(1); } } const LOG_DIR = path.join(path.dirname(DELTA), 'upload-logs'); fs.mkdirSync(LOG_DIR, {recursive: true}); const LOG_PATH = path.join(LOG_DIR, `upload-${new Date().toISOString().replace(/[:.]/g,'-').slice(0,19)}.log`); const log = fs.createWriteStream(LOG_PATH); let tokenCache = {value: null, expiresAt: 0}; function postForm(uri, formObj) { return new Promise((resolve, reject) => { const u = new url.URL(uri); const body = Object.entries(formObj).map(([k,v]) => `${encodeURIComponent(k)}=${encodeURIComponent(v)}`).join('&'); const req = https.request({ hostname: u.hostname, port: u.port||443, path: u.pathname + u.search, method:'POST', headers:{'Content-Type':'application/x-www-form-urlencoded','Content-Length':Buffer.byteLength(body)}, timeout: 30000, }, res => { let data=''; res.on('data', c => data+=c); res.on('end', () => { try { resolve({status:res.statusCode, body:JSON.parse(data)}); } catch (e) { resolve({status:res.statusCode, body:{_raw:data}}); } }); }); req.on('error', reject); req.on('timeout', () => req.destroy(new Error('timeout'))); req.write(body); req.end(); }); } function postJson(uri, jsonObj, token) { return new Promise((resolve, reject) => { const u = new url.URL(uri); const body = JSON.stringify(jsonObj); const req = https.request({ hostname: u.hostname, port: u.port||443, path: u.pathname + u.search, method:'POST', headers:{ 'Authorization':`Bearer ${token}`, 'Content-Type':'application/json', 'Content-Length':Buffer.byteLength(body), }, timeout: 180000, }, res => { let data=''; res.on('data', c => data+=c); res.on('end', () => { try { resolve({status:res.statusCode, body:JSON.parse(data)}); } catch (e) { resolve({status:res.statusCode, body:{_raw:data}}); } }); }); req.on('error', reject); req.on('timeout', () => req.destroy(new Error('timeout'))); req.write(body); req.end(); }); } async function getToken(force=false) { if (!force && tokenCache.value && Date.now() < tokenCache.expiresAt - 60000) { return tokenCache.value; } const r = await postForm(CRED.tokenUrl, { grant_type: 'client_credentials', client_id: CRED.clientId, client_secret: CRED.clientSecret, scope: CRED.scope, }); if (r.status !== 200 || !r.body.access_token) { throw new Error(`token fetch failed: ${r.status} ${JSON.stringify(r.body)}`); } tokenCache.value = r.body.access_token; tokenCache.expiresAt = Date.now() + (r.body.expires_in || 3600) * 1000; return tokenCache.value; } async function bulkUpload(items) { for (let attempt = 0; attempt < 2; attempt++) { const token = await getToken(attempt > 0); try { const r = await postJson(`${CRED.apiBase}/api/v1/TestReportDataFiles/bulk`, {Items: items}, token); if (r.status === 401 && attempt === 0) continue; return r; } catch (e) { if (attempt === 0) { await new Promise(r => setTimeout(r, 5000)); continue; } return {status: 0, body: {_error: e.message}}; } } } function loadDelta() { const items = []; const lines = fs.readFileSync(DELTA, 'utf-8').split(/\r?\n/); for (let i = 0; i < lines.length; i++) { if (i < START) continue; if (LIMIT && items.length >= LIMIT) break; const parts = lines[i].split('|'); if (parts.length < 4) continue; items.push({sn: parts[0], path: parts[1], size: parseInt(parts[2],10) || 0}); } return items; } (async () => { console.log(`[INFO] delta file: ${DELTA}`); console.log(`[INFO] log file: ${LOG_PATH}`); const items = loadDelta(); console.log(`[INFO] ${items.length} items queued (start=${START} limit=${LIMIT||'all'} batch=${BATCH})`); console.log(`[INFO] dry-run: ${DRY}`); log.write(`# upload run ${new Date().toISOString()}\n`); log.write(`# delta=${items.length} batch=${BATCH} dry=${DRY}\n`); if (!DRY) { const t = await getToken(); console.log(`[OK] token len=${t.length}`); } const totals = {received:0, created:0, updated:0, unchanged:0, errors:0}; const t0 = Date.now(); const nBatches = Math.ceil(items.length / BATCH); for (let i = 0; i < items.length; i += BATCH) { const chunk = items.slice(i, i + BATCH); const bulk = []; for (const it of chunk) { try { const buf = fs.readFileSync(it.path); bulk.push({SerialNumber: it.sn, Content: buf.toString('utf-8')}); } catch (e) { log.write(`READ_FAIL ${it.sn} ${it.path} ${e.message}\n`); totals.errors++; } } if (DRY) { console.log(` [DRY] batch ${(i/BATCH)+1}/${nBatches}: ${bulk.length} items`); continue; } const r = await bulkUpload(bulk); if (r.status !== 200) { console.log(` [FAIL] batch ${(i/BATCH)+1} HTTP ${r.status}: ${JSON.stringify(r.body).slice(0,300)}`); log.write(`BATCH_FAIL idx=${i} status=${r.status} body=${JSON.stringify(r.body).slice(0,500)}\n`); totals.errors += bulk.length; continue; } totals.received += r.body.TotalReceived || 0; totals.created += r.body.Created || 0; totals.updated += r.body.Updated || 0; totals.unchanged += r.body.Unchanged || 0; for (const e of (r.body.Errors || [])) { log.write(`BATCH_ITEM_ERR ${e}\n`); totals.errors++; } const done = i + chunk.length; const rate = done / Math.max(1, (Date.now()-t0)/1000); const etaS = Math.round((items.length - done) / Math.max(1, rate)); console.log(` batch ${(i/BATCH)+1}/${nBatches}: recv=${r.body.TotalReceived} cre=${r.body.Created} upd=${r.body.Updated} unch=${r.body.Unchanged} err=${(r.body.Errors||[]).length} | rate=${rate.toFixed(0)}/s eta=${etaS}s`); log.write(`BATCH idx=${i} rcv=${r.body.TotalReceived} cre=${r.body.Created} upd=${r.body.Updated} unch=${r.body.Unchanged} err=${(r.body.Errors||[]).length}\n`); } const elapsed = (Date.now() - t0) / 1000; console.log(`\n[DONE] elapsed ${elapsed.toFixed(1)}s`); for (const [k,v] of Object.entries(totals)) console.log(` ${k}: ${v}`); log.write(`\n# totals ${JSON.stringify(totals)}\n# elapsed ${elapsed.toFixed(1)}s\n`); log.end(); console.log(`[INFO] log: ${LOG_PATH}`); })().catch(e => { console.error('[FATAL]', e); process.exit(1); });