feat: Discord bot Phase 1 MVP implementation

Implemented Phase 1 of ClaudeTools Discord bot with:

Core Features:
- Discord.py bot with message content intents
- Claude API integration with streaming responses
- Thread-based conversations with context management
- @mention handling with automatic thread creation
- Tool definitions for future ClaudeTools/remediation integration

Architecture:
- bot/main.py: Entry point with Discord client setup
- bot/config.py: Pydantic Settings for environment config
- bot/claude/client.py: Anthropic SDK wrapper with streaming
- bot/claude/tools.py: Tool definitions and system prompt
- bot/handlers/message_handler.py: Discord message handling

Configuration:
- requirements.txt: Python dependencies (discord.py, anthropic, httpx)
- .env.example: Environment variable template
- .gitignore: Sensitive data protection
- README.md: Comprehensive setup and usage guide

Next Steps (Phase 2):
- Implement tool execution (ClaudeTools API client)
- Add user role mapping and permissions
- Implement audit logging

Deployment Target: BEAST (Windows) as NSSM service
Test: @ClaudeTools hello should create thread and stream response

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-04-30 20:40:24 -07:00
parent 8cf4bfe614
commit 777ad52803
15 changed files with 1026 additions and 0 deletions

View File

View File

@@ -0,0 +1,149 @@
"""Claude API client with streaming support for Discord."""
import asyncio
from datetime import datetime
from typing import Callable, Optional, Any
import discord
from anthropic import AsyncAnthropic
from anthropic.types import MessageStreamEvent
from bot.config import settings
from bot.claude.tools import TOOLS, SYSTEM_PROMPT_TEMPLATE
class ClaudeClient:
"""Wrapper around Anthropic SDK for Discord bot usage."""
def __init__(self):
self.client = AsyncAnthropic(api_key=settings.anthropic_api_key)
self.model = settings.claude_model
def format_system_prompt(
self,
discord_user: discord.User,
channel_name: str,
thread_name: str,
user_role: str = "unknown"
) -> str:
"""Format system prompt with current context."""
return SYSTEM_PROMPT_TEMPLATE.format(
discord_username=discord_user.name,
discord_id=discord_user.id,
role=user_role,
channel_name=channel_name,
thread_name=thread_name,
datetime_utc=datetime.utcnow().isoformat()
)
async def stream_response(
self,
messages: list[dict],
system_prompt: str,
tool_executor: Optional[Callable] = None,
progress_callback: Optional[Callable] = None
) -> tuple[str, list[dict]]:
"""
Stream a response from Claude, executing tools as needed.
Args:
messages: Conversation history
system_prompt: System prompt with context
tool_executor: Async function to execute tool calls
progress_callback: Async function to call with progress updates
Returns:
Tuple of (final_response_text, tool_results)
"""
final_text = ""
tool_results = []
async with self.client.messages.stream(
model=self.model,
max_tokens=4096,
system=system_prompt,
messages=messages,
tools=TOOLS
) as stream:
async for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
# Tool call starting
tool_name = event.content_block.name
if progress_callback:
await progress_callback(f"🔧 Calling {tool_name}...")
elif event.type == "content_block_delta":
if hasattr(event.delta, "text"):
# Text content streaming
final_text += event.delta.text
if progress_callback and len(final_text) % 500 == 0:
# Send progress update every 500 chars
await progress_callback(final_text)
elif event.type == "message_stop":
# Check for tool uses
message = await stream.get_final_message()
for block in message.content:
if block.type == "tool_use":
# Execute tool
tool_name = block.name
tool_input = block.input
if tool_executor:
try:
if progress_callback:
await progress_callback(
f"⚙️ Executing {tool_name}..."
)
result = await tool_executor(tool_name, tool_input)
tool_results.append({
"name": tool_name,
"input": tool_input,
"result": result
})
if progress_callback:
await progress_callback(
f"{tool_name} complete"
)
except Exception as e:
error_msg = f"Error in {tool_name}: {str(e)}"
tool_results.append({
"name": tool_name,
"input": tool_input,
"error": error_msg
})
if progress_callback:
await progress_callback(f"{error_msg}")
elif block.type == "text":
final_text += block.text
return final_text, tool_results
async def simple_ask(
self,
user_message: str,
conversation_history: list[dict],
system_prompt: str
) -> str:
"""
Simple non-streaming request without tools.
Useful for summarization and simple queries.
"""
messages = conversation_history + [
{"role": "user", "content": user_message}
]
response = await self.client.messages.create(
model=self.model,
max_tokens=4096,
system=system_prompt,
messages=messages
)
return response.content[0].text if response.content else ""

View File

