351 lines
15 KiB
Python
351 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Build / inspect the ALIS Staff Import workbook for the `alis` skill.
|
|
|
|
ALIS has NO staff-write API. Staff (and their logins) are created/changed in
|
|
bulk by uploading an .xls on the ALIS web UI (Staff -> Import). This module
|
|
produces a workbook that exactly matches Medtelligent's template so it uploads
|
|
cleanly, and validates each row against the template's controlled values and
|
|
against the live job-role -> security-role reference (role-map.json).
|
|
|
|
Template (Sheet1 header, exact order - DO NOT reorder/rename):
|
|
First Name | Last Name | Staff Record Number | Security Roles | Staff Status |
|
|
Hire Date | Login Enabled | Email | Password | Date of Birth | Gender |
|
|
Job Role | Cell Phone
|
|
Sheet2 holds the dropdown lists:
|
|
Staff Status: Applicant / Discharged / Hired / Rejected
|
|
Login Enabled: Yes / No
|
|
Gender: Female / Male
|
|
|
|
Write uses xlwt (legacy .xls, matching the template format). Read uses xlrd.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
import secrets
|
|
import string
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
# ALIS has TWO import layouts (column order matters for the importer):
|
|
# - CREATE (blank "new staff" template): has Password, NO ALIS ID. Rows WITHOUT
|
|
# an ALIS ID are created as new staff.
|
|
# - UPDATE (export of current staff): leads with ALIS ID, NO Password. Rows WITH
|
|
# an ALIS ID update the matching existing staff member.
|
|
# We build CREATE files for new hires by default.
|
|
TEMPLATE_HEADERS = [
|
|
"First Name", "Last Name", "Staff Record Number", "Security Roles",
|
|
"Staff Status", "Hire Date", "Login Enabled", "Email", "Password",
|
|
"Date of Birth", "Gender", "Job Role", "Cell Phone",
|
|
]
|
|
UPDATE_TEMPLATE_HEADERS = [
|
|
"ALIS ID", "First Name", "Last Name", "Staff Record Number", "Security Roles",
|
|
"Staff Status", "Hire Date", "Login Enabled", "Email",
|
|
"Date of Birth", "Gender", "Job Role", "Cell Phone",
|
|
]
|
|
HEADERS_BY_FORMAT = {"create": TEMPLATE_HEADERS, "update": UPDATE_TEMPLATE_HEADERS}
|
|
|
|
# ALIS dates render as MM/DD/YYYY (confirmed from the real staff export).
|
|
DATE_FORMAT_HINT = "MM/DD/YYYY"
|
|
|
|
# Sheet2 dropdown lists (mirror the official template).
|
|
STATUS_VALUES = ["Applicant", "Discharged", "Hired", "Rejected"]
|
|
LOGIN_VALUES = ["Yes", "No"]
|
|
GENDER_VALUES = ["Female", "Male"]
|
|
|
|
# Accepted input aliases -> canonical header. Lower-cased, stripped, non-alnum
|
|
# removed for matching, so "first_name", "FirstName", "First Name" all map.
|
|
_ALIASES = {
|
|
"alisid": "ALIS ID", "alis": "ALIS ID", "staffaliasid": "ALIS ID",
|
|
"firstname": "First Name", "first": "First Name", "fname": "First Name",
|
|
"lastname": "Last Name", "last": "Last Name", "lname": "Last Name",
|
|
"staffrecordnumber": "Staff Record Number", "recordnumber": "Staff Record Number",
|
|
"recordno": "Staff Record Number", "employeeid": "Staff Record Number",
|
|
"empid": "Staff Record Number", "staffid": "Staff Record Number",
|
|
"securityroles": "Security Roles", "securityrole": "Security Roles",
|
|
"role": "Security Roles", "accessrole": "Security Roles",
|
|
"staffstatus": "Staff Status", "status": "Staff Status",
|
|
"hiredate": "Hire Date", "datehired": "Hire Date", "startdate": "Hire Date",
|
|
"loginenabled": "Login Enabled", "login": "Login Enabled",
|
|
"loginaccess": "Login Enabled", "portalaccess": "Login Enabled",
|
|
"email": "Email", "emailaddress": "Email", "workemail": "Email",
|
|
"password": "Password", "pass": "Password", "pwd": "Password",
|
|
"dateofbirth": "Date of Birth", "dob": "Date of Birth", "birthdate": "Date of Birth",
|
|
"gender": "Gender", "sex": "Gender",
|
|
"jobrole": "Job Role", "jobtitle": "Job Role", "title": "Job Role",
|
|
"position": "Job Role",
|
|
"cellphone": "Cell Phone", "cell": "Cell Phone", "mobile": "Cell Phone",
|
|
"phone": "Cell Phone", "mobilephone": "Cell Phone", "cellphonenumber": "Cell Phone",
|
|
}
|
|
|
|
|
|
class ImportBuilderError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _norm(s: str) -> str:
|
|
return "".join(ch for ch in str(s).lower() if ch.isalnum())
|
|
|
|
|
|
def _canon_header(raw: str) -> Optional[str]:
|
|
raw_s = str(raw).strip()
|
|
for h in TEMPLATE_HEADERS:
|
|
if _norm(h) == _norm(raw_s):
|
|
return h
|
|
return _ALIASES.get(_norm(raw_s))
|
|
|
|
|
|
def load_role_map(path: Optional[Path] = None) -> dict:
|
|
"""Load the cached job-role -> security-role reference (role-map.json)."""
|
|
if path is None:
|
|
path = Path(__file__).resolve().parent.parent / "references" / "role-map.json"
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return {}
|
|
|
|
|
|
# --- input parsing ------------------------------------------------------------
|
|
def read_input_rows(path: str) -> list[dict]:
|
|
"""Read a CSV or JSON file into a list of {canonical-header: value} dicts.
|
|
Unknown columns are ignored (with the raw key preserved under '_extra')."""
|
|
p = Path(path)
|
|
if not p.exists():
|
|
raise ImportBuilderError(f"input file not found: {path}")
|
|
text = p.read_text(encoding="utf-8-sig")
|
|
raw_rows: list[dict]
|
|
if p.suffix.lower() == ".json":
|
|
data = json.loads(text)
|
|
raw_rows = data if isinstance(data, list) else data.get("staff") or data.get("rows") or []
|
|
else: # CSV/TSV
|
|
delim = "\t" if p.suffix.lower() in (".tsv", ".tab") else ","
|
|
raw_rows = list(csv.DictReader(text.splitlines(), delimiter=delim))
|
|
rows: list[dict] = []
|
|
for raw in raw_rows:
|
|
row: dict = {}
|
|
extra: dict = {}
|
|
for k, v in raw.items():
|
|
if k is None:
|
|
continue
|
|
canon = _canon_header(k)
|
|
if canon:
|
|
row[canon] = "" if v is None else str(v).strip()
|
|
else:
|
|
extra[k] = v
|
|
if extra:
|
|
row["_extra"] = extra
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
# --- validation + enrichment --------------------------------------------------
|
|
def _suggest_security_role(job_role: str, role_map: dict) -> Optional[str]:
|
|
"""Return the typical FULL Security Roles string (comma-separated, as ALIS
|
|
stores it) for a job role, learned from current staff. Prefers the modal
|
|
exact combo (jobRoleToSecurityRolesCombo); falls back to the older
|
|
single-role map for compatibility."""
|
|
if not job_role:
|
|
return None
|
|
combo = role_map.get("jobRoleToSecurityRolesCombo", {})
|
|
for k, v in combo.items():
|
|
if k.lower() == job_role.lower() and v:
|
|
return v
|
|
# legacy fallback: list of roles -> join
|
|
mapping = role_map.get("jobRoleToSecurityRoles", {})
|
|
for k, v in mapping.items():
|
|
if k.lower() == job_role.lower() and v:
|
|
return ", ".join(v) if isinstance(v, list) else str(v)
|
|
return None
|
|
|
|
|
|
def enrich_and_validate(rows: list[dict], role_map: dict,
|
|
default_status: str = "Hired",
|
|
suggest_roles: bool = True) -> tuple[list[dict], list[dict]]:
|
|
"""Apply defaults, infer Security Roles from Job Role, validate enums.
|
|
Returns (processed_rows, report) where report has per-row notes/warnings."""
|
|
sec_vocab = set(role_map.get("securityRoleVocabulary", []))
|
|
report: list[dict] = []
|
|
out: list[dict] = []
|
|
for i, r in enumerate(rows):
|
|
notes: list[str] = []
|
|
warns: list[str] = []
|
|
# Keep every known header (union of both layouts) so ALIS ID survives for
|
|
# update rows; write_workbook selects the columns per chosen format.
|
|
all_headers = ["ALIS ID"] + TEMPLATE_HEADERS
|
|
row = {h: str(r.get(h, "") or "").strip() for h in all_headers}
|
|
|
|
if not row["First Name"] or not row["Last Name"]:
|
|
warns.append("missing First/Last Name (required)")
|
|
|
|
# Status
|
|
if not row["Staff Status"]:
|
|
row["Staff Status"] = default_status
|
|
notes.append(f"Staff Status defaulted to {default_status}")
|
|
elif row["Staff Status"] not in STATUS_VALUES:
|
|
warns.append(f"Staff Status '{row['Staff Status']}' not in {STATUS_VALUES}")
|
|
|
|
# Gender
|
|
if row["Gender"]:
|
|
g = row["Gender"].strip().capitalize()
|
|
if g in GENDER_VALUES:
|
|
row["Gender"] = g
|
|
else:
|
|
warns.append(f"Gender '{row['Gender']}' not in {GENDER_VALUES}")
|
|
|
|
# Security Roles - infer from Job Role if blank
|
|
if not row["Security Roles"] and suggest_roles:
|
|
sug = _suggest_security_role(row["Job Role"], role_map)
|
|
if sug:
|
|
row["Security Roles"] = sug
|
|
notes.append(f"Security Roles inferred '{sug}' from Job Role "
|
|
f"'{row['Job Role']}' (review)")
|
|
elif row["Job Role"]:
|
|
warns.append(f"no reference Security Role for Job Role "
|
|
f"'{row['Job Role']}' - set manually")
|
|
elif row["Security Roles"] and sec_vocab:
|
|
unknown = [p.strip() for p in row["Security Roles"].split(",")
|
|
if p.strip() and p.strip() not in sec_vocab]
|
|
if unknown:
|
|
warns.append(f"Security Role(s) {unknown} not seen in current staff "
|
|
"- confirm they're real ALIS roles")
|
|
|
|
# Login Enabled - default from presence of email+password
|
|
if not row["Login Enabled"]:
|
|
if row["Email"] and row["Password"]:
|
|
row["Login Enabled"] = "Yes"
|
|
notes.append("Login Enabled defaulted to Yes (email+password present)")
|
|
else:
|
|
row["Login Enabled"] = "No"
|
|
notes.append("Login Enabled defaulted to No (no email/password)")
|
|
elif row["Login Enabled"] not in LOGIN_VALUES:
|
|
warns.append(f"Login Enabled '{row['Login Enabled']}' not in {LOGIN_VALUES}")
|
|
|
|
if row["Login Enabled"] == "Yes" and not row["Email"]:
|
|
warns.append("Login Enabled=Yes but no Email (login needs an email)")
|
|
|
|
out.append(row)
|
|
report.append({
|
|
"row": i + 1,
|
|
"name": f"{row['First Name']} {row['Last Name']}".strip(),
|
|
"notes": notes, "warnings": warns,
|
|
})
|
|
return out, report
|
|
|
|
|
|
def generate_password(length: int = 14) -> str:
|
|
"""Strong random password (letters+digits+symbols), avoids ambiguous chars."""
|
|
alphabet = (string.ascii_uppercase.replace("O", "").replace("I", "")
|
|
+ string.ascii_lowercase.replace("l", "")
|
|
+ string.digits.replace("0", "").replace("1", "")
|
|
+ "!@#$%*?")
|
|
while True:
|
|
pw = "".join(secrets.choice(alphabet) for _ in range(length))
|
|
if (any(c.islower() for c in pw) and any(c.isupper() for c in pw)
|
|
and any(c.isdigit() for c in pw) and any(c in "!@#$%*?" for c in pw)):
|
|
return pw
|
|
|
|
|
|
def fill_passwords(rows: list[dict]) -> list[dict]:
|
|
"""For Login Enabled=Yes rows missing a Password, generate one in place.
|
|
Returns the list of {name, email, password} that were generated."""
|
|
generated = []
|
|
for row in rows:
|
|
if row.get("Login Enabled") == "Yes" and not row.get("Password"):
|
|
pw = generate_password()
|
|
row["Password"] = pw
|
|
generated.append({
|
|
"name": f"{row['First Name']} {row['Last Name']}".strip(),
|
|
"email": row.get("Email", ""), "password": pw,
|
|
})
|
|
return generated
|
|
|
|
|
|
# --- workbook writing ---------------------------------------------------------
|
|
def write_workbook(rows: list[dict], out_path: str, fmt: str = "create") -> str:
|
|
"""Write the rows to an .xls matching the ALIS template (Sheet1 data +
|
|
Sheet2 dropdown lists). fmt='create' (new staff, has Password) or 'update'
|
|
(existing staff, leads with ALIS ID). Returns the output path."""
|
|
headers = HEADERS_BY_FORMAT.get(fmt)
|
|
if headers is None:
|
|
raise ImportBuilderError(f"unknown format '{fmt}' (use create|update)")
|
|
try:
|
|
import xlwt # type: ignore
|
|
except ImportError as exc:
|
|
raise ImportBuilderError(
|
|
"xlwt is required to write .xls. Install with: pip install xlwt"
|
|
) from exc
|
|
|
|
wb = xlwt.Workbook(encoding="utf-8")
|
|
header_style = xlwt.easyxf("font: bold on; align: wrap on")
|
|
s1 = wb.add_sheet("Sheet1")
|
|
for c, h in enumerate(headers):
|
|
s1.write(0, c, h, header_style)
|
|
for ri, row in enumerate(rows, start=1):
|
|
for ci, h in enumerate(headers):
|
|
s1.write(ri, ci, row.get(h, ""))
|
|
|
|
# Sheet2 = validation lists, laid out exactly like the official template:
|
|
# col0 Status, col1 Login Enabled, col2 Gender.
|
|
s2 = wb.add_sheet("Sheet2")
|
|
lists = [STATUS_VALUES, LOGIN_VALUES, GENDER_VALUES]
|
|
for col, values in enumerate(lists):
|
|
for r, v in enumerate(values):
|
|
s2.write(r, col, v)
|
|
|
|
out = Path(out_path)
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
|
wb.save(str(out))
|
|
return str(out)
|
|
|
|
|
|
def write_password_sidecar(generated: list[dict], xls_path: str) -> Optional[str]:
|
|
"""Write generated logins to a sidecar CSV next to the workbook. Contains
|
|
PLAINTEXT passwords - caller must warn + tell the user to distribute then
|
|
delete/vault it. Returns the sidecar path, or None if nothing generated."""
|
|
if not generated:
|
|
return None
|
|
side = Path(xls_path).with_name(Path(xls_path).stem + "_passwords.csv")
|
|
with side.open("w", newline="", encoding="utf-8") as f:
|
|
w = csv.writer(f)
|
|
w.writerow(["Name", "Email", "Password"])
|
|
for g in generated:
|
|
w.writerow([g["name"], g["email"], g["password"]])
|
|
return str(side)
|
|
|
|
|
|
# --- workbook reading (inspect) -----------------------------------------------
|
|
def read_workbook(path: str, max_rows: int = 50) -> dict:
|
|
"""Read an existing .xls (or .xlsx) staff-import workbook for inspection."""
|
|
p = Path(path)
|
|
if not p.exists():
|
|
raise ImportBuilderError(f"workbook not found: {path}")
|
|
if p.suffix.lower() == ".xlsx":
|
|
try:
|
|
import openpyxl # type: ignore
|
|
except ImportError as exc:
|
|
raise ImportBuilderError("openpyxl required for .xlsx") from exc
|
|
wb = openpyxl.load_workbook(str(p), read_only=True, data_only=True)
|
|
sheets = {}
|
|
for sh in wb.worksheets:
|
|
rows = []
|
|
for ri, row in enumerate(sh.iter_rows(values_only=True)):
|
|
if ri >= max_rows:
|
|
break
|
|
rows.append([("" if c is None else c) for c in row])
|
|
sheets[sh.title] = rows
|
|
return {"format": "xlsx", "sheets": sheets}
|
|
try:
|
|
import xlrd # type: ignore
|
|
except ImportError as exc:
|
|
raise ImportBuilderError("xlrd required for .xls") from exc
|
|
wb = xlrd.open_workbook(str(p))
|
|
sheets = {}
|
|
for sh in wb.sheets():
|
|
rows = []
|
|
for r in range(min(sh.nrows, max_rows)):
|
|
rows.append([sh.cell_value(r, c) for c in range(sh.ncols)])
|
|
sheets[sh.name] = rows
|
|
return {"format": "xls", "sheets": sheets}
|