from datetime import datetime, timezone from typing import Annotated, Optional from uuid import UUID import uuid from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi.responses import PlainTextResponse from pydantic import BaseModel, Field as PydanticField from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update as sa_update from app.core.database import get_db from app.models.tree import Tree from app.models.session import Session from app.models.user import User from app.schemas.session import ( SessionCreate, SessionUpdate, SessionResponse, SessionExport, ScratchpadUpdate, SaveAsTreeRequest, SaveAsTreeResponse, SessionComplete, SessionVariablesUpdate, PrepareSessionRequest, TicketLinkRequest, TicketLinkResponse, PSATicketResponse, ) from app.schemas.psa_connection import PsaPostRequest from app.api.deps import get_current_active_user, require_engineer_or_admin from app.core.permissions import can_access_tree from app.services.export_service import generate_markdown_export, generate_text_export, generate_html_export, generate_psa_export router = APIRouter(prefix="/sessions", tags=["sessions"]) @router.get("", response_model=list[SessionResponse]) async def list_sessions( db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)], completed: Optional[bool] = Query(None, description="Filter by completion status"), ticket_number: Optional[str] = Query(None, description="Search by ticket number (partial match)"), client_name: Optional[str] = Query(None, description="Search by client name (partial match)"), tree_name: Optional[str] = Query(None, description="Filter by tree name from snapshot"), tree_id: Optional[UUID] = Query(None, description="Filter by tree ID"), batch_id: Optional[UUID] = Query(None, description="Filter by batch ID (maintenance batch runs)"), assigned_to_id: Optional[UUID] = Query(None, description="Filter by assigned user ID (prepared sessions)"), status: Optional[str] = Query(None, description="Filter by status: prepared, active, completed"), started_after: Optional[datetime] = Query(None, description="Filter sessions started after this datetime"), started_before: Optional[datetime] = Query(None, description="Filter sessions started before this datetime"), completed_after: Optional[datetime] = Query(None, description="Filter sessions completed after this datetime"), completed_before: Optional[datetime] = Query(None, description="Filter sessions completed before this datetime"), page: Optional[int] = Query(None, ge=1, description="1-based page number (frontend compatibility)"), size: Optional[int] = Query(None, ge=1, le=100, description="Page size (frontend compatibility)"), skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=100) ): """List user's troubleshooting sessions with comprehensive filtering.""" from sqlalchemy import or_ # Show sessions owned by user OR assigned to user (prepared sessions) query = select(Session).where( or_(Session.user_id == current_user.id, Session.assigned_to_id == current_user.id) ) # Completion status filter if completed is not None: if completed: query = query.where(Session.completed_at.isnot(None)) else: query = query.where(Session.completed_at.is_(None)) # Ticket number search (case-insensitive partial match) if ticket_number: query = query.where(Session.ticket_number.ilike(f"%{ticket_number}%")) # Client name search (case-insensitive partial match) if client_name: query = query.where(Session.client_name.ilike(f"%{client_name}%")) # Tree name filter (JSONB path query) if tree_name: query = query.where(Session.tree_snapshot['name'].astext.ilike(f"%{tree_name}%")) # Tree ID filter if tree_id: query = query.where(Session.tree_id == tree_id) # Batch ID filter if batch_id: query = query.where(Session.batch_id == batch_id) # Assigned user filter (for prepared sessions) if assigned_to_id: query = query.where(Session.assigned_to_id == assigned_to_id) # Status filter: prepared (started_at IS NULL), active, completed if status == "prepared": query = query.where(Session.started_at.is_(None)) elif status == "active": query = query.where(Session.started_at.isnot(None), Session.completed_at.is_(None)) elif status == "completed": query = query.where(Session.completed_at.isnot(None)) # Date range filters if started_after: query = query.where(Session.started_at >= started_after) if started_before: query = query.where(Session.started_at <= started_before) if completed_after: query = query.where(Session.completed_at >= completed_after) if completed_before: query = query.where(Session.completed_at <= completed_before) effective_limit = size if size is not None else limit effective_skip = skip if page is not None: effective_skip = (page - 1) * effective_limit query = query.order_by(Session.started_at.desc().nullslast()) query = query.offset(effective_skip).limit(effective_limit) result = await db.execute(query) sessions = result.scalars().all() return sessions @router.get("/{session_id}", response_model=SessionResponse) async def get_session( session_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Get a specific session.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id and session.assigned_to_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) return session @router.post("", response_model=SessionResponse, status_code=status.HTTP_201_CREATED) async def start_session( session_data: SessionCreate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Start a new troubleshooting session.""" # Get the tree result = await db.execute(select(Tree).where(Tree.id == session_data.tree_id)) tree = result.scalar_one_or_none() if not tree: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Tree not found" ) if not tree.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot start session with inactive tree" ) if not can_access_tree(current_user, tree): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this tree" ) # Deferred variables: sessions can start with empty/partial variables # Variables are filled inline during execution via PATCH /sessions/{id}/variables session_variables = session_data.session_variables or {} # Create session with tree snapshot (includes tree metadata for filtering/export) tree_snapshot = { **tree.tree_structure, "name": tree.name, "description": tree.description, "category": tree.category, "version": tree.version, "tree_type": tree.tree_type, } new_session = Session( tree_id=tree.id, user_id=current_user.id, tree_snapshot=tree_snapshot, path_taken=[], decisions=[], ticket_number=session_data.ticket_number, client_name=session_data.client_name, session_variables=session_variables, ) # Atomically increment tree usage count (SQL-level to avoid lost updates) await db.execute( sa_update(Tree).where(Tree.id == tree.id).values(usage_count=Tree.usage_count + 1) ) db.add(new_session) await db.commit() await db.refresh(new_session) return new_session @router.put("/{session_id}", response_model=SessionResponse) async def update_session( session_id: UUID, session_data: SessionUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Update session (add decisions, notes, etc.).""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id and session.assigned_to_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) if session.completed_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot update a completed session" ) # Start a prepared session on first update (set started_at) if session.started_at is None: session.started_at = datetime.now(timezone.utc) # Transfer ownership to the executing user if they're the assignee if session.assigned_to_id == current_user.id and session.user_id != current_user.id: session.user_id = current_user.id # Use mode='json' to ensure datetime fields are serialized as ISO strings for JSONB storage update_data = session_data.model_dump(exclude_unset=True, mode='json') for field, value in update_data.items(): setattr(session, field, value) await db.commit() await db.refresh(session) return session @router.post("/{session_id}/complete", response_model=SessionResponse) async def complete_session( session_id: UUID, completion_data: SessionComplete, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Mark session as complete.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id and session.assigned_to_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) if session.completed_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Session already completed" ) session.completed_at = datetime.now(timezone.utc) session.outcome = completion_data.outcome session.outcome_notes = completion_data.outcome_notes session.next_steps = completion_data.next_steps await db.commit() await db.refresh(session) return session @router.patch("/{session_id}/scratchpad", response_model=SessionResponse) async def update_scratchpad( session_id: UUID, data: ScratchpadUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Update session scratchpad. Accepts updates on both active and completed sessions.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id and session.assigned_to_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) session.scratchpad = data.scratchpad await db.commit() await db.refresh(session) return session @router.patch("/{session_id}/variables", response_model=SessionResponse) async def update_session_variables( session_id: UUID, data: SessionVariablesUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Update session variables (partial dict merge). Used for deferred variable input.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id and session.assigned_to_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) if session.completed_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot update variables on a completed session" ) # Merge new variables into existing ones existing = session.session_variables or {} merged = {**existing, **data.variables} session.session_variables = merged await db.commit() await db.refresh(session) return session @router.post("/{session_id}/export") async def export_session( session_id: UUID, export_options: SessionExport, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Export session to formatted notes.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id and session.assigned_to_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) # Generate export based on format if export_options.format == "markdown": content = generate_markdown_export(session, export_options) media_type = "text/markdown" elif export_options.format == "html": content = generate_html_export(session, export_options) media_type = "text/html" elif export_options.format == "psa": content = generate_psa_export(session, export_options) media_type = "text/plain" else: # text content = generate_text_export(session, export_options) media_type = "text/plain" # Resolve variables in export output session_vars = getattr(session, 'session_variables', None) or {} if session_vars: from app.services.variable_service import resolve_variables content = resolve_variables(content, session_vars) # Phase C: Apply redaction AFTER generation and variable resolution redaction_summary = None if export_options.redaction_mode == "mask": from app.services.redaction_service import apply_redaction_to_text, format_redaction_footer try: content, redaction_summary = apply_redaction_to_text(content) footer = format_redaction_footer(redaction_summary) if footer: content += footer except Exception: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Redaction processing failed" ) # Only mark as exported if session is completed if session.completed_at: session.exported = True await db.commit() # Build response with redaction headers import json headers = {"X-Redaction-Mode": export_options.redaction_mode} if redaction_summary is not None: headers["X-Redaction-Summary"] = json.dumps(redaction_summary.to_dict()) return PlainTextResponse(content=content, media_type=media_type, headers=headers) # --- Save Session as Tree --- @router.post("/{session_id}/save-as-tree", response_model=SaveAsTreeResponse, status_code=status.HTTP_201_CREATED) async def save_session_as_tree( session_id: UUID, request_data: SaveAsTreeRequest, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Save a session as a new tree. Converts the session's path_taken and custom_steps into a linear tree structure. The new tree is linked to the original tree via parent_tree_id (fork relationship). Args: session_id: ID of the session to save request_data: Tree name, description, and status db: Database session current_user: Current authenticated user Returns: SaveAsTreeResponse with new tree ID and name """ from app.core.session_to_tree import convert_session_to_tree, generate_tree_name_from_session from app.core.tree_validation import can_publish_tree from app.core.subscriptions import check_tree_limit # Load the session result = await db.execute( select(Session).where( Session.id == session_id, Session.user_id == current_user.id ) ) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) # Load the original tree to get metadata tree_result = await db.execute( select(Tree).where(Tree.id == session.tree_id) ) original_tree = tree_result.scalar_one_or_none() if not original_tree: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Original tree not found" ) # Convert session to tree structure tree_structure = convert_session_to_tree( session.path_taken, session.tree_snapshot, session.custom_steps, session.decisions ) # Generate tree name if request_data.tree_name: tree_name = request_data.tree_name else: tree_name = generate_tree_name_from_session( original_tree.name, session.ticket_number, session.client_name ) # Validate if status is published if request_data.status == 'published': can_publish, validation_errors = can_publish_tree( tree_structure, tree_name, request_data.description, tree_type=original_tree.tree_type, intake_form=original_tree.intake_form, ) if not can_publish: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={ "message": "Cannot save as published tree with validation errors", "errors": validation_errors } ) # Check subscription tree limit if current_user.account_id: can_create, limit, count = await check_tree_limit(current_user.account_id, db) if not can_create: raise HTTPException( status_code=status.HTTP_402_PAYMENT_REQUIRED, detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees." ) # Create the new tree as a fork of the original new_tree = Tree( name=tree_name, description=request_data.description or f"Saved from troubleshooting session on {session.started_at.strftime('%Y-%m-%d')}", tree_structure=tree_structure, author_id=current_user.id, account_id=current_user.account_id, status=request_data.status, is_public=False, is_default=False, # Fork tracking - link to original tree parent_tree_id=original_tree.id, root_tree_id=original_tree.root_tree_id if original_tree.root_tree_id else original_tree.id, fork_depth=original_tree.fork_depth + 1, fork_reason=f"Saved from session: {session.ticket_number or 'No ticket'}" if session.ticket_number else "Saved from troubleshooting session", parent_updated_at=original_tree.updated_at ) db.add(new_tree) await db.commit() await db.refresh(new_tree) return SaveAsTreeResponse( tree_id=new_tree.id, tree_name=new_tree.name, message=f"Session saved as {'published' if request_data.status == 'published' else 'draft'} tree" ) # ── Prepare Session (Flexible Intake) ───────────────────────────────────── @router.post("/prepare", response_model=SessionResponse, status_code=status.HTTP_201_CREATED) async def prepare_session( data: PrepareSessionRequest, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Create a prepared session with pre-filled variables and optional assignee. Prepared sessions have started_at = NULL. They appear in the assigned engineer's queue and can be started later with variables already filled. """ # Get the tree result = await db.execute(select(Tree).where(Tree.id == data.tree_id)) tree = result.scalar_one_or_none() if not tree: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Tree not found" ) if not tree.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot prepare session with inactive tree" ) if not can_access_tree(current_user, tree): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this tree" ) # Validate assignee exists and is on the same team (if specified) if data.assigned_to_id: assignee_result = await db.execute(select(User).where(User.id == data.assigned_to_id)) assignee = assignee_result.scalar_one_or_none() if not assignee: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Assigned user not found" ) if current_user.team_id and assignee.team_id != current_user.team_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Can only assign to users on your team" ) # Create tree snapshot tree_snapshot = { **tree.tree_structure, "name": tree.name, "description": tree.description, "category": tree.category, "version": tree.version, "tree_type": tree.tree_type, } session_variables = data.session_variables or {} new_session = Session( tree_id=tree.id, user_id=data.assigned_to_id or current_user.id, tree_snapshot=tree_snapshot, path_taken=[], decisions=[], session_variables=session_variables, ticket_number=data.ticket_number, client_name=data.client_name, started_at=None, # NULL = prepared state prepared_by_id=current_user.id, assigned_to_id=data.assigned_to_id, ) db.add(new_session) await db.commit() await db.refresh(new_session) return new_session # ── Batch Launch (Maintenance Flows) ────────────────────────────────────── class _BatchTarget(BaseModel): label: str = PydanticField(..., min_length=1, max_length=255) class _BatchLaunchRequest(BaseModel): tree_id: UUID targets: list[_BatchTarget] = PydanticField(..., min_length=1, max_length=100) class _BatchLaunchResponse(BaseModel): batch_id: str count: int sessions: list[dict] @router.post("/batch", status_code=201, response_model=_BatchLaunchResponse) async def batch_launch_sessions( data: _BatchLaunchRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Create one session per target for a maintenance flow batch run.""" tree_result = await db.execute(select(Tree).where(Tree.id == data.tree_id)) tree = tree_result.scalar_one_or_none() if not tree: raise HTTPException(status_code=404, detail="Tree not found") if not can_access_tree(current_user, tree): raise HTTPException(status_code=403, detail="Access denied") if not tree.is_active: raise HTTPException(status_code=400, detail="Cannot batch-launch an inactive flow") if tree.status == 'draft': raise HTTPException(status_code=400, detail="Cannot batch-launch a draft flow") if not current_user.is_super_admin and tree.team_id != current_user.team_id: raise HTTPException(status_code=403, detail="Access denied") if tree.tree_type != "maintenance": raise HTTPException(status_code=400, detail="Batch launch is only for maintenance flows") batch_id = uuid.uuid4() created_sessions = [] # Hoist snapshot computation out of the loop — same tree for all targets tree_snapshot = { **tree.tree_structure, "name": tree.name, "description": tree.description, "tree_type": tree.tree_type, } for target in data.targets: session = Session( tree_id=tree.id, user_id=current_user.id, tree_snapshot=tree_snapshot, path_taken=[], decisions=[], custom_steps=[], session_variables={}, batch_id=batch_id, target_label=target.label, ) db.add(session) created_sessions.append(session) await db.flush() session_ids = [s.id for s in created_sessions] result = await db.execute(select(Session).where(Session.id.in_(session_ids))) created_sessions = result.scalars().all() await db.commit() return _BatchLaunchResponse( batch_id=str(batch_id), count=len(created_sessions), sessions=[ { "id": str(s.id), "batch_id": str(s.batch_id), "target_label": s.target_label, "tree_id": str(s.tree_id), } for s in created_sessions ], ) # ── PSA Ticket Link ───────────────────────────────────────────────── @router.patch("/{session_id}/ticket-link", response_model=TicketLinkResponse) async def link_ticket( session_id: UUID, data: TicketLinkRequest, current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], ): """Link or unlink a PSA ticket to/from a session.""" from app.models.psa_connection import PsaConnection from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSANotFoundError, PSAError # Look up session result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found", ) # Verify ownership or admin if session.user_id != current_user.id and session.assigned_to_id != current_user.id: if not current_user.is_super_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session", ) # Unlink if data.psa_ticket_id is None: session.psa_ticket_id = None session.psa_connection_id = None await db.commit() return TicketLinkResponse( session_id=str(session.id), psa_ticket_id=None, ticket=None, ) # Link — validate ticket exists in CW if not current_user.account_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No account associated with your user", ) try: provider = await get_provider_for_account(current_user.account_id, db) except PSAError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc), ) # Fetch the connection to store its ID conn_result = await db.execute( select(PsaConnection).where( PsaConnection.account_id == current_user.account_id, PsaConnection.is_active.is_(True), ) ) psa_connection = conn_result.scalar_one_or_none() try: ticket = await provider.get_ticket(data.psa_ticket_id) except PSANotFoundError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Ticket not found in ConnectWise", ) except PSAError as exc: raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"PSA error: {exc}", ) session.psa_ticket_id = ticket.id session.psa_connection_id = psa_connection.id if psa_connection else None await db.commit() return TicketLinkResponse( session_id=str(session.id), psa_ticket_id=ticket.id, ticket=PSATicketResponse( id=ticket.id, summary=ticket.summary, company_name=ticket.company_name, board_name=ticket.board_name, status_name=ticket.status_name, priority_name=ticket.priority_name, ), ) # ── PSA Post to Ticket ──────────────────────────────────────────── @router.get("/{session_id}/psa-post/preview") async def psa_post_preview( session_id: UUID, current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], ): """Preview the content that will be posted to the linked PSA ticket. Generates session documentation in PSA format, fetches current ticket details and available statuses, and counts previous posts. """ from app.models.psa_post_log import PsaPostLog from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSAError from app.schemas.psa_connection import ( PsaPreviewResponse, PSATicketSearchResult, PSATicketStatusItem, ) from sqlalchemy import func as sa_func # Load session result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException(status_code=404, detail="Session not found") if session.user_id != current_user.id and session.assigned_to_id != current_user.id: if not current_user.is_super_admin: raise HTTPException(status_code=403, detail="You don't have access to this session") if not session.psa_ticket_id: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Session has no linked PSA ticket. Link a ticket first.", ) if not current_user.account_id: raise HTTPException(status_code=400, detail="No account associated with your user") # Generate PSA export content export_options = SessionExport( format="psa", include_timestamps=True, include_tree_info=True, include_outcome_notes=True, include_next_steps=True, include_summary=True, ) content = generate_psa_export(session, export_options) # Resolve session variables in content session_vars = getattr(session, "session_variables", None) or {} if session_vars: from app.services.variable_service import resolve_variables content = resolve_variables(content, session_vars) # Fetch ticket details and statuses from CW try: provider = await get_provider_for_account(current_user.account_id, db) ticket = await provider.get_ticket(session.psa_ticket_id) available_statuses: list[PSATicketStatusItem] = [] if ticket.board_id: statuses = await provider.get_ticket_statuses(ticket.board_id) available_statuses = [ PSATicketStatusItem(id=s.id, name=s.name, is_closed=s.is_closed) for s in statuses ] except PSAError as e: raise HTTPException(status_code=502, detail=f"PSA error: {e}") # Count previous posts count_result = await db.execute( select(sa_func.count(PsaPostLog.id)).where( PsaPostLog.session_id == session_id ) ) previous_posts = count_result.scalar_one() return PsaPreviewResponse( content=content, ticket=PSATicketSearchResult( id=ticket.id, summary=ticket.summary, company_name=ticket.company_name, board_name=ticket.board_name, status_name=ticket.status_name, priority_name=ticket.priority_name, closed=ticket.closed, ), available_statuses=available_statuses, character_count=len(content), previous_posts=previous_posts, ) @router.post("/{session_id}/psa-post") async def psa_post_to_ticket( session_id: UUID, data: PsaPostRequest, current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], ): """Post session documentation as a note to the linked PSA ticket. Optionally updates the ticket status if update_status_id is provided. All actions are logged in psa_post_log for audit trail. """ from app.models.psa_connection import PsaConnection from app.models.psa_post_log import PsaPostLog from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSAError from app.schemas.psa_connection import PsaPostResponse # Load session result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException(status_code=404, detail="Session not found") if session.user_id != current_user.id and session.assigned_to_id != current_user.id: if not current_user.is_super_admin: raise HTTPException(status_code=403, detail="You don't have access to this session") if not session.psa_ticket_id: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Session has no linked PSA ticket. Link a ticket first.", ) if not current_user.account_id: raise HTTPException(status_code=400, detail="No account associated with your user") # Get PSA connection ID for audit conn_result = await db.execute( select(PsaConnection).where( PsaConnection.account_id == current_user.account_id, PsaConnection.is_active.is_(True), ) ) psa_connection = conn_result.scalar_one_or_none() # Look up member mapping for attribution from app.models.psa_member_mapping import PsaMemberMapping member_id = None if psa_connection: mapping_result = await db.execute( select(PsaMemberMapping).where( PsaMemberMapping.psa_connection_id == psa_connection.id, PsaMemberMapping.user_id == current_user.id, ) ) mapping = mapping_result.scalar_one_or_none() if mapping: member_id = mapping.external_member_id # Post note try: provider = await get_provider_for_account(current_user.account_id, db) note_result = await provider.post_note( ticket_id=session.psa_ticket_id, text=data.content, note_type=data.note_type, member_id=member_id, ) note_status = "success" external_note_id = note_result.id error_message = None except PSAError as e: note_status = "failed" external_note_id = None error_message = str(e) # Optionally update ticket status status_changed_from = None status_changed_to = None if data.update_status_id and note_status == "success": try: # Get current status before update current_ticket = await provider.get_ticket(session.psa_ticket_id) status_changed_from = current_ticket.status_name if current_ticket.status_id != data.update_status_id: updated_ticket = await provider.update_ticket_status( session.psa_ticket_id, data.update_status_id ) status_changed_to = updated_ticket.status_name except PSAError as e: # Log the status update failure but don't fail the whole request # since the note was already posted successfully if error_message: error_message += f"; Status update failed: {e}" else: error_message = f"Note posted successfully but status update failed: {e}" # Log to audit trail log_entry = PsaPostLog( session_id=session.id, psa_connection_id=psa_connection.id if psa_connection else None, ticket_id=session.psa_ticket_id, note_type=data.note_type, content_posted=data.content, external_note_id=external_note_id, status=note_status, error_message=error_message, status_changed_from=status_changed_from, status_changed_to=status_changed_to, posted_by=current_user.id, ) db.add(log_entry) await db.commit() await db.refresh(log_entry) if note_status == "failed": raise HTTPException( status_code=502, detail=error_message or "Failed to post note to PSA", ) return PsaPostResponse( id=str(log_entry.id), session_id=str(session.id), ticket_id=session.psa_ticket_id, note_type=data.note_type, status=note_status, external_note_id=external_note_id, error_message=error_message, status_changed_from=status_changed_from, status_changed_to=status_changed_to, posted_at=log_entry.posted_at.isoformat(), ) @router.get("/{session_id}/psa-posts") async def list_psa_posts( session_id: UUID, current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], ): """List all PSA post history for a session, ordered by most recent first.""" from app.models.psa_post_log import PsaPostLog from app.schemas.psa_connection import PsaPostLogResponse # Verify session access result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException(status_code=404, detail="Session not found") if session.user_id != current_user.id and session.assigned_to_id != current_user.id: if not current_user.is_super_admin: raise HTTPException(status_code=403, detail="You don't have access to this session") # Query post log log_result = await db.execute( select(PsaPostLog) .where(PsaPostLog.session_id == session_id) .order_by(PsaPostLog.posted_at.desc()) ) logs = log_result.scalars().all() return [ PsaPostLogResponse( id=str(log.id), ticket_id=log.ticket_id, note_type=log.note_type, status=log.status, error_message=log.error_message, status_changed_from=log.status_changed_from, status_changed_to=log.status_changed_to, posted_at=log.posted_at.isoformat(), content_preview=log.content_posted[:200], ) for log in logs ]