Files
claudetools/.claude/skills/b2/scripts/b2.py
Mike Swanson 96fb4110ea Add b2 skill: Backblaze B2 management CLI (storage cost, prefix purge)
B2 Native API v3 client for the ACG B2 account: status, buckets, keys,
files, bucket-size, usage/cost ($0.00695/GB), gated create/delete bucket+key,
and gated lifecycle-based delete-prefix/lifecycle-remove for prefix purges.
Read-only by default; destructive ops require --confirm.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 14:31:09 -07:00

767 lines
30 KiB
Python

#!/usr/bin/env python3
"""CLI for the b2 skill — Backblaze B2 Native API v3 (ACG production account).
Read-only subcommands run freely. Destructive subcommands (create-bucket,
create-key, delete-bucket, delete-key) refuse to run unless --confirm is passed;
without it they print what they WOULD do and exit non-zero (3).
Output: --json emits raw JSON; otherwise a readable table/summary.
Usage examples:
python b2.py status
python b2.py buckets
python b2.py buckets --json
python b2.py keys
python b2.py files ACG-Internal --prefix MBS- --limit 50
python b2.py files ACG-Internal --versions
python b2.py bucket-size ACG-Internal
python b2.py usage
python b2.py usage --bucket ACG-Dataforth
python b2.py cost --json
python b2.py create-bucket NewBucket --confirm
python b2.py create-key --name client-backup --capabilities listFiles,readFiles \\
--bucket ACG-Internal --confirm
python b2.py delete-bucket OldBucket --confirm
python b2.py delete-key 00146f69bc611630000000abc --confirm
python b2.py lifecycle ACG-Internal
python b2.py delete-prefix ACG-Internal "MBS-<guid>/CBB_<machine>/" --confirm
python b2.py lifecycle-remove ACG-Internal "MBS-<guid>/CBB_<machine>/" --confirm
python b2.py raw --method b2_list_buckets --body '{"accountId":"<acct>"}'
"""
from __future__ import annotations
import argparse
import json
import sys
from typing import Optional
from b2_client import B2Client, B2Error, RATE_PER_GB_USD, BYTES_PER_GB, BYTES_PER_GIB
def _emit(obj, as_json: bool, table_fn=None) -> None:
if as_json or table_fn is None:
print(json.dumps(obj, indent=2, default=str))
else:
table_fn(obj)
def _client_label(bucket_name: str) -> str:
"""Derive a client label: strip the 'ACG-' prefix; leave others as-is."""
if bucket_name.startswith("ACG-"):
return bucket_name[len("ACG-"):]
return bucket_name
def _fmt_usd(amount: float) -> str:
return f"${amount:,.4f}"
# --- table renderers ----------------------------------------------------------
def _print_status(info: dict) -> None:
caps = info.get("capabilities") or []
scope = "account-wide" if not info.get("bucketId") else f"bucket {info['bucketId']}"
print("[INFO] Backblaze B2 authorization")
print(f" accountId: {info.get('accountId')}")
print(f" apiUrl: {info.get('apiUrl')}")
print(f" s3ApiUrl: {info.get('s3ApiUrl')}")
print(f" downloadUrl: {info.get('downloadUrl')}")
print(f" key scope: {scope}")
if info.get("namePrefix"):
print(f" namePrefix: {info.get('namePrefix')}")
print(f" capabilities: {len(caps)} -> {', '.join(sorted(caps))}")
print(f" authorized_at: {info.get('authorized_at')}")
def _print_buckets(buckets: list) -> None:
print(f"Buckets: {len(buckets)}")
print(f" {'NAME':28} {'TYPE':12} {'LOCK':5} BUCKET-ID")
for b in sorted(buckets, key=lambda x: x.get("bucketName", "")):
lock = (b.get("fileLockConfiguration") or {}).get("isFileLockEnabled")
lock_str = "yes" if lock else "no"
print(f" {b.get('bucketName',''):28} {b.get('bucketType',''):12} "
f"{lock_str:5} {b.get('bucketId','')}")
def _print_keys(keys: list) -> None:
print(f"Application keys: {len(keys)}")
print(f" {'NAME':22} {'APP-KEY-ID':28} {'SCOPE':18} {'CAPS':4} {'PREFIX':12} EXPIRES")
for k in keys:
scope = k.get("bucketId") or "account-wide"
caps = k.get("capabilities") or []
prefix = k.get("namePrefix") or "-"
exp = k.get("expirationTimestamp")
exp_str = str(exp) if exp else "never"
print(f" {k.get('keyName',''):22} {k.get('applicationKeyId',''):28} "
f"{scope:18} {len(caps):<4} {prefix:12} {exp_str}")
def _print_files(files: list) -> None:
print(f"Files: {len(files)}")
print(f" {'ACTION':8} {'SIZE':>14} {'UPLOADED':>16} NAME")
for f in files:
size = f.get("contentLength", 0) or 0
ts = f.get("uploadTimestamp", "")
print(f" {str(f.get('action','')):8} {size:>14,} {str(ts):>16} "
f"{f.get('fileName','')}")
def _print_bucket_size(data: dict) -> None:
print(f"Bucket: {data['bucketName']}")
print(f" stored bytes: {data['bytes']:,}")
print(f" stored GB (1e9): {data['gb']:.4f}")
print(f" stored GiB: {data['gib']:.4f}")
print(f" distinct files: {data['file_count']:,}")
print(f" upload versions: {data['version_count']:,}")
print(f" versions seen: {data['total_versions_seen']:,} "
"(includes hide/start/folder markers)")
def _print_usage(report: dict) -> None:
rows = report["buckets"]
rate = report["rate"]
print(f"[INFO] Storage cost report (rate = ${rate:.5f} / GB, "
"GB = bytes / 1e9, all versions counted)")
print(f"[WARNING] Sized via b2_list_file_versions across all versions; "
"large buckets may issue many list transactions.")
print()
print(f" {'CLIENT/BUCKET':24} {'BYTES':>16} {'GB':>12} {'COST':>14}")
print(f" {'-'*24} {'-'*16} {'-'*12} {'-'*14}")
for r in rows:
print(f" {r['label']:24} {r['bytes']:>16,} {r['gb']:>12.4f} "
f"{_fmt_usd(r['cost']):>14}")
print(f" {'-'*24} {'-'*16} {'-'*12} {'-'*14}")
print(f" {'TOTAL':24} {report['total_bytes']:>16,} "
f"{report['total_gb']:>12.4f} {_fmt_usd(report['total_cost']):>14}")
def _print_lifecycle(data: dict) -> None:
rules = data.get("lifecycleRules") or []
print(f"Bucket: {data.get('bucketName')} (revision {data.get('revision')})")
print(f"Lifecycle rules: {len(rules)}")
if not rules:
print(" (none - files are kept until explicitly deleted)")
return
print(f" {'FILE-NAME-PREFIX':40} {'HIDE@days':>9} {'DELETE@days':>11}")
print(f" {'-'*40} {'-'*9} {'-'*11}")
for r in rules:
prefix = r.get("fileNamePrefix", "")
prefix_disp = "(whole bucket)" if prefix == "" else prefix
hide = r.get("daysFromUploadingToHiding")
delete = r.get("daysFromHidingToDeleting")
hide_disp = "-" if hide is None else str(hide)
delete_disp = "-" if delete is None else str(delete)
print(f" {prefix_disp:40} {hide_disp:>9} {delete_disp:>11}")
# --- command handlers ---------------------------------------------------------
def cmd_status(client, args):
_emit(client.auth_info, args.json, _print_status)
return 0
def cmd_buckets(client, args):
_emit(client.list_buckets(), args.json, _print_buckets)
return 0
def cmd_keys(client, args):
_emit(client.list_keys(), args.json, _print_keys)
return 0
def cmd_files(client, args):
bucket = client.resolve_bucket(args.bucket_name)
bucket_id = bucket["bucketId"]
if args.versions:
files = client.list_file_versions(
bucket_id, prefix=args.prefix, limit=args.limit
)
else:
files = client.list_file_names(
bucket_id, prefix=args.prefix, limit=args.limit
)
_emit(files, args.json, _print_files)
return 0
def cmd_bucket_size(client, args):
bucket = client.resolve_bucket(args.bucket_name)
if not args.json:
print(f"[INFO] Listing all versions in '{args.bucket_name}' "
"(may take a while for large buckets)...", file=sys.stderr)
data = client.bucket_size(bucket["bucketId"])
data["bucketName"] = args.bucket_name
_emit(data, args.json, _print_bucket_size)
return 0
def cmd_usage(client, args):
rate = args.rate
buckets = client.list_buckets()
if args.bucket:
buckets = [b for b in buckets if b.get("bucketName") == args.bucket]
if not buckets:
print(f"[ERROR] No bucket named '{args.bucket}'.", file=sys.stderr)
return 1
if not args.json:
print(f"[INFO] Computing storage cost across {len(buckets)} bucket(s); "
"this lists every version and may issue many list transactions...",
file=sys.stderr)
rows = []
total_bytes = 0
for b in buckets:
size = client.bucket_size(b["bucketId"])
cost = size["gb"] * rate
total_bytes += size["bytes"]
rows.append({
"bucket": b.get("bucketName", ""),
"label": _client_label(b.get("bucketName", "")),
"bytes": size["bytes"],
"gb": size["gb"],
"gib": size["gib"],
"version_count": size["version_count"],
"file_count": size["file_count"],
"cost": cost,
})
rows.sort(key=lambda r: r["cost"], reverse=True)
total_gb = total_bytes / BYTES_PER_GB
report = {
"rate": rate,
"buckets": rows,
"total_bytes": total_bytes,
"total_gb": total_gb,
"total_gib": total_bytes / BYTES_PER_GIB,
"total_cost": total_gb * rate,
}
_emit(report, args.json, _print_usage)
return 0
# --- lifecycle (prefix purge) -------------------------------------------------
# A "purge" rule hides any file > 1 day after upload, then deletes hidden files
# > 1 day later — so B2's daily lifecycle pass removes EVERY version under the
# prefix within ~24-48h. All current targets predate today by years, so they are
# eligible immediately on the next pass. There is no recycle bin: this is
# irreversible server-side deletion.
PURGE_DAYS_FROM_UPLOAD_TO_HIDE = 1
PURGE_DAYS_FROM_HIDE_TO_DELETE = 1
def _purge_rule(prefix: str) -> dict:
"""Build the canonical 1/1-day purge lifecycle rule for a prefix."""
return {
"fileNamePrefix": prefix,
"daysFromUploadingToHiding": PURGE_DAYS_FROM_UPLOAD_TO_HIDE,
"daysFromHidingToDeleting": PURGE_DAYS_FROM_HIDE_TO_DELETE,
}
def _rule_matches_prefix(rule: dict, prefix: str) -> bool:
"""A lifecycle rule targets `prefix` iff its fileNamePrefix equals it exactly."""
return rule.get("fileNamePrefix", "") == prefix
def _is_purge_rule(rule: dict, prefix: str) -> bool:
"""True if `rule` is an identical 1/1-day purge rule for `prefix`."""
return (
_rule_matches_prefix(rule, prefix)
and rule.get("daysFromUploadingToHiding") == PURGE_DAYS_FROM_UPLOAD_TO_HIDE
and rule.get("daysFromHidingToDeleting") == PURGE_DAYS_FROM_HIDE_TO_DELETE
)
def _validate_purge_prefix(prefix: str, allow_account_root: bool) -> Optional[str]:
"""Return an error string if `prefix` is too broad to purge; else None.
Hard-fail rules (apply even with --confirm):
* empty / "/" / "*" / no "/" at all -> too broad (whole-bucket / whole-account)
* exactly a bucket root "MBS-<guid>/" -> account-level, requires
--allow-account-root (off by default; we only purge CBB_ machine prefixes)
A valid machine target looks like "MBS-<guid>/CBB_<machine>/".
"""
if prefix in ("", "/", "*"):
return (f"prefix {prefix!r} is too broad — it would purge the whole "
"bucket or account. Refusing.")
if "/" not in prefix:
return (f"prefix {prefix!r} contains no '/'; a top-level prefix can match "
"an entire MBS-<guid> account tree. Refusing. A valid target looks "
"like 'MBS-<guid>/CBB_<machine>/'.")
# Account root looks like "MBS-<something>/" with exactly one trailing slash
# and no further path segment (i.e. the only '/' is the terminal one).
stripped = prefix[:-1] if prefix.endswith("/") else prefix
if "/" not in stripped:
# Single segment followed by a slash, e.g. "MBS-<guid>/": account-level.
if not allow_account_root:
return (f"prefix {prefix!r} is an account root (MBS-<guid>/); purging "
"it removes ALL machines under that account. Pass "
"--allow-account-root to override (NOT recommended — purge "
"machine-level CBB_ prefixes instead).")
return None
def _purge_prefix_warning(prefix: str) -> str:
return (
f"[WARNING] Scheduling IRREVERSIBLE server-side deletion of ALL versions "
f"under '{prefix}'. B2's daily lifecycle pass will hide files >1 day old "
f"then delete hidden files >1 day old, fully purging the prefix within "
f"~24-48h. There is NO recycle bin and NO undo."
)
def cmd_lifecycle(client, args):
"""READ-ONLY: list a bucket's current lifecycle rules."""
bucket = client.resolve_bucket(args.bucket_name)
full = client.get_bucket_with_revision(bucket["bucketId"])
data = {
"bucketName": args.bucket_name,
"bucketId": full.get("bucketId"),
"revision": full.get("revision"),
"lifecycleRules": full.get("lifecycleRules") or [],
}
_emit(data, args.json, _print_lifecycle)
return 0
def cmd_delete_prefix(client, args):
"""GATED, DESTRUCTIVE: add a 1/1-day purge lifecycle rule per prefix."""
bucket = client.resolve_bucket(args.bucket_name)
bucket_id = bucket["bucketId"]
# Hard-fail validation first — applies even with --confirm.
for prefix in args.prefixes:
err = _validate_purge_prefix(prefix, args.allow_account_root)
if err:
print(f"[ERROR] {err}", file=sys.stderr)
return 2
# Soft warning: recommend a trailing slash so the prefix can't match a
# sibling whose name merely starts with these characters.
for prefix in args.prefixes:
if not prefix.endswith("/"):
print(f"[WARNING] prefix {prefix!r} does not end with '/'; it will "
"also match any file whose name merely starts with it.",
file=sys.stderr)
if not args.confirm:
print("[WARNING] Refusing destructive action without --confirm.")
for prefix in args.prefixes:
print(_purge_prefix_warning(prefix))
print(f"[INFO] Would add purge rule: fileNamePrefix={prefix!r}, "
f"daysFromUploadingToHiding={PURGE_DAYS_FROM_UPLOAD_TO_HIDE}, "
f"daysFromHidingToDeleting={PURGE_DAYS_FROM_HIDE_TO_DELETE}")
return 3
result = _apply_lifecycle_change(
client, bucket_id, args.prefixes, add=True, json_out=args.json
)
return result
def cmd_lifecycle_remove(client, args):
"""GATED: remove lifecycle rule(s) whose fileNamePrefix matches the prefix(es)."""
bucket = client.resolve_bucket(args.bucket_name)
bucket_id = bucket["bucketId"]
if not args.confirm:
# Read so we can show exactly which existing rules would be removed.
full = client.get_bucket_with_revision(bucket_id)
existing = full.get("lifecycleRules") or []
print("[WARNING] Refusing to modify lifecycle rules without --confirm.")
any_match = False
for prefix in args.prefixes:
matches = [r for r in existing if _rule_matches_prefix(r, prefix)]
if matches:
any_match = True
for r in matches:
print(f"[INFO] Would remove rule: fileNamePrefix={prefix!r} "
f"(daysFromUploadingToHiding="
f"{r.get('daysFromUploadingToHiding')}, "
f"daysFromHidingToDeleting="
f"{r.get('daysFromHidingToDeleting')})")
else:
print(f"[INFO] No lifecycle rule matches prefix {prefix!r} "
"(nothing to remove).")
if not any_match:
print("[INFO] No matching rules; this would be a no-op.")
return 3
result = _apply_lifecycle_change(
client, bucket_id, args.prefixes, add=False, json_out=args.json
)
return result
def _apply_lifecycle_change(client, bucket_id, prefixes, *, add, json_out):
"""Read the current rules, merge (add) or filter (remove), then write back.
Read -> merge -> write the COMPLETE rules array. This account's
b2_update_bucket does not accept an `ifRevisionMatch` optimistic-lock token,
so writes are last-write-wins; merging onto the freshly-read set is what
preserves pre-existing rules. The read still returns the bucket `revision`,
but it is informational only and is not sent back. Returns a CLI exit code.
"""
full = client.get_bucket_with_revision(bucket_id)
existing = list(full.get("lifecycleRules") or [])
if add:
merged = list(existing)
added, skipped = [], []
for prefix in prefixes:
if any(_is_purge_rule(r, prefix) for r in existing):
skipped.append(prefix)
continue
# Replace any non-purge rule on the same prefix rather than
# stacking two rules for one prefix.
merged = [r for r in merged if not _rule_matches_prefix(r, prefix)]
merged.append(_purge_rule(prefix))
added.append(prefix)
if not added:
_emit({"bucketId": bucket_id, "added": [], "skipped": skipped,
"lifecycleRules": existing}, json_out,
lambda o: print("[OK] All requested purge rules already "
f"present; nothing to do. (skipped: "
f"{', '.join(skipped) or 'none'})"))
return 0
change_summary = {"added": added, "skipped": skipped}
else:
to_remove = [p for p in prefixes
if any(_rule_matches_prefix(r, p) for r in existing)]
merged = [r for r in existing
if not any(_rule_matches_prefix(r, p) for p in prefixes)]
if not to_remove:
_emit({"bucketId": bucket_id, "removed": [],
"lifecycleRules": existing}, json_out,
lambda o: print("[OK] No matching lifecycle rules to "
"remove; nothing to do."))
return 0
change_summary = {"removed": to_remove}
if len(merged) > 100:
print(f"[ERROR] Resulting rule count {len(merged)} exceeds B2's "
"limit of 100 lifecycle rules per bucket.", file=sys.stderr)
return 1
try:
updated = client.update_bucket_lifecycle(bucket_id, merged)
except B2Error as exc:
# No optimistic-lock retry path here (no ifRevisionMatch). Make a single
# defensive retry on a transient/server-side hiccup, then give up — never
# loop. A 4xx (e.g. bad_request) is a request problem, not transient, so
# re-raise it immediately.
if exc.status is not None and 500 <= exc.status < 600:
print(f"[WARNING] b2_update_bucket transient failure (HTTP "
f"{exc.status}); retrying once...", file=sys.stderr)
updated = client.update_bucket_lifecycle(bucket_id, merged)
else:
raise
out = {
"bucketId": bucket_id,
"newRevision": updated.get("revision"),
"lifecycleRules": updated.get("lifecycleRules") or merged,
}
out.update(change_summary)
def _render(_o):
if add:
for p in change_summary["added"]:
print(f"[OK] Added purge rule for {p!r} "
f"({PURGE_DAYS_FROM_UPLOAD_TO_HIDE}/"
f"{PURGE_DAYS_FROM_HIDE_TO_DELETE} days).")
for p in change_summary["skipped"]:
print(f"[INFO] Purge rule for {p!r} already present; skipped.")
print(_purge_prefix_warning(
", ".join(change_summary["added"])))
else:
for p in change_summary["removed"]:
print(f"[OK] Removed lifecycle rule(s) for {p!r}.")
print(f"[INFO] Bucket now has "
f"{len(updated.get('lifecycleRules') or merged)} rule(s); "
f"revision {updated.get('revision')}.")
_emit(out, json_out, _render)
return 0
# --- gating helper ------------------------------------------------------------
def _gated(action_desc: str, confirm: bool) -> bool:
if not confirm:
print("[WARNING] Refusing destructive action without --confirm.")
print(f"[INFO] Would: {action_desc}")
return False
return True
def cmd_create_bucket(client, args):
if not _gated(f"create {args.type} bucket '{args.name}'", args.confirm):
return 3
result = client.create_bucket(args.name, bucket_type=args.type)
_emit({"createdBucket": args.name, "result": result}, args.json,
lambda o: print(f"[OK] Created bucket '{args.name}' "
f"(id {result.get('bucketId')}, type {result.get('bucketType')})."))
return 0
def cmd_create_key(client, args):
caps = [c.strip() for c in args.capabilities.split(",") if c.strip()]
if not caps:
print("[ERROR] --capabilities must list at least one capability.",
file=sys.stderr)
return 2
bucket_id = None
if args.bucket:
bucket_id = client.resolve_bucket(args.bucket)["bucketId"]
scope = f"bucket '{args.bucket}'" if args.bucket else "account-wide"
desc = (f"create key '{args.name}' scoped {scope} with capabilities "
f"{','.join(caps)}")
if not _gated(desc, args.confirm):
return 3
result = client.create_key(
key_name=args.name,
capabilities=caps,
bucket_id=bucket_id,
name_prefix=args.prefix,
valid_duration_seconds=args.duration_seconds,
)
if args.json:
# Surface the one-time-key warning on STDERR so piping --json to a file
# still alerts the operator — the applicationKey cannot be retrieved again.
print("[WARNING] store this key in the vault now - it cannot be "
"retrieved again", file=sys.stderr)
print(json.dumps(result, indent=2, default=str))
else:
print(f"[OK] Created application key '{args.name}'.")
print(f" applicationKeyId: {result.get('applicationKeyId')}")
print(f" capabilities: {', '.join(result.get('capabilities', []))}")
print(f" scope: "
f"{result.get('bucketId') or 'account-wide'}")
if result.get("namePrefix"):
print(f" namePrefix: {result.get('namePrefix')}")
print()
print("[WARNING] The applicationKey below is shown ONCE and CANNOT be "
"retrieved later. Store it in the SOPS vault immediately:")
print(f" applicationKey: {result.get('applicationKey')}")
return 0
def cmd_delete_bucket(client, args):
bucket = client.resolve_bucket(args.name)
if not _gated(f"delete bucket '{args.name}' (id {bucket['bucketId']})",
args.confirm):
return 3
result = client.delete_bucket(bucket["bucketId"])
_emit({"deletedBucket": args.name, "result": result}, args.json,
lambda o: print(f"[OK] Deleted bucket '{args.name}'."))
return 0
def cmd_delete_key(client, args):
if not _gated(f"delete application key '{args.application_key_id}'",
args.confirm):
return 3
result = client.delete_key(args.application_key_id)
_emit({"deletedKey": args.application_key_id, "result": result}, args.json,
lambda o: print(f"[OK] Deleted application key "
f"'{args.application_key_id}'."))
return 0
# Substrings that mark a method as state-changing; `raw` gates these behind
# --confirm (mirror the bitdefender raw gating). Covers the obvious
# delete/create/update/hide/cancel verbs plus the large-file and copy mutators
# whose names don't contain those verbs (b2_copy_file, b2_copy_part,
# b2_start_large_file, b2_upload_file/part, b2_finish_large_file).
DESTRUCTIVE_RAW_PATTERNS = (
"delete",
"create",
"update",
"hide",
"cancel",
"copy",
"finish",
"upload",
"start",
)
def _is_destructive_method(method: str) -> bool:
m = method.lower()
return any(pat in m for pat in DESTRUCTIVE_RAW_PATTERNS)
def cmd_raw(client, args):
if _is_destructive_method(args.method) and not args.confirm:
print(f"[WARNING] '{args.method}' looks state-changing; refusing without "
"--confirm.", file=sys.stderr)
return 3
try:
body = json.loads(args.body) if args.body else {}
except json.JSONDecodeError as exc:
print(f"[ERROR] --body is not valid JSON: {exc}", file=sys.stderr)
return 2
if not isinstance(body, dict):
print("[ERROR] --body must be a JSON object.", file=sys.stderr)
return 2
result = client.call(args.method, body)
# Output may carry sensitive data (keys, tokens) — review before reuse.
print(json.dumps(result, indent=2, default=str))
return 0
# --- parser -------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="b2.py",
description="Backblaze B2 Native API v3 CLI (ACG production account).",
)
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--json", action="store_true", help="Emit raw JSON output.")
sub = p.add_subparsers(dest="command", required=True)
sub.add_parser("status", help="Authorize and show account/key info.",
parents=[common])
sub.add_parser("buckets", help="List buckets.", parents=[common])
sub.add_parser("keys", help="List application keys.", parents=[common])
sp = sub.add_parser("files", help="List files in a bucket.", parents=[common])
sp.add_argument("bucket_name")
sp.add_argument("--prefix", help="Restrict to files under this prefix.")
sp.add_argument("--versions", action="store_true",
help="List ALL versions (default: latest names only).")
sp.add_argument("--limit", type=int, help="Stop after N files.")
sp = sub.add_parser("bucket-size",
help="Sum stored bytes/GB for one bucket (all versions).",
parents=[common])
sp.add_argument("bucket_name")
sp = sub.add_parser("usage",
help="Storage cost across all buckets (headline report).",
parents=[common])
sp.add_argument("--bucket", help="Scope the report to a single bucket name.")
sp.add_argument("--rate", type=float, default=RATE_PER_GB_USD,
help=f"USD per GB (default {RATE_PER_GB_USD}, ACG cost basis).")
# alias: cost == usage
sp = sub.add_parser("cost", help="Alias for 'usage'.", parents=[common])
sp.add_argument("--bucket", help="Scope the report to a single bucket name.")
sp.add_argument("--rate", type=float, default=RATE_PER_GB_USD,
help=f"USD per GB (default {RATE_PER_GB_USD}, ACG cost basis).")
# gated (destructive)
sp = sub.add_parser("create-bucket", help="Create a bucket (gated).",
parents=[common])
sp.add_argument("name")
sp.add_argument("--type", default="allPrivate",
choices=["allPrivate", "allPublic"],
help="Bucket type (default allPrivate).")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("create-key", help="Create an application key (gated).",
parents=[common])
sp.add_argument("--name", required=True, help="Key name (keyName).")
sp.add_argument("--capabilities", required=True,
help="Comma-separated capability list, e.g. "
"listFiles,readFiles,writeFiles.")
sp.add_argument("--bucket", help="Scope the key to this bucket name "
"(resolves to bucketId).")
sp.add_argument("--prefix", help="Restrict the key to a name prefix.")
sp.add_argument("--duration-seconds", type=int,
help="Optional key lifetime (validDurationInSeconds).")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("delete-bucket", help="Delete a bucket (gated).",
parents=[common])
sp.add_argument("name")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("delete-key", help="Delete an application key (gated).",
parents=[common])
sp.add_argument("application_key_id")
sp.add_argument("--confirm", action="store_true")
# lifecycle (read-only)
sp = sub.add_parser("lifecycle",
help="List a bucket's current lifecycle rules (read-only).",
parents=[common])
sp.add_argument("bucket_name")
# delete-prefix (gated, destructive)
sp = sub.add_parser(
"delete-prefix",
help="Schedule a server-side purge of everything under a prefix via a "
"1/1-day lifecycle rule (gated, IRREVERSIBLE).",
parents=[common],
)
sp.add_argument("bucket_name")
sp.add_argument("prefixes", nargs="+", metavar="prefix",
help="One or more file-name prefixes "
"(e.g. 'MBS-<guid>/CBB_<machine>/').")
sp.add_argument("--allow-account-root", action="store_true",
help="Permit purging an account-level 'MBS-<guid>/' root. "
"NOT recommended; off by default.")
sp.add_argument("--confirm", action="store_true")
# lifecycle-remove (gated)
sp = sub.add_parser(
"lifecycle-remove",
help="Remove lifecycle rule(s) matching a prefix (cleanup after a purge "
"completes; gated).",
parents=[common],
)
sp.add_argument("bucket_name")
sp.add_argument("prefixes", nargs="+", metavar="prefix",
help="One or more fileNamePrefix values to remove.")
sp.add_argument("--confirm", action="store_true")
sp = sub.add_parser("raw", help="Call any B2 v3 method directly (power use).",
parents=[common])
sp.add_argument("--method", required=True, help="e.g. b2_list_buckets.")
sp.add_argument("--body", default="{}", help="JSON object request body.")
sp.add_argument("--confirm", action="store_true",
help="Required for state-changing methods "
"(create/delete/update/hide/cancel). Output may carry "
"sensitive data — review before reuse.")
return p
HANDLERS = {
"status": cmd_status,
"buckets": cmd_buckets,
"keys": cmd_keys,
"files": cmd_files,
"bucket-size": cmd_bucket_size,
"usage": cmd_usage,
"cost": cmd_usage,
"create-bucket": cmd_create_bucket,
"create-key": cmd_create_key,
"delete-bucket": cmd_delete_bucket,
"delete-key": cmd_delete_key,
"lifecycle": cmd_lifecycle,
"delete-prefix": cmd_delete_prefix,
"lifecycle-remove": cmd_lifecycle_remove,
"raw": cmd_raw,
}
def main(argv=None) -> int:
args = build_parser().parse_args(argv)
handler = HANDLERS[args.command]
try:
client = B2Client()
rc = handler(client, args)
return rc if isinstance(rc, int) else 0
except B2Error as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
return 1
except KeyboardInterrupt:
return 130
if __name__ == "__main__":
raise SystemExit(main())