"""Discord message handler. Maps each thread to a persistent Claude Agent session.""" from __future__ import annotations import asyncio import json import logging import shutil from pathlib import Path import discord from bot.claude.client import ClaudeAgentManager from bot.config import settings logger = logging.getLogger(__name__) DISCORD_MAX_LEN = 2000 EDIT_THROTTLE_SECONDS = 0.75 MAX_FILE_BYTES = 25 * 1024 * 1024 MAX_TURN_BYTES = 100 * 1024 * 1024 ATTACHMENT_ROOT = settings.claudetools_root / "projects" / "discord-bot" / ".attachments" class MessageHandler: _USER_MAP: dict[str, dict] | None = None def __init__(self, bot: discord.Client, agents: ClaudeAgentManager) -> None: self.bot = bot self.agents = agents # One lock per thread serializes turns: a message that arrives while the # bot is mid-turn (or a second user chiming in) waits and is processed # next, in order — nothing collides on the single SDK session, nothing # is dropped. Different threads still run concurrently (separate locks). self._thread_locks: dict[int, asyncio.Lock] = {} @classmethod def _users(cls) -> dict[str, dict]: """discord_id -> {user, full_name} map from .claude/users.json (cached).""" if cls._USER_MAP is None: mapping: dict[str, dict] = {} try: p = settings.claudetools_root / ".claude" / "users.json" data = json.loads(p.read_text(encoding="utf-8")) for key, u in (data.get("users") or {}).items(): did = str(u.get("discord_id") or "").strip() if did: mapping[did] = {"user": key, "full_name": u.get("full_name", key)} cls._USER_MAP = mapping # cache only on a successful load except Exception as e: # noqa: BLE001 logger.warning("[WARNING] could not load users.json discord map: %s", e) return mapping # do NOT cache a failed/empty load — retry next call return cls._USER_MAP def _requester_env(self, author: discord.abc.User) -> dict[str, str]: """Per-session attribution env: marks the bot as executor and the Discord requester (mapped to a known user key when their discord_id is on file).""" mapped = self._users().get(str(author.id)) display = getattr(author, "display_name", author.name) if mapped: label = f"{mapped['full_name']} (@{author.name}, via Discord)" user_key = mapped["user"] else: label = f"@{author.name} (display: {display}, via Discord)" user_key = "" return { "CLAUDETOOLS_ACTOR": "discord-bot", "CLAUDETOOLS_REQUESTER": label, "CLAUDETOOLS_REQUESTER_USER": user_key, } async def handle_mention(self, message: discord.Message) -> None: if message.author == self.bot.user: return # Strip the bot mention to get the raw user text. user_text = message.content for mention in message.mentions: if mention == self.bot.user: user_text = user_text.replace(f"<@{mention.id}>", "").replace( f"<@!{mention.id}>", "" ).strip() if not user_text and not message.attachments: await message.reply("Hey! How can I help?") return # Resolve or create the thread first so we can include its ID in context. if isinstance(message.channel, discord.Thread): thread = message.channel else: name = self._thread_name(user_text) if user_text else "Attachment" thread = await message.create_thread( name=name, auto_archive_duration=1440, ) # Build caller-identity header so the agent always knows who is asking. # Thread ID is included so the agent can delete the thread on completion. author = message.author display = getattr(author, "display_name", author.name) guild_name = message.guild.name if message.guild else "DM" channel_name = getattr(message.channel, "name", "unknown") discord_ctx = ( "[DISCORD_CONTEXT]\n" f"User: @{author.name}" + (f" (display: {display})" if display != author.name else "") + f" | ID: {author.id}\n" f"Channel: #{channel_name} | Thread ID: {thread.id} | Guild: {guild_name}\n" "[/DISCORD_CONTEXT]\n\n" ) content = (discord_ctx + user_text).strip() attachment_paths = await self._download_attachments(message, thread.id) if attachment_paths: lines = "\n".join(f"- {p}" for p in attachment_paths) content = (content + f"\n\nUser attached files:\n{lines}").strip() if not content: content = "User uploaded file(s) without a message." # Attribution pins to the THREAD OPENER: this env is honored only when the # thread's SDK session is first created and is reused for every later turn. # If a second person posts in the same thread, the work is still credited # to whoever opened it (a thread = one person's request — see DISCORD_CLAUDE.md). req_env = self._requester_env(author) await self._run_turn(thread, content, req_env) @staticmethod async def _download_attachments( message: discord.Message, thread_id: int ) -> list[Path]: if not message.attachments: return [] target_dir = ATTACHMENT_ROOT / str(thread_id) target_dir.mkdir(parents=True, exist_ok=True) saved: list[Path] = [] running_total = 0 for att in message.attachments: if att.size > MAX_FILE_BYTES: logger.warning( "[WARNING] skipping %s: %d bytes exceeds per-file cap %d", att.filename, att.size, MAX_FILE_BYTES, ) continue if running_total + att.size > MAX_TURN_BYTES: logger.warning( "[WARNING] skipping %s: would exceed per-turn cap %d", att.filename, MAX_TURN_BYTES, ) continue safe_name = Path(att.filename).name or f"attachment_{att.id}" target = target_dir / safe_name try: await att.save(target) except discord.HTTPException as e: logger.warning("[WARNING] failed to download %s: %s", att.filename, e) continue saved.append(target) running_total += att.size logger.info("[INFO] saved attachment %s (%d bytes)", target, att.size) return saved @staticmethod def _cleanup_attachments(thread_id: int) -> None: target_dir = ATTACHMENT_ROOT / str(thread_id) if target_dir.exists(): try: shutil.rmtree(target_dir) except OSError as e: logger.warning("[WARNING] attachment cleanup failed for %d: %s", thread_id, e) @staticmethod def _thread_name(message: str) -> str: name = message[:50].replace("\n", " ").strip() if len(message) > 50: name += "..." return name or "ClaudeTools Conversation" async def _run_turn( self, thread: discord.Thread, user_message: str, env: dict[str, str] | None = None, ) -> None: # The Claude Code CLI cold-start can take a few seconds; show feedback. # (Shown immediately; queued turns sit here while an earlier turn runs.) status_msg = await thread.send("[INFO] Thinking...") buffer: list[str] = [] last_edit = 0.0 lock = asyncio.Lock() async def on_text(chunk: str) -> None: nonlocal last_edit buffer.append(chunk) now = asyncio.get_event_loop().time() if now - last_edit < EDIT_THROTTLE_SECONDS: return async with lock: last_edit = asyncio.get_event_loop().time() preview = "".join(buffer) if len(preview) > DISCORD_MAX_LEN: preview = preview[-DISCORD_MAX_LEN:] try: await status_msg.edit(content=preview or "[INFO] Thinking...") except discord.errors.NotFound: pass async def on_tool_use(tool_name: str) -> None: # Server-side log only — Discord-side notices clutter the thread # and ordering issues push them below the streaming answer. logger.info("[INFO] thread=%d tool=%s", thread.id, tool_name) try: # Serialize turns within this thread: a message that lands while a # turn is in flight (or a second user chiming in) waits here and runs # next, in order — nothing collides on the single SDK session, nothing # is dropped. Separate threads still run concurrently. turn_lock = self._thread_locks.setdefault(thread.id, asyncio.Lock()) async with turn_lock: agent = await self.agents.get_or_create(thread.id, env=env) full_text = await agent.send(user_message, on_text, on_tool_use) except Exception as e: logger.exception("[ERROR] Agent turn failed in thread %d", thread.id) await status_msg.edit(content=f"[ERROR] {e}") self._cleanup_attachments(thread.id) return await self._post_final(status_msg, thread, full_text or "[INFO] (no response)") self._cleanup_attachments(thread.id) async def _post_final( self, status_msg: discord.Message, thread: discord.Thread, text: str, ) -> None: chunks = self._split(text) # Deliver the finished answer as NEW message(s) at the BOTTOM of the # thread, like a normal human reply — do NOT edit the in-place # "Thinking..." preview into the answer. The live edit is good for showing # progress while working; the final answer belongs at the bottom where the # next person expects it. (A fresh send is also what makes any <@id> # mention actually notify the user — edits do not ping.) try: await status_msg.delete() except discord.errors.HTTPException: pass for chunk in chunks: await thread.send(chunk) @staticmethod def _split(content: str, max_len: int = DISCORD_MAX_LEN) -> list[str]: if len(content) <= max_len: return [content] chunks: list[str] = [] current = "" for line in content.split("\n"): # A single line longer than the limit must be hard-split mid-line. while len(line) > max_len: if current: chunks.append(current.rstrip()) current = "" chunks.append(line[:max_len]) line = line[max_len:] if len(current) + len(line) + 1 > max_len: chunks.append(current.rstrip()) current = line + "\n" else: current += line + "\n" if current: chunks.append(current.rstrip()) return chunks