@@ -0,0 +1,142 @@
"""Claude API tool definitions for ClaudeTools integration."""
TOOLS = [
{
"name": "query_claudetools_api",
"description": (
"Query the ClaudeTools MSP database. Use this for ALL data lookups including "
"clients, sessions, tasks, work items, billable time, infrastructure, "
"credentials, projects, and more. Returns JSON data from the API."
),
"input_schema": {
"type": "object",
"properties": {
"endpoint": {
"type": "string",
"description": (
"API endpoint path starting with /api/, e.g., '/api/clients', "
"'/api/sessions', '/api/tasks'"
)
},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE"],
"default": "GET",
"description": "HTTP method to use"
},
"params": {
"type": "object",
"description": (
"Query parameters as key-value pairs. Common params: "
"skip (offset), limit (page size), client_id, session_id, "
"status_filter, etc."
)
},
"body": {
"type": "object",
"description": "Request body for POST/PUT requests (JSON)"
}
},
"required": ["endpoint"]
}
},
{
"name": "run_breach_check",
"description": (
"Run a comprehensive 10-point M365 breach investigation on a single user account. "
"Checks: inbox rules, mailbox forwarding, OAuth consents, auth methods, "
"sign-ins (including foreign countries and legacy auth), directory audits, "
"risky user status, sent items, and deleted items. "
"Returns breach summary and artifact locations. "
"Requires tenant to be onboarded to remediation-tool."
),
"input_schema": {
"type": "object",
"properties": {
"tenant": {
"type": "string",
"description": (
"Tenant domain or GUID (e.g., 'cascadestucson.com' or "
"'4fcbb1f4-fbf9-4548-a93e-7d14a3c091e6')"
)
},
"upn": {
"type": "string",
"description": (
"User Principal Name - the user's email address "
"(e.g., 'john.trozzi@cascadestucson.com')"
)
}
},
"required": ["tenant", "upn"]
}
},
{
"name": "run_tenant_sweep",
"description": (
"Sweep an entire M365 tenant for security issues. "
"Checks: failed sign-ins from multiple foreign countries, "
"successful non-US sign-ins, B2B guest invitations, "
"consent/auth-method/role changes in directory audits, "
"and risky users (if IdentityRiskyUser consent granted). "
"Returns priority-sorted findings. "
"Requires tenant to be onboarded to remediation-tool."
),
"input_schema": {
"type": "object",
"properties": {
"tenant": {
"type": "string",
"description": (
"Tenant domain or GUID (e.g., 'dataforth.com' or "
"'dd4a82e8-85a3-44ac-8800-07945ab4d95f')"
)
}
},
"required": ["tenant"]
}
}
]
SYSTEM_PROMPT_TEMPLATE = """You are the ClaudeTools MSP Assistant for Arizona Computer Guru.
Available Tools:
1. query_claudetools_api - MSP database (clients, sessions, tasks, infrastructure, credentials)
2. run_breach_check - M365 user breach investigation (10-point audit)
3. run_tenant_sweep - M365 tenant-wide security sweep
Current Context:
- User: {discord_username} (Discord ID: {discord_id})
- Role: {role} (admin or tech)
- Channel: #{channel_name}
- Thread: {thread_name}
- DateTime: {datetime_utc}
Response Guidelines:
- Use Discord markdown: **bold**, `code`, ```language blocks```
- Keep responses under 2000 chars (Discord limit) - split into multiple messages if needed
- For structured data, use clear formatting or request embeds
- Ask before listing >5 items
- Security-conscious: NEVER expose credentials in responses
- Provide 1Password vault paths instead of actual secrets
Access Control:
- All team members: read-only queries, breach checks, tenant sweeps
- Mike/Howard only: remediation actions (require explicit confirmation)
- Dev/coding questions: refer to Mike or Howard
- NEVER execute destructive operations without explicit YES confirmation
Tool Usage:
- Use query_claudetools_api for ALL database lookups (don't make up data)
- Use run_breach_check for single-user M365 investigation
- Use run_tenant_sweep for tenant-wide M365 security analysis
- Chain tools when needed for complex multi-step queries
- Always cite which tool you used when presenting results
Remember:
- You're an MSP assistant - understand client/project/session/work item concepts
- Be concise but thorough
- If unsure, ask clarifying questions
- Guide users through multi-step processes
"""

View File

