fix: critical security hardening (Phase A permissions audit)

- Remove role field from UserCreate schema, hardcode 'engineer' at registration
- Escape all user content in HTML export with html.escape() (XSS fix)
- Add field_validator to reject default SECRET_KEY when DEBUG=False
- Add CHECK constraint on users.role ('engineer'|'viewer') + migration 011
- Fix test_admin fixture to properly grant is_super_admin via ORM
- Fix circular FK (users↔invite_codes) in test DB setup with DROP SCHEMA CASCADE
- Add 5 new security tests (role validation + XSS prevention)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-02-05 22:04:37 -05:00
parent fd8fab97bd
commit 3e0fb92012
10 changed files with 236 additions and 48 deletions

View File

@@ -75,7 +75,7 @@ async def register(
email=user_data.email,
password_hash=get_password_hash(user_data.password),
name=user_data.name,
role=user_data.role,
role="engineer",
invite_code_id=invite_code_record.id if invite_code_record else None
)
db.add(new_user)

View File

@@ -1,3 +1,4 @@
import html
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
@@ -348,9 +349,9 @@ def _generate_text_export(session: Session, options: SessionExport) -> str:
def _generate_html_export(session: Session, options: SessionExport) -> str:
"""Generate HTML export."""
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session"))
html = ['<!DOCTYPE html>', '<html>', '<head>',
html_parts = ['<!DOCTYPE html>', '<html>', '<head>',
'<meta charset="UTF-8">',
f'<title>{tree_name}</title>',
'<style>',
@@ -366,40 +367,40 @@ def _generate_html_export(session: Session, options: SessionExport) -> str:
'</head>', '<body>']
if options.include_tree_info:
html.append(f'<h1>{tree_name}</h1>')
html.append('<div class="meta">')
html_parts.append(f'<h1>{tree_name}</h1>')
html_parts.append('<div class="meta">')
if session.ticket_number:
html.append(f'<p><strong>Ticket:</strong> {session.ticket_number}</p>')
html_parts.append(f'<p><strong>Ticket:</strong> {html.escape(session.ticket_number)}</p>')
if session.client_name:
html.append(f'<p><strong>Client:</strong> {session.client_name}</p>')
html_parts.append(f'<p><strong>Client:</strong> {html.escape(session.client_name)}</p>')
if options.include_timestamps:
html.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
html_parts.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
if session.completed_at:
html.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
html.append('</div>')
html_parts.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
html_parts.append('</div>')
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
html.append('<h2>Evidence / Reference</h2>')
html.append(f'<div class="scratchpad" style="white-space: pre-wrap; background: #f9f9f9; padding: 12px; border-radius: 4px; margin-bottom: 20px;">{scratchpad}</div>')
html_parts.append('<h2>Evidence / Reference</h2>')
html_parts.append(f'<div class="scratchpad" style="white-space: pre-wrap; background: #f9f9f9; padding: 12px; border-radius: 4px; margin-bottom: 20px;">{html.escape(scratchpad)}</div>')
html.append('<h2>Troubleshooting Steps</h2>')
html_parts.append('<h2>Troubleshooting Steps</h2>')
for i, decision in enumerate(session.decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
question = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
answer = html.escape(decision.get("answer", ""))
notes = html.escape(decision.get("notes", ""))
html.append('<div class="step">')
html.append(f'<h3>Step {i}: {question}</h3>')
html_parts.append('<div class="step">')
html_parts.append(f'<h3>Step {i}: {question}</h3>')
if answer:
html.append(f'<p class="answer">Answer: {answer}</p>')
html_parts.append(f'<p class="answer">Answer: {answer}</p>')
if notes:
html.append(f'<p class="notes">Notes: {notes}</p>')
html_parts.append(f'<p class="notes">Notes: {notes}</p>')
if options.include_timestamps and decision.get("timestamp"):
html.append(f'<p class="timestamp">{decision["timestamp"]}</p>')
html.append('</div>')
html_parts.append(f'<p class="timestamp">{html.escape(str(decision["timestamp"]))}</p>')
html_parts.append('</div>')
html.extend(['</body>', '</html>'])
return "\n".join(html)
html_parts.extend(['</body>', '</html>'])
return "\n".join(html_parts)

View File

@@ -3,6 +3,9 @@ from pydantic import field_validator
from typing import Optional
_DEFAULT_SECRET_KEY = "your-secret-key-change-in-production-use-openssl-rand-hex-32"
class Settings(BaseSettings):
# Application
APP_NAME: str = "ResolutionFlow"
@@ -26,7 +29,19 @@ class Settings(BaseSettings):
return self.DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://", 1)
# JWT Settings
SECRET_KEY: str = "your-secret-key-change-in-production-use-openssl-rand-hex-32"
SECRET_KEY: str = _DEFAULT_SECRET_KEY
@field_validator("SECRET_KEY", mode="after")
@classmethod
def reject_default_secret_in_production(cls, v: str, info) -> str:
"""Fail loudly if the default secret key is used outside of DEBUG mode."""
debug = info.data.get("DEBUG", False)
if v == _DEFAULT_SECRET_KEY and not debug:
raise ValueError(
"SECRET_KEY must be set to a secure value in production. "
"Generate one with: openssl rand -hex 32"
)
return v
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
REFRESH_TOKEN_EXPIRE_DAYS: int = 7

View File

@@ -1,7 +1,7 @@
import uuid
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING
from sqlalchemy import String, DateTime, ForeignKey, Boolean
from sqlalchemy import String, DateTime, ForeignKey, Boolean, CheckConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
@@ -15,6 +15,12 @@ if TYPE_CHECKING:
class User(Base):
__tablename__ = "users"
__table_args__ = (
CheckConstraint(
"role IN ('engineer', 'viewer')",
name='ck_users_role_enum'
),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),

View File

@@ -11,7 +11,6 @@ class UserBase(BaseModel):
class UserCreate(UserBase):
password: str = Field(..., min_length=10, description="Password must be at least 10 characters")
role: str = Field(default="engineer", description="User role: engineer or viewer")
invite_code: Optional[str] = Field(None, description="Invite code for registration (required when invite system is enabled)")