refactor: remove dead assistant_chat system, consolidate image helpers

The old /assistant/chats/* CRUD endpoints and assistant_chat_service
chat functions were unused — the frontend exclusively uses
/ai-sessions/{id}/chat (unified_chat_service) for all chat operations.

Removed:
- Chat CRUD endpoints (create, list, get, send, delete, conclude)
- assistant_chat_service: create_chat, send_message,
  generate_conclusion_summary, CONCLUSION_SYSTEM_PROMPT
- Frontend: assistantChatApi chat methods, dead types
  (AssistantChat, AssistantChatMessage, ConcludeChatRequest, etc.)

Kept:
- /assistant/retention endpoints (used by ChatRetentionSettingsPage)
- Shared AI infrastructure (_call_ai, _call_anthropic_cached,
  ASSISTANT_SYSTEM_PROMPT, _auto_title) — imported by unified_chat_service

Moved:
- fetch_upload_images + resize_image_for_vision → storage_service.py
  (shared location, not tied to dead endpoint)

Also added "Image Analysis" section to system prompt so Claude knows
to describe attached screenshots.

-650 lines of dead code removed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-03-24 05:28:06 +00:00
parent 36ca830481
commit 8e7f13d2f8
8 changed files with 141 additions and 791 deletions

View File

@@ -1,7 +1,7 @@
"""Standalone AI assistant chat service with RAG context.
"""Shared AI chat infrastructure — system prompt, prompt caching, and AI calling.
Provides persistent conversation history for general IT questions
with semantic search over the team's flow library.
Used by unified_chat_service (the active chat backend). The assistant_chat
CRUD endpoints were removed — only retention settings remain on that router.
Uses Anthropic prompt caching to reduce cost on multi-turn conversations:
- The static system prompt is cached (ephemeral, 5-min TTL)
@@ -13,14 +13,8 @@ for real-time documentation lookups (controlled by ENABLE_MCP_MICROSOFT_LEARN).
"""
import logging
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.assistant_chat import AssistantChat
from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows
logger = logging.getLogger(__name__)
@@ -74,6 +68,11 @@ You have access to Microsoft's official documentation via Microsoft Learn. Use i
- No team flow covers the topic and vendor-specific detail would help
Do NOT use Microsoft Learn for every question — only when official docs add real value.
## Image Analysis
When an image is attached, analyze it carefully. Screenshots of error messages, \
config panels, event viewer logs, and network diagrams are common in MSP work. \
Describe what you see and use the visual information to inform your troubleshooting advice.
## Boundaries
- Stay focused on IT infrastructure, systems administration, and MSP operations.
- If a question is clearly outside your domain, say so briefly and redirect.
@@ -273,199 +272,3 @@ def _auto_title(message: str) -> str:
if len(message) > 100:
title = title.rsplit(" ", 1)[0] + "..."
return title
CONCLUSION_SYSTEM_PROMPT = """\
You are a ticket documentation specialist for MSP (Managed Service Provider) teams. \
Your job is to transform an AI troubleshooting conversation into clean, professional \
ticket notes that can be pasted directly into a PSA/ticketing system (ConnectWise, \
Autotask, HaloPSA, etc.).
## Output Format
Generate a structured summary using this exact format:
**Subject:** [One-line summary of the issue]
**Outcome:** {outcome_label}
**Problem Description:**
[2-3 sentence summary of the original problem]
**Steps Taken:**
1. [Step] — [Result/finding]
2. [Step] — [Result/finding]
(list all troubleshooting steps from the conversation)
**Current Status:**
[Where things stand now — what was resolved, what remains]
{notes_section}
**Key Findings:**
- [Important discovery or configuration detail]
- [Any relevant error codes, settings, or values identified]
{resume_section}
## Rules
- Be concise but thorough — these notes will be read by another engineer
- Include specific technical details (commands run, error messages, config values)
- Use plain text formatting (no HTML) — bold with ** is fine
- Do NOT include conversational filler, greetings, or meta-commentary
- Extract ALL actionable steps from the conversation, in chronological order
- If the conversation identified root cause, state it clearly
"""
async def generate_conclusion_summary(
chat: "AssistantChat",
outcome: str,
notes: str | None = None,
) -> str:
"""Generate a ticket-ready summary from a concluded chat conversation."""
outcome_labels = {
"resolved": "Resolved",
"escalated": "Escalated",
"paused": "Paused — To Be Continued",
}
outcome_label = outcome_labels.get(outcome, outcome)
notes_section = ""
if notes:
notes_section = f"\n**Engineer Notes:**\n{notes}\n"
resume_section = ""
if outcome == "paused":
resume_section = (
"\n**Next Steps (for resumption):**\n"
"- [What needs to happen next]\n"
"- [Any pending actions or follow-ups]\n"
)
elif outcome == "escalated":
resume_section = (
"\n**Escalation Details:**\n"
"- [Reason for escalation]\n"
"- [Recommended next steps for receiving team/tier]\n"
)
# Build the conversation transcript for the AI
transcript_lines = []
for msg in chat.messages:
role_label = "ENGINEER" if msg["role"] == "user" else "AI ASSISTANT"
transcript_lines.append(f"[{role_label}]: {msg['content']}")
transcript = "\n\n".join(transcript_lines)
prompt = (
f"Outcome: {outcome_label}\n\n"
f"{'Engineer Notes: ' + notes if notes else '(No additional notes)'}\n\n"
f"--- CONVERSATION TRANSCRIPT ---\n\n{transcript}\n\n"
f"--- END TRANSCRIPT ---\n\n"
f"Generate the ticket notes now. Replace all placeholder brackets with actual content from the conversation. "
f"The notes_section placeholder should be: {notes_section or '(omit this section)'}\n"
f"The resume_section placeholder should be filled based on the conversation context."
)
system_with_vars = CONCLUSION_SYSTEM_PROMPT.replace(
"{outcome_label}", outcome_label
).replace(
"{notes_section}", notes_section or ""
).replace(
"{resume_section}", resume_section
)
content, _, _ = await _call_ai(
system_base=system_with_vars,
rag_context="",
history=[],
new_message=prompt,
max_tokens=2048,
)
return content
async def create_chat(
user_id: UUID,
account_id: UUID,
db: AsyncSession,
) -> AssistantChat:
"""Create a new empty chat."""
chat = AssistantChat(
user_id=user_id,
account_id=account_id,
messages=[],
)
db.add(chat)
await db.flush()
return chat
async def send_message(
chat_id: UUID,
user_id: UUID,
account_id: UUID,
message: str,
db: AsyncSession,
images: list[dict[str, Any]] | None = None,
) -> tuple[str, list[dict[str, Any]], AssistantChat]:
"""Send a user message and get AI response.
Args:
images: Optional list of {"media_type": str, "data": str (base64)}
for vision content attached to this message.
Returns (ai_content, suggested_flows, chat).
"""
result = await db.execute(
select(AssistantChat).where(
AssistantChat.id == chat_id,
AssistantChat.user_id == user_id,
)
)
chat = result.scalar_one_or_none()
if not chat:
raise ValueError("Chat not found")
# Auto-title from first message
if chat.message_count == 0:
chat.title = _auto_title(message)
# RAG search
rag_results = await rag_search(
query=message,
account_id=account_id,
db=db,
limit=8,
)
rag_context = build_rag_context(rag_results)
# Build messages for AI
ai_messages: list[dict[str, Any]] = []
for msg in chat.messages:
if msg["role"] in ("user", "assistant"):
ai_messages.append({"role": msg["role"], "content": msg["content"]})
# Call AI with prompt caching (Anthropic) or generic provider
ai_content, input_tokens, output_tokens = await _call_ai(
system_base=ASSISTANT_SYSTEM_PROMPT,
rag_context=rag_context,
history=ai_messages,
new_message=message,
images=images,
)
# Update chat
msgs = list(chat.messages)
msgs.append({"role": "user", "content": message})
msgs.append({"role": "assistant", "content": ai_content})
chat.messages = msgs
chat.message_count += 2
chat.total_input_tokens += input_tokens
chat.total_output_tokens += output_tokens
suggested_flows = extract_suggested_flows(rag_results)
return ai_content, suggested_flows, chat

View File

@@ -1,7 +1,10 @@
"""S3-compatible object storage service for file uploads."""
import base64
import logging
import uuid
from io import BytesIO
from typing import Any
from uuid import UUID
import boto3
from botocore.config import Config as BotoConfig
@@ -92,3 +95,107 @@ async def delete_file(storage_key: str) -> None:
client.delete_object(Bucket=settings.STORAGE_BUCKET_NAME, Key=storage_key)
except ClientError:
logger.warning(f"Failed to delete S3 object: {storage_key}")
# ── Vision helpers (resize + fetch for AI) ─────────────────────
# Claude vision costs: (width × height) / 750 tokens per image.
# Claude auto-resizes images >1568px on the longest edge.
# We resize server-side to avoid sending multi-MB base64 payloads over the wire.
MAX_IMAGE_DIMENSION = 1568 # Claude's max efficient resolution
MAX_IMAGES_PER_MESSAGE = 3 # Cap to control token budget
def resize_image_for_vision(file_data: bytes, content_type: str) -> tuple[bytes, str]:
"""Resize image to fit within Claude's efficient vision bounds.
Returns (resized_bytes, media_type). Converts PNG screenshots to JPEG
when it reduces size significantly (screenshots are often huge PNGs).
"""
try:
from PIL import Image
img = Image.open(BytesIO(file_data))
w, h = img.size
# Only resize if larger than Claude's max efficient dimension
if max(w, h) > MAX_IMAGE_DIMENSION:
ratio = MAX_IMAGE_DIMENSION / max(w, h)
new_w, new_h = int(w * ratio), int(h * ratio)
img = img.resize((new_w, new_h), Image.LANCZOS)
# Convert RGBA (common in screenshots) to RGB for JPEG
out_type = content_type
if img.mode in ("RGBA", "P") and content_type == "image/png":
img = img.convert("RGB")
out_type = "image/jpeg"
buf = BytesIO()
if out_type == "image/jpeg":
img.save(buf, format="JPEG", quality=85, optimize=True)
else:
img.save(buf, format=img.format or "PNG", optimize=True)
result = buf.getvalue()
# Only use resized version if it's actually smaller
if len(result) < len(file_data):
return result, out_type
return file_data, content_type
except ImportError:
# Pillow not installed — send original (Claude auto-resizes)
logger.debug("Pillow not available, sending original image to Claude")
return file_data, content_type
except Exception:
logger.warning("Image resize failed, sending original")
return file_data, content_type
async def fetch_upload_images(
upload_ids: list[UUID],
account_id: UUID,
db: Any,
) -> list[dict[str, Any]]:
"""Fetch uploaded images from S3 and return as base64-encoded dicts for Claude vision.
Resizes images server-side to reduce network payload and applies a per-message
cap to control token budget (~1,600 tokens per full-res image).
"""
if not upload_ids or not settings.STORAGE_ENDPOINT:
return []
from sqlalchemy import select
from app.models.file_upload import FileUpload
# Cap the number of images to limit token cost
capped_ids = upload_ids[:MAX_IMAGES_PER_MESSAGE]
if len(upload_ids) > MAX_IMAGES_PER_MESSAGE:
logger.info(
"Capped images from %d to %d for token budget",
len(upload_ids), MAX_IMAGES_PER_MESSAGE,
)
result = await db.execute(
select(FileUpload).where(
FileUpload.id.in_(capped_ids),
FileUpload.account_id == account_id,
FileUpload.content_type.in_(ALLOWED_IMAGE_TYPES),
)
)
uploads = result.scalars().all()
images: list[dict[str, Any]] = []
for upload in uploads:
try:
file_data = download_file(upload.storage_key)
resized_data, media_type = resize_image_for_vision(
file_data, upload.content_type
)
images.append({
"media_type": media_type,
"data": base64.b64encode(resized_data).decode("ascii"),
})
except Exception:
logger.warning("Failed to fetch upload %s from S3", upload.id)
return images