client.py: send() falls back to ResultMessage.result when no TextBlock streams (the "(no response)" bug) and reconnects+retries once on a closed SDK session. message_handler.py: per-thread turn lock so messages arriving mid-turn or from a second user queue in order (nothing dropped); per-session requester-attribution env (discord_id -> users.json key), pinned to the thread opener; _USER_MAP caches only on a successful load; final answer posts as a fresh message at the BOTTOM (no edit-in-place); a <@id> tag goes out as a fresh send so it actually pings. main.py: allowed_mentions permits user pings, blocks @everyone/@here/roles. DISCORD_CLAUDE.md: no thread auto-delete; tiered close-out (Q&A -> one-line rolling log, substantive -> /save); @mention guidance; opener-pinned attribution note. whoami-block.sh / sync.sh: bot-context attribution (Executed by ClaudeTools Bot / Requested by <person>; git author = mapped requester, committer = bot). Strict no-op for interactive sessions. users.json: discord_id for Mike/Howard; added Winter Williams (bot-only, full trust). Reviewed by Code Review Agent + Grok + Gemini (Gemini's "malformed email" finding verified as a false positive). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
277 lines
11 KiB
Python
277 lines
11 KiB
Python
"""Discord message handler. Maps each thread to a persistent Claude Agent session."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
import discord
|
|
|
|
from bot.claude.client import ClaudeAgentManager
|
|
from bot.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DISCORD_MAX_LEN = 2000
|
|
EDIT_THROTTLE_SECONDS = 0.75
|
|
MAX_FILE_BYTES = 25 * 1024 * 1024
|
|
MAX_TURN_BYTES = 100 * 1024 * 1024
|
|
ATTACHMENT_ROOT = settings.claudetools_root / "projects" / "discord-bot" / ".attachments"
|
|
|
|
|
|
class MessageHandler:
|
|
_USER_MAP: dict[str, dict] | None = None
|
|
|
|
def __init__(self, bot: discord.Client, agents: ClaudeAgentManager) -> None:
|
|
self.bot = bot
|
|
self.agents = agents
|
|
# One lock per thread serializes turns: a message that arrives while the
|
|
# bot is mid-turn (or a second user chiming in) waits and is processed
|
|
# next, in order — nothing collides on the single SDK session, nothing
|
|
# is dropped. Different threads still run concurrently (separate locks).
|
|
self._thread_locks: dict[int, asyncio.Lock] = {}
|
|
|
|
@classmethod
|
|
def _users(cls) -> dict[str, dict]:
|
|
"""discord_id -> {user, full_name} map from .claude/users.json (cached)."""
|
|
if cls._USER_MAP is None:
|
|
mapping: dict[str, dict] = {}
|
|
try:
|
|
p = settings.claudetools_root / ".claude" / "users.json"
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
for key, u in (data.get("users") or {}).items():
|
|
did = str(u.get("discord_id") or "").strip()
|
|
if did:
|
|
mapping[did] = {"user": key, "full_name": u.get("full_name", key)}
|
|
cls._USER_MAP = mapping # cache only on a successful load
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("[WARNING] could not load users.json discord map: %s", e)
|
|
return mapping # do NOT cache a failed/empty load — retry next call
|
|
return cls._USER_MAP
|
|
|
|
def _requester_env(self, author: discord.abc.User) -> dict[str, str]:
|
|
"""Per-session attribution env: marks the bot as executor and the Discord
|
|
requester (mapped to a known user key when their discord_id is on file)."""
|
|
mapped = self._users().get(str(author.id))
|
|
display = getattr(author, "display_name", author.name)
|
|
if mapped:
|
|
label = f"{mapped['full_name']} (@{author.name}, via Discord)"
|
|
user_key = mapped["user"]
|
|
else:
|
|
label = f"@{author.name} (display: {display}, via Discord)"
|
|
user_key = ""
|
|
return {
|
|
"CLAUDETOOLS_ACTOR": "discord-bot",
|
|
"CLAUDETOOLS_REQUESTER": label,
|
|
"CLAUDETOOLS_REQUESTER_USER": user_key,
|
|
}
|
|
|
|
async def handle_mention(self, message: discord.Message) -> None:
|
|
if message.author == self.bot.user:
|
|
return
|
|
|
|
# Strip the bot mention to get the raw user text.
|
|
user_text = message.content
|
|
for mention in message.mentions:
|
|
if mention == self.bot.user:
|
|
user_text = user_text.replace(f"<@{mention.id}>", "").replace(
|
|
f"<@!{mention.id}>", ""
|
|
).strip()
|
|
|
|
if not user_text and not message.attachments:
|
|
await message.reply("Hey! How can I help?")
|
|
return
|
|
|
|
# Resolve or create the thread first so we can include its ID in context.
|
|
if isinstance(message.channel, discord.Thread):
|
|
thread = message.channel
|
|
else:
|
|
name = self._thread_name(user_text) if user_text else "Attachment"
|
|
thread = await message.create_thread(
|
|
name=name,
|
|
auto_archive_duration=1440,
|
|
)
|
|
|
|
# Build caller-identity header so the agent always knows who is asking.
|
|
# Thread ID is included so the agent can delete the thread on completion.
|
|
author = message.author
|
|
display = getattr(author, "display_name", author.name)
|
|
guild_name = message.guild.name if message.guild else "DM"
|
|
channel_name = getattr(message.channel, "name", "unknown")
|
|
discord_ctx = (
|
|
"[DISCORD_CONTEXT]\n"
|
|
f"User: @{author.name}"
|
|
+ (f" (display: {display})" if display != author.name else "")
|
|
+ f" | ID: {author.id}\n"
|
|
f"Channel: #{channel_name} | Thread ID: {thread.id} | Guild: {guild_name}\n"
|
|
"[/DISCORD_CONTEXT]\n\n"
|
|
)
|
|
content = (discord_ctx + user_text).strip()
|
|
|
|
attachment_paths = await self._download_attachments(message, thread.id)
|
|
if attachment_paths:
|
|
lines = "\n".join(f"- {p}" for p in attachment_paths)
|
|
content = (content + f"\n\nUser attached files:\n{lines}").strip()
|
|
|
|
if not content:
|
|
content = "User uploaded file(s) without a message."
|
|
|
|
# Attribution pins to the THREAD OPENER: this env is honored only when the
|
|
# thread's SDK session is first created and is reused for every later turn.
|
|
# If a second person posts in the same thread, the work is still credited
|
|
# to whoever opened it (a thread = one person's request — see DISCORD_CLAUDE.md).
|
|
req_env = self._requester_env(author)
|
|
await self._run_turn(thread, content, req_env)
|
|
|
|
@staticmethod
|
|
async def _download_attachments(
|
|
message: discord.Message, thread_id: int
|
|
) -> list[Path]:
|
|
if not message.attachments:
|
|
return []
|
|
|
|
target_dir = ATTACHMENT_ROOT / str(thread_id)
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
saved: list[Path] = []
|
|
running_total = 0
|
|
for att in message.attachments:
|
|
if att.size > MAX_FILE_BYTES:
|
|
logger.warning(
|
|
"[WARNING] skipping %s: %d bytes exceeds per-file cap %d",
|
|
att.filename, att.size, MAX_FILE_BYTES,
|
|
)
|
|
continue
|
|
if running_total + att.size > MAX_TURN_BYTES:
|
|
logger.warning(
|
|
"[WARNING] skipping %s: would exceed per-turn cap %d",
|
|
att.filename, MAX_TURN_BYTES,
|
|
)
|
|
continue
|
|
safe_name = Path(att.filename).name or f"attachment_{att.id}"
|
|
target = target_dir / safe_name
|
|
try:
|
|
await att.save(target)
|
|
except discord.HTTPException as e:
|
|
logger.warning("[WARNING] failed to download %s: %s", att.filename, e)
|
|
continue
|
|
saved.append(target)
|
|
running_total += att.size
|
|
logger.info("[INFO] saved attachment %s (%d bytes)", target, att.size)
|
|
return saved
|
|
|
|
@staticmethod
|
|
def _cleanup_attachments(thread_id: int) -> None:
|
|
target_dir = ATTACHMENT_ROOT / str(thread_id)
|
|
if target_dir.exists():
|
|
try:
|
|
shutil.rmtree(target_dir)
|
|
except OSError as e:
|
|
logger.warning("[WARNING] attachment cleanup failed for %d: %s", thread_id, e)
|
|
|
|
@staticmethod
|
|
def _thread_name(message: str) -> str:
|
|
name = message[:50].replace("\n", " ").strip()
|
|
if len(message) > 50:
|
|
name += "..."
|
|
return name or "ClaudeTools Conversation"
|
|
|
|
async def _run_turn(
|
|
self,
|
|
thread: discord.Thread,
|
|
user_message: str,
|
|
env: dict[str, str] | None = None,
|
|
) -> None:
|
|
# The Claude Code CLI cold-start can take a few seconds; show feedback.
|
|
# (Shown immediately; queued turns sit here while an earlier turn runs.)
|
|
status_msg = await thread.send("[INFO] Thinking...")
|
|
|
|
buffer: list[str] = []
|
|
last_edit = 0.0
|
|
lock = asyncio.Lock()
|
|
|
|
async def on_text(chunk: str) -> None:
|
|
nonlocal last_edit
|
|
buffer.append(chunk)
|
|
now = asyncio.get_event_loop().time()
|
|
if now - last_edit < EDIT_THROTTLE_SECONDS:
|
|
return
|
|
async with lock:
|
|
last_edit = asyncio.get_event_loop().time()
|
|
preview = "".join(buffer)
|
|
if len(preview) > DISCORD_MAX_LEN:
|
|
preview = preview[-DISCORD_MAX_LEN:]
|
|
try:
|
|
await status_msg.edit(content=preview or "[INFO] Thinking...")
|
|
except discord.errors.NotFound:
|
|
pass
|
|
|
|
async def on_tool_use(tool_name: str) -> None:
|
|
# Server-side log only — Discord-side notices clutter the thread
|
|
# and ordering issues push them below the streaming answer.
|
|
logger.info("[INFO] thread=%d tool=%s", thread.id, tool_name)
|
|
|
|
try:
|
|
# Serialize turns within this thread: a message that lands while a
|
|
# turn is in flight (or a second user chiming in) waits here and runs
|
|
# next, in order — nothing collides on the single SDK session, nothing
|
|
# is dropped. Separate threads still run concurrently.
|
|
turn_lock = self._thread_locks.setdefault(thread.id, asyncio.Lock())
|
|
async with turn_lock:
|
|
agent = await self.agents.get_or_create(thread.id, env=env)
|
|
full_text = await agent.send(user_message, on_text, on_tool_use)
|
|
except Exception as e:
|
|
logger.exception("[ERROR] Agent turn failed in thread %d", thread.id)
|
|
await status_msg.edit(content=f"[ERROR] {e}")
|
|
self._cleanup_attachments(thread.id)
|
|
return
|
|
|
|
await self._post_final(status_msg, thread, full_text or "[INFO] (no response)")
|
|
self._cleanup_attachments(thread.id)
|
|
|
|
async def _post_final(
|
|
self,
|
|
status_msg: discord.Message,
|
|
thread: discord.Thread,
|
|
text: str,
|
|
) -> None:
|
|
chunks = self._split(text)
|
|
# Deliver the finished answer as NEW message(s) at the BOTTOM of the
|
|
# thread, like a normal human reply — do NOT edit the in-place
|
|
# "Thinking..." preview into the answer. The live edit is good for showing
|
|
# progress while working; the final answer belongs at the bottom where the
|
|
# next person expects it. (A fresh send is also what makes any <@id>
|
|
# mention actually notify the user — edits do not ping.)
|
|
try:
|
|
await status_msg.delete()
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
for chunk in chunks:
|
|
await thread.send(chunk)
|
|
|
|
@staticmethod
|
|
def _split(content: str, max_len: int = DISCORD_MAX_LEN) -> list[str]:
|
|
if len(content) <= max_len:
|
|
return [content]
|
|
|
|
chunks: list[str] = []
|
|
current = ""
|
|
for line in content.split("\n"):
|
|
# A single line longer than the limit must be hard-split mid-line.
|
|
while len(line) > max_len:
|
|
if current:
|
|
chunks.append(current.rstrip())
|
|
current = ""
|
|
chunks.append(line[:max_len])
|
|
line = line[max_len:]
|
|
if len(current) + len(line) + 1 > max_len:
|
|
chunks.append(current.rstrip())
|
|
current = line + "\n"
|
|
else:
|
|
current += line + "\n"
|
|
if current:
|
|
chunks.append(current.rstrip())
|
|
return chunks
|