fix: add ownership check and 404 responses to ai-sessions endpoints
Cross-tenant isolation audit found: - retry-psa-push had NO ownership check (CRITICAL) — any user could retry any session's PSA push - save_task_lane used db.get() without ownership filter, returned 403 revealing existence - get_session returned 403 instead of 404 for unauthorized access - stream_documentation returned 403 instead of 404 All now use query-level user_id filtering and return 404 to avoid revealing existence. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -519,11 +519,15 @@ async def save_task_lane(
|
||||
_: None = Depends(require_engineer_or_admin),
|
||||
):
|
||||
"""Save the current task lane state including user's in-progress responses."""
|
||||
session = await db.get(AISession, session_id)
|
||||
result = await db.execute(
|
||||
select(AISession).where(
|
||||
AISession.id == session_id,
|
||||
AISession.user_id == current_user.id,
|
||||
)
|
||||
)
|
||||
session = result.scalar_one_or_none()
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
if session.user_id != current_user.id:
|
||||
raise HTTPException(status_code=403, detail="Not your session")
|
||||
|
||||
payload = {
|
||||
"questions": [q.model_dump() for q in body.questions],
|
||||
@@ -901,7 +905,7 @@ async def get_session(
|
||||
pkg = session.escalation_package or {}
|
||||
is_handler = pkg.get("picked_up_by") == str(current_user.id)
|
||||
if session.user_id != current_user.id and session.escalated_to_id != current_user.id and not is_handler:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
|
||||
|
||||
return _build_session_detail(session)
|
||||
|
||||
@@ -942,13 +946,14 @@ async def stream_documentation(
|
||||
|
||||
# Verify session ownership
|
||||
result = await db.execute(
|
||||
select(AISession).where(AISession.id == session_id)
|
||||
select(AISession).where(
|
||||
AISession.id == session_id,
|
||||
AISession.user_id == current_user.id,
|
||||
)
|
||||
)
|
||||
session = result.scalar_one_or_none()
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
if session.user_id != current_user.id:
|
||||
raise HTTPException(status_code=403, detail="Not authorized")
|
||||
|
||||
async def event_generator():
|
||||
try:
|
||||
@@ -1043,6 +1048,19 @@ async def retry_psa_push_endpoint(
|
||||
"""Manually retry a failed PSA documentation push."""
|
||||
from app.models.psa_post_log import PsaPostLog
|
||||
|
||||
# Verify the session belongs to the current user
|
||||
session_result = await db.execute(
|
||||
select(AISession).where(
|
||||
AISession.id == session_id,
|
||||
AISession.user_id == current_user.id,
|
||||
)
|
||||
)
|
||||
if not session_result.scalar_one_or_none():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="Session not found",
|
||||
)
|
||||
|
||||
# Find the latest failed push log for this session
|
||||
result = await db.execute(
|
||||
select(PsaPostLog)
|
||||
|
||||
Reference in New Issue
Block a user