Files
claudetools/api/services/quote_service.py
azcomputerguru a1a19f8c00 sync: Auto-sync from Mikes-MacBook-Air.local at 2026-03-09 08:14:13
Synced files:
- Session logs updated
- Latest context and credentials
- Command/directive updates

Machine: Mikes-MacBook-Air.local
Timestamp: 2026-03-09 08:14:13

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-09 08:14:13 -07:00

986 lines
28 KiB
Python

"""
Quote service layer for business logic and database operations.
This module handles all database operations for quotes, providing a clean
separation between the API routes and data access layer.
"""
import json
import logging
import secrets
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload
from api.models.quote import (
Quote,
QuoteActivity,
QuoteItem,
QuoteNotification,
QuoteStatus,
BillingFrequency,
)
from api.schemas.quote import (
QuoteCreate,
QuoteUpdate,
QuoteSubmit,
QuoteItemCreate,
QuoteAdminUpdate,
QuoteListItem,
QuoteStatsResponse,
)
from api.services.syncro_service import get_syncro_service
logger = logging.getLogger(__name__)
def generate_access_token() -> str:
"""
Generate a secure, URL-safe access token for quote access.
Returns:
str: A 43-character URL-safe token
"""
return secrets.token_urlsafe(32)
def calculate_totals(items: list[QuoteItem]) -> tuple[Decimal, Decimal, Decimal]:
"""
Calculate monthly, setup, and annual totals from quote items.
Args:
items: List of QuoteItem objects
Returns:
tuple: (monthly_total, setup_total, annual_total)
"""
monthly_total = Decimal("0.00")
setup_total = Decimal("0.00")
for item in items:
# Calculate line total
line_total = item.unit_price * item.quantity
# Add to appropriate total based on billing frequency
if item.billing_frequency == BillingFrequency.MONTHLY.value:
monthly_total += line_total
elif item.billing_frequency == BillingFrequency.QUARTERLY.value:
monthly_total += line_total / Decimal("3")
elif item.billing_frequency == BillingFrequency.ANNUAL.value:
monthly_total += line_total / Decimal("12")
# one_time items don't add to monthly
# Setup fees are always one-time
setup_total += item.setup_fee
# Annual total is monthly * 12 + setup
annual_total = (monthly_total * Decimal("12")) + setup_total
return monthly_total, setup_total, annual_total
def log_activity(
db: Session,
quote_id: str,
action: str,
description: Optional[str] = None,
actor: Optional[str] = None,
ip_address: Optional[str] = None,
metadata: Optional[dict] = None
) -> QuoteActivity:
"""
Log an activity for a quote.
Args:
db: Database session
quote_id: UUID of the quote
action: Action being performed
description: Detailed description
actor: Who performed the action
ip_address: IP address of the actor
metadata: Additional metadata as dict
Returns:
QuoteActivity: The created activity record
"""
activity = QuoteActivity(
quote_id=quote_id,
action=action,
description=description,
actor=actor,
ip_address=ip_address,
metadata=json.dumps(metadata) if metadata else None
)
db.add(activity)
db.flush()
return activity
def create_quote(
db: Session,
quote_data: QuoteCreate,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None
) -> Quote:
"""
Create a new quote draft with access token.
Args:
db: Database session
quote_data: Quote creation data
ip_address: IP address of the requester
user_agent: Browser user agent
Returns:
Quote: The created quote object
Example:
```python
quote_data = QuoteCreate(employee_count=25)
quote = create_quote(db, quote_data, ip_address="192.168.1.1")
print(f"Quote created: {quote.access_token}")
```
"""
try:
# Create quote with unique access token
quote = Quote(
access_token=generate_access_token(),
status=QuoteStatus.DRAFT.value,
employee_count=quote_data.employee_count,
notes=quote_data.notes,
ip_address=ip_address,
user_agent=user_agent,
# Set expiration to 30 days from now
expires_at=datetime.utcnow() + timedelta(days=30)
)
db.add(quote)
db.flush() # Get the quote ID
# Add initial items if provided
if quote_data.items:
for idx, item_data in enumerate(quote_data.items):
item = QuoteItem(
quote_id=quote.id,
service_name=item_data.service_name,
service_description=item_data.service_description,
category=item_data.category.value,
billing_frequency=item_data.billing_frequency.value,
unit_price=item_data.unit_price,
quantity=item_data.quantity,
setup_fee=item_data.setup_fee,
is_required=item_data.is_required,
sort_order=item_data.sort_order if item_data.sort_order else idx
)
db.add(item)
db.flush()
# Calculate and update totals
monthly, setup, annual = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
quote.annual_total = annual
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="created",
description="Quote draft created",
ip_address=ip_address,
metadata={"employee_count": quote_data.employee_count}
)
db.commit()
db.refresh(quote)
return quote
except IntegrityError as e:
db.rollback()
# If token collision (extremely rare), retry once
if "access_token" in str(e.orig):
return create_quote(db, quote_data, ip_address, user_agent)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create quote: {str(e)}"
)
def get_quote_by_token(db: Session, access_token: str) -> Quote:
"""
Retrieve a quote by its access token (public access).
Args:
db: Database session
access_token: The quote's access token
Returns:
Quote: The quote object with items loaded
Raises:
HTTPException: 404 if quote not found
"""
quote = (
db.query(Quote)
.options(joinedload(Quote.items))
.filter(Quote.access_token == access_token)
.first()
)
if not quote:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Quote not found"
)
# Check if expired
if quote.expires_at and quote.expires_at < datetime.utcnow():
if quote.status == QuoteStatus.DRAFT.value:
quote.status = QuoteStatus.EXPIRED.value
db.commit()
return quote
def get_quote_by_id(db: Session, quote_id: UUID) -> Quote:
"""
Retrieve a quote by its ID (admin access).
Args:
db: Database session
quote_id: UUID of the quote
Returns:
Quote: The quote object with all related data loaded
Raises:
HTTPException: 404 if quote not found
"""
quote = (
db.query(Quote)
.options(
joinedload(Quote.items),
joinedload(Quote.activities),
joinedload(Quote.notifications)
)
.filter(Quote.id == str(quote_id))
.first()
)
if not quote:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Quote with ID {quote_id} not found"
)
return quote
def update_quote(
db: Session,
access_token: str,
quote_data: QuoteUpdate,
ip_address: Optional[str] = None
) -> Quote:
"""
Update a quote (add/remove items, update details).
Only drafts can be updated. Replaces all items if items are provided.
Args:
db: Database session
access_token: The quote's access token
quote_data: Update data
ip_address: IP address of the requester
Returns:
Quote: The updated quote object
Raises:
HTTPException: 404 if not found, 400 if not a draft
"""
quote = get_quote_by_token(db, access_token)
# Only drafts can be updated
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot update quote with status '{quote.status}'. Only drafts can be modified."
)
try:
# Update basic fields if provided
update_data = quote_data.model_dump(exclude_unset=True, exclude={"items"})
changes = []
for field, value in update_data.items():
old_value = getattr(quote, field)
if old_value != value:
setattr(quote, field, value)
changes.append(f"{field}: {old_value} -> {value}")
# Replace items if provided
if quote_data.items is not None:
# Remove existing items
db.query(QuoteItem).filter(QuoteItem.quote_id == quote.id).delete()
# Add new items
for idx, item_data in enumerate(quote_data.items):
item = QuoteItem(
quote_id=quote.id,
service_name=item_data.service_name,
service_description=item_data.service_description,
category=item_data.category.value,
billing_frequency=item_data.billing_frequency.value,
unit_price=item_data.unit_price,
quantity=item_data.quantity,
setup_fee=item_data.setup_fee,
is_required=item_data.is_required,
sort_order=item_data.sort_order if item_data.sort_order else idx
)
db.add(item)
changes.append(f"items: replaced with {len(quote_data.items)} items")
db.flush()
# Recalculate totals
db.refresh(quote)
monthly, setup, annual = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
quote.annual_total = annual
# Log activity
if changes:
log_activity(
db=db,
quote_id=quote.id,
action="updated",
description=f"Quote updated: {', '.join(changes)}",
ip_address=ip_address
)
db.commit()
db.refresh(quote)
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update quote: {str(e)}"
)
def submit_quote(
db: Session,
access_token: str,
submit_data: QuoteSubmit,
ip_address: Optional[str] = None
) -> Quote:
"""
Submit a quote with contact information.
Transitions quote from draft to submitted status.
Args:
db: Database session
access_token: The quote's access token
submit_data: Submission data with required contact info
ip_address: IP address of the requester
Returns:
Quote: The submitted quote object
Raises:
HTTPException: 404 if not found, 400 if not a draft or no items
"""
quote = get_quote_by_token(db, access_token)
# Only drafts can be submitted
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot submit quote with status '{quote.status}'. Only drafts can be submitted."
)
# Must have at least one item
if not quote.items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot submit quote without any items. Please add at least one service."
)
try:
# Update contact information
quote.company_name = submit_data.company_name
quote.contact_name = submit_data.contact_name
quote.contact_email = submit_data.contact_email
quote.contact_phone = submit_data.contact_phone
if submit_data.notes:
quote.notes = submit_data.notes
# Update status and timestamp
quote.status = QuoteStatus.SUBMITTED.value
quote.submitted_at = datetime.utcnow()
# Extend expiration to 90 days from submission
quote.expires_at = datetime.utcnow() + timedelta(days=90)
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="submitted",
description=f"Quote submitted by {submit_data.contact_name} ({submit_data.contact_email})",
actor=submit_data.contact_email,
ip_address=ip_address,
metadata={
"company_name": submit_data.company_name,
"contact_email": submit_data.contact_email,
"monthly_total": str(quote.monthly_total),
"setup_total": str(quote.setup_total)
}
)
# Create admin notification record (actual sending would be handled elsewhere)
notification = QuoteNotification(
quote_id=quote.id,
notification_type="admin_alert",
recipient="admin@example.com", # Would come from config in production
subject=f"New Quote Submission: {submit_data.company_name}",
content=f"Quote submitted by {submit_data.contact_name}. Monthly: ${quote.monthly_total}",
status="pending"
)
db.add(notification)
db.commit()
db.refresh(quote)
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to submit quote: {str(e)}"
)
async def sync_quote_to_syncro(db: Session, quote: Quote) -> dict:
"""
Sync a submitted quote to SyncroRMM.
Checks for existing customer and creates a lead in Syncro. Updates the
quote with sync status and existing customer flag.
This function is designed to be called after submit_quote() completes,
typically as a background task or in the API endpoint. It handles all
Syncro API errors gracefully to avoid blocking the quote submission.
Args:
db: Database session
quote: The submitted quote object (must have contact_email)
Returns:
dict: Sync result with keys:
- synced: bool - Whether lead was created successfully
- is_existing_customer: bool - Whether customer already exists
- syncro_lead_id: str|None - Lead ID if created
- error: str|None - Error message if sync failed
Example:
```python
quote = submit_quote(db, access_token, submit_data, ip_address)
sync_result = await sync_quote_to_syncro(db, quote)
if sync_result["synced"]:
print(f"Lead created: {sync_result['syncro_lead_id']}")
```
"""
result = {
"synced": False,
"is_existing_customer": False,
"syncro_lead_id": None,
"error": None
}
if not quote.contact_email:
result["error"] = "Quote has no contact email"
return result
try:
syncro = get_syncro_service()
# Check for existing customer
customer_check = await syncro.check_existing_customer(
email=quote.contact_email,
business_name=quote.company_name
)
if customer_check.exists:
quote.is_existing_customer = True
result["is_existing_customer"] = True
logger.info(
f"Quote {quote.id} is from existing customer: "
f"{customer_check.customer_name} (ID: {customer_check.customer_id}, "
f"match: {customer_check.match_type})"
)
# Log activity for existing customer
log_activity(
db=db,
quote_id=quote.id,
action="syncro_customer_found",
description=f"Existing Syncro customer found: {customer_check.customer_name}",
metadata={
"syncro_customer_id": customer_check.customer_id,
"match_type": customer_check.match_type
}
)
# Create lead in Syncro
lead_result = await syncro.create_lead(quote)
if lead_result.success:
quote.syncro_lead_id = lead_result.lead_id
quote.syncro_synced_at = datetime.utcnow()
result["synced"] = True
result["syncro_lead_id"] = lead_result.lead_id
# Log activity for successful sync
log_activity(
db=db,
quote_id=quote.id,
action="syncro_lead_created",
description=f"Lead created in Syncro: {lead_result.lead_id}",
metadata={
"syncro_lead_id": lead_result.lead_id,
"is_existing_customer": customer_check.exists
}
)
else:
result["error"] = lead_result.error
logger.warning(
f"Failed to create Syncro lead for quote {quote.id}: {lead_result.error}"
)
# Log activity for failed sync
log_activity(
db=db,
quote_id=quote.id,
action="syncro_sync_failed",
description=f"Failed to sync to Syncro: {lead_result.error}",
metadata={"error": lead_result.error}
)
# Commit the updates to quote
db.commit()
db.refresh(quote)
except Exception as e:
# Log error but don't fail the overall operation
error_msg = str(e)
result["error"] = error_msg
logger.error(
f"Unexpected error syncing quote {quote.id} to Syncro: {error_msg}",
exc_info=True
)
try:
log_activity(
db=db,
quote_id=quote.id,
action="syncro_sync_error",
description=f"Syncro sync error: {error_msg}",
metadata={"error": error_msg}
)
db.commit()
except Exception:
db.rollback()
return result
def list_quotes(
db: Session,
skip: int = 0,
limit: int = 100,
status_filter: Optional[str] = None,
search: Optional[str] = None
) -> tuple[list[Quote], int]:
"""
List quotes with pagination and optional filters (admin).
Args:
db: Database session
skip: Number of records to skip
limit: Maximum number of records to return
status_filter: Filter by status
search: Search in company_name, contact_name, contact_email
Returns:
tuple: (list of quotes, total count)
"""
query = db.query(Quote).options(joinedload(Quote.items))
# Apply filters
if status_filter:
query = query.filter(Quote.status == status_filter)
if search:
search_term = f"%{search}%"
query = query.filter(
(Quote.company_name.ilike(search_term)) |
(Quote.contact_name.ilike(search_term)) |
(Quote.contact_email.ilike(search_term))
)
# Get total count before pagination
total = query.count()
# Apply pagination and ordering
quotes = (
query
.order_by(Quote.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return quotes, total
def update_quote_status(
db: Session,
quote_id: UUID,
update_data: QuoteAdminUpdate,
admin_user: str
) -> Quote:
"""
Update quote status and admin notes (admin).
Args:
db: Database session
quote_id: UUID of the quote
update_data: Admin update data
admin_user: Username of the admin making the change
Returns:
Quote: The updated quote object
"""
quote = get_quote_by_id(db, quote_id)
try:
changes = []
if update_data.status is not None and update_data.status.value != quote.status:
old_status = quote.status
quote.status = update_data.status.value
changes.append(f"status: {old_status} -> {update_data.status.value}")
if update_data.admin_notes is not None:
quote.admin_notes = update_data.admin_notes
changes.append("admin_notes updated")
if update_data.expires_at is not None:
quote.expires_at = update_data.expires_at
changes.append(f"expires_at: {update_data.expires_at}")
# Log activity
if changes:
log_activity(
db=db,
quote_id=quote.id,
action="admin_update",
description=f"Admin update: {', '.join(changes)}",
actor=admin_user
)
db.commit()
db.refresh(quote)
return quote
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update quote status: {str(e)}"
)
def get_quote_stats(db: Session) -> QuoteStatsResponse:
"""
Get dashboard statistics for quotes (admin).
Args:
db: Database session
Returns:
QuoteStatsResponse: Statistics about quotes
"""
# Total quotes
total_quotes = db.query(Quote).count()
# Quotes by status
status_counts = (
db.query(Quote.status, func.count(Quote.id))
.group_by(Quote.status)
.all()
)
quotes_by_status = {status: count for status, count in status_counts}
# Total values for submitted quotes
submitted_statuses = [
QuoteStatus.SUBMITTED.value,
QuoteStatus.REVIEWING.value,
QuoteStatus.APPROVED.value
]
value_query = (
db.query(
func.sum(Quote.monthly_total),
func.sum(Quote.setup_total)
)
.filter(Quote.status.in_(submitted_statuses))
.first()
)
total_monthly_value = value_query[0] or Decimal("0.00")
total_setup_value = value_query[1] or Decimal("0.00")
# Quotes this month
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0)
quotes_this_month = (
db.query(Quote)
.filter(Quote.created_at >= month_start)
.count()
)
# Quotes submitted this month
quotes_submitted_this_month = (
db.query(Quote)
.filter(
Quote.submitted_at >= month_start,
Quote.submitted_at.isnot(None)
)
.count()
)
# Calculate averages and conversion rate
submitted_count = sum(
quotes_by_status.get(s, 0)
for s in submitted_statuses
)
average_monthly_value = (
total_monthly_value / submitted_count
if submitted_count > 0
else Decimal("0.00")
)
draft_count = quotes_by_status.get(QuoteStatus.DRAFT.value, 0)
total_started = draft_count + submitted_count
conversion_rate = (
(Decimal(submitted_count) / Decimal(total_started) * Decimal("100"))
if total_started > 0
else Decimal("0.00")
)
return QuoteStatsResponse(
total_quotes=total_quotes,
quotes_by_status=quotes_by_status,
total_monthly_value=total_monthly_value,
total_setup_value=total_setup_value,
quotes_this_month=quotes_this_month,
quotes_submitted_this_month=quotes_submitted_this_month,
average_monthly_value=round(average_monthly_value, 2),
conversion_rate=round(conversion_rate, 2)
)
def add_item_to_quote(
db: Session,
access_token: str,
item_data: QuoteItemCreate,
ip_address: Optional[str] = None
) -> Quote:
"""
Add a single item to a quote.
Args:
db: Database session
access_token: The quote's access token
item_data: Item data to add
ip_address: IP address of the requester
Returns:
Quote: The updated quote object
"""
quote = get_quote_by_token(db, access_token)
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot add items to a non-draft quote"
)
try:
# Get next sort order
max_order = (
db.query(func.max(QuoteItem.sort_order))
.filter(QuoteItem.quote_id == quote.id)
.scalar()
) or 0
item = QuoteItem(
quote_id=quote.id,
service_name=item_data.service_name,
service_description=item_data.service_description,
category=item_data.category.value,
billing_frequency=item_data.billing_frequency.value,
unit_price=item_data.unit_price,
quantity=item_data.quantity,
setup_fee=item_data.setup_fee,
is_required=item_data.is_required,
sort_order=item_data.sort_order if item_data.sort_order else max_order + 1
)
db.add(item)
db.flush()
# Recalculate totals
db.refresh(quote)
monthly, setup, annual = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
quote.annual_total = annual
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="item_added",
description=f"Added item: {item_data.service_name}",
ip_address=ip_address
)
db.commit()
db.refresh(quote)
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add item: {str(e)}"
)
def remove_item_from_quote(
db: Session,
access_token: str,
item_id: UUID,
ip_address: Optional[str] = None
) -> Quote:
"""
Remove an item from a quote.
Args:
db: Database session
access_token: The quote's access token
item_id: UUID of the item to remove
ip_address: IP address of the requester
Returns:
Quote: The updated quote object
"""
quote = get_quote_by_token(db, access_token)
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot remove items from a non-draft quote"
)
# Find the item
item = (
db.query(QuoteItem)
.filter(QuoteItem.id == str(item_id), QuoteItem.quote_id == quote.id)
.first()
)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item with ID {item_id} not found in this quote"
)
if item.is_required:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot remove required items from the quote"
)
try:
item_name = item.service_name
db.delete(item)
db.flush()
# Recalculate totals
db.refresh(quote)
monthly, setup, annual = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
quote.annual_total = annual
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="item_removed",
description=f"Removed item: {item_name}",
ip_address=ip_address
)
db.commit()
db.refresh(quote)
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to remove item: {str(e)}"
)