Files
claudetools/api/services/quote_service.py
Mike Swanson fa15b03180 sync: Auto-sync from ACG-M-L5090 at 2026-03-10 19:11:00
Synced files:
- Quote wizard frontend (all components, hooks, types, config)
- API updates (config, models, routers, schemas, services)
- Client work (bg-builders, gurushow)
- Scripts (BGB Lesley termination, CIPP, Datto, migration)
- Temp files (Bardach contacts, VWP investigation, misc)
- Credentials and session logs
- Email service, PHP API, session logs

Machine: ACG-M-L5090
Timestamp: 2026-03-10 19:11:00

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 19:59:08 -07:00

947 lines
27 KiB
Python

"""
Quote service layer for business logic and database operations.
This module handles all database operations for quotes, providing a clean
separation between the API routes and data access layer.
"""
import logging
import os
import secrets
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload
from api.models.quote import (
Quote,
QuoteActivity,
QuoteItem,
QuoteNotification,
QuoteStatus,
BillingFrequency,
)
from api.schemas.quote import (
QuoteCreate,
QuoteUpdate,
QuoteSubmit,
QuoteItemCreate,
QuoteAdminUpdate,
QuoteListItem,
QuoteStatsResponse,
)
from api.services.syncro_service import get_syncro_service
logger = logging.getLogger(__name__)
def generate_access_token() -> str:
"""
Generate a secure, URL-safe access token for quote access.
Returns:
str: A 43-character URL-safe token
"""
return secrets.token_urlsafe(32)
def calculate_totals(items: list[QuoteItem]) -> tuple[Decimal, Decimal]:
"""
Calculate monthly and setup totals from quote items.
Args:
items: List of QuoteItem objects
Returns:
tuple: (monthly_total, setup_total)
"""
monthly_total = Decimal("0.00")
setup_total = Decimal("0.00")
for item in items:
# Calculate line total
line_total = item.unit_price * item.quantity
# Add to appropriate total based on billing frequency
if item.billing_frequency == BillingFrequency.MONTHLY.value:
monthly_total += line_total
elif item.billing_frequency == BillingFrequency.YEARLY.value:
monthly_total += line_total / Decimal("12")
# one_time items don't add to monthly
# Setup prices are always one-time
setup_total += item.setup_price
return monthly_total, setup_total
def log_activity(
db: Session,
quote_id: str,
action: str,
details: Optional[str] = None,
step_name: Optional[str] = None,
ip_address: Optional[str] = None,
) -> QuoteActivity:
"""
Log an activity for a quote.
Args:
db: Database session
quote_id: UUID of the quote
action: Action being performed
details: Additional details about the action (stored as JSON)
step_name: Wizard step name associated with the action
ip_address: IP address of the actor
Returns:
QuoteActivity: The created activity record
"""
import json
# DB column has CHECK (json_valid(details)), so wrap in JSON
details_json = json.dumps({"message": details}) if details else None
activity = QuoteActivity(
quote_id=quote_id,
action=action,
step_name=step_name,
details=details_json,
ip_address=ip_address,
)
db.add(activity)
db.flush()
return activity
def create_quote(
db: Session,
quote_data: QuoteCreate,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None
) -> Quote:
"""
Create a new quote draft with access token.
Args:
db: Database session
quote_data: Quote creation data
ip_address: IP address of the requester
user_agent: Browser user agent
Returns:
Quote: The created quote object
Example:
```python
quote_data = QuoteCreate(employee_count=25)
quote = create_quote(db, quote_data, ip_address="192.168.1.1")
print(f"Quote created: {quote.access_token}")
```
"""
try:
# Create quote with unique access token
quote = Quote(
access_token=generate_access_token(),
status=QuoteStatus.DRAFT.value,
employee_count=quote_data.employee_count,
ip_address=ip_address,
user_agent=user_agent,
# Set expiration to 30 days from now
expires_at=datetime.utcnow() + timedelta(days=30)
)
db.add(quote)
db.flush() # Get the quote ID
# Add initial items if provided
if quote_data.items:
for idx, item_data in enumerate(quote_data.items):
item = QuoteItem(
quote_id=quote.id,
category=item_data.category.value,
product_code=item_data.product_code,
product_name=item_data.product_name,
description=item_data.description,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
setup_price=item_data.setup_price,
billing_frequency=item_data.billing_frequency.value,
tier=item_data.tier,
is_recommended=item_data.is_recommended,
)
db.add(item)
db.flush()
# Calculate and update totals
monthly, setup = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="created",
details=f"Quote draft created, employee_count={quote_data.employee_count}",
ip_address=ip_address,
)
db.commit()
db.refresh(quote)
return quote
except IntegrityError as e:
db.rollback()
# If token collision (extremely rare), retry once
if "access_token" in str(e.orig):
return create_quote(db, quote_data, ip_address, user_agent)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create quote: {str(e)}"
)
def get_quote_by_token(db: Session, access_token: str) -> Quote:
"""
Retrieve a quote by its access token (public access).
Args:
db: Database session
access_token: The quote's access token
Returns:
Quote: The quote object with items loaded
Raises:
HTTPException: 404 if quote not found
"""
quote = (
db.query(Quote)
.options(joinedload(Quote.items))
.filter(Quote.access_token == access_token)
.first()
)
if not quote:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Quote not found"
)
# Check if expired
if quote.expires_at and quote.expires_at < datetime.utcnow():
if quote.status == QuoteStatus.DRAFT.value:
quote.status = QuoteStatus.EXPIRED.value
db.commit()
return quote
def get_quote_by_id(db: Session, quote_id: UUID) -> Quote:
"""
Retrieve a quote by its ID (admin access).
Args:
db: Database session
quote_id: UUID of the quote
Returns:
Quote: The quote object with all related data loaded
Raises:
HTTPException: 404 if quote not found
"""
quote = (
db.query(Quote)
.options(
joinedload(Quote.items),
joinedload(Quote.activities),
joinedload(Quote.notifications)
)
.filter(Quote.id == str(quote_id))
.first()
)
if not quote:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Quote with ID {quote_id} not found"
)
return quote
def update_quote(
db: Session,
access_token: str,
quote_data: QuoteUpdate,
ip_address: Optional[str] = None
) -> Quote:
"""
Update a quote (add/remove items, update details).
Only drafts can be updated. Replaces all items if items are provided.
Args:
db: Database session
access_token: The quote's access token
quote_data: Update data
ip_address: IP address of the requester
Returns:
Quote: The updated quote object
Raises:
HTTPException: 404 if not found, 400 if not a draft
"""
quote = get_quote_by_token(db, access_token)
# Only drafts can be updated
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot update quote with status '{quote.status}'. Only drafts can be modified."
)
try:
# Update basic fields if provided
update_data = quote_data.model_dump(exclude_unset=True, exclude={"items"})
changes = []
for field, value in update_data.items():
old_value = getattr(quote, field)
if old_value != value:
setattr(quote, field, value)
changes.append(f"{field}: {old_value} -> {value}")
# Replace items if provided
if quote_data.items is not None:
# Remove existing items
db.query(QuoteItem).filter(QuoteItem.quote_id == quote.id).delete()
# Add new items
for idx, item_data in enumerate(quote_data.items):
item = QuoteItem(
quote_id=quote.id,
category=item_data.category.value,
product_code=item_data.product_code,
product_name=item_data.product_name,
description=item_data.description,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
setup_price=item_data.setup_price,
billing_frequency=item_data.billing_frequency.value,
tier=item_data.tier,
is_recommended=item_data.is_recommended,
)
db.add(item)
changes.append(f"items: replaced with {len(quote_data.items)} items")
db.flush()
# Recalculate totals
db.refresh(quote)
monthly, setup = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
# Log activity
if changes:
log_activity(
db=db,
quote_id=quote.id,
action="updated",
details=f"Quote updated: {', '.join(changes)}",
ip_address=ip_address
)
db.commit()
db.refresh(quote)
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update quote: {str(e)}"
)
def submit_quote(
db: Session,
access_token: str,
submit_data: QuoteSubmit,
ip_address: Optional[str] = None
) -> Quote:
"""
Submit a quote with contact information.
Transitions quote from draft to submitted status.
Args:
db: Database session
access_token: The quote's access token
submit_data: Submission data with required contact info
ip_address: IP address of the requester
Returns:
Quote: The submitted quote object
Raises:
HTTPException: 404 if not found, 400 if not a draft or no items
"""
quote = get_quote_by_token(db, access_token)
# Only drafts can be submitted
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot submit quote with status '{quote.status}'. Only drafts can be submitted."
)
# Must have at least one item
if not quote.items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot submit quote without any items. Please add at least one service."
)
try:
# Update contact information
quote.company_name = submit_data.company_name
quote.contact_name = submit_data.contact_name
quote.contact_email = submit_data.contact_email
quote.contact_phone = submit_data.contact_phone
# Update status and timestamp
quote.status = QuoteStatus.SUBMITTED.value
quote.submitted_at = datetime.utcnow()
# Extend expiration to 90 days from submission
quote.expires_at = datetime.utcnow() + timedelta(days=90)
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="submitted",
details=f"Quote submitted by {submit_data.contact_name} ({submit_data.contact_email}), company={submit_data.company_name}, monthly=${quote.monthly_total}, setup=${quote.setup_total}",
ip_address=ip_address,
)
# Create admin notification record (actual sending would be handled elsewhere)
notification = QuoteNotification(
quote_id=quote.id,
notification_type="email",
recipient=os.environ.get("ADMIN_NOTIFICATION_EMAIL", "mike@azcomputerguru.com"),
subject=f"New Quote Submission: {submit_data.company_name}",
body=f"Quote submitted by {submit_data.contact_name}. Monthly: ${quote.monthly_total}",
status="pending"
)
db.add(notification)
db.commit()
db.refresh(quote)
# Syncro sync is handled via the admin endpoint POST /{quote_id}/sync-syncro
# or can be triggered manually after submission. Not run inline to avoid
# async/sync mixing and DB session lifecycle issues.
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to submit quote: {str(e)}"
)
async def sync_quote_to_syncro(db: Session, quote: Quote) -> dict:
"""
Sync a submitted quote to SyncroRMM.
Checks for existing customer and creates a lead in Syncro. Updates the
quote with sync status and existing customer flag.
This function is designed to be called after submit_quote() completes,
typically as a background task or in the API endpoint. It handles all
Syncro API errors gracefully to avoid blocking the quote submission.
Args:
db: Database session
quote: The submitted quote object (must have contact_email)
Returns:
dict: Sync result with keys:
- synced: bool - Whether lead was created successfully
- is_existing_customer: bool - Whether customer already exists
- syncro_lead_id: str|None - Lead ID if created
- error: str|None - Error message if sync failed
Example:
```python
quote = submit_quote(db, access_token, submit_data, ip_address)
sync_result = await sync_quote_to_syncro(db, quote)
if sync_result["synced"]:
print(f"Lead created: {sync_result['syncro_lead_id']}")
```
"""
result = {
"synced": False,
"is_existing_customer": False,
"syncro_lead_id": None,
"error": None
}
if not quote.contact_email:
result["error"] = "Quote has no contact email"
return result
try:
syncro = get_syncro_service()
# Check for existing customer
customer_check = await syncro.check_existing_customer(
email=quote.contact_email,
business_name=quote.company_name
)
if customer_check.exists:
quote.is_existing_customer = True
result["is_existing_customer"] = True
logger.info(
f"Quote {quote.id} is from existing customer: "
f"{customer_check.customer_name} (ID: {customer_check.customer_id}, "
f"match: {customer_check.match_type})"
)
# Log activity for existing customer
log_activity(
db=db,
quote_id=quote.id,
action="syncro_customer_found",
details=f"Existing Syncro customer found: {customer_check.customer_name} (ID: {customer_check.customer_id}, match: {customer_check.match_type})",
)
# Create lead in Syncro
lead_result = await syncro.create_lead(quote)
if lead_result.success:
quote.syncro_lead_id = lead_result.lead_id
quote.syncro_synced_at = datetime.utcnow()
result["synced"] = True
result["syncro_lead_id"] = lead_result.lead_id
# Log activity for successful sync
log_activity(
db=db,
quote_id=quote.id,
action="syncro_lead_created",
details=f"Lead created in Syncro: {lead_result.lead_id}, is_existing_customer={customer_check.exists}",
)
else:
result["error"] = lead_result.error
logger.warning(
f"Failed to create Syncro lead for quote {quote.id}: {lead_result.error}"
)
# Log activity for failed sync
log_activity(
db=db,
quote_id=quote.id,
action="syncro_sync_failed",
details=f"Failed to sync to Syncro: {lead_result.error}",
)
# Commit the updates to quote
db.commit()
db.refresh(quote)
except Exception as e:
# Log error but don't fail the overall operation
error_msg = str(e)
result["error"] = error_msg
logger.error(
f"Unexpected error syncing quote {quote.id} to Syncro: {error_msg}",
exc_info=True
)
try:
log_activity(
db=db,
quote_id=quote.id,
action="syncro_sync_error",
details=f"Syncro sync error: {error_msg}",
)
db.commit()
except Exception:
db.rollback()
return result
def list_quotes(
db: Session,
skip: int = 0,
limit: int = 100,
status_filter: Optional[str] = None,
search: Optional[str] = None
) -> tuple[list[Quote], int]:
"""
List quotes with pagination and optional filters (admin).
Args:
db: Database session
skip: Number of records to skip
limit: Maximum number of records to return
status_filter: Filter by status
search: Search in company_name, contact_name, contact_email
Returns:
tuple: (list of quotes, total count)
"""
query = db.query(Quote).options(joinedload(Quote.items))
# Apply filters
if status_filter:
query = query.filter(Quote.status == status_filter)
if search:
search_term = f"%{search}%"
query = query.filter(
(Quote.company_name.ilike(search_term)) |
(Quote.contact_name.ilike(search_term)) |
(Quote.contact_email.ilike(search_term))
)
# Get total count before pagination
total = query.count()
# Apply pagination and ordering
quotes = (
query
.order_by(Quote.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return quotes, total
def update_quote_status(
db: Session,
quote_id: UUID,
update_data: QuoteAdminUpdate,
admin_user: str
) -> Quote:
"""
Update quote status and expiration (admin).
Args:
db: Database session
quote_id: UUID of the quote
update_data: Admin update data
admin_user: Username of the admin making the change
Returns:
Quote: The updated quote object
"""
quote = get_quote_by_id(db, quote_id)
try:
changes = []
if update_data.status is not None and update_data.status.value != quote.status:
old_status = quote.status
quote.status = update_data.status.value
changes.append(f"status: {old_status} -> {update_data.status.value}")
if update_data.expires_at is not None:
quote.expires_at = update_data.expires_at
changes.append(f"expires_at: {update_data.expires_at}")
# Log activity
if changes:
log_activity(
db=db,
quote_id=quote.id,
action="admin_update",
details=f"Admin update by {admin_user}: {', '.join(changes)}",
)
db.commit()
db.refresh(quote)
return quote
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update quote status: {str(e)}"
)
def get_quote_stats(db: Session) -> QuoteStatsResponse:
"""
Get dashboard statistics for quotes (admin).
Args:
db: Database session
Returns:
QuoteStatsResponse: Statistics about quotes
"""
# Total quotes
total_quotes = db.query(Quote).count()
# Quotes by status
status_counts = (
db.query(Quote.status, func.count(Quote.id))
.group_by(Quote.status)
.all()
)
quotes_by_status = {status: count for status, count in status_counts}
# Total values for submitted quotes
submitted_statuses = [
QuoteStatus.SUBMITTED.value,
QuoteStatus.VIEWED.value,
QuoteStatus.FOLLOWED_UP.value,
QuoteStatus.CONVERTED.value,
]
value_query = (
db.query(
func.sum(Quote.monthly_total),
func.sum(Quote.setup_total)
)
.filter(Quote.status.in_(submitted_statuses))
.first()
)
total_monthly_value = value_query[0] or Decimal("0.00")
total_setup_value = value_query[1] or Decimal("0.00")
# Quotes this month
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0)
quotes_this_month = (
db.query(Quote)
.filter(Quote.created_at >= month_start)
.count()
)
# Quotes submitted this month
quotes_submitted_this_month = (
db.query(Quote)
.filter(
Quote.submitted_at >= month_start,
Quote.submitted_at.isnot(None)
)
.count()
)
# Calculate averages and conversion rate
submitted_count = sum(
quotes_by_status.get(s, 0)
for s in submitted_statuses
)
average_monthly_value = (
total_monthly_value / submitted_count
if submitted_count > 0
else Decimal("0.00")
)
draft_count = quotes_by_status.get(QuoteStatus.DRAFT.value, 0)
total_started = draft_count + submitted_count
conversion_rate = (
(Decimal(submitted_count) / Decimal(total_started) * Decimal("100"))
if total_started > 0
else Decimal("0.00")
)
return QuoteStatsResponse(
total_quotes=total_quotes,
quotes_by_status=quotes_by_status,
total_monthly_value=total_monthly_value,
total_setup_value=total_setup_value,
quotes_this_month=quotes_this_month,
quotes_submitted_this_month=quotes_submitted_this_month,
average_monthly_value=round(average_monthly_value, 2),
conversion_rate=round(conversion_rate, 2)
)
def add_item_to_quote(
db: Session,
access_token: str,
item_data: QuoteItemCreate,
ip_address: Optional[str] = None
) -> Quote:
"""
Add a single item to a quote.
Args:
db: Database session
access_token: The quote's access token
item_data: Item data to add
ip_address: IP address of the requester
Returns:
Quote: The updated quote object
"""
quote = get_quote_by_token(db, access_token)
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot add items to a non-draft quote"
)
try:
item = QuoteItem(
quote_id=quote.id,
category=item_data.category.value,
product_code=item_data.product_code,
product_name=item_data.product_name,
description=item_data.description,
quantity=item_data.quantity,
unit_price=item_data.unit_price,
setup_price=item_data.setup_price,
billing_frequency=item_data.billing_frequency.value,
tier=item_data.tier,
is_recommended=item_data.is_recommended,
)
db.add(item)
db.flush()
# Recalculate totals
db.refresh(quote)
monthly, setup = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="item_added",
details=f"Added item: {item_data.product_name}",
ip_address=ip_address
)
db.commit()
db.refresh(quote)
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add item: {str(e)}"
)
def remove_item_from_quote(
db: Session,
access_token: str,
item_id: UUID,
ip_address: Optional[str] = None
) -> Quote:
"""
Remove an item from a quote.
Args:
db: Database session
access_token: The quote's access token
item_id: UUID of the item to remove
ip_address: IP address of the requester
Returns:
Quote: The updated quote object
"""
quote = get_quote_by_token(db, access_token)
if quote.status != QuoteStatus.DRAFT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot remove items from a non-draft quote"
)
# Find the item
item = (
db.query(QuoteItem)
.filter(QuoteItem.id == str(item_id), QuoteItem.quote_id == quote.id)
.first()
)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item with ID {item_id} not found in this quote"
)
try:
item_name = item.product_name
db.delete(item)
db.flush()
# Recalculate totals
db.refresh(quote)
monthly, setup = calculate_totals(quote.items)
quote.monthly_total = monthly
quote.setup_total = setup
# Log activity
log_activity(
db=db,
quote_id=quote.id,
action="item_removed",
details=f"Removed item: {item_name}",
ip_address=ip_address
)
db.commit()
db.refresh(quote)
return quote
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to remove item: {str(e)}"
)