"""Claude API client with streaming support for Discord.""" import asyncio from datetime import datetime from typing import Callable, Optional, Any import discord from anthropic import AsyncAnthropic from anthropic.types import MessageStreamEvent from bot.config import settings from bot.claude.tools import TOOLS, SYSTEM_PROMPT_TEMPLATE class ClaudeClient: """Wrapper around Anthropic SDK for Discord bot usage.""" def __init__(self): self.client = AsyncAnthropic(api_key=settings.anthropic_api_key) self.model = settings.claude_model def format_system_prompt( self, discord_user: discord.User, channel_name: str, thread_name: str, user_role: str = "unknown" ) -> str: """Format system prompt with current context.""" return SYSTEM_PROMPT_TEMPLATE.format( discord_username=discord_user.name, discord_id=discord_user.id, role=user_role, channel_name=channel_name, thread_name=thread_name, datetime_utc=datetime.utcnow().isoformat() ) async def stream_response( self, messages: list[dict], system_prompt: str, tool_executor: Optional[Callable] = None, progress_callback: Optional[Callable] = None ) -> tuple[str, list[dict]]: """ Stream a response from Claude, executing tools as needed. Args: messages: Conversation history system_prompt: System prompt with context tool_executor: Async function to execute tool calls progress_callback: Async function to call with progress updates Returns: Tuple of (final_response_text, tool_results) """ final_text = "" tool_results = [] async with self.client.messages.stream( model=self.model, max_tokens=4096, system=system_prompt, messages=messages, tools=TOOLS ) as stream: async for event in stream: if event.type == "content_block_start": if event.content_block.type == "tool_use": # Tool call starting tool_name = event.content_block.name if progress_callback: await progress_callback(f"🔧 Calling {tool_name}...") elif event.type == "content_block_delta": if hasattr(event.delta, "text"): # Text content streaming final_text += event.delta.text if progress_callback and len(final_text) % 500 == 0: # Send progress update every 500 chars await progress_callback(final_text) elif event.type == "message_stop": # Check for tool uses message = await stream.get_final_message() for block in message.content: if block.type == "tool_use": # Execute tool tool_name = block.name tool_input = block.input if tool_executor: try: if progress_callback: await progress_callback( f"⚙️ Executing {tool_name}..." ) result = await tool_executor(tool_name, tool_input) tool_results.append({ "name": tool_name, "input": tool_input, "result": result }) if progress_callback: await progress_callback( f"✅ {tool_name} complete" ) except Exception as e: error_msg = f"Error in {tool_name}: {str(e)}" tool_results.append({ "name": tool_name, "input": tool_input, "error": error_msg }) if progress_callback: await progress_callback(f"❌ {error_msg}") elif block.type == "text": final_text += block.text return final_text, tool_results async def simple_ask( self, user_message: str, conversation_history: list[dict], system_prompt: str ) -> str: """ Simple non-streaming request without tools. Useful for summarization and simple queries. """ messages = conversation_history + [ {"role": "user", "content": user_message} ] response = await self.client.messages.create( model=self.model, max_tokens=4096, system=system_prompt, messages=messages ) return response.content[0].text if response.content else ""