Files
claudetools/.claude/skills/synology/scripts/syno_client.py
Howard Enos 974be13f4c synology: code-review hardening + trim over-budget description
Addresses 5 verified findings from /code-review high:
- SynoError carries DSM code + handled flag; call() no longer logs eagerly.
  Top-level handler logs only genuine unhandled failures, so the handled
  FileStation denial + VPN-down connect errors stop polluting errorlog.md
  (was a CLAUDE.md rule violation: don't log handled conditions).
- FileStation-denial detection is numeric (code in 400/407), not substring.
- SSH hint now also fires on the generic `call` path, not just `ls`.
- `services` falls back get->list on 103 for older DSM builds (multi-device).
- BrokenPipe flush moved inside try so small piped output can't leak a traceback.
- Trim SKILL description 755->515 chars (was the longest of 32 skills; self-check
  registry-budget WARN).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-25 12:23:43 -07:00

449 lines
20 KiB
Python

#!/usr/bin/env python3
"""syno_client.py — Synology DSM Web API client (DSM 7.x).
The structured control surface for a Synology NAS: auth -> discover the device's
own API map (SYNO.API.Info) -> call any API method. Read methods run freely;
mutating methods (set/create/delete/start/stop/reboot/...) require --confirm at
the CLI layer.
Credentials come from a SOPS vault entry (default
`clients/cascades-tucson/synology-cascadesds.sops.yaml`) — fields host, port,
credentials.username, credentials.password — or env overrides
SYNO_HOST / SYNO_PORT / SYNO_USER / SYNO_PASS (+ SYNO_OTP for 2FA, SYNO_HTTPS=1).
Override the vault entry with --vault <path> (e.g. another client's NAS).
The device is on a private LAN (Cascades 192.168.0.120) — reachable only with the
site VPN up. Connection failures almost always mean the VPN is down.
"""
import sys, os, json, ssl, subprocess, argparse
import urllib.request, urllib.error, urllib.parse
DEFAULT_VAULT = "clients/cascades-tucson/synology-cascadesds.sops.yaml"
# Method-name prefixes that MUTATE the device -> gated behind --confirm at the CLI.
# (Expanded per the DSM API survey: backup/run/save/wake/send_test etc. all change state.)
MUTATING = ("set", "create", "delete", "del", "add", "remove", "edit", "update",
"start", "stop", "restart", "reboot", "shutdown", "enable", "disable",
"install", "uninstall", "apply", "clear", "rename", "move", "copy",
"upload", "write", "clean", "format", "mount", "unmount", "join",
"leave", "reset", "upgrade", "downgrade", "import", "backup", "run",
"save", "wake", "send", "eject", "connect", "disconnect", "register",
"encrypt", "decrypt", "cancel", "change")
# DSM generic error codes (any API) -> friendly text. 106/119 trigger a re-login retry.
GENERIC_ERRORS = {
100: "unknown error", 101: "invalid parameter", 102: "API does not exist",
103: "method does not exist (DSM-version mismatch -- try `apis` to find the right version, "
"or the SSH fallback)", 104: "API version not supported", 105: "insufficient permission",
106: "session timeout", 107: "session interrupted by duplicate login",
119: "invalid/expired SID or synotoken",
}
def _repo_root():
return os.environ.get("CLAUDETOOLS_ROOT") or os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
def _log_skill_error(msg, context=""):
try:
h = os.path.join(_repo_root(), ".claude", "scripts", "log-skill-error.sh")
if not os.path.exists(h):
return
a = ["bash", h, "synology", msg]
if context:
a += ["--context", context]
subprocess.run(a, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=10)
except Exception:
pass
def _vault_field(vault, field):
vs = os.path.join(_repo_root(), ".claude", "scripts", "vault.sh")
if not os.path.exists(vs):
return None
try:
r = subprocess.run(["bash", vs, "get-field", vault, field],
capture_output=True, text=True, timeout=30)
v = (r.stdout or "").strip()
return v or None
except Exception:
return None
# DSM auth error codes -> human strings (the common ones).
AUTH_ERRORS = {
400: "no such account or incorrect password",
401: "account disabled",
402: "permission denied",
403: "2-factor auth required (pass --otp <code> / SYNO_OTP)",
404: "failed to authenticate 2-factor code",
406: "enforce 2FA but user has none configured",
407: "blocked by IP auto-block",
408: "password expired (cannot change)",
409: "password expired",
410: "password must be changed",
}
class SynoError(Exception):
# code = the DSM error code (when known) so callers compare numerically, not by
# substring on the message. handled = an expected/converted condition (e.g. the
# FileStation file-privilege denial) that must NOT be written to errorlog.md.
def __init__(self, msg, code=None, handled=False):
super().__init__(msg)
self.code = code
self.handled = handled
class SynoClient:
def __init__(self, vault=DEFAULT_VAULT):
self.host = os.environ.get("SYNO_HOST") or _vault_field(vault, "host")
self.port = os.environ.get("SYNO_PORT") or _vault_field(vault, "port")
self.user = os.environ.get("SYNO_USER") or _vault_field(vault, "credentials.username")
self.passwd = os.environ.get("SYNO_PASS") or _vault_field(vault, "credentials.password")
self.otp = os.environ.get("SYNO_OTP")
self.https = os.environ.get("SYNO_HTTPS", "") not in ("", "0", "false")
if not (self.host and self.user and self.passwd):
raise SynoError(
"No Synology credentials. Expected vault " + vault +
" (host / credentials.username / credentials.password) or env "
"SYNO_HOST / SYNO_USER / SYNO_PASS.")
if self.https and (not self.port or self.port == "5000"):
self.port = "5001"
self.port = self.port or ("5001" if self.https else "5000")
scheme = "https" if self.https else "http"
self.base = f"{scheme}://{self.host}:{self.port}/webapi"
self._ctx = None
if self.https:
self._ctx = ssl.create_default_context()
self._ctx.check_hostname = False
self._ctx.verify_mode = ssl.CERT_NONE
self._sid = None
self._syno_token = None
self._apimap = None # name -> {path, minVersion, maxVersion}
# ---- low-level ----
def _get(self, path, params, post=False):
url = f"{self.base}/{path}"
q = urllib.parse.urlencode({k: v for k, v in params.items() if v is not None})
headers = {}
if self._syno_token:
headers["X-SYNO-TOKEN"] = self._syno_token
if post:
req = urllib.request.Request(url, data=q.encode(), headers=headers, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
else:
req = urllib.request.Request(url + "?" + q, headers=headers, method="GET")
try:
with urllib.request.urlopen(req, timeout=30, context=self._ctx) as r:
raw = r.read().decode("utf-8", "replace")
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", "replace")
try:
return json.loads(raw)
except Exception:
raise SynoError(f"HTTP {e.code}: {raw[:300]}")
except Exception as e:
raise SynoError(f"connect failed ({e}) -- is the site VPN up? host={self.host}:{self.port}")
# ---- API discovery ----
def apimap(self):
if self._apimap is None:
r = self._get("query.cgi", {
"api": "SYNO.API.Info", "version": "1", "method": "query", "query": "all"})
if not r.get("success"):
raise SynoError(f"SYNO.API.Info query failed: {r.get('error')}")
self._apimap = r.get("data", {})
return self._apimap
def _resolve(self, api):
"""Return (cgi_path, max_version) for an API name."""
m = self.apimap()
if api == "SYNO.API.Auth":
info = m.get(api, {"path": "auth.cgi", "maxVersion": 7})
else:
info = m.get(api)
if not info:
raise SynoError(f"API '{api}' not available on this device (see `apis`)")
return info.get("path", "entry.cgi"), info.get("maxVersion", 1)
# ---- auth ----
def login(self):
if self._sid:
return self._sid
path, ver = self._resolve("SYNO.API.Auth")
p = {"api": "SYNO.API.Auth", "version": min(ver, 7), "method": "login",
"account": self.user, "passwd": self.passwd, "session": "DSM",
"format": "sid", "enable_syno_token": "yes"}
if self.otp:
p["otp_code"] = self.otp
r = self._get(path, p, post=True)
if not r.get("success"):
code = (r.get("error") or {}).get("code")
msg = AUTH_ERRORS.get(code, f"login failed (code {code})")
_log_skill_error(f"DSM login failed: {msg}", context=f"host={self.host} code={code}")
raise SynoError(f"DSM login failed: {msg}")
data = r.get("data", {})
self._sid = data.get("sid")
self._syno_token = data.get("synotoken")
return self._sid
def logout(self):
if not self._sid:
return
try:
path, ver = self._resolve("SYNO.API.Auth")
self._get(path, {"api": "SYNO.API.Auth", "version": min(ver, 7),
"method": "logout", "session": "DSM", "_sid": self._sid})
except Exception:
pass
self._sid = None
# ---- generic call ----
def call(self, api, method, version=None, post=False, _retry=True, **params):
self.login()
path, maxver = self._resolve(api)
ver = version or maxver
def _do():
p = {"api": api, "version": ver, "method": method, "_sid": self._sid}
if self._syno_token:
p["SynoToken"] = self._syno_token
p.update(params)
return self._get(path, p, post=post)
r = _do()
if not r.get("success"):
err = r.get("error") or {}
code = err.get("code")
# session expired / token invalid -> re-login once and retry transparently
if code in (106, 119) and _retry:
self._sid = None
self._syno_token = None
self.login()
r = _do()
if not r.get("success"):
err = r.get("error") or {}
code = err.get("code")
hint = GENERIC_ERRORS.get(code, "")
# Do NOT log here -- call() can't know if the caller will handle this
# (e.g. the `ls`/FileStation denial). The top-level handler logs only
# genuinely-unhandled failures, so handled conversions don't pollute errorlog.
raise SynoError(f"{api}.{method} -> error {code}"
+ (f" ({hint})" if hint else "") + f" {json.dumps(err)[:200]}",
code=code)
return r.get("data", r)
def is_mutating(method):
m = method.lower()
return any(m == v or m.startswith(v + "_") or m.startswith(v) for v in MUTATING) \
and not m.startswith(("getinfo", "list", "get", "info", "query", "load", "enum", "status"))
# FileStation file-op denial: a SYNO.FileStation.* call returning 400 (web API) / 407
# (on-box) when the account lacks FileStation file privileges (e.g. the built-in admin
# on cascadesDS). list_share works; real file browsing must go via the SSH backend.
def _filestation_denial(api, err):
return api.startswith("SYNO.FileStation") and getattr(err, "code", None) in (400, 407)
def _filestation_hint(err):
return SynoError(
f"{err}\n [HINT] FileStation file-listing is not permitted for this account "
f"(built-in admin lacks FileStation file privileges on this device). Use the SSH "
f"backend for real file browsing:\n"
f" bash .claude/skills/synology/scripts/syno-ssh.sh run "
f"\"ls -la /volume1/<share>/<subpath>\" --confirm",
code=getattr(err, "code", None), handled=True)
# ============================ CLI ============================
def _print(obj):
print(json.dumps(obj, indent=2, ensure_ascii=False, default=str))
def _kv(pairs):
"""Parse k=v (string) and k:=json pairs into a params dict."""
out = {}
for item in pairs or []:
if ":=" in item:
k, v = item.split(":=", 1)
out[k] = json.dumps(json.loads(v)) # normalize JSON
elif "=" in item:
k, v = item.split("=", 1)
out[k] = v
else:
raise SynoError(f"bad param '{item}' (want k=v or k:=json)")
return out
def main(argv):
# --vault / --confirm live on a shared parent so they are accepted BOTH before the
# subcommand and after it (the SKILL docs put --confirm at the end, e.g.
# `call X set k=v --confirm`). SUPPRESS defaults stop the subparser copy from
# clobbering a value parsed by the main parser.
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--vault", default=argparse.SUPPRESS, help="SOPS vault entry for the NAS")
common.add_argument("--confirm", action="store_true", default=argparse.SUPPRESS,
help="authorize a mutating call")
ap = argparse.ArgumentParser(prog="syno", parents=[common],
description="Synology DSM Web API client")
sub = ap.add_subparsers(dest="cmd", required=True)
def add(name, **kw):
return sub.add_parser(name, parents=[common], **kw)
add("test", help="login and report DSM identity")
pa = add("apis", help="list the device's API map (what you can control)")
pa.add_argument("filter", nargs="?", help="case-insensitive substring filter")
add("sysinfo", help="model/serial/RAM/temp/uptime (SYNO.Core.System)")
add("util", help="live CPU/mem/disk/net (SYNO.Core.System.Utilization)")
add("storage", help="volumes/disks/RAID/usage (SYNO.Storage.CGI.Storage)")
add("shares", help="shared folders (SYNO.Core.Share)")
add("users", help="local users (SYNO.Core.User)")
add("groups", help="local groups (SYNO.Core.Group)")
add("packages", help="installed packages (SYNO.Core.Package)")
add("services", help="services (SYNO.Core.Service)")
add("connections", help="current connections (SYNO.Core.CurrentConnection)")
pl = add("ls", help="FileStation list (no path = shares)")
pl.add_argument("path", nargs="?")
pc = add("call", help="generic: call ANY API method (the power tool)")
pc.add_argument("api"); pc.add_argument("method")
pc.add_argument("--version", type=int)
pc.add_argument("--post", action="store_true")
pc.add_argument("params", nargs="*", help="k=v or k:=json")
# gated convenience writes
for name, helptxt in (("pkg-start", "start a package by id"),
("pkg-stop", "stop a package by id")):
pp = add(name, help=helptxt + " (needs --confirm)")
pp.add_argument("id")
add("reboot", help="reboot the NAS (needs --confirm)")
add("shutdown", help="shut down the NAS (needs --confirm)")
args = ap.parse_args(argv)
vault = getattr(args, "vault", DEFAULT_VAULT)
confirm = getattr(args, "confirm", False)
def guard(method):
if not confirm:
raise SynoError(f"'{method}' mutates the device -- re-run with --confirm")
try:
c = SynoClient(vault=vault)
if args.cmd == "test":
c.login()
d = c.call("SYNO.Core.System", "info")
_print({"login": "ok", "host": f"{c.host}:{c.port}",
"model": d.get("model"), "serial": d.get("serial"),
"firmware": d.get("firmware_ver") or d.get("version_string"),
"ram_mb": d.get("ram_size") or d.get("ram"),
"uptime_s": d.get("up_time")})
elif args.cmd == "apis":
m = c.apimap()
keys = sorted(m)
if args.filter:
f = args.filter.lower()
keys = [k for k in keys if f in k.lower()]
_print({k: {"path": m[k].get("path"), "maxVersion": m[k].get("maxVersion")}
for k in keys})
elif args.cmd == "sysinfo":
_print(c.call("SYNO.Core.System", "info"))
elif args.cmd == "util":
_print(c.call("SYNO.Core.System.Utilization", "get"))
elif args.cmd == "storage":
_print(c.call("SYNO.Storage.CGI.Storage", "load_info"))
elif args.cmd == "shares":
_print(c.call("SYNO.Core.Share", "list",
additional='["hidden","encryption","share_quota_used","is_aclmode",'
'"is_support_acl","unite_permission","is_force_readonly",'
'"recyclebin"]'))
elif args.cmd == "users":
_print(c.call("SYNO.Core.User", "list", additional='["email","description","expired"]'))
elif args.cmd == "groups":
_print(c.call("SYNO.Core.Group", "list"))
elif args.cmd == "packages":
_print(c.call("SYNO.Core.Package", "list", additional='["status","installed_info"]'))
elif args.cmd == "services":
# DSM 7.2.x enumerates SYNO.Core.Service via `get` (returns {"service":[...]});
# older builds expose `list`. Try get, fall back to list on 103 (method missing).
try:
_print(c.call("SYNO.Core.Service", "get"))
except SynoError as e:
if e.code == 103:
_print(c.call("SYNO.Core.Service", "list"))
else:
raise
elif args.cmd == "connections":
_print(c.call("SYNO.Core.CurrentConnection", "list"))
elif args.cmd == "ls":
try:
if args.path:
_print(c.call("SYNO.FileStation.List", "list", folder_path=args.path,
additional='["size","owner","time","perm"]'))
else:
_print(c.call("SYNO.FileStation.List", "list_share",
additional='["size","owner","perm"]'))
except SynoError as e:
raise _filestation_hint(e) if _filestation_denial("SYNO.FileStation.List", e) else e
elif args.cmd == "call":
params = _kv(args.params)
if is_mutating(args.method):
guard(f"{args.api}.{args.method}")
try:
_print(c.call(args.api, args.method, version=args.version,
post=args.post or is_mutating(args.method), **params))
except SynoError as e:
# same FileStation denial guidance on the documented power-tool path
raise _filestation_hint(e) if _filestation_denial(args.api, e) else e
elif args.cmd == "pkg-start":
guard("pkg-start")
_print(c.call("SYNO.Core.Package.Control", "start", post=True, id=args.id))
elif args.cmd == "pkg-stop":
guard("pkg-stop")
_print(c.call("SYNO.Core.Package.Control", "stop", post=True, id=args.id))
elif args.cmd == "reboot":
guard("reboot")
_print(c.call("SYNO.Core.System", "reboot", post=True))
elif args.cmd == "shutdown":
guard("shutdown")
_print(c.call("SYNO.Core.System", "shutdown", post=True))
else:
ap.print_help()
return 2
except SynoError as e:
# Log only genuine, unhandled API failures. handled=True (e.g. the FileStation
# file-privilege denial) and login errors (already logged inline, code=None) are
# skipped so routine/expected conditions never pollute errorlog.md.
if e.code is not None and not e.handled:
_log_skill_error(str(e).splitlines()[0], context=f"code={e.code}")
print(f"[ERROR] {e}", file=sys.stderr)
return 1
finally:
try:
c.logout()
except NameError:
pass # construction failed before c was bound
return 0
if __name__ == "__main__":
try:
rc = main(sys.argv[1:])
# Flush inside the try so a broken downstream pipe (e.g. `apis | head`) surfaces
# HERE and is caught -- not at interpreter shutdown, which would still print a
# 'BrokenPipeError ignored' traceback for small block-buffered output.
sys.stdout.flush()
sys.exit(rc)
except BrokenPipeError:
try:
sys.stdout.close()
except Exception:
pass
os._exit(0)