feat: add audit log table and integration with admin/tree endpoints

Creates AuditLog model with JSONB details column for tracking admin
actions. Integrates log_audit() helper into admin endpoints (role
change, team admin toggle, deactivate, activate) and tree delete.
IP address column reserved for future Railway proxy header support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-02-05 23:28:41 -05:00
parent 02d06acfb8
commit 3a5ac0f201
7 changed files with 159 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
"""add audit_logs table
Revision ID: 014
Revises: 013
Create Date: 2026-02-05
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
# revision identifiers, used by Alembic.
revision: str = '014'
down_revision: Union[str, None] = '013'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
'audit_logs',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('user_id', UUID(as_uuid=True), sa.ForeignKey('users.id'), nullable=False),
sa.Column('action', sa.String(50), nullable=False),
sa.Column('resource_type', sa.String(50), nullable=False),
sa.Column('resource_id', UUID(as_uuid=True), nullable=True),
sa.Column('details', JSONB, nullable=True),
sa.Column('ip_address', sa.String(45), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index('ix_audit_logs_user_id', 'audit_logs', ['user_id'])
op.create_index('ix_audit_logs_action', 'audit_logs', ['action'])
op.create_index('ix_audit_logs_resource_type', 'audit_logs', ['resource_type'])
op.create_index('ix_audit_logs_created_at', 'audit_logs', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_audit_logs_created_at', table_name='audit_logs')
op.drop_index('ix_audit_logs_resource_type', table_name='audit_logs')
op.drop_index('ix_audit_logs_action', table_name='audit_logs')
op.drop_index('ix_audit_logs_user_id', table_name='audit_logs')
op.drop_table('audit_logs')

View File

@@ -5,6 +5,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.core.database import get_db
from app.core.audit import log_audit
from app.models.user import User
from app.schemas.user import UserResponse, RoleUpdate, TeamAdminUpdate
from app.api.deps import require_admin
@@ -81,7 +82,10 @@ async def update_user_role(
detail="Cannot change your own role"
)
old_role = user.role
user.role = role_data.role
await log_audit(db, current_user.id, "user.role_change", "user", user.id,
{"old_role": old_role, "new_role": role_data.role})
await db.commit()
await db.refresh(user)
return user
@@ -111,6 +115,8 @@ async def toggle_team_admin(
)
user.is_team_admin = data.is_team_admin
await log_audit(db, current_user.id, "user.team_admin_toggle", "user", user.id,
{"is_team_admin": data.is_team_admin})
await db.commit()
await db.refresh(user)
return user
@@ -139,6 +145,7 @@ async def deactivate_user(
)
user.is_active = False
await log_audit(db, current_user.id, "user.deactivate", "user", user.id)
await db.commit()
await db.refresh(user)
return user
@@ -161,6 +168,7 @@ async def activate_user(
)
user.is_active = True
await log_audit(db, current_user.id, "user.activate", "user", user.id)
await db.commit()
await db.refresh(user)
return user

View File

@@ -14,6 +14,7 @@ from app.models.folder import UserFolder
from app.schemas.tree import TreeCreate, TreeUpdate, TreeResponse, TreeListResponse, CategoryInfo
from app.api.deps import get_current_active_user, require_engineer_or_admin, require_admin
from app.core.permissions import can_edit_tree, can_access_tree
from app.core.audit import log_audit
router = APIRouter(prefix="/trees", tags=["trees"])
@@ -518,5 +519,7 @@ async def delete_tree(
)
tree.is_active = False
await log_audit(db, current_user.id, "tree.delete", "tree", tree.id,
{"tree_name": tree.name})
await db.commit()
return None

24
backend/app/core/audit.py Normal file
View File

@@ -0,0 +1,24 @@
"""Centralized audit logging for admin and destructive actions."""
from uuid import UUID
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.audit_log import AuditLog
async def log_audit(
db: AsyncSession,
user_id: UUID,
action: str,
resource_type: str,
resource_id: Optional[UUID] = None,
details: Optional[dict] = None,
) -> None:
"""Record an audit log entry. Does not commit — piggybacks on the caller's commit."""
entry = AuditLog(
user_id=user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
details=details,
)
db.add(entry)

View File

@@ -10,6 +10,7 @@ from .folder import UserFolder, user_folder_trees
from .step_category import StepCategory
from .step_library import StepLibrary, StepRating, StepUsageLog
from .refresh_token import RefreshToken
from .audit_log import AuditLog
__all__ = [
"User",
@@ -28,4 +29,5 @@ __all__ = [
"StepRating",
"StepUsageLog",
"RefreshToken",
"AuditLog",
]

View File

@@ -0,0 +1,35 @@
import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import String, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
class AuditLog(Base):
__tablename__ = "audit_logs"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id"),
nullable=False,
index=True
)
action: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
resource_type: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
resource_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
nullable=True
)
details: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc)
)

View File

@@ -2,6 +2,9 @@
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.audit_log import AuditLog
class TestAdminEndpoints:
@@ -127,3 +130,42 @@ class TestAdminEndpoints:
)
assert response.status_code == 400
assert "own account" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_audit_log_created_on_role_change(
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession
):
"""Test that changing a user's role creates an audit log entry."""
user_id = test_user["user_data"]["id"]
await client.put(
f"/api/v1/admin/users/{user_id}/role",
json={"role": "viewer"},
headers=admin_auth_headers
)
result = await test_db.execute(
select(AuditLog).where(AuditLog.action == "user.role_change")
)
log = result.scalar_one_or_none()
assert log is not None
assert str(log.resource_id) == user_id
assert log.details["old_role"] == "engineer"
assert log.details["new_role"] == "viewer"
@pytest.mark.asyncio
async def test_audit_log_created_on_deactivate(
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession
):
"""Test that deactivating a user creates an audit log entry."""
user_id = test_user["user_data"]["id"]
await client.put(
f"/api/v1/admin/users/{user_id}/deactivate",
headers=admin_auth_headers
)
result = await test_db.execute(
select(AuditLog).where(AuditLog.action == "user.deactivate")
)
log = result.scalar_one_or_none()
assert log is not None
assert str(log.resource_id) == user_id