import html from datetime import datetime, timezone from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi.responses import PlainTextResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.database import get_db from app.models.tree import Tree from app.models.session import Session from app.models.user import User from app.schemas.session import SessionCreate, SessionUpdate, SessionResponse, SessionExport, ScratchpadUpdate from app.api.deps import get_current_active_user from app.core.permissions import can_access_tree router = APIRouter(prefix="/sessions", tags=["sessions"]) @router.get("", response_model=list[SessionResponse]) async def list_sessions( db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)], completed: Optional[bool] = Query(None, description="Filter by completion status"), skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=100) ): """List user's troubleshooting sessions.""" query = select(Session).where(Session.user_id == current_user.id) if completed is not None: if completed: query = query.where(Session.completed_at.isnot(None)) else: query = query.where(Session.completed_at.is_(None)) query = query.order_by(Session.started_at.desc()) query = query.offset(skip).limit(limit) result = await db.execute(query) sessions = result.scalars().all() return sessions @router.get("/{session_id}", response_model=SessionResponse) async def get_session( session_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Get a specific session.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) return session @router.post("", response_model=SessionResponse, status_code=status.HTTP_201_CREATED) async def start_session( session_data: SessionCreate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Start a new troubleshooting session.""" # Get the tree result = await db.execute(select(Tree).where(Tree.id == session_data.tree_id)) tree = result.scalar_one_or_none() if not tree: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Tree not found" ) if not tree.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot start session with inactive tree" ) if not can_access_tree(current_user, tree): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this tree" ) # Create session with tree snapshot new_session = Session( tree_id=tree.id, user_id=current_user.id, tree_snapshot=tree.tree_structure, path_taken=[], decisions=[], ticket_number=session_data.ticket_number, client_name=session_data.client_name ) # Increment tree usage count tree.usage_count += 1 db.add(new_session) await db.commit() await db.refresh(new_session) return new_session @router.put("/{session_id}", response_model=SessionResponse) async def update_session( session_id: UUID, session_data: SessionUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Update session (add decisions, notes, etc.).""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) if session.completed_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot update a completed session" ) # Use mode='json' to ensure datetime fields are serialized as ISO strings for JSONB storage update_data = session_data.model_dump(exclude_unset=True, mode='json') for field, value in update_data.items(): setattr(session, field, value) await db.commit() await db.refresh(session) return session @router.post("/{session_id}/complete", response_model=SessionResponse) async def complete_session( session_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Mark session as complete.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) if session.completed_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Session already completed" ) session.completed_at = datetime.now(timezone.utc) await db.commit() await db.refresh(session) return session @router.patch("/{session_id}/scratchpad", response_model=SessionResponse) async def update_scratchpad( session_id: UUID, data: ScratchpadUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Update session scratchpad. Accepts updates on both active and completed sessions.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) session.scratchpad = data.scratchpad await db.commit() await db.refresh(session) return session @router.post("/{session_id}/export") async def export_session( session_id: UUID, export_options: SessionExport, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)] ): """Export session to formatted notes.""" result = await db.execute(select(Session).where(Session.id == session_id)) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) if session.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this session" ) # Generate export based on format if export_options.format == "markdown": content = _generate_markdown_export(session, export_options) media_type = "text/markdown" elif export_options.format == "html": content = _generate_html_export(session, export_options) media_type = "text/html" else: # text content = _generate_text_export(session, export_options) media_type = "text/plain" # Mark as exported session.exported = True await db.commit() return PlainTextResponse(content=content, media_type=media_type) def _generate_markdown_export(session: Session, options: SessionExport) -> str: """Generate markdown export.""" lines = [] if options.include_tree_info: tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") lines.append(f"# {tree_name}") lines.append("") if session.ticket_number: lines.append(f"**Ticket:** {session.ticket_number}") if session.client_name: lines.append(f"**Client:** {session.client_name}") if options.include_timestamps: lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append("") lines.append("---") lines.append("") # Scratchpad / Evidence section scratchpad = getattr(session, 'scratchpad', '') or '' if scratchpad.strip(): lines.append("## Evidence / Reference") lines.append("") lines.append(scratchpad) lines.append("") lines.append("---") lines.append("") lines.append("## Troubleshooting Steps") lines.append("") for i, decision in enumerate(session.decisions, 1): question = decision.get("question") or decision.get("action_performed", "Step") answer = decision.get("answer", "") notes = decision.get("notes", "") lines.append(f"### Step {i}: {question}") if answer: lines.append(f"**Answer:** {answer}") if notes: lines.append(f"**Notes:** {notes}") if options.include_timestamps and decision.get("timestamp"): lines.append(f"*{decision['timestamp']}*") lines.append("") return "\n".join(lines) def _generate_text_export(session: Session, options: SessionExport) -> str: """Generate plain text export.""" lines = [] if options.include_tree_info: tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") lines.append(tree_name) lines.append("=" * len(tree_name)) if session.ticket_number: lines.append(f"Ticket: {session.ticket_number}") if session.client_name: lines.append(f"Client: {session.client_name}") if options.include_timestamps: lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append("") # Scratchpad / Evidence section scratchpad = getattr(session, 'scratchpad', '') or '' if scratchpad.strip(): lines.append("EVIDENCE / REFERENCE") lines.append("-" * 20) lines.append(scratchpad) lines.append("") lines.append("TROUBLESHOOTING STEPS") lines.append("-" * 20) for i, decision in enumerate(session.decisions, 1): question = decision.get("question") or decision.get("action_performed", "Step") answer = decision.get("answer", "") notes = decision.get("notes", "") lines.append(f"\n{i}. {question}") if answer: lines.append(f" Answer: {answer}") if notes: lines.append(f" Notes: {notes}") return "\n".join(lines) def _generate_html_export(session: Session, options: SessionExport) -> str: """Generate HTML export.""" tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session")) html_parts = ['', '', '
', '', f'Answer: {answer}
') if notes: html_parts.append(f'Notes: {notes}
') if options.include_timestamps and decision.get("timestamp"): html_parts.append(f'') html_parts.append('