84 lines
4.7 KiB
Python
84 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
# sc-cleanup.py — safe fleet-wide ScreenConnect metadata cleanup (direct API, fast).
|
|
# Per client: Company (CP1) -> RMM client name (trimmed), keeping a deliberate 'CODE - Name'
|
|
# convention; Device Type (CP4) from hostname/OS; Department (CP3) only on high-confidence
|
|
# hostname tokens. Never touches Site (CP2), never guesses departments.
|
|
# Env: RMM, TOK (GuruRMM), SC_SECRET (ScreenConnect). Args: client names (default = all real
|
|
# RMM clients minus skip-list). --dry = report only.
|
|
import json,re,os,sys,ssl,urllib.request
|
|
from collections import Counter
|
|
RMM=os.environ["RMM"]; TOK=os.environ["TOK"]; SEC=os.environ["SC_SECRET"]
|
|
ctx=ssl.create_default_context()
|
|
SCBASE="https://computerguru.screenconnect.com/App_Extensions/2d558935-686a-4bd0-9991-07539f5fe749/Service.ashx"
|
|
DRY="--dry" in sys.argv
|
|
args=[a for a in sys.argv[1:] if not a.startswith("--")]
|
|
SKIP={"AZ Computer Guru","Unassigned","Dataforth Corp","Cascades of Tucson"}
|
|
def scpost(method,body):
|
|
req=urllib.request.Request(f"{SCBASE}/{method}",data=json.dumps(body).encode(),method="POST",
|
|
headers={"CTRLAuthHeader":SEC,"Origin":"https://computerguru.screenconnect.com","Content-Type":"application/json"})
|
|
try:
|
|
raw=urllib.request.urlopen(req,context=ctx,timeout=25).read().decode("utf-8","replace")
|
|
return json.loads("".join(c for c in raw if ord(c)>=32 or c in "\t\n\r"))
|
|
except Exception as e:
|
|
return {"__err":str(e)}
|
|
def getS(h):
|
|
r=scpost("GetSessionsByName",{"sessionName":h}); return r if isinstance(r,list) else []
|
|
def setP(sid,arr): return scpost("UpdateSessionCustomProperties",[sid,arr])
|
|
def dtype(h,os_):
|
|
u=h.upper();o=(os_ or"").lower()
|
|
if "server" in o:return "Server"
|
|
if re.search(r'(SERVER|-SRV|\bSVR\b|SQL|HYPERV|-DC\d?$|\bNAS\b|EXCHANGE|-VM$|PROXESS|-TS\d)',u):return "Server"
|
|
if re.search(r'(LAPTOP|-LT$|YOGA|SURFACE|MACBOOK|\bLT\d)',u):return "Laptop"
|
|
return "Desktop"
|
|
DEPT=[("ACCT","Accounting"),("NURSE","Nursing"),("MFGR","Manufacturing"),("MFG","Manufacturing"),
|
|
("RCVG","Receiving"),("QCINSP","Quality"),("PROQC","Quality"),("QC","Quality"),("MAINT","Maintenance"),
|
|
("MEMRECEPT","Memory Care"),("MDIRECTOR","Memory Care"),("RECEPT","Reception"),("FRONTDESK","Front Desk"),
|
|
("FRONT","Front Desk"),("SHIPPING","Shipping"),("SHIP","Shipping"),("CHEF","Dietary"),("KITCHEN","Dietary"),
|
|
("SALES","Sales/Marketing"),("MARKETING","Marketing"),("ESTIMAT","Estimating"),("WAREHOUSE","Warehouse"),
|
|
("WHSE","Warehouse"),("DISPATCH","Dispatch"),("PAYROLL","Payroll"),("BILLING","Billing"),
|
|
("ADMIN","Administration"),("CONF","Conference"),("ENGI","Engineering"),("LOBBY","Lobby")]
|
|
def dept(h):
|
|
u=h.upper()
|
|
for pat,d in DEPT:
|
|
if pat in u: return d
|
|
return ""
|
|
agents=json.loads("".join(c for c in urllib.request.urlopen(urllib.request.Request(f"{RMM}/api/agents",headers={"Authorization":f"Bearer {TOK}"}),context=ctx,timeout=30).read().decode("utf-8","replace") if ord(c)>=32 or c in "\t\n\r"))
|
|
byc={}
|
|
for a in agents: byc.setdefault(a.get("client_name") or "",[]).append(a)
|
|
targets=args or [c for c in byc if c and c not in SKIP]
|
|
tc=td=tp=0; dupes=[]; flags=[]
|
|
for c in sorted(targets):
|
|
ms=byc.get(c,[])
|
|
sess={}; comps=Counter()
|
|
for a in ms:
|
|
ss=getS(a["hostname"])
|
|
if not ss: continue
|
|
sess[a["hostname"]]=(a,ss)
|
|
if len(ss)>1: dupes.append(f"{c}:{a['hostname']}")
|
|
cp=ss[0].get("CustomPropertyValues") or []
|
|
if cp and cp[0]: comps[cp[0]]+=1
|
|
maj=comps.most_common(1)[0][0] if comps else ""
|
|
canon = maj if re.match(r'^[A-Za-z]{2,5}\s*-\s',maj) else c.strip()
|
|
if maj and maj!=canon and not maj.strip()==canon: flags.append(f"{c}: '{maj}' -> '{canon}'")
|
|
cc=dtc=dpc=0
|
|
for h,(a,ss) in sess.items():
|
|
if len(ss)>1: continue # ambiguous: name shared across clients OR a within-client dup -> never auto-write (would cross-contaminate). Handle by-hand.
|
|
dt=dtype(h,a.get("os_type",""))
|
|
dp=dept(h) or ("IT" if dt=="Server" else "") # servers -> IT when no role token
|
|
for s in ss:
|
|
cur=list(s.get("CustomPropertyValues") or [])
|
|
while len(cur)<8: cur.append("")
|
|
new=cur[:]; new[0]=canon; new[3]=dt
|
|
if dp and not new[2]: new[2]=dp
|
|
if new!=cur:
|
|
if new[0]!=cur[0]: cc+=1
|
|
if new[3]!=cur[3]: dtc+=1
|
|
if new[2]!=cur[2]: dpc+=1
|
|
if not DRY: setP(s["SessionID"],new)
|
|
tc+=cc; td+=dtc; tp+=dpc
|
|
if cc or dtc or dpc or len(sess): print(f"{c}: company->'{canon}' ({cc}), devType {dtc}, dept {dpc} [{len(sess)} sess]")
|
|
print(f"\nTOTALS: company {tc}, deviceType {td}, department {tp}")
|
|
print(f"DUPLICATES ({len(dupes)}): {', '.join(dupes) or 'none'}")
|
|
print(f"COMPANY CHANGES worth a glance ({len(flags)}):")
|
|
for f in flags: print(" "+f)
|