Files
claudetools/projects/gps-rmm-audit/tools/sc-cleanup.py
Howard Enos fad05480a5 sync: auto-sync from HOWARD-HOME at 2026-07-03 23:31:28
Author: Howard Enos
Machine: HOWARD-HOME
Timestamp: 2026-07-03 23:31:28
2026-07-03 23:31:53 -07:00

83 lines
4.7 KiB
Python

#!/usr/bin/env python3
# sc-cleanup.py — safe fleet-wide ScreenConnect metadata cleanup (direct API, fast).
# Per client: Company (CP1) -> RMM client name (trimmed), keeping a deliberate 'CODE - Name'
# convention; Device Type (CP4) from hostname/OS; Department (CP3) only on high-confidence
# hostname tokens. Never touches Site (CP2), never guesses departments.
# Env: RMM, TOK (GuruRMM), SC_SECRET (ScreenConnect). Args: client names (default = all real
# RMM clients minus skip-list). --dry = report only.
import json,re,os,sys,ssl,urllib.request
from collections import Counter
RMM=os.environ["RMM"]; TOK=os.environ["TOK"]; SEC=os.environ["SC_SECRET"]
ctx=ssl.create_default_context()
SCBASE="https://computerguru.screenconnect.com/App_Extensions/2d558935-686a-4bd0-9991-07539f5fe749/Service.ashx"
DRY="--dry" in sys.argv
args=[a for a in sys.argv[1:] if not a.startswith("--")]
SKIP={"AZ Computer Guru","Unassigned","Dataforth Corp","Cascades of Tucson"}
def scpost(method,body):
req=urllib.request.Request(f"{SCBASE}/{method}",data=json.dumps(body).encode(),method="POST",
headers={"CTRLAuthHeader":SEC,"Origin":"https://computerguru.screenconnect.com","Content-Type":"application/json"})
try:
raw=urllib.request.urlopen(req,context=ctx,timeout=25).read().decode("utf-8","replace")
return json.loads("".join(c for c in raw if ord(c)>=32 or c in "\t\n\r"))
except Exception as e:
return {"__err":str(e)}
def getS(h):
r=scpost("GetSessionsByName",{"sessionName":h}); return r if isinstance(r,list) else []
def setP(sid,arr): return scpost("UpdateSessionCustomProperties",[sid,arr])
def dtype(h,os_):
u=h.upper();o=(os_ or"").lower()
if "server" in o:return "Server"
if re.search(r'(SERVER|-SRV|\bSVR\b|SQL|HYPERV|-DC\d?$|\bNAS\b|EXCHANGE|-VM$|PROXESS|-TS\d)',u):return "Server"
if re.search(r'(LAPTOP|-LT$|YOGA|SURFACE|MACBOOK|\bLT\d)',u):return "Laptop"
return "Desktop"
DEPT=[("ACCT","Accounting"),("NURSE","Nursing"),("MFGR","Manufacturing"),("MFG","Manufacturing"),
("RCVG","Receiving"),("QCINSP","Quality"),("PROQC","Quality"),("QC","Quality"),("MAINT","Maintenance"),
("MEMRECEPT","Memory Care"),("MDIRECTOR","Memory Care"),("RECEPT","Reception"),("FRONTDESK","Front Desk"),
("FRONT","Front Desk"),("SHIPPING","Shipping"),("SHIP","Shipping"),("CHEF","Dietary"),("KITCHEN","Dietary"),
("SALES","Sales/Marketing"),("MARKETING","Marketing"),("ESTIMAT","Estimating"),("WAREHOUSE","Warehouse"),
("WHSE","Warehouse"),("DISPATCH","Dispatch"),("PAYROLL","Payroll"),("BILLING","Billing"),
("ADMIN","Administration"),("CONF","Conference"),("ENGI","Engineering"),("LOBBY","Lobby")]
def dept(h):
u=h.upper()
for pat,d in DEPT:
if pat in u: return d
return ""
agents=json.loads("".join(c for c in urllib.request.urlopen(urllib.request.Request(f"{RMM}/api/agents",headers={"Authorization":f"Bearer {TOK}"}),context=ctx,timeout=30).read().decode("utf-8","replace") if ord(c)>=32 or c in "\t\n\r"))
byc={}
for a in agents: byc.setdefault(a.get("client_name") or "",[]).append(a)
targets=args or [c for c in byc if c and c not in SKIP]
tc=td=tp=0; dupes=[]; flags=[]
for c in sorted(targets):
ms=byc.get(c,[])
sess={}; comps=Counter()
for a in ms:
ss=getS(a["hostname"])
if not ss: continue
sess[a["hostname"]]=(a,ss)
if len(ss)>1: dupes.append(f"{c}:{a['hostname']}")
cp=ss[0].get("CustomPropertyValues") or []
if cp and cp[0]: comps[cp[0]]+=1
maj=comps.most_common(1)[0][0] if comps else ""
canon = maj if re.match(r'^[A-Za-z]{2,5}\s*-\s',maj) else c.strip()
if maj and maj!=canon and not maj.strip()==canon: flags.append(f"{c}: '{maj}' -> '{canon}'")
cc=dtc=dpc=0
for h,(a,ss) in sess.items():
if len(ss)>1: continue # ambiguous: name shared across clients OR a within-client dup -> never auto-write (would cross-contaminate). Handle by-hand.
dt=dtype(h,a.get("os_type","")); dp=dept(h)
for s in ss:
cur=list(s.get("CustomPropertyValues") or [])
while len(cur)<8: cur.append("")
new=cur[:]; new[0]=canon; new[3]=dt
if dp and not new[2]: new[2]=dp
if new!=cur:
if new[0]!=cur[0]: cc+=1
if new[3]!=cur[3]: dtc+=1
if new[2]!=cur[2]: dpc+=1
if not DRY: setP(s["SessionID"],new)
tc+=cc; td+=dtc; tp+=dpc
if cc or dtc or dpc or len(sess): print(f"{c}: company->'{canon}' ({cc}), devType {dtc}, dept {dpc} [{len(sess)} sess]")
print(f"\nTOTALS: company {tc}, deviceType {td}, department {tp}")
print(f"DUPLICATES ({len(dupes)}): {', '.join(dupes) or 'none'}")
print(f"COMPANY CHANGES worth a glance ({len(flags)}):")
for f in flags: print(" "+f)