fix: race condition hardening across auth, counters, and data fetching #102

Merged
chihlasm merged 4 commits from fix/race-conditions-critical into main 2026-03-10 05:57:22 +00:00
9 changed files with 305 additions and 98 deletions

View File

@@ -5,7 +5,7 @@ from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy import select, update as sa_update
from app.core.config import settings
from app.core.settings_manager import SettingsManager
from app.core.database import get_db
@@ -78,13 +78,15 @@ async def register(
After user creation, if no account invite was used, a personal Account
and free Subscription are created automatically.
"""
# Check for account invite code FIRST — bypasses platform invite gate
# Check for account invite code FIRST — bypasses platform invite gate.
# SELECT FOR UPDATE prevents two concurrent registrations from both
# reading the same invite as unused and double-spending it.
account_invite_record = None
if user_data.account_invite_code:
result = await db.execute(
select(AccountInvite).where(
AccountInvite.code == user_data.account_invite_code
)
select(AccountInvite)
.where(AccountInvite.code == user_data.account_invite_code)
.with_for_update()
)
account_invite_record = result.scalar_one_or_none()
@@ -116,9 +118,12 @@ async def register(
)
if user_data.invite_code:
# Look up invite code (case-insensitive) — applies plan/trial regardless of REQUIRE_INVITE_CODE
# Look up invite code (case-insensitive) — applies plan/trial regardless of REQUIRE_INVITE_CODE.
# FOR UPDATE prevents double-spend by concurrent registrations.
result = await db.execute(
select(InviteCode).where(InviteCode.code == user_data.invite_code.upper())
select(InviteCode)
.where(InviteCode.code == user_data.invite_code.upper())
.with_for_update()
)
invite_code_record = result.scalar_one_or_none()
@@ -305,24 +310,29 @@ async def refresh_token(
user_id = payload.get("sub")
jti = payload.get("jti")
# Validate refresh token hasn't been revoked
# Atomically revoke the old refresh token (token rotation).
# Using a conditional UPDATE prevents the race where two concurrent
# refresh requests both read revoked_at=NULL and both succeed.
if jti:
token_hash = hash_token(jti)
result = await db.execute(
select(RefreshToken).where(RefreshToken.token_hash == token_hash)
sa_update(RefreshToken)
.where(
RefreshToken.token_hash == token_hash,
RefreshToken.revoked_at.is_(None),
)
.values(revoked_at=datetime.now(timezone.utc))
.returning(RefreshToken.id, RefreshToken.user_id)
)
stored_token = result.scalar_one_or_none()
revoked_row = result.fetchone()
if stored_token and stored_token.is_revoked:
if not revoked_row:
# Either the token doesn't exist or was already revoked/used
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token has been revoked"
)
# Revoke the old refresh token (token rotation)
if stored_token:
stored_token.revoked_at = datetime.now(timezone.utc)
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
@@ -552,9 +562,12 @@ async def reset_password(
detail="Invalid reset token"
)
# Validate token in DB (single-use)
# Validate token in DB (single-use).
# FOR UPDATE prevents two concurrent reset requests from both succeeding.
result = await db.execute(
select(PasswordResetToken).where(PasswordResetToken.token_hash == hash_token(jti))
select(PasswordResetToken)
.where(PasswordResetToken.token_hash == hash_token(jti))
.with_for_update()
)
token_record = result.scalar_one_or_none()
@@ -674,10 +687,11 @@ async def verify_email(
detail="Invalid verification token"
)
# FOR UPDATE prevents two concurrent verification requests from both succeeding.
result = await db.execute(
select(EmailVerificationToken).where(
EmailVerificationToken.token_hash == hash_token(jti)
)
select(EmailVerificationToken)
.where(EmailVerificationToken.token_hash == hash_token(jti))
.with_for_update()
)
token_record = result.scalar_one_or_none()

View File

@@ -6,7 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel, Field as PydanticField
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy import select, update as sa_update
from app.core.database import get_db
from app.models.tree import Tree
@@ -189,8 +189,10 @@ async def start_session(
session_variables=session_variables,
)
# Increment tree usage count
tree.usage_count += 1
# Atomically increment tree usage count (SQL-level to avoid lost updates)
await db.execute(
sa_update(Tree).where(Tree.id == tree.id).values(usage_count=Tree.usage_count + 1)
)
db.add(new_session)
await db.commit()

View File

@@ -25,7 +25,7 @@ from app.models.user_pinned_tree import UserPinnedTree
from app.api.deps import get_current_active_user, require_engineer_or_admin, require_admin, get_service_account_id
from app.core.permissions import can_edit_tree, can_access_tree
from app.core.filters import build_tree_access_filter
from app.core.subscriptions import check_tree_limit
from app.core.subscriptions import check_tree_limit, get_account_subscription, get_plan_limits
from app.core.audit import log_audit
from app.core.config import settings
from app.core.tree_validation import can_publish_tree
@@ -487,6 +487,26 @@ async def create_tree(
db.add(new_tree)
await db.flush() # Get the ID
# Re-check tree limit after flush to close the TOCTOU race window:
# two concurrent creates could both pass the pre-check, but only one
# should succeed when the limit is exactly reached.
if not is_default and current_user.account_id:
post_count = await db.scalar(
select(func.count(Tree.id)).where(
Tree.account_id == current_user.account_id,
Tree.deleted_at.is_(None),
)
)
sub = await get_account_subscription(current_user.account_id, db)
if sub:
limits = await get_plan_limits(sub.plan, db)
if limits and limits.max_trees and (post_count or 0) > limits.max_trees:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Tree limit reached ({limits.max_trees}/{limits.max_trees}). Upgrade your plan to create more trees."
)
# Handle tags
if tree_data.tags:
tree_account_id = new_tree.account_id or (current_user.account_id if not current_user.is_super_admin else None)
@@ -519,7 +539,6 @@ async def create_tree(
await db.flush()
tags_to_add.append(tag)
tag.usage_count += 1
# Use direct SQL insert for the junction table to avoid lazy load issues
from app.models.tag import tree_tag_assignments
@@ -531,6 +550,10 @@ async def create_tree(
assigned_by=current_user.id
)
)
# Atomically increment (SQL-level to avoid lost updates from concurrent requests)
await db.execute(
update(TreeTag).where(TreeTag.id == tag.id).values(usage_count=TreeTag.usage_count + 1)
)
await db.commit()
@@ -673,9 +696,14 @@ async def update_tree(
if tags_data is not None:
from app.models.tag import tree_tag_assignments
# Decrement usage count for old tags (already eagerly loaded)
for tag in tree.tags:
tag.usage_count = max(0, tag.usage_count - 1)
# Atomically decrement usage count for old tags
old_tag_ids = [tag.id for tag in tree.tags]
if old_tag_ids:
await db.execute(
update(TreeTag)
.where(TreeTag.id.in_(old_tag_ids))
.values(usage_count=func.greatest(TreeTag.usage_count - 1, 0))
)
# Delete existing tag assignments using direct SQL
await db.execute(
@@ -720,7 +748,10 @@ async def update_tree(
)
)
added_tag_ids.add(tag.id)
tag.usage_count += 1
# Atomically increment (SQL-level to avoid lost updates)
await db.execute(
update(TreeTag).where(TreeTag.id == tag.id).values(usage_count=TreeTag.usage_count + 1)
)
await db.commit()

View File

@@ -0,0 +1,139 @@
# Plan: Flexible Intake — Deferred Variables + Prepared Sessions
## Context
The current intake form on procedural flows is a blocking modal that forces engineers to enter all variables before the flow starts. This creates friction because:
- Engineers don't always have all the information upfront
- Information often lives in PSA tickets, RMM tools, or was communicated verbally
- Sometimes a lead/PM has the info and wants to set up the session for an engineer to execute later
**Goal:** Replace the blocking intake modal with two complementary workflows:
1. **Deferred Variables** — start the flow immediately, fill variables inline as you encounter them
2. **Prepared Sessions** — pre-fill variables ahead of time, optionally assign to an engineer, execute later
---
## Design
### Workflow 1: Deferred Variables (Start Now, Fill Later)
- "Start Flow" launches the session immediately — no intake modal
- Variables begin empty
- When a step references `[VAR:server_name]` and it's unfilled, an **inline input prompt** renders in place — visually prominent with the field's label, help text, and styling that stands out (cyan border, slight glow)
- Once filled, the value persists and resolves everywhere in the session
- Engineers can also open a **"Session Variables" side panel** at any time to see/edit all variables
- At **session completion**, if required variables are still empty → soft warning with a prompt to fill them (for complete export documentation), but not a hard block
### Workflow 2: Prepared Sessions (Set Up Ahead, Execute Later)
- From a flow's detail page: "Prepare Session" action opens a form to fill variables + assign an engineer
- Creates a session in `prepared` state — `started_at` is null, variables populated, no steps executed
- **Assignment:** Preparer can assign to a specific engineer on their team (or leave unassigned)
- **Personal queue:** Engineers see prepared sessions assigned to them in a dedicated section (Quick Start page or Session History tab)
- Clicking a prepared session opens the flow with variables pre-populated; execution begins normally
- Unassigned prepared sessions are visible to all team members
### Data Model Changes
**Session model additions:**
- `prepared_by_id` — UUID FK to users, nullable. Who created the prepared session.
- `assigned_to_id` — UUID FK to users, nullable. Who should execute it.
- Use existing convention: `started_at IS NULL` = prepared, `started_at IS NOT NULL, completed_at IS NULL` = active, `completed_at IS NOT NULL` = completed
**Session variables become mutable:**
- `session_variables` is currently write-once at session creation
- New endpoint: `PATCH /sessions/{id}/variables` — updates individual variables during an active session
- Only the session owner (or assigned engineer) can update variables
**Migration:** One migration adding `prepared_by_id` and `assigned_to_id` columns with FK constraints.
### Variable Resolution Changes
**Backend:**
- No changes to export pipeline — it already resolves variables from `session_variables`
- New `PATCH /sessions/{id}/variables` endpoint accepts partial variable updates
- Session creation no longer validates required intake fields (they can be filled later)
**Frontend — `resolveVariables()` in `lib/variableResolver.ts`:**
- Currently returns a plain string with `[VAR:x]` replaced
- New behavior: also identify unresolved variables so `StepDetail` can render inline prompts
**Frontend — `StepDetail.tsx`:**
- When rendering step content, unresolved `[VAR:x]` references render as inline input components
- Inline prompt design: input field with the field's label as placeholder, cyan border, subtle glow background to make them visually prominent and easy to spot
- On blur/enter: calls `PATCH /sessions/{id}/variables` → re-renders step with resolved value
- Lookup field metadata (label, field_type, help_text, options) from the intake form definition in the tree snapshot
**Frontend — Session Variables Panel:**
- Existing "View Parameters" button becomes "Session Variables" — now editable
- Shows all intake form fields with filled/unfilled status
- Unfilled required fields highlighted
- Editing a field here updates the session and re-resolves all visible steps
### API Changes
| Method | Endpoint | Description |
|--------|----------|-------------|
| `PATCH` | `/sessions/{id}/variables` | Update one or more session variables (partial dict merge) |
| `POST` | `/sessions` | Remove required-field validation for intake forms (allow empty start) |
| `GET` | `/sessions` | Add `assigned_to_id` and `status=prepared` filter params |
| `POST` | `/sessions/prepare` | New endpoint: create a prepared session with variables + optional assignee |
### UI Changes
| Location | Change |
|----------|--------|
| **Flow detail page** | "Start Flow" no longer shows intake modal. Add "Prepare Session" option (dropdown or secondary button) |
| **ProceduralNavigationPage** | Remove `IntakeFormModal` gating. Add "Session Variables" panel button. Inline prompts on steps with unfilled variables |
| **StepDetail** | Render inline input prompts for unresolved `[VAR:x]` references |
| **Quick Start page** | New "Prepared for You" section showing assigned prepared sessions |
| **Session History** | New "Prepared" tab/filter showing prepared sessions |
| **Prepare Session form** | New modal/page: select flow, fill variables, assign engineer, save |
| **Session completion** | Soft warning if required variables still empty |
### What Gets Removed
- `IntakeFormModal.tsx` — no longer used as a blocking gate (may repurpose as the "Prepare Session" form)
- Required-field validation in `POST /sessions` for intake form fields
- The `showIntakeForm` / intake modal state in `ProceduralNavigationPage`
---
## Implementation Phases
### Phase 1: Mutable Variables + Inline Prompts
**Files:** `sessions.py`, `variableResolver.ts`, `StepDetail.tsx`, `ProceduralNavigationPage.tsx`
1. Add `PATCH /sessions/{id}/variables` endpoint
2. Remove intake form required-field blocking from `POST /sessions`
3. Update `resolveVariables()` to identify unresolved variables
4. Build inline variable prompt component for `StepDetail`
5. Make "View Parameters" panel editable
6. Remove `IntakeFormModal` gating from `ProceduralNavigationPage`
### Phase 2: Prepared Sessions
**Files:** `sessions.py`, `session.py` (schemas), migration, `PrepareSessionModal.tsx`, `QuickStartPage.tsx`, `SessionHistoryPage.tsx`
1. Migration: add `prepared_by_id`, `assigned_to_id` to sessions table
2. `POST /sessions/prepare` endpoint
3. `GET /sessions` filter support for `assigned_to_id` and prepared status
4. Prepare Session modal/form (reuse IntakeFormModal field rendering)
5. "Prepared for You" section on Quick Start
6. "Prepared" filter on Session History
### Phase 3: Polish
1. Soft completion warning for unfilled required variables
2. Prepared session staleness indicator (optional)
3. Notification when a session is prepared/assigned to you (optional, future)
---
## Verification
- Start a procedural flow without filling any variables → flow starts immediately, no modal
- Navigate to a step with `[VAR:server_name]` → see inline input prompt
- Fill the variable inline → value resolves across all steps
- Open Session Variables panel → see all fields, edit one → reflected in steps
- Prepare a session from flow detail page → assign to another engineer
- Log in as assigned engineer → see prepared session in Quick Start queue
- Click prepared session → flow opens with variables pre-filled, execute normally
- Complete a session with one unfilled required variable → see soft warning
- Export session → variables resolved in output, unfilled ones show as `[VAR:x]` or blank

View File

@@ -75,15 +75,19 @@ let refreshSubscribers: ((token: string) => void)[] = []
let refreshFailSubscribers: ((error: unknown) => void)[] = []
function onRefreshed(token: string) {
refreshSubscribers.forEach(cb => cb(token))
// Swap arrays before iterating — if a callback throws, the arrays
// are already cleared so the next refresh cycle starts clean.
const subscribers = refreshSubscribers
refreshSubscribers = []
refreshFailSubscribers = []
subscribers.forEach(cb => cb(token))
}
function onRefreshFailed(error: unknown) {
refreshFailSubscribers.forEach(cb => cb(error))
const failSubscribers = refreshFailSubscribers
refreshSubscribers = []
refreshFailSubscribers = []
failSubscribers.forEach(cb => cb(error))
}
// Response interceptor - handle token refresh

View File

@@ -176,7 +176,8 @@ export function QuickStartPage() {
return () => window.removeEventListener('focus', onFocus)
}, [loadFlows])
// Debounced search
// Debounced search with staleness guard
const searchRequestId = useRef(0)
useEffect(() => {
if (debounceRef.current) clearTimeout(debounceRef.current)
if (query.length < 2) {
@@ -188,13 +189,16 @@ export function QuickStartPage() {
setIsSearching(true)
setShowResults(true)
debounceRef.current = setTimeout(async () => {
const requestId = ++searchRequestId.current
try {
const results = await treesApi.search(query, 8)
if (requestId !== searchRequestId.current) return
setSearchResults(results)
} catch {
if (requestId !== searchRequestId.current) return
setSearchResults([])
} finally {
setIsSearching(false)
if (requestId === searchRequestId.current) setIsSearching(false)
}
}, 300)
return () => { if (debounceRef.current) clearTimeout(debounceRef.current) }

View File

@@ -60,7 +60,59 @@ export function SessionHistoryPage() {
// Load sessions when filters change
useEffect(() => {
let cancelled = false
const loadSessions = async () => {
setIsLoading(true)
try {
const params: Record<string, string | boolean> = {}
// Tab filter (all/active/completed)
if (filter !== 'all') {
params.completed = filter === 'completed'
}
// Search/filter params
if (filters.ticketNumber) {
params.ticket_number = filters.ticketNumber
}
if (filters.clientName) {
params.client_name = filters.clientName
}
if (filters.treeName) {
params.tree_name = filters.treeName
}
// Date range params
if (filters.dateRange?.from) {
const fromDate = filters.dateRange.from
const toDate = filters.dateRange.to || filters.dateRange.from
if (filters.dateType === 'started') {
params.started_after = fromDate.toISOString()
params.started_before = toDate.toISOString()
} else {
params.completed_after = fromDate.toISOString()
params.completed_before = toDate.toISOString()
}
}
const sessionsData = await sessionsApi.list({ ...params, size: 51 })
if (cancelled) return
const truncated = sessionsData.length > 50
setHasMore(truncated)
setSessions(truncated ? sessionsData.slice(0, 50) : sessionsData)
} catch (err) {
if (cancelled) return
toast.error('Failed to load sessions')
console.error(err)
} finally {
if (!cancelled) setIsLoading(false)
}
}
loadSessions()
return () => { cancelled = true }
}, [filter, filters])
// Update URL params when filters change
@@ -79,53 +131,6 @@ export function SessionHistoryPage() {
setSearchParams(params, { replace: true })
}, [filters, setSearchParams])
const loadSessions = async () => {
setIsLoading(true)
try {
const params: Record<string, string | boolean> = {}
// Tab filter (all/active/completed)
if (filter !== 'all') {
params.completed = filter === 'completed'
}
// Search/filter params
if (filters.ticketNumber) {
params.ticket_number = filters.ticketNumber
}
if (filters.clientName) {
params.client_name = filters.clientName
}
if (filters.treeName) {
params.tree_name = filters.treeName
}
// Date range params
if (filters.dateRange?.from) {
const fromDate = filters.dateRange.from
const toDate = filters.dateRange.to || filters.dateRange.from
if (filters.dateType === 'started') {
params.started_after = fromDate.toISOString()
params.started_before = toDate.toISOString()
} else {
params.completed_after = fromDate.toISOString()
params.completed_before = toDate.toISOString()
}
}
const sessionsData = await sessionsApi.list({ ...params, size: 51 })
const truncated = sessionsData.length > 50
setHasMore(truncated)
setSessions(truncated ? sessionsData.slice(0, 50) : sessionsData)
} catch (err) {
toast.error('Failed to load sessions')
console.error(err)
} finally {
setIsLoading(false)
}
}
const handleFilterChange = (newFilters: SessionFilterState) => {
setFilters(newFilters)
}

View File

@@ -330,6 +330,7 @@ export function TreeEditorPage() {
}, [updateNode, selectNode])
const handleSaveDraft = useCallback(async () => {
if (isSaving) return
setSaving(true)
try {
// In Code Mode, run fresh validation on current markdown before saving
@@ -388,9 +389,10 @@ export function TreeEditorPage() {
} finally {
setSaving(false)
}
}, [isEditMode, id, editorMode, getTreeForSave, markSaved, navigate])
}, [isSaving, isEditMode, id, editorMode, getTreeForSave, markSaved, navigate])
const handlePublish = useCallback(async () => {
if (isSaving) return
setSaving(true)
try {
// In Code Mode, run fresh validation on current markdown before publishing
@@ -467,7 +469,7 @@ export function TreeEditorPage() {
} finally {
setSaving(false)
}
}, [isEditMode, id, editorMode, validate, getTreeForSave, markSaved, navigate])
}, [isSaving, isEditMode, id, editorMode, validate, getTreeForSave, markSaved, navigate])
// Keep handleSave for backward compatibility (Ctrl+S shortcut)
const handleSave = useCallback(async () => {

View File

@@ -1,4 +1,4 @@
import { useEffect, useState, useCallback, useMemo } from 'react'
import { useEffect, useState, useCallback, useMemo, useRef } from 'react'
import { useNavigate, useSearchParams } from 'react-router-dom'
import { X, RotateCcw, Play, FileUp } from 'lucide-react'
import { PageMeta } from '@/components/common/PageMeta'
@@ -158,20 +158,11 @@ export function TreeLibraryPage() {
.catch((err) => console.error('Failed to load categories:', err))
}, [])
// Load trees when filters change
useEffect(() => {
loadTrees()
}, [selectedCategoryId, selectedTags, selectedFolderId, treeLibrarySortBy, typeFilter])
// Request ID ref to discard stale responses when filters change rapidly
const loadTreesRequestId = useRef(0)
// Load folders on mount and listen for changes
useEffect(() => {
loadFolders()
const handleFolderChange = () => loadFolders()
window.addEventListener('folder-changed', handleFolderChange)
return () => window.removeEventListener('folder-changed', handleFolderChange)
}, [loadFolders])
const loadTrees = async () => {
const loadTrees = useCallback(async () => {
const requestId = ++loadTreesRequestId.current
setIsLoading(true)
try {
const treesData = await treesApi.list({
@@ -181,14 +172,29 @@ export function TreeLibraryPage() {
folder_id: selectedFolderId || undefined,
sort_by: treeLibrarySortBy,
})
if (requestId !== loadTreesRequestId.current) return
setTrees(treesData)
} catch (err) {
if (requestId !== loadTreesRequestId.current) return
toast.error('Failed to load flows')
console.error(err)
} finally {
setIsLoading(false)
if (requestId === loadTreesRequestId.current) setIsLoading(false)
}
}
}, [selectedCategoryId, selectedTags, selectedFolderId, treeLibrarySortBy, typeFilter])
// Load trees when filters change
useEffect(() => {
loadTrees()
}, [loadTrees])
// Load folders on mount and listen for changes
useEffect(() => {
loadFolders()
const handleFolderChange = () => loadFolders()
window.addEventListener('folder-changed', handleFolderChange)
return () => window.removeEventListener('folder-changed', handleFolderChange)
}, [loadFolders])
const handleSearch = async () => {
if (!searchQuery.trim()) {