Files
resolutionflow/backend/app/api/endpoints/sessions.py
chihlasm 71ba0b95a5 fix: high-severity security hardening (Phase B permissions audit)
Phase B addresses 7 high-severity gaps from the permissions audit:

- B1: Enforce tree access check on session start via can_access_tree
- B2: Replace all inline permission helpers with centralized permissions.py
- B3: Fix require_engineer_or_admin to check is_team_admin before role
- B4: Add is_active field on User with enforcement in get_current_active_user
- B5: Add admin user management endpoints (list, get, role, team-admin, deactivate, activate)
- B6: Add rate limiting on auth/invite endpoints via slowapi (disabled in DEBUG)
- B7: Implement refresh token rotation with JTI-based revocation and meaningful logout

Also reduces access token TTL from 15 to 5 minutes and updates CLAUDE.md
with SaaS/MSP context for future planning sessions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-05 22:44:05 -05:00

414 lines
14 KiB
Python

import html
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.responses import PlainTextResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.models.tree import Tree
from app.models.session import Session
from app.models.user import User
from app.schemas.session import SessionCreate, SessionUpdate, SessionResponse, SessionExport, ScratchpadUpdate
from app.api.deps import get_current_active_user
from app.core.permissions import can_access_tree
router = APIRouter(prefix="/sessions", tags=["sessions"])
@router.get("", response_model=list[SessionResponse])
async def list_sessions(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
completed: Optional[bool] = Query(None, description="Filter by completion status"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100)
):
"""List user's troubleshooting sessions."""
query = select(Session).where(Session.user_id == current_user.id)
if completed is not None:
if completed:
query = query.where(Session.completed_at.isnot(None))
else:
query = query.where(Session.completed_at.is_(None))
query = query.order_by(Session.started_at.desc())
query = query.offset(skip).limit(limit)
result = await db.execute(query)
sessions = result.scalars().all()
return sessions
@router.get("/{session_id}", response_model=SessionResponse)
async def get_session(
session_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Get a specific session."""
result = await db.execute(select(Session).where(Session.id == session_id))
session = result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Session not found"
)
if session.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this session"
)
return session
@router.post("", response_model=SessionResponse, status_code=status.HTTP_201_CREATED)
async def start_session(
session_data: SessionCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Start a new troubleshooting session."""
# Get the tree
result = await db.execute(select(Tree).where(Tree.id == session_data.tree_id))
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not tree.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot start session with inactive tree"
)
if not can_access_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
# Create session with tree snapshot
new_session = Session(
tree_id=tree.id,
user_id=current_user.id,
tree_snapshot=tree.tree_structure,
path_taken=[],
decisions=[],
ticket_number=session_data.ticket_number,
client_name=session_data.client_name
)
# Increment tree usage count
tree.usage_count += 1
db.add(new_session)
await db.commit()
await db.refresh(new_session)
return new_session
@router.put("/{session_id}", response_model=SessionResponse)
async def update_session(
session_id: UUID,
session_data: SessionUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Update session (add decisions, notes, etc.)."""
result = await db.execute(select(Session).where(Session.id == session_id))
session = result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Session not found"
)
if session.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this session"
)
if session.completed_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot update a completed session"
)
# Use mode='json' to ensure datetime fields are serialized as ISO strings for JSONB storage
update_data = session_data.model_dump(exclude_unset=True, mode='json')
for field, value in update_data.items():
setattr(session, field, value)
await db.commit()
await db.refresh(session)
return session
@router.post("/{session_id}/complete", response_model=SessionResponse)
async def complete_session(
session_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Mark session as complete."""
result = await db.execute(select(Session).where(Session.id == session_id))
session = result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Session not found"
)
if session.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this session"
)
if session.completed_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Session already completed"
)
session.completed_at = datetime.now(timezone.utc)
await db.commit()
await db.refresh(session)
return session
@router.patch("/{session_id}/scratchpad", response_model=SessionResponse)
async def update_scratchpad(
session_id: UUID,
data: ScratchpadUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Update session scratchpad. Accepts updates on both active and completed sessions."""
result = await db.execute(select(Session).where(Session.id == session_id))
session = result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Session not found"
)
if session.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this session"
)
session.scratchpad = data.scratchpad
await db.commit()
await db.refresh(session)
return session
@router.post("/{session_id}/export")
async def export_session(
session_id: UUID,
export_options: SessionExport,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Export session to formatted notes."""
result = await db.execute(select(Session).where(Session.id == session_id))
session = result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Session not found"
)
if session.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this session"
)
# Generate export based on format
if export_options.format == "markdown":
content = _generate_markdown_export(session, export_options)
media_type = "text/markdown"
elif export_options.format == "html":
content = _generate_html_export(session, export_options)
media_type = "text/html"
else: # text
content = _generate_text_export(session, export_options)
media_type = "text/plain"
# Mark as exported
session.exported = True
await db.commit()
return PlainTextResponse(content=content, media_type=media_type)
def _generate_markdown_export(session: Session, options: SessionExport) -> str:
"""Generate markdown export."""
lines = []
if options.include_tree_info:
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
lines.append(f"# {tree_name}")
lines.append("")
if session.ticket_number:
lines.append(f"**Ticket:** {session.ticket_number}")
if session.client_name:
lines.append(f"**Client:** {session.client_name}")
if options.include_timestamps:
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append("")
lines.append("---")
lines.append("")
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
lines.append("## Evidence / Reference")
lines.append("")
lines.append(scratchpad)
lines.append("")
lines.append("---")
lines.append("")
lines.append("## Troubleshooting Steps")
lines.append("")
for i, decision in enumerate(session.decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
lines.append(f"### Step {i}: {question}")
if answer:
lines.append(f"**Answer:** {answer}")
if notes:
lines.append(f"**Notes:** {notes}")
if options.include_timestamps and decision.get("timestamp"):
lines.append(f"*{decision['timestamp']}*")
lines.append("")
return "\n".join(lines)
def _generate_text_export(session: Session, options: SessionExport) -> str:
"""Generate plain text export."""
lines = []
if options.include_tree_info:
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
lines.append(tree_name)
lines.append("=" * len(tree_name))
if session.ticket_number:
lines.append(f"Ticket: {session.ticket_number}")
if session.client_name:
lines.append(f"Client: {session.client_name}")
if options.include_timestamps:
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append("")
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
lines.append("EVIDENCE / REFERENCE")
lines.append("-" * 20)
lines.append(scratchpad)
lines.append("")
lines.append("TROUBLESHOOTING STEPS")
lines.append("-" * 20)
for i, decision in enumerate(session.decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
lines.append(f"\n{i}. {question}")
if answer:
lines.append(f" Answer: {answer}")
if notes:
lines.append(f" Notes: {notes}")
return "\n".join(lines)
def _generate_html_export(session: Session, options: SessionExport) -> str:
"""Generate HTML export."""
tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session"))
html_parts = ['<!DOCTYPE html>', '<html>', '<head>',
'<meta charset="UTF-8">',
f'<title>{tree_name}</title>',
'<style>',
'body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }',
'h1 { color: #333; }',
'.meta { color: #666; margin-bottom: 20px; }',
'.step { margin-bottom: 15px; padding: 10px; background: #f5f5f5; border-radius: 5px; }',
'.step h3 { margin: 0 0 10px 0; color: #444; }',
'.answer { font-weight: bold; }',
'.notes { font-style: italic; color: #555; }',
'.timestamp { font-size: 0.85em; color: #888; }',
'</style>',
'</head>', '<body>']
if options.include_tree_info:
html_parts.append(f'<h1>{tree_name}</h1>')
html_parts.append('<div class="meta">')
if session.ticket_number:
html_parts.append(f'<p><strong>Ticket:</strong> {html.escape(session.ticket_number)}</p>')
if session.client_name:
html_parts.append(f'<p><strong>Client:</strong> {html.escape(session.client_name)}</p>')
if options.include_timestamps:
html_parts.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
if session.completed_at:
html_parts.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
html_parts.append('</div>')
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
html_parts.append('<h2>Evidence / Reference</h2>')
html_parts.append(f'<div class="scratchpad" style="white-space: pre-wrap; background: #f9f9f9; padding: 12px; border-radius: 4px; margin-bottom: 20px;">{html.escape(scratchpad)}</div>')
html_parts.append('<h2>Troubleshooting Steps</h2>')
for i, decision in enumerate(session.decisions, 1):
question = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
answer = html.escape(decision.get("answer", ""))
notes = html.escape(decision.get("notes", ""))
html_parts.append('<div class="step">')
html_parts.append(f'<h3>Step {i}: {question}</h3>')
if answer:
html_parts.append(f'<p class="answer">Answer: {answer}</p>')
if notes:
html_parts.append(f'<p class="notes">Notes: {notes}</p>')
if options.include_timestamps and decision.get("timestamp"):
html_parts.append(f'<p class="timestamp">{html.escape(str(decision["timestamp"]))}</p>')
html_parts.append('</div>')
html_parts.extend(['</body>', '</html>'])
return "\n".join(html_parts)