@@ -0,0 +1,82 @@
"""Configuration management for ClaudeTools Discord Bot."""
from pathlib import Path
from typing import Optional
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Bot configuration from environment variables."""
# Discord
discord_token: str = Field(..., description="Discord bot token")
discord_guild_id: Optional[int] = Field(None, description="Discord guild/server ID")
# Anthropic Claude API
anthropic_api_key: str = Field(..., description="Anthropic API key")
claude_model: str = Field(
default="claude-sonnet-4-5-20250929",
description="Claude model to use"
)
# ClaudeTools API
claudetools_api_url: str = Field(
default="http://172.16.3.30:8001",
description="ClaudeTools API base URL"
)
claudetools_api_key: str = Field(..., description="ClaudeTools API key")
# File Paths
vault_path: Path = Field(
default=Path("D:/vault"),
description="Path to SOPS vault"
)
claudetools_root: Path = Field(
default=Path("D:/claudetools"),
description="Path to ClaudeTools repository"
)
# Git Bash (for remediation scripts on Windows)
git_bash_path: Path = Field(
default=Path("C:/Program Files/Git/bin/bash.exe"),
description="Path to Git Bash executable"
)
# Logging
log_level: str = Field(default="INFO", description="Logging level")
log_file: Optional[Path] = Field(
default=Path("logs/bot.log"),
description="Log file path"
)
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore"
)
def validate_paths(self) -> None:
"""Validate that required paths exist."""
if not self.git_bash_path.exists():
raise FileNotFoundError(
f"Git Bash not found at {self.git_bash_path}. "
"Set GIT_BASH_PATH environment variable."
)
if not self.vault_path.exists():
raise FileNotFoundError(
f"Vault not found at {self.vault_path}. "
"Set VAULT_PATH environment variable."
)
if not self.claudetools_root.exists():
raise FileNotFoundError(
f"ClaudeTools not found at {self.claudetools_root}. "
"Set CLAUDETOOLS_ROOT environment variable."
)
# Global settings instance
settings = Settings()

View File

@@ -0,0 +1,186 @@
"""Discord message handler for @mentions and conversations."""
import asyncio
from typing import Optional
import discord
from bot.claude.client import ClaudeClient
class MessageHandler:
"""Handles Discord messages and coordinates with Claude API."""
def __init__(self, bot: discord.Client, claude_client: ClaudeClient):
self.bot = bot
self.claude = claude_client
# Store conversation history per thread
self.conversations: dict[int, list[dict]] = {}
async def handle_mention(self, message: discord.Message):
"""Handle a message that mentions the bot."""
# Don't respond to self
if message.author == self.bot.user:
return
# Extract the actual message content (remove bot mention)
content = message.content
for mention in message.mentions:
if mention == self.bot.user:
content = content.replace(f"<@{mention.id}>", "").strip()
if not content:
await message.reply("Hey! How can I help you?")
return
# Create a thread for this conversation if not already in one
thread = None
if isinstance(message.channel, discord.Thread):
thread = message.channel
else:
# Create new thread
thread_name = self._generate_thread_name(content)
thread = await message.create_thread(
name=thread_name,
auto_archive_duration=1440 # 24 hours
)
# Handle the conversation in the thread
await self.handle_conversation(thread, message.author, content)
def _generate_thread_name(self, message: str) -> str:
"""Generate a thread name from the first message."""
# Take first 50 chars, remove newlines
name = message[:50].replace("\n", " ").strip()
if len(message) > 50:
name += "..."
return name or "ClaudeTools Conversation"
async def handle_conversation(
self,
thread: discord.Thread,
user: discord.User,
user_message: str
):
"""Handle a conversation turn in a thread."""
# Get or initialize conversation history for this thread
thread_id = thread.id
if thread_id not in self.conversations:
self.conversations[thread_id] = []
# Add user message to history
self.conversations[thread_id].append({
"role": "user",
"content": user_message
})
# Send initial "thinking" message
thinking_msg = await thread.send("⏳ Thinking...")
# Format system prompt
channel_name = thread.parent.name if thread.parent else "Unknown"
system_prompt = self.claude.format_system_prompt(
discord_user=user,
channel_name=channel_name,
thread_name=thread.name,
user_role="unknown" # TODO: Look up actual role from database
)
# Stream response from Claude
current_content = ""
async def update_progress(text: str):
"""Update the Discord message with progress."""
nonlocal current_content
if text.startswith(("🔧", "⚙️", "", "")):
# Tool execution status update
await self._safe_edit(thinking_msg, text)
else:
# Text content update
current_content = text
await self._safe_edit(thinking_msg, current_content)
async def execute_tool(tool_name: str, tool_input: dict) -> str:
"""Execute a tool call (placeholder for now)."""
# TODO: Implement actual tool execution
return f"[Tool {tool_name} executed with {tool_input}]"
try:
# Stream the response
final_text, tool_results = await self.claude.stream_response(
messages=self.conversations[thread_id],
system_prompt=system_prompt,
tool_executor=execute_tool,
progress_callback=update_progress
)
# Format final response
response_text = final_text
# Add tool results if any
if tool_results:
response_text += "\n\n**Tools Used:**\n"
for result in tool_results:
if "error" in result:
response_text += f"- ❌ {result['name']}: {result['error']}\n"
else:
response_text += f"- ✅ {result['name']}\n"
# Update final message
await self._safe_edit(thinking_msg, response_text)
# Add assistant response to history
self.conversations[thread_id].append({
"role": "assistant",
"content": final_text
})
# Trim conversation history if too long (keep last 20 messages)
if len(self.conversations[thread_id]) > 20:
self.conversations[thread_id] = self.conversations[thread_id][-20:]
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
await self._safe_edit(thinking_msg, error_msg)
async def _safe_edit(self, message: discord.Message, content: str):
"""Safely edit a Discord message, handling 2000 char limit."""
if len(content) <= 2000:
try:
await message.edit(content=content)
except discord.errors.NotFound:
# Message was deleted, ignore
pass
else:
# Split into multiple messages
chunks = self._split_message(content)
try:
await message.edit(content=chunks[0])
# Send remaining chunks as new messages
for chunk in chunks[1:]:
await message.channel.send(chunk)
except discord.errors.NotFound:
pass
def _split_message(self, content: str, max_length: int = 2000) -> list[str]:
"""Split a message into chunks under max_length."""
if len(content) <= max_length:
return [content]
chunks = []
current_chunk = ""
# Split by lines to avoid breaking mid-sentence
lines = content.split("\n")
for line in lines:
if len(current_chunk) + len(line) + 1 <= max_length:
current_chunk += line + "\n"
else:
if current_chunk:
chunks.append(current_chunk.rstrip())
current_chunk = line + "\n"
if current_chunk:
chunks.append(current_chunk.rstrip())
return chunks

View File

@@ -0,0 +1,136 @@
"""ClaudeTools Discord Bot - Main Entry Point."""
import asyncio
import logging
import sys
from pathlib import Path
import discord
from discord.ext import commands
from dotenv import load_dotenv
from bot.config import settings
from bot.claude.client import ClaudeClient
from bot.handlers.message_handler import MessageHandler
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=getattr(logging, settings.log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
]
)
# Add file handler if log file specified
if settings.log_file:
settings.log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(settings.log_file)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logging.getLogger().addHandler(file_handler)
logger = logging.getLogger(__name__)
# Discord bot setup with required intents
intents = discord.Intents.default()
intents.message_content = True # Required to read message content
intents.guilds = True
intents.members = True
bot = commands.Bot(command_prefix="!", intents=intents)
# Initialize Claude client
claude_client = ClaudeClient()
# Initialize message handler
message_handler: MessageHandler = None
@bot.event
async def on_ready():
"""Called when the bot successfully connects to Discord."""
global message_handler
logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})")
logger.info(f"Connected to {len(bot.guilds)} guild(s)")
# Validate paths
try:
settings.validate_paths()
logger.info("Path validation successful")
logger.info(f"Vault path: {settings.vault_path}")
logger.info(f"ClaudeTools root: {settings.claudetools_root}")
logger.info(f"Git Bash: {settings.git_bash_path}")
except FileNotFoundError as e:
logger.error(f"Path validation failed: {e}")
logger.error("Bot will continue but some features may not work")
# Initialize message handler
message_handler = MessageHandler(bot, claude_client)
# Set bot status
await bot.change_presence(
activity=discord.Activity(
type=discord.ActivityType.watching,
name="for @mentions | ClaudeTools MSP Assistant"
)
)
logger.info("Bot is ready and listening for mentions!")
@bot.event
async def on_message(message: discord.Message):
"""Called when a message is sent in a channel the bot can see."""
# Ignore messages from bots
if message.author.bot:
return
# Check if bot was mentioned
if bot.user in message.mentions:
logger.info(
f"Mentioned by {message.author.name} in #{message.channel.name}: "
f"{message.content[:100]}"
)
await message_handler.handle_mention(message)
return
# Process commands (for future slash commands)
await bot.process_commands(message)
@bot.event
async def on_error(event: str, *args, **kwargs):
"""Called when an error occurs."""
logger.error(f"Error in {event}", exc_info=sys.exc_info())
async def main():
"""Main entry point."""
try:
logger.info("Starting ClaudeTools Discord Bot...")
logger.info(f"Claude Model: {settings.claude_model}")
logger.info(f"ClaudeTools API: {settings.claudetools_api_url}")
# Start the bot
async with bot:
await bot.start(settings.discord_token)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down...")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
raise
finally:
logger.info("Bot shut down complete")
if __name__ == "__main__":
asyncio.run(main())