26 bite-sized TDD tasks covering: l1_tech role + perms, seat enforcement (L1 + engineer together), 5 migrations (role/columns, FlowProposal, internal_tickets, l1_walk_sessions), seat_enforcement/internal_ticket/ l1_session services, full L1 endpoint surface (intake/queue/step/notes/ resolve/escalate/escalate-without-walk), APScheduler cleanup for 24h abandoned sessions, frontend usePermissions/Sidebar/router updates, L1Dashboard (active + empty state + resume widget), L1WalkPage with tree and adhoc variants, coverage banner, seat counter widget, RLS regression tests, E2E Playwright suite, acceptance walkthrough. Phase 2 (AI build + KB documents) and Phase 3 (KB connectors) get their own plan files. Phase 1 ships with adhoc walks as the default intake; user-facing flow selection ships in Phase 2 alongside the AI matcher. PSA close/reassign is a Phase 1 stub (deferred to Phase 2). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
138 KiB
L1 Workspace — Phase 1 Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Ship the L1 helpdesk workspace — new role, dedicated /l1/* surface, walker (tree + adhoc variants), session lifecycle, escalation, internal-ticket fallback, coverage flag, seat enforcement — working end-to-end against existing authored flows.
Architecture: New l1_tech role between engineer and viewer. Two new tables (l1_walk_sessions, internal_tickets) and small column additions to flow_proposals, users, accounts, subscriptions, audit_logs. New services/l1_session_service.py orchestrates walking-session lifecycle. New services/seat_enforcement.py shared by invite/role-change paths for both L1 and engineer seats. Frontend gets a role-gated /l1/* page tree. AI tree-builder and KB connectors are explicitly out of Phase 1 (Phases 2 and 3).
Tech Stack: Python 3.12 + FastAPI + SQLAlchemy 2.0 async + Alembic + Pydantic v2 + APScheduler (backend). React 19 + Vite + TypeScript + Tailwind v4 + Zustand + React Router v7 (frontend). PostgreSQL 16 with RLS.
Spec: docs/superpowers/specs/2026-05-28-l1-workspace-design.md — read in full before starting.
Out of scope for Phase 1 (deferred to Phase 2/3): match_or_build orchestrator, AI tree-builder, kb_documents tables, KB connectors (IT Glue / Hudu / Microsoft Graph), SUGGEST_THRESHOLD near-miss UX, BuildAbortedNoKB screen with near-miss option. User-facing flow selection is also deferred — Phase 1 intake creates adhoc walks only; the walker's flow variant exists in code and works when flow_id is passed to the intake endpoint (used by tests + direct API), but the UI surface for L1s to manually pick a flow ships in Phase 2 alongside the AI matcher. FlowProposal column extensions ARE included in Phase 1 (model is ready; Phase 2 populates).
File Structure
Backend — new files:
backend/app/models/l1_walk_session.py— SQLAlchemyL1WalkSessionbackend/app/models/internal_ticket.py— SQLAlchemyInternalTicketbackend/app/schemas/l1.py— Pydantic request/response shapesbackend/app/schemas/seat_enforcement.py—SeatCheckResult,SeatUsageshapesbackend/app/services/seat_enforcement.py—check_seat_availableshared helperbackend/app/services/internal_ticket_service.py— CRUD + status transitionsbackend/app/services/l1_session_service.py— session lifecycle (start/step/notes/resolve/escalate)backend/app/services/l1_session_cleanup.py— APScheduler hourly job, 24h abandonmentbackend/app/api/endpoints/l1.py— all/l1/*endpointsbackend/alembic/versions/<hash>_add_l1_tech_role.py— role + column additionsbackend/alembic/versions/<hash>_extend_flow_proposals.py— FlowProposal columnsbackend/alembic/versions/<hash>_create_internal_tickets.py— table + RLSbackend/alembic/versions/<hash>_create_l1_walk_sessions.py— table + RLS + check constraintbackend/tests/test_seat_enforcement.pybackend/tests/test_internal_ticket_service.pybackend/tests/test_l1_session_service.pybackend/tests/test_l1_endpoints.pybackend/tests/test_l1_rls.py
Backend — modified files:
backend/app/models/flow_proposal.py— add new columnsbackend/app/models/user.py— addcan_cover_l1backend/app/models/account.py— addl1_seats_purchasedbackend/app/models/subscription.py— addl1_seat_limitbackend/app/models/audit_log.py— addacting_asbackend/app/core/permissions.py— addl1_techto role docstring + helpersbackend/app/api/deps.py— addrequire_l1,require_l1_or_coverage,require_l1_or_abovebackend/app/api/router.py— registerl1router +internal-ticketsrouterbackend/app/api/endpoints/invite.py— integrate seat enforcementbackend/app/api/endpoints/accounts.py— seat-usage endpoint + coverage PATCH + role-change checkbackend/app/main.py— register cleanup scheduler in lifespan
Frontend — new files:
frontend/src/pages/l1/L1Dashboard.tsxfrontend/src/pages/l1/L1WalkPage.tsxfrontend/src/pages/l1/L1DraftsPage.tsxfrontend/src/pages/l1/L1TicketsPage.tsxfrontend/src/components/l1/L1WalkTreeVariant.tsxfrontend/src/components/l1/L1WalkAdhocVariant.tsxfrontend/src/components/l1/L1CoverageBanner.tsxfrontend/src/components/l1/EmptyStateCard.tsxfrontend/src/components/l1/ResumeInProgress.tsxfrontend/src/components/admin/SeatCounterWidget.tsxfrontend/src/components/layout/L1RouteGuard.tsxfrontend/src/api/l1.tsfrontend/src/types/l1.ts
Frontend — modified files:
frontend/src/hooks/usePermissions.ts— addisL1Tech,canCoverL1, role-tier checkfrontend/src/components/layout/Sidebar.tsx— role-based nav arrayfrontend/src/components/layout/ProtectedRoute.tsx— L1 post-login redirectfrontend/src/router.tsx— register/l1/*routesfrontend/src/types/auth.ts(or whereverUser.account_rolelives) — add'l1_tech'to unionfrontend/src/api/index.ts— exportl1APIfrontend/src/types/index.ts— exportl1types
Task 1: Backend — extend role docstring + permission helpers
Files:
-
Modify:
backend/app/core/permissions.py(header docstring + any role-list constants) -
Step 1: Open file and locate the role docstring (around lines 5–10).
Read permissions.py to confirm current shape.
- Step 2: Add
l1_techto the role docstring.
Replace the existing role list block:
"""
Permissions module.
Role hierarchy:
- super_admin: is_super_admin=True, full system access
- owner: account_role='owner', manage account resources
- engineer: account_role='engineer' (default), CRUD own trees/steps
- l1_tech: account_role='l1_tech', use /l1/* surface only — walk flows, resolve/escalate
- viewer: account_role='viewer', read-only (can browse, run sessions, rate steps)
"""
- Step 3: If there's a
VALID_ROLESconstant or similar enum/list, add'l1_tech'.
Grep first:
grep -n "engineer.*viewer\|VALID_ROLES\|ROLE_HIERARCHY" backend/app/core/permissions.py
If a list/tuple exists, insert 'l1_tech' between 'engineer' and 'viewer'. If not, no change.
-
Step 4: No tests for this docstring-only change. Move to commit.
-
Step 5: Commit.
git add backend/app/core/permissions.py
git commit -m "feat(l1): add l1_tech role to permissions docstring"
Task 2: Backend — add require_l1, require_l1_or_coverage, require_l1_or_above deps
Files:
-
Modify:
backend/app/api/deps.py(add new dep functions afterrequire_engineer_or_admin) -
Test:
backend/tests/test_deps_l1.py(new) -
Step 1: Write the failing tests.
Create backend/tests/test_deps_l1.py:
import pytest
from fastapi import HTTPException
from app.api.deps import require_l1, require_l1_or_coverage, require_l1_or_above
from tests.factories import make_user # existing test factory
@pytest.mark.asyncio
async def test_require_l1_passes_for_l1_tech():
user = make_user(account_role='l1_tech')
result = await require_l1(current_user=user)
assert result is user
@pytest.mark.asyncio
async def test_require_l1_blocks_engineer():
user = make_user(account_role='engineer')
with pytest.raises(HTTPException) as exc:
await require_l1(current_user=user)
assert exc.value.status_code == 403
@pytest.mark.asyncio
async def test_require_l1_or_coverage_passes_engineer_with_flag():
user = make_user(account_role='engineer', can_cover_l1=True)
result = await require_l1_or_coverage(current_user=user)
assert result is user
@pytest.mark.asyncio
async def test_require_l1_or_coverage_blocks_engineer_without_flag():
user = make_user(account_role='engineer', can_cover_l1=False)
with pytest.raises(HTTPException) as exc:
await require_l1_or_coverage(current_user=user)
assert exc.value.status_code == 403
@pytest.mark.asyncio
async def test_require_l1_or_coverage_passes_owner_always():
user = make_user(account_role='owner', can_cover_l1=False)
result = await require_l1_or_coverage(current_user=user)
assert result is user
@pytest.mark.asyncio
async def test_require_l1_or_above_passes_engineer():
user = make_user(account_role='engineer')
result = await require_l1_or_above(current_user=user)
assert result is user
@pytest.mark.asyncio
async def test_require_l1_or_above_blocks_viewer():
user = make_user(account_role='viewer')
with pytest.raises(HTTPException) as exc:
await require_l1_or_above(current_user=user)
assert exc.value.status_code == 403
- Step 2: Run tests to verify they fail.
docker exec resolutionflow_backend pytest backend/tests/test_deps_l1.py -v
Expected: ImportError (require_l1 etc. don't exist yet).
- Step 3: Add the new deps in
backend/app/api/deps.py.
After the existing require_engineer_or_admin definition, add:
async def require_l1(
current_user: User = Depends(get_current_active_user),
) -> User:
"""L1 tech only (exact match). Used by endpoints exclusive to L1 self-service."""
if current_user.is_super_admin:
return current_user # super_admin bypass for support purposes
if current_user.account_role != "l1_tech":
raise HTTPException(status_code=403, detail="L1 tech role required")
return current_user
async def require_l1_or_coverage(
current_user: User = Depends(get_current_active_user),
) -> User:
"""
L1 endpoints accessible to: l1_tech, engineers with can_cover_l1, owners, super_admin.
The "coverage" tier — engineers covering a frontline shift.
"""
if current_user.is_super_admin:
return current_user
role = current_user.account_role
if role == "l1_tech":
return current_user
if role == "owner":
return current_user
if role == "engineer" and current_user.can_cover_l1:
return current_user
raise HTTPException(
status_code=403,
detail="L1 access requires l1_tech role or engineer coverage flag",
)
async def require_l1_or_above(
current_user: User = Depends(get_current_active_user),
) -> User:
"""
Anyone at L1 tier or higher. Used for shared resources L1s can see
(e.g., flow library, KB connector list view).
"""
if current_user.is_super_admin:
return current_user
if current_user.account_role in ("l1_tech", "engineer", "owner"):
return current_user
raise HTTPException(status_code=403, detail="L1 or above required")
Also ensure User model is imported at the top of deps.py (likely already imported).
- Step 4: Run tests to verify they pass.
docker exec resolutionflow_backend pytest backend/tests/test_deps_l1.py -v
Expected: 7 PASSED.
- Step 5: Commit.
git add backend/app/api/deps.py backend/tests/test_deps_l1.py
git commit -m "feat(l1): add require_l1, require_l1_or_coverage, require_l1_or_above deps"
Task 3: Migration — column additions (can_cover_l1, l1_seats_purchased, audit_logs.acting_as, l1_seat_limit)
Files:
-
Create:
backend/alembic/versions/<hash>_add_l1_columns.py -
Modify:
backend/app/models/user.py(addcan_cover_l1) -
Modify:
backend/app/models/account.py(addl1_seats_purchased) -
Modify:
backend/app/models/subscription.py(addl1_seat_limit) -
Modify:
backend/app/models/audit_log.py(addacting_as) -
Step 1: Generate the manual migration (no autogenerate).
docker exec -w /app resolutionflow_backend alembic revision -m "add_l1_columns"
This creates a file backend/alembic/versions/<hash>_add_l1_columns.py. Note the hash from the output.
- Step 2: Write the migration content.
Open the generated file and replace the upgrade() / downgrade() bodies:
def upgrade() -> None:
op.add_column(
'users',
sa.Column('can_cover_l1', sa.Boolean(), nullable=False, server_default='false'),
)
op.add_column(
'accounts',
sa.Column('l1_seats_purchased', sa.Integer(), nullable=False, server_default='0'),
)
op.add_column(
'subscriptions',
sa.Column('l1_seat_limit', sa.Integer(), nullable=True),
)
op.add_column(
'audit_logs',
sa.Column('acting_as', sa.String(30), nullable=True),
)
def downgrade() -> None:
op.drop_column('audit_logs', 'acting_as')
op.drop_column('subscriptions', 'l1_seat_limit')
op.drop_column('accounts', 'l1_seats_purchased')
op.drop_column('users', 'can_cover_l1')
- Step 3: Add the matching columns to the SQLAlchemy models.
In backend/app/models/user.py, add inside the User class column block:
can_cover_l1: Mapped[bool] = mapped_column(
sa.Boolean(), nullable=False, server_default=sa.text('false')
)
In backend/app/models/account.py:
l1_seats_purchased: Mapped[int] = mapped_column(
sa.Integer(), nullable=False, server_default=sa.text('0')
)
In backend/app/models/subscription.py:
l1_seat_limit: Mapped[Optional[int]] = mapped_column(sa.Integer(), nullable=True)
In backend/app/models/audit_log.py:
acting_as: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True)
- Step 4: Run migration to verify it applies.
docker exec -w /app resolutionflow_backend alembic upgrade head
Expected: applies successfully, no errors.
- Step 5: Verify schema via psql.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d users" | grep can_cover_l1
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d accounts" | grep l1_seats_purchased
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d subscriptions" | grep l1_seat_limit
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d audit_logs" | grep acting_as
Expected: each grep returns the new column row.
- Step 6: Verify downgrade works (then upgrade again).
docker exec -w /app resolutionflow_backend alembic downgrade -1
docker exec -w /app resolutionflow_backend alembic upgrade head
- Step 7: Commit.
git add backend/alembic/versions/<hash>_add_l1_columns.py backend/app/models/user.py backend/app/models/account.py backend/app/models/subscription.py backend/app/models/audit_log.py
git commit -m "feat(l1): add column additions migration (can_cover_l1, l1_seats_purchased, l1_seat_limit, acting_as)"
Task 4: Migration — extend flow_proposals with L1 source columns
Files:
- Create:
backend/alembic/versions/<hash>_extend_flow_proposals.py - Modify:
backend/app/models/flow_proposal.py
Spec §5.1: add source, linked_ticket_id, linked_ticket_kind, validated_by_outcome to flow_proposals. (walked_path_snapshot is NOT added — it lives on l1_walk_sessions per the revised design.)
- Step 1: Generate migration.
docker exec -w /app resolutionflow_backend alembic revision -m "extend_flow_proposals_l1"
- Step 2: Write migration content.
def upgrade() -> None:
op.add_column(
'flow_proposals',
sa.Column('source', sa.String(30), nullable=True),
)
op.add_column(
'flow_proposals',
sa.Column('linked_ticket_id', sa.String(64), nullable=True),
)
op.add_column(
'flow_proposals',
sa.Column('linked_ticket_kind', sa.String(10), nullable=True),
)
op.add_column(
'flow_proposals',
sa.Column('validated_by_outcome', sa.Boolean(), nullable=False, server_default='false'),
)
# Backfill existing rows
op.execute("UPDATE flow_proposals SET source = 'manual_draft' WHERE source IS NULL")
# Now enforce NOT NULL on source
op.alter_column('flow_proposals', 'source', nullable=False)
# CHECK constraint on source values
op.create_check_constraint(
'ck_flow_proposals_source',
'flow_proposals',
"source IN ('ai_realtime_l1', 'kb_accelerator', 'manual_draft', 'ai_promoted')",
)
# CHECK constraint on linked_ticket_kind values
op.create_check_constraint(
'ck_flow_proposals_linked_ticket_kind',
'flow_proposals',
"linked_ticket_kind IS NULL OR linked_ticket_kind IN ('psa', 'internal')",
)
def downgrade() -> None:
op.drop_constraint('ck_flow_proposals_linked_ticket_kind', 'flow_proposals', type_='check')
op.drop_constraint('ck_flow_proposals_source', 'flow_proposals', type_='check')
op.drop_column('flow_proposals', 'validated_by_outcome')
op.drop_column('flow_proposals', 'linked_ticket_kind')
op.drop_column('flow_proposals', 'linked_ticket_id')
op.drop_column('flow_proposals', 'source')
- Step 3: Update
FlowProposalmodel.
In backend/app/models/flow_proposal.py, add inside the class:
source: Mapped[str] = mapped_column(sa.String(30), nullable=False, server_default=sa.text("'manual_draft'"))
linked_ticket_id: Mapped[Optional[str]] = mapped_column(sa.String(64), nullable=True)
linked_ticket_kind: Mapped[Optional[str]] = mapped_column(sa.String(10), nullable=True)
validated_by_outcome: Mapped[bool] = mapped_column(
sa.Boolean(), nullable=False, server_default=sa.text('false')
)
Also update __table_args__ to include the new check constraints:
__table_args__ = (
# ... existing constraints ...
CheckConstraint(
"source IN ('ai_realtime_l1', 'kb_accelerator', 'manual_draft', 'ai_promoted')",
name="ck_flow_proposals_source",
),
CheckConstraint(
"linked_ticket_kind IS NULL OR linked_ticket_kind IN ('psa', 'internal')",
name="ck_flow_proposals_linked_ticket_kind",
),
)
- Step 4: Apply migration.
docker exec -w /app resolutionflow_backend alembic upgrade head
- Step 5: Verify with psql.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d flow_proposals"
Expected: shows new columns + check constraints.
- Step 6: Commit.
git add backend/alembic/versions/<hash>_extend_flow_proposals.py backend/app/models/flow_proposal.py
git commit -m "feat(l1): extend FlowProposal with source/linked_ticket/validated_by_outcome"
Task 5: Migration + model — create internal_tickets table
Files:
-
Create:
backend/alembic/versions/<hash>_create_internal_tickets.py -
Create:
backend/app/models/internal_ticket.py -
Step 1: Generate migration.
docker exec -w /app resolutionflow_backend alembic revision -m "create_internal_tickets"
- Step 2: Write migration content.
def upgrade() -> None:
op.create_table(
'internal_tickets',
sa.Column('id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('account_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('created_by_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('customer_name', sa.String(120), nullable=True),
sa.Column('customer_contact', sa.String(200), nullable=True),
sa.Column('problem_statement', sa.Text(), nullable=False),
sa.Column('status', sa.String(30), nullable=False, server_default='open'),
sa.Column('flow_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('flow_proposal_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('ai_session_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('assigned_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('resolution_notes', sa.Text(), nullable=True),
sa.Column('psa_promoted_ticket_id', sa.String(64), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.Column('resolved_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['account_id'], ['accounts.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['created_by_user_id'], ['users.id'], ondelete='RESTRICT'),
sa.ForeignKeyConstraint(['flow_id'], ['trees.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['flow_proposal_id'], ['flow_proposals.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['ai_session_id'], ['ai_sessions.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['assigned_user_id'], ['users.id'], ondelete='SET NULL'),
sa.CheckConstraint(
"status IN ('open', 'walking', 'resolved', 'escalated')",
name='ck_internal_tickets_status',
),
)
op.create_index('ix_internal_tickets_account_id', 'internal_tickets', ['account_id'])
op.create_index('ix_internal_tickets_status', 'internal_tickets', ['status'])
op.create_index('ix_internal_tickets_assigned_user_id', 'internal_tickets', ['assigned_user_id'])
# RLS — match the project pattern
op.execute("ALTER TABLE internal_tickets ENABLE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY internal_tickets_account_isolation ON internal_tickets
USING (account_id = current_setting('app.current_account_id')::uuid)
WITH CHECK (account_id = current_setting('app.current_account_id')::uuid)
""")
def downgrade() -> None:
op.execute("DROP POLICY IF EXISTS internal_tickets_account_isolation ON internal_tickets")
op.execute("ALTER TABLE internal_tickets DISABLE ROW LEVEL SECURITY")
op.drop_index('ix_internal_tickets_assigned_user_id', 'internal_tickets')
op.drop_index('ix_internal_tickets_status', 'internal_tickets')
op.drop_index('ix_internal_tickets_account_id', 'internal_tickets')
op.drop_table('internal_tickets')
- Step 3: Create the SQLAlchemy model.
Create backend/app/models/internal_ticket.py:
import uuid
from datetime import datetime
from typing import Optional
import sqlalchemy as sa
from sqlalchemy import CheckConstraint, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class InternalTicket(Base):
__tablename__ = "internal_tickets"
__table_args__ = (
CheckConstraint(
"status IN ('open', 'walking', 'resolved', 'escalated')",
name="ck_internal_tickets_status",
),
)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
account_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
created_by_user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="RESTRICT"),
nullable=False,
)
customer_name: Mapped[Optional[str]] = mapped_column(sa.String(120), nullable=True)
customer_contact: Mapped[Optional[str]] = mapped_column(sa.String(200), nullable=True)
problem_statement: Mapped[str] = mapped_column(sa.Text(), nullable=False)
status: Mapped[str] = mapped_column(
sa.String(30), nullable=False, server_default=sa.text("'open'"), index=True
)
flow_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("trees.id", ondelete="SET NULL"), nullable=True
)
flow_proposal_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("flow_proposals.id", ondelete="SET NULL"), nullable=True
)
ai_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("ai_sessions.id", ondelete="SET NULL"), nullable=True
)
assigned_user_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
)
resolution_notes: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True)
psa_promoted_ticket_id: Mapped[Optional[str]] = mapped_column(sa.String(64), nullable=True)
created_at: Mapped[datetime] = mapped_column(
sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")
)
updated_at: Mapped[datetime] = mapped_column(
sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()"),
onupdate=sa.text("now()"),
)
resolved_at: Mapped[Optional[datetime]] = mapped_column(sa.DateTime(timezone=True), nullable=True)
- Step 4: Apply migration.
docker exec -w /app resolutionflow_backend alembic upgrade head
- Step 5: Verify RLS policy via psql.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "SELECT polname, polcmd, polqual FROM pg_policy WHERE polrelid = 'internal_tickets'::regclass"
Expected: one policy row internal_tickets_account_isolation.
- Step 6: Commit.
git add backend/alembic/versions/<hash>_create_internal_tickets.py backend/app/models/internal_ticket.py
git commit -m "feat(l1): create internal_tickets table with RLS"
Task 6: Migration + model — create l1_walk_sessions table
Files:
-
Create:
backend/alembic/versions/<hash>_create_l1_walk_sessions.py -
Create:
backend/app/models/l1_walk_session.py -
Step 1: Generate migration.
docker exec -w /app resolutionflow_backend alembic revision -m "create_l1_walk_sessions"
- Step 2: Write migration content.
def upgrade() -> None:
op.create_table(
'l1_walk_sessions',
sa.Column('id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('account_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('created_by_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('acting_as', sa.String(30), nullable=True),
sa.Column('ticket_id', sa.String(64), nullable=False),
sa.Column('ticket_kind', sa.String(10), nullable=False),
sa.Column('session_kind', sa.String(20), nullable=False),
sa.Column('flow_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('flow_proposal_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('current_node_id', sa.String(100), nullable=True),
sa.Column('walked_path', sa.dialects.postgresql.JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column('walk_notes', sa.dialects.postgresql.JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column('status', sa.String(20), nullable=False, server_default='active'),
sa.Column('resolution_notes', sa.Text(), nullable=True),
sa.Column('helpful', sa.Boolean(), nullable=True),
sa.Column('escalation_reason', sa.Text(), nullable=True),
sa.Column('escalation_reason_category', sa.String(30), nullable=True),
sa.Column('started_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.Column('last_step_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.Column('resolved_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['account_id'], ['accounts.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['created_by_user_id'], ['users.id'], ondelete='RESTRICT'),
sa.ForeignKeyConstraint(['flow_id'], ['trees.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['flow_proposal_id'], ['flow_proposals.id'], ondelete='SET NULL'),
sa.CheckConstraint(
"ticket_kind IN ('psa', 'internal')",
name='ck_l1_walk_sessions_ticket_kind',
),
sa.CheckConstraint(
"session_kind IN ('flow', 'proposal', 'adhoc')",
name='ck_l1_walk_sessions_session_kind',
),
sa.CheckConstraint(
"status IN ('active', 'resolved', 'escalated', 'abandoned')",
name='ck_l1_walk_sessions_status',
),
sa.CheckConstraint(
"(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
"OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
"OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)",
name='ck_l1_walk_sessions_target_consistency',
),
)
op.create_index('ix_l1_walk_sessions_account_id', 'l1_walk_sessions', ['account_id'])
op.create_index('ix_l1_walk_sessions_created_by_user_id', 'l1_walk_sessions', ['created_by_user_id'])
op.create_index('ix_l1_walk_sessions_status', 'l1_walk_sessions', ['status'])
op.create_index('ix_l1_walk_sessions_last_step_at', 'l1_walk_sessions', ['last_step_at'])
op.execute("ALTER TABLE l1_walk_sessions ENABLE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY l1_walk_sessions_account_isolation ON l1_walk_sessions
USING (account_id = current_setting('app.current_account_id')::uuid)
WITH CHECK (account_id = current_setting('app.current_account_id')::uuid)
""")
def downgrade() -> None:
op.execute("DROP POLICY IF EXISTS l1_walk_sessions_account_isolation ON l1_walk_sessions")
op.execute("ALTER TABLE l1_walk_sessions DISABLE ROW LEVEL SECURITY")
op.drop_index('ix_l1_walk_sessions_last_step_at', 'l1_walk_sessions')
op.drop_index('ix_l1_walk_sessions_status', 'l1_walk_sessions')
op.drop_index('ix_l1_walk_sessions_created_by_user_id', 'l1_walk_sessions')
op.drop_index('ix_l1_walk_sessions_account_id', 'l1_walk_sessions')
op.drop_table('l1_walk_sessions')
- Step 3: Create the SQLAlchemy model.
Create backend/app/models/l1_walk_session.py:
import uuid
from datetime import datetime
from typing import Any, Optional
import sqlalchemy as sa
from sqlalchemy import CheckConstraint, ForeignKey
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class L1WalkSession(Base):
__tablename__ = "l1_walk_sessions"
__table_args__ = (
CheckConstraint(
"ticket_kind IN ('psa', 'internal')",
name="ck_l1_walk_sessions_ticket_kind",
),
CheckConstraint(
"session_kind IN ('flow', 'proposal', 'adhoc')",
name="ck_l1_walk_sessions_session_kind",
),
CheckConstraint(
"status IN ('active', 'resolved', 'escalated', 'abandoned')",
name="ck_l1_walk_sessions_status",
),
CheckConstraint(
"(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
"OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
"OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)",
name="ck_l1_walk_sessions_target_consistency",
),
)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
account_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
created_by_user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="RESTRICT"),
nullable=False,
index=True,
)
acting_as: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True)
ticket_id: Mapped[str] = mapped_column(sa.String(64), nullable=False)
ticket_kind: Mapped[str] = mapped_column(sa.String(10), nullable=False)
session_kind: Mapped[str] = mapped_column(sa.String(20), nullable=False)
flow_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("trees.id", ondelete="SET NULL"), nullable=True
)
flow_proposal_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("flow_proposals.id", ondelete="SET NULL"), nullable=True
)
current_node_id: Mapped[Optional[str]] = mapped_column(sa.String(100), nullable=True)
walked_path: Mapped[list[dict[str, Any]]] = mapped_column(
JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")
)
walk_notes: Mapped[list[dict[str, Any]]] = mapped_column(
JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")
)
status: Mapped[str] = mapped_column(
sa.String(20), nullable=False, server_default=sa.text("'active'"), index=True
)
resolution_notes: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True)
helpful: Mapped[Optional[bool]] = mapped_column(sa.Boolean(), nullable=True)
escalation_reason: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True)
escalation_reason_category: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True)
started_at: Mapped[datetime] = mapped_column(
sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")
)
last_step_at: Mapped[datetime] = mapped_column(
sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()"), index=True
)
resolved_at: Mapped[Optional[datetime]] = mapped_column(sa.DateTime(timezone=True), nullable=True)
- Step 4: Apply migration + verify RLS.
docker exec -w /app resolutionflow_backend alembic upgrade head
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "SELECT polname FROM pg_policy WHERE polrelid = 'l1_walk_sessions'::regclass"
- Step 5: Verify check constraint blocks invalid combos.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "INSERT INTO l1_walk_sessions (id, account_id, created_by_user_id, ticket_id, ticket_kind, session_kind) VALUES (gen_random_uuid(), '00000000-0000-0000-0000-000000000000', '00000000-0000-0000-0000-000000000000', 'x', 'psa', 'flow')"
Expected: violates ck_l1_walk_sessions_target_consistency (flow_id NULL).
- Step 6: Commit.
git add backend/alembic/versions/<hash>_create_l1_walk_sessions.py backend/app/models/l1_walk_session.py
git commit -m "feat(l1): create l1_walk_sessions table with RLS + target-consistency check"
Task 7: Seat enforcement service
Files:
-
Create:
backend/app/schemas/seat_enforcement.py -
Create:
backend/app/services/seat_enforcement.py -
Test:
backend/tests/test_seat_enforcement.py -
Step 1: Write the failing tests.
Create backend/tests/test_seat_enforcement.py:
import pytest
from app.services.seat_enforcement import check_seat_available
from tests.factories import make_account, make_subscription, make_user
@pytest.mark.asyncio
async def test_engineer_seat_available_when_under_limit(db_session):
account = await make_account(db_session)
sub = await make_subscription(db_session, account_id=account.id, seat_limit=5)
for _ in range(3):
await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
result = await check_seat_available(account, sub, 'engineer', db_session)
assert result.available is True
assert result.current == 3
assert result.limit == 5
@pytest.mark.asyncio
async def test_engineer_seat_unavailable_when_at_limit(db_session):
account = await make_account(db_session)
sub = await make_subscription(db_session, account_id=account.id, seat_limit=2)
for _ in range(2):
await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
result = await check_seat_available(account, sub, 'engineer', db_session)
assert result.available is False
assert result.current == 2
assert result.limit == 2
@pytest.mark.asyncio
async def test_l1_seat_uses_separate_limit(db_session):
account = await make_account(db_session)
sub = await make_subscription(db_session, account_id=account.id, seat_limit=2, l1_seat_limit=10)
for _ in range(2):
await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
# Engineer limit hit but L1 seats still available
eng_result = await check_seat_available(account, sub, 'engineer', db_session)
l1_result = await check_seat_available(account, sub, 'l1_tech', db_session)
assert eng_result.available is False
assert l1_result.available is True
assert l1_result.limit == 10
@pytest.mark.asyncio
async def test_unlimited_when_limit_null(db_session):
account = await make_account(db_session)
sub = await make_subscription(db_session, account_id=account.id, seat_limit=None)
for _ in range(100):
await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
result = await check_seat_available(account, sub, 'engineer', db_session)
assert result.available is True
assert result.limit is None
- Step 2: Run tests, verify failure.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v
Expected: ImportError.
- Step 3: Create schema.
backend/app/schemas/seat_enforcement.py:
from typing import Literal, Optional
from pydantic import BaseModel
Role = Literal['engineer', 'l1_tech']
class SeatCheckResult(BaseModel):
available: bool
current: int
limit: Optional[int] # None = unlimited
role: Role
class SeatUsage(BaseModel):
engineer: SeatCheckResult
l1_tech: SeatCheckResult
- Step 4: Create service.
backend/app/services/seat_enforcement.py:
from typing import Literal
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.subscription import Subscription
from app.models.user import User
from app.schemas.seat_enforcement import SeatCheckResult
Role = Literal['engineer', 'l1_tech']
def _limit_for_role(subscription: Subscription, role: Role) -> int | None:
if role == 'engineer':
return subscription.seat_limit
if role == 'l1_tech':
return subscription.l1_seat_limit
raise ValueError(f"Unknown role: {role}")
async def check_seat_available(
account: Account,
subscription: Subscription,
role: Role,
db: AsyncSession,
) -> SeatCheckResult:
"""
Count active users with the given role in the account, compare against
the role-specific seat limit on the subscription. Returns availability.
None limit = unlimited (returns available=True).
"""
limit = _limit_for_role(subscription, role)
stmt = (
select(func.count(User.id))
.where(User.account_id == account.id)
.where(User.account_role == role)
.where(User.is_active.is_(True))
)
current = (await db.execute(stmt)).scalar_one()
if limit is None:
return SeatCheckResult(available=True, current=current, limit=None, role=role)
return SeatCheckResult(
available=current < limit,
current=current,
limit=limit,
role=role,
)
async def get_seat_usage(
account: Account,
subscription: Subscription,
db: AsyncSession,
) -> tuple[SeatCheckResult, SeatCheckResult]:
"""Return (engineer, l1_tech) seat-usage tuple for the seat-counter widget."""
eng = await check_seat_available(account, subscription, 'engineer', db)
l1 = await check_seat_available(account, subscription, 'l1_tech', db)
return eng, l1
- Step 5: Run tests, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v
Expected: 4 PASSED.
- Step 6: Commit.
git add backend/app/schemas/seat_enforcement.py backend/app/services/seat_enforcement.py backend/tests/test_seat_enforcement.py
git commit -m "feat(l1): add seat_enforcement service for engineer + L1 seat limits"
Task 8: Integrate seat enforcement into invite + accept-invite + role change
Files:
-
Modify:
backend/app/api/endpoints/invite.py— block invite create when limit reached -
Modify:
backend/app/api/endpoints/accounts.pyor wherever accept-invite lives — re-check at accept time -
Modify: wherever role-change PATCH lives (likely
accounts.pyoradmin.py) — re-check before commit -
Test: extend
backend/tests/test_seat_enforcement.py -
Step 1: Write failing integration tests.
Add to backend/tests/test_seat_enforcement.py:
@pytest.mark.asyncio
async def test_invite_blocked_when_engineer_seats_full(authed_owner_client, db_session):
# Setup: account at engineer seat limit
# ... (use existing test fixtures to seed an account with N=limit engineers)
response = await authed_owner_client.post(
"/api/v1/invites",
json={"email": "new@example.com", "role": "engineer"},
)
assert response.status_code == 402
body = response.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "engineer"
assert body["detail"]["current"] == body["detail"]["limit"]
@pytest.mark.asyncio
async def test_invite_blocked_when_l1_seats_full(authed_owner_client, db_session):
# Same as above but for l1_tech role
response = await authed_owner_client.post(
"/api/v1/invites",
json={"email": "new@example.com", "role": "l1_tech"},
)
assert response.status_code == 402
@pytest.mark.asyncio
async def test_invite_succeeds_when_l1_seats_available(authed_owner_client, db_session):
# Account with l1_seat_limit=10, current L1 count = 0
response = await authed_owner_client.post(
"/api/v1/invites",
json={"email": "new@example.com", "role": "l1_tech"},
)
assert response.status_code == 201
@pytest.mark.asyncio
async def test_role_change_blocked_when_target_seats_full(authed_owner_client, viewer_user, db_session):
# Try promoting viewer → engineer when engineer seats are full
response = await authed_owner_client.patch(
f"/api/v1/users/{viewer_user.id}/role",
json={"account_role": "engineer"},
)
assert response.status_code == 402
- Step 2: Run, verify failure.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py::test_invite_blocked_when_engineer_seats_full -v
- Step 3: Add seat check to invite create endpoint.
In backend/app/api/endpoints/invite.py, locate the create endpoint and add at the top (before persistence):
from app.services.seat_enforcement import check_seat_available
# ... inside the create handler ...
if payload.role in ('engineer', 'l1_tech'):
# Load the account's subscription (existing pattern in the codebase)
sub = await get_active_subscription(db, current_user.account_id)
result = await check_seat_available(account, sub, payload.role, db)
if not result.available:
raise HTTPException(
status_code=402,
detail={
"code": "seat_limit_exceeded",
"role": result.role,
"current": result.current,
"limit": result.limit,
"upgrade_url": "/account/billing", # Frontend deep-links to the existing /account/billing page; Stripe customer portal link is generated server-side there.
},
)
(If the codebase has an existing helper like get_active_subscription, use it. If not, add a thin helper in services/billing.py.)
- Step 4: Add check to accept-invite endpoint.
Find the accept-invite endpoint (grep -rn "accept.invite\|@router.post.*accept" backend/app/api/endpoints/). Add the same check_seat_available call before the user is upgraded from invite to active user. Race-condition guard.
- Step 5: Add check to role-change endpoint.
Find the role-change PATCH endpoint. If promoting toward engineer or l1_tech, run the check first. Return same 402 shape on block.
- Step 6: Run integration tests, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v
- Step 7: Commit.
git add backend/app/api/endpoints/invite.py backend/app/api/endpoints/accounts.py backend/tests/test_seat_enforcement.py
git commit -m "feat(l1): enforce seat limits on invite, accept-invite, role-change for engineer + L1"
Task 9: GET /api/v1/accounts/me/seats endpoint
Files:
-
Modify:
backend/app/api/endpoints/accounts.py(add endpoint) -
Test:
backend/tests/test_l1_endpoints.py(new) -
Step 1: Write failing test.
Create backend/tests/test_l1_endpoints.py:
import pytest
@pytest.mark.asyncio
async def test_get_seats_returns_both_role_counts(authed_engineer_client, db_session):
response = await authed_engineer_client.get("/api/v1/accounts/me/seats")
assert response.status_code == 200
body = response.json()
assert "engineer" in body
assert "l1_tech" in body
assert {"available", "current", "limit", "role"}.issubset(body["engineer"].keys())
@pytest.mark.asyncio
async def test_get_seats_blocked_for_viewer(authed_viewer_client):
response = await authed_viewer_client.get("/api/v1/accounts/me/seats")
assert response.status_code == 403
- Step 2: Run, verify failure.
docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py::test_get_seats_returns_both_role_counts -v
- Step 3: Add endpoint to
accounts.py.
from app.services.seat_enforcement import get_seat_usage
from app.schemas.seat_enforcement import SeatUsage
@router.get("/me/seats", response_model=SeatUsage)
async def get_my_account_seat_usage(
current_user: User = Depends(require_engineer_or_admin),
db: AsyncSession = Depends(get_db),
):
account = await db.get(Account, current_user.account_id)
sub = await get_active_subscription(db, current_user.account_id)
engineer, l1_tech = await get_seat_usage(account, sub, db)
return SeatUsage(engineer=engineer, l1_tech=l1_tech)
- Step 4: Run tests, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py -v
- Step 5: Commit.
git add backend/app/api/endpoints/accounts.py backend/tests/test_l1_endpoints.py
git commit -m "feat(l1): GET /accounts/me/seats endpoint"
Task 10: PATCH /api/v1/users/{id}/coverage endpoint
Files:
-
Modify:
backend/app/api/endpoints/accounts.py(or wherever user-management endpoints live) -
Test: extend
backend/tests/test_l1_endpoints.py -
Step 1: Write failing tests.
@pytest.mark.asyncio
async def test_owner_can_toggle_coverage_on_engineer(authed_owner_client, engineer_user):
response = await authed_owner_client.patch(
f"/api/v1/users/{engineer_user.id}/coverage",
json={"can_cover_l1": True},
)
assert response.status_code == 200
assert response.json()["can_cover_l1"] is True
@pytest.mark.asyncio
async def test_engineer_cannot_toggle_coverage_on_self(authed_engineer_client, engineer_user):
response = await authed_engineer_client.patch(
f"/api/v1/users/{engineer_user.id}/coverage",
json={"can_cover_l1": True},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_coverage_only_applies_to_engineers(authed_owner_client, viewer_user):
response = await authed_owner_client.patch(
f"/api/v1/users/{viewer_user.id}/coverage",
json={"can_cover_l1": True},
)
assert response.status_code == 422 # validation: viewer can't have coverage
-
Step 2: Run, verify failure.
-
Step 3: Add endpoint.
from pydantic import BaseModel
class CoveragePatch(BaseModel):
can_cover_l1: bool
@router.patch("/users/{user_id}/coverage")
async def patch_user_coverage(
user_id: UUID,
payload: CoveragePatch,
current_user: User = Depends(require_account_owner),
db: AsyncSession = Depends(get_db),
):
target = await db.get(User, user_id)
if not target or target.account_id != current_user.account_id:
raise HTTPException(status_code=404)
if target.account_role != 'engineer':
raise HTTPException(
status_code=422,
detail="can_cover_l1 only applies to engineers",
)
target.can_cover_l1 = payload.can_cover_l1
await db.commit()
return {"id": str(target.id), "can_cover_l1": target.can_cover_l1}
-
Step 4: Run tests, verify pass.
-
Step 5: Commit.
git add backend/app/api/endpoints/accounts.py backend/tests/test_l1_endpoints.py
git commit -m "feat(l1): PATCH /users/{id}/coverage for engineer L1-coverage flag"
Task 11: internal_ticket_service.py
Files:
-
Create:
backend/app/services/internal_ticket_service.py -
Test:
backend/tests/test_internal_ticket_service.py -
Step 1: Write failing tests.
import pytest
import uuid
from app.services.internal_ticket_service import (
create_ticket, update_status, get_ticket, list_tickets_for_account,
)
@pytest.mark.asyncio
async def test_create_ticket_sets_status_open(db_session, account, l1_user):
ticket = await create_ticket(
db_session,
account_id=account.id,
created_by_user_id=l1_user.id,
problem_statement="Outlook can't connect",
customer_name="Alice",
)
assert ticket.status == 'open'
assert ticket.account_id == account.id
@pytest.mark.asyncio
async def test_update_status_to_resolved_sets_resolved_at(db_session, internal_ticket):
updated = await update_status(db_session, ticket_id=internal_ticket.id, status='resolved')
assert updated.status == 'resolved'
assert updated.resolved_at is not None
@pytest.mark.asyncio
async def test_list_tickets_filters_by_account(db_session, account_a, account_b, ticket_a, ticket_b):
rows = await list_tickets_for_account(db_session, account_id=account_a.id)
assert ticket_a in rows
assert ticket_b not in rows
-
Step 2: Run, verify failure.
-
Step 3: Implement service.
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.internal_ticket import InternalTicket
async def create_ticket(
db: AsyncSession,
*,
account_id: UUID,
created_by_user_id: UUID,
problem_statement: str,
customer_name: Optional[str] = None,
customer_contact: Optional[str] = None,
) -> InternalTicket:
ticket = InternalTicket(
account_id=account_id,
created_by_user_id=created_by_user_id,
problem_statement=problem_statement,
customer_name=customer_name,
customer_contact=customer_contact,
)
db.add(ticket)
await db.flush()
return ticket
async def update_status(
db: AsyncSession,
*,
ticket_id: UUID,
status: str,
resolution_notes: Optional[str] = None,
assigned_user_id: Optional[UUID] = None,
) -> InternalTicket:
ticket = await db.get(InternalTicket, ticket_id)
if not ticket:
raise ValueError(f"InternalTicket {ticket_id} not found")
ticket.status = status
if status == 'resolved':
ticket.resolved_at = datetime.now(timezone.utc)
if resolution_notes is not None:
ticket.resolution_notes = resolution_notes
if assigned_user_id is not None:
ticket.assigned_user_id = assigned_user_id
await db.flush()
return ticket
async def get_ticket(db: AsyncSession, ticket_id: UUID) -> Optional[InternalTicket]:
return await db.get(InternalTicket, ticket_id)
async def list_tickets_for_account(
db: AsyncSession,
*,
account_id: UUID,
status: Optional[str] = None,
limit: int = 100,
) -> list[InternalTicket]:
stmt = select(InternalTicket).where(InternalTicket.account_id == account_id)
if status:
stmt = stmt.where(InternalTicket.status == status)
stmt = stmt.order_by(InternalTicket.created_at.desc()).limit(limit)
result = await db.execute(stmt)
return list(result.scalars())
async def promote_to_psa(
db: AsyncSession,
*,
ticket_id: UUID,
psa_ticket_id: str,
) -> InternalTicket:
ticket = await db.get(InternalTicket, ticket_id)
if not ticket:
raise ValueError(f"InternalTicket {ticket_id} not found")
ticket.psa_promoted_ticket_id = psa_ticket_id
await db.flush()
return ticket
- Step 4: Run, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_internal_ticket_service.py -v
- Step 5: Commit.
git add backend/app/services/internal_ticket_service.py backend/tests/test_internal_ticket_service.py
git commit -m "feat(l1): internal_ticket_service with CRUD + status transitions"
Task 12: l1_session_service.py — start (flow / proposal / adhoc)
Files:
- Create:
backend/app/services/l1_session_service.py - Test:
backend/tests/test_l1_session_service.py
This task implements the session start path only. Steps/notes/resolve/escalate are split into Tasks 13/14 to keep each task bite-sized.
- Step 1: Write failing tests.
import pytest
from app.services.l1_session_service import start_flow_session, start_adhoc_session
@pytest.mark.asyncio
async def test_start_flow_session_creates_active_session(db_session, account, l1_user, flow, internal_ticket):
session = await start_flow_session(
db_session,
account_id=account.id,
user=l1_user,
flow_id=flow.id,
ticket_id=str(internal_ticket.id),
ticket_kind='internal',
)
assert session.session_kind == 'flow'
assert session.flow_id == flow.id
assert session.flow_proposal_id is None
assert session.status == 'active'
assert session.walked_path == []
@pytest.mark.asyncio
async def test_start_adhoc_session_no_flow(db_session, account, l1_user, internal_ticket):
session = await start_adhoc_session(
db_session,
account_id=account.id,
user=l1_user,
ticket_id=str(internal_ticket.id),
ticket_kind='internal',
)
assert session.session_kind == 'adhoc'
assert session.flow_id is None
assert session.flow_proposal_id is None
assert session.walked_path == []
assert session.walk_notes == []
@pytest.mark.asyncio
async def test_start_session_records_acting_as_for_coverage_engineer(
db_session, account, engineer_with_coverage, flow, internal_ticket
):
session = await start_flow_session(
db_session,
account_id=account.id,
user=engineer_with_coverage,
flow_id=flow.id,
ticket_id=str(internal_ticket.id),
ticket_kind='internal',
)
assert session.acting_as == 'l1_coverage'
-
Step 2: Run, verify failure.
-
Step 3: Implement start functions.
from typing import Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.l1_walk_session import L1WalkSession
from app.models.user import User
def _resolve_acting_as(user: User) -> Optional[str]:
"""An engineer (with coverage flag) acting in L1 mode gets tagged for audit."""
if user.account_role == 'engineer':
return 'l1_coverage'
return None
async def start_flow_session(
db: AsyncSession,
*,
account_id: UUID,
user: User,
flow_id: UUID,
ticket_id: str,
ticket_kind: str, # 'psa' | 'internal'
) -> L1WalkSession:
session = L1WalkSession(
account_id=account_id,
created_by_user_id=user.id,
acting_as=_resolve_acting_as(user),
ticket_id=ticket_id,
ticket_kind=ticket_kind,
session_kind='flow',
flow_id=flow_id,
walked_path=[],
walk_notes=[],
)
db.add(session)
await db.flush()
return session
async def start_proposal_session(
db: AsyncSession,
*,
account_id: UUID,
user: User,
flow_proposal_id: UUID,
ticket_id: str,
ticket_kind: str,
) -> L1WalkSession:
session = L1WalkSession(
account_id=account_id,
created_by_user_id=user.id,
acting_as=_resolve_acting_as(user),
ticket_id=ticket_id,
ticket_kind=ticket_kind,
session_kind='proposal',
flow_proposal_id=flow_proposal_id,
walked_path=[],
walk_notes=[],
)
db.add(session)
await db.flush()
return session
async def start_adhoc_session(
db: AsyncSession,
*,
account_id: UUID,
user: User,
ticket_id: str,
ticket_kind: str,
) -> L1WalkSession:
session = L1WalkSession(
account_id=account_id,
created_by_user_id=user.id,
acting_as=_resolve_acting_as(user),
ticket_id=ticket_id,
ticket_kind=ticket_kind,
session_kind='adhoc',
walked_path=[],
walk_notes=[],
)
db.add(session)
await db.flush()
return session
-
Step 4: Run, verify pass.
-
Step 5: Commit.
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): session start (flow/proposal/adhoc) with acting_as tagging"
Task 13: l1_session_service.py — step + notes
Files:
-
Modify:
backend/app/services/l1_session_service.py -
Modify:
backend/tests/test_l1_session_service.py -
Step 1: Write failing tests (append to existing file).
@pytest.mark.asyncio
async def test_record_step_appends_to_walked_path(db_session, active_flow_session):
updated = await record_step(
db_session,
session_id=active_flow_session.id,
node_id='n1',
question='Has the user signed back in?',
answer='yes',
note=None,
)
assert len(updated.walked_path) == 1
assert updated.walked_path[0] == {
'node_id': 'n1',
'question': 'Has the user signed back in?',
'answer': 'yes',
'l1_note': None,
}
assert updated.current_node_id == 'n1'
@pytest.mark.asyncio
async def test_record_step_blocks_on_adhoc_session(db_session, active_adhoc_session):
with pytest.raises(ValueError, match="adhoc"):
await record_step(
db_session,
session_id=active_adhoc_session.id,
node_id='n1', question='x', answer='y', note=None,
)
@pytest.mark.asyncio
async def test_update_notes_replaces_walk_notes(db_session, active_adhoc_session):
new_notes = [{'timestamp': '2026-05-28T10:00:00Z', 'content': 'Customer said outlook crashed'}]
updated = await update_notes(db_session, session_id=active_adhoc_session.id, notes=new_notes)
assert updated.walk_notes == new_notes
-
Step 2: Run, verify failure.
-
Step 3: Implement.
Append to l1_session_service.py:
from datetime import datetime, timezone
async def record_step(
db: AsyncSession,
*,
session_id: UUID,
node_id: str,
question: str,
answer: str,
note: Optional[str],
) -> L1WalkSession:
session = await db.get(L1WalkSession, session_id)
if not session:
raise ValueError(f"L1WalkSession {session_id} not found")
if session.session_kind == 'adhoc':
raise ValueError("Cannot record step on adhoc session — use update_notes instead")
if session.status != 'active':
raise ValueError(f"Session {session_id} is not active (status={session.status})")
entry = {
'node_id': node_id,
'question': question,
'answer': answer,
'l1_note': note,
}
# JSONB append — assign new list because SQLAlchemy doesn't track in-place mutations
session.walked_path = [*session.walked_path, entry]
session.current_node_id = node_id
session.last_step_at = datetime.now(timezone.utc)
await db.flush()
return session
async def update_notes(
db: AsyncSession,
*,
session_id: UUID,
notes: list[dict],
) -> L1WalkSession:
session = await db.get(L1WalkSession, session_id)
if not session:
raise ValueError(f"L1WalkSession {session_id} not found")
# Cap at 256KB (rough check, JSON-encoded size)
import json
encoded_size = len(json.dumps(notes).encode('utf-8'))
if encoded_size > 256 * 1024:
raise ValueError("walk_notes exceeds 256KB cap — consider escalating")
session.walk_notes = notes
session.last_step_at = datetime.now(timezone.utc)
await db.flush()
return session
-
Step 4: Run, verify pass.
-
Step 5: Commit.
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): record_step + update_notes for walk sessions"
Task 14: l1_session_service.py — resolve / escalate / escalate-without-walk
Files:
-
Modify:
backend/app/services/l1_session_service.py -
Modify:
backend/tests/test_l1_session_service.py -
Step 1: Write failing tests.
@pytest.mark.asyncio
async def test_resolve_helpful_proposal_flips_validated_by_outcome(
db_session, active_proposal_session, flow_proposal
):
await resolve(
db_session,
session_id=active_proposal_session.id,
helpful=True,
resolution_notes="Walked the user through restoring profile",
)
await db_session.refresh(flow_proposal)
assert flow_proposal.validated_by_outcome is True
@pytest.mark.asyncio
async def test_resolve_unhelpful_does_not_flip_validation(
db_session, active_proposal_session, flow_proposal
):
await resolve(
db_session,
session_id=active_proposal_session.id,
helpful=False,
resolution_notes="Tree was wrong",
)
await db_session.refresh(flow_proposal)
assert flow_proposal.validated_by_outcome is False
@pytest.mark.asyncio
async def test_resolve_adhoc_session_closes_ticket(
db_session, active_adhoc_session, internal_ticket
):
await resolve(
db_session,
session_id=active_adhoc_session.id,
helpful=True,
resolution_notes="Customer rebooted, fixed",
)
await db_session.refresh(active_adhoc_session)
assert active_adhoc_session.status == 'resolved'
assert active_adhoc_session.resolved_at is not None
await db_session.refresh(internal_ticket)
assert internal_ticket.status == 'resolved'
@pytest.mark.asyncio
async def test_escalate_marks_session_and_ticket(
db_session, active_flow_session, internal_ticket
):
await escalate(
db_session,
session_id=active_flow_session.id,
reason="Customer demanding senior",
reason_category="Customer demanding senior",
)
await db_session.refresh(active_flow_session)
assert active_flow_session.status == 'escalated'
await db_session.refresh(internal_ticket)
assert internal_ticket.status == 'escalated'
@pytest.mark.asyncio
async def test_escalate_without_walk_creates_escalated_session(
db_session, account, l1_user, internal_ticket
):
session = await escalate_without_walk(
db_session,
account_id=account.id,
user=l1_user,
ticket_id=str(internal_ticket.id),
ticket_kind='internal',
reason_category='No KB available',
reason='No knowledge base content yet',
)
assert session.status == 'escalated'
assert session.session_kind == 'adhoc' # adhoc as placeholder kind
assert session.escalation_reason_category == 'No KB available'
-
Step 2: Run, verify failure.
-
Step 3: Implement.
Append to l1_session_service.py:
from app.models.flow_proposal import FlowProposal
from app.services import internal_ticket_service
# PSA service import — adjust to actual path
# from app.services.psa.registry import PsaProviderRegistry
async def resolve(
db: AsyncSession,
*,
session_id: UUID,
helpful: bool,
resolution_notes: str,
) -> L1WalkSession:
session = await db.get(L1WalkSession, session_id)
if not session:
raise ValueError(f"L1WalkSession {session_id} not found")
if session.status != 'active':
raise ValueError(f"Session not active (status={session.status})")
session.status = 'resolved'
session.helpful = helpful
session.resolution_notes = resolution_notes
session.resolved_at = datetime.now(timezone.utc)
session.last_step_at = session.resolved_at
# Outcome validation: flip proposal flag on helpful=true
if helpful and session.session_kind == 'proposal' and session.flow_proposal_id:
proposal = await db.get(FlowProposal, session.flow_proposal_id)
if proposal:
proposal.validated_by_outcome = True
# Close the ticket
if session.ticket_kind == 'internal':
await internal_ticket_service.update_status(
db, ticket_id=UUID(session.ticket_id),
status='resolved', resolution_notes=resolution_notes,
)
else:
# PSA close is intentionally deferred to Phase 2 (which adds the full escalation_package_generator
# integration alongside the AI build pipeline). For Phase 1, PSA-backed sessions update only the
# local session row on resolve; engineers verify ticket state directly in their PSA UI.
# This is documented in spec §11 "Internal ticket fallback" and the Phase 1 scope section.
pass
await db.flush()
return session
async def escalate(
db: AsyncSession,
*,
session_id: UUID,
reason: str,
reason_category: str,
) -> L1WalkSession:
session = await db.get(L1WalkSession, session_id)
if not session:
raise ValueError(f"L1WalkSession {session_id} not found")
if session.status != 'active':
raise ValueError(f"Session not active (status={session.status})")
session.status = 'escalated'
session.escalation_reason = reason
session.escalation_reason_category = reason_category
session.resolved_at = datetime.now(timezone.utc)
session.last_step_at = session.resolved_at
# Mark ticket escalated
if session.ticket_kind == 'internal':
await internal_ticket_service.update_status(
db, ticket_id=UUID(session.ticket_id), status='escalated',
)
else:
# PSA reassign — Phase 1 stub; full integration with escalation_package_generator
# follows in Phase 2 alongside the ai_session creation for engineer pickup.
pass
await db.flush()
return session
async def escalate_without_walk(
db: AsyncSession,
*,
account_id: UUID,
user: User,
ticket_id: str,
ticket_kind: str,
reason_category: str,
reason: Optional[str] = None,
) -> L1WalkSession:
"""
Used from the no-KB / no-flow-picked screen — creates an immediately-escalated session
with no walked_path. Lets escalation reporting still capture the call.
"""
session = L1WalkSession(
account_id=account_id,
created_by_user_id=user.id,
acting_as=_resolve_acting_as(user),
ticket_id=ticket_id,
ticket_kind=ticket_kind,
session_kind='adhoc',
walked_path=[],
walk_notes=[],
status='escalated',
escalation_reason=reason,
escalation_reason_category=reason_category,
resolved_at=datetime.now(timezone.utc),
)
session.last_step_at = session.resolved_at
db.add(session)
if ticket_kind == 'internal':
await internal_ticket_service.update_status(
db, ticket_id=UUID(ticket_id), status='escalated',
)
await db.flush()
return session
- Step 4: Run, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_l1_session_service.py -v
- Step 5: Commit.
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): resolve/escalate/escalate-without-walk for l1 sessions"
Task 15: L1 endpoints (api/endpoints/l1.py)
Files:
-
Create:
backend/app/schemas/l1.py -
Create:
backend/app/api/endpoints/l1.py -
Modify:
backend/app/api/router.py(registerl1router) -
Modify:
backend/tests/test_l1_endpoints.py -
Step 1: Define request/response schemas.
backend/app/schemas/l1.py:
from datetime import datetime
from typing import Any, Literal, Optional
from uuid import UUID
from pydantic import BaseModel
class IntakeRequest(BaseModel):
problem_statement: str
customer_name: Optional[str] = None
customer_contact: Optional[str] = None
flow_id: Optional[UUID] = None # if provided, starts flow session; else adhoc
class IntakeResponse(BaseModel):
session_id: UUID
session_kind: Literal['flow', 'proposal', 'adhoc']
ticket_id: str
ticket_kind: Literal['psa', 'internal']
class StepRequest(BaseModel):
node_id: str
question: str
answer: str
note: Optional[str] = None
class NotesRequest(BaseModel):
notes: list[dict[str, Any]]
class ResolveRequest(BaseModel):
helpful: bool
resolution_notes: str
class EscalateRequest(BaseModel):
reason: Optional[str] = None
reason_category: str
class EscalateWithoutWalkRequest(BaseModel):
problem_statement: str
customer_name: Optional[str] = None
customer_contact: Optional[str] = None
reason_category: str
reason: Optional[str] = None
class WalkSessionResponse(BaseModel):
id: UUID
session_kind: str
flow_id: Optional[UUID]
flow_proposal_id: Optional[UUID]
current_node_id: Optional[str]
walked_path: list[dict[str, Any]]
walk_notes: list[dict[str, Any]]
status: str
started_at: datetime
last_step_at: datetime
resolved_at: Optional[datetime]
class QueueRow(BaseModel):
ticket_id: str
ticket_kind: Literal['psa', 'internal']
problem_statement: Optional[str]
customer_name: Optional[str]
status: str
created_at: Optional[datetime]
- Step 2: Write the endpoints.
backend/app/api/endpoints/l1.py:
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import (
get_current_active_user, get_db, require_l1_or_coverage,
)
from app.models.l1_walk_session import L1WalkSession
from app.models.user import User
from app.schemas.l1 import (
EscalateRequest, EscalateWithoutWalkRequest, IntakeRequest, IntakeResponse,
NotesRequest, QueueRow, ResolveRequest, StepRequest, WalkSessionResponse,
)
from app.services import internal_ticket_service, l1_session_service
router = APIRouter(prefix="/l1", tags=["l1"])
def _as_response(session: L1WalkSession) -> WalkSessionResponse:
return WalkSessionResponse(
id=session.id,
session_kind=session.session_kind,
flow_id=session.flow_id,
flow_proposal_id=session.flow_proposal_id,
current_node_id=session.current_node_id,
walked_path=session.walked_path,
walk_notes=session.walk_notes,
status=session.status,
started_at=session.started_at,
last_step_at=session.last_step_at,
resolved_at=session.resolved_at,
)
@router.post("/intake", response_model=IntakeResponse)
async def intake(
payload: IntakeRequest,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
"""
Phase 1 intake:
- Creates an internal ticket (PSA integration deferred to Phase 2/3 escalation polish).
- Starts an L1WalkSession: flow kind if flow_id provided, adhoc otherwise.
Phase 2 will replace this with match_or_build + suggest/aborted_no_kb outcomes.
"""
# Phase 1: always create internal ticket. PSA support handled in escalate() / escalation_package.
ticket = await internal_ticket_service.create_ticket(
db,
account_id=user.account_id,
created_by_user_id=user.id,
problem_statement=payload.problem_statement,
customer_name=payload.customer_name,
customer_contact=payload.customer_contact,
)
if payload.flow_id:
session = await l1_session_service.start_flow_session(
db,
account_id=user.account_id,
user=user,
flow_id=payload.flow_id,
ticket_id=str(ticket.id),
ticket_kind='internal',
)
else:
session = await l1_session_service.start_adhoc_session(
db,
account_id=user.account_id,
user=user,
ticket_id=str(ticket.id),
ticket_kind='internal',
)
await db.commit()
return IntakeResponse(
session_id=session.id,
session_kind=session.session_kind,
ticket_id=str(ticket.id),
ticket_kind='internal',
)
@router.get("/queue", response_model=list[QueueRow])
async def queue(
status_filter: Optional[str] = None,
limit: int = 50,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
"""Phase 1: returns internal tickets only. PSA queue merge in Phase 2."""
tickets = await internal_ticket_service.list_tickets_for_account(
db, account_id=user.account_id, status=status_filter, limit=limit,
)
return [
QueueRow(
ticket_id=str(t.id),
ticket_kind='internal',
problem_statement=t.problem_statement,
customer_name=t.customer_name,
status=t.status,
created_at=t.created_at,
)
for t in tickets
]
@router.get("/sessions/{session_id}", response_model=WalkSessionResponse)
async def get_session(
session_id: UUID,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
session = await db.get(L1WalkSession, session_id)
if not session or session.account_id != user.account_id:
raise HTTPException(status_code=404)
return _as_response(session)
@router.get("/sessions/active", response_model=list[WalkSessionResponse])
async def list_active_sessions(
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
"""List the caller's currently-active sessions for the dashboard 'Resume in progress' widget."""
stmt = (
select(L1WalkSession)
.where(L1WalkSession.created_by_user_id == user.id)
.where(L1WalkSession.status == 'active')
.order_by(L1WalkSession.last_step_at.desc())
.limit(20)
)
result = await db.execute(stmt)
return [_as_response(s) for s in result.scalars()]
@router.post("/sessions/{session_id}/step", response_model=WalkSessionResponse)
async def post_step(
session_id: UUID,
payload: StepRequest,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
session = await db.get(L1WalkSession, session_id)
if not session or session.account_id != user.account_id:
raise HTTPException(status_code=404)
try:
updated = await l1_session_service.record_step(
db, session_id=session_id,
node_id=payload.node_id, question=payload.question,
answer=payload.answer, note=payload.note,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
await db.commit()
return _as_response(updated)
@router.post("/sessions/{session_id}/notes", response_model=WalkSessionResponse)
async def post_notes(
session_id: UUID,
payload: NotesRequest,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
session = await db.get(L1WalkSession, session_id)
if not session or session.account_id != user.account_id:
raise HTTPException(status_code=404)
try:
updated = await l1_session_service.update_notes(
db, session_id=session_id, notes=payload.notes,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
await db.commit()
return _as_response(updated)
@router.post("/sessions/{session_id}/resolve", response_model=WalkSessionResponse)
async def post_resolve(
session_id: UUID,
payload: ResolveRequest,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
session = await db.get(L1WalkSession, session_id)
if not session or session.account_id != user.account_id:
raise HTTPException(status_code=404)
try:
updated = await l1_session_service.resolve(
db, session_id=session_id, helpful=payload.helpful,
resolution_notes=payload.resolution_notes,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
await db.commit()
return _as_response(updated)
@router.post("/sessions/{session_id}/escalate", response_model=WalkSessionResponse)
async def post_escalate(
session_id: UUID,
payload: EscalateRequest,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
session = await db.get(L1WalkSession, session_id)
if not session or session.account_id != user.account_id:
raise HTTPException(status_code=404)
try:
updated = await l1_session_service.escalate(
db, session_id=session_id,
reason=payload.reason or '',
reason_category=payload.reason_category,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
await db.commit()
return _as_response(updated)
@router.post("/escalate-without-walk", response_model=WalkSessionResponse)
async def post_escalate_without_walk(
payload: EscalateWithoutWalkRequest,
user: User = Depends(require_l1_or_coverage),
db: AsyncSession = Depends(get_db),
):
ticket = await internal_ticket_service.create_ticket(
db,
account_id=user.account_id,
created_by_user_id=user.id,
problem_statement=payload.problem_statement,
customer_name=payload.customer_name,
customer_contact=payload.customer_contact,
)
session = await l1_session_service.escalate_without_walk(
db,
account_id=user.account_id,
user=user,
ticket_id=str(ticket.id),
ticket_kind='internal',
reason_category=payload.reason_category,
reason=payload.reason,
)
await db.commit()
return _as_response(session)
- Step 3: Register the router.
In backend/app/api/router.py, add:
from app.api.endpoints import l1
api_router.include_router(l1.router)
- Step 4: Add E2E endpoint tests.
In backend/tests/test_l1_endpoints.py, add:
@pytest.mark.asyncio
async def test_intake_creates_adhoc_session_when_no_flow_id(authed_l1_client):
response = await authed_l1_client.post(
"/api/v1/l1/intake",
json={"problem_statement": "outlook broken", "customer_name": "Alice"},
)
assert response.status_code == 200
body = response.json()
assert body["session_kind"] == "adhoc"
assert body["ticket_kind"] == "internal"
@pytest.mark.asyncio
async def test_intake_creates_flow_session_when_flow_id_provided(authed_l1_client, flow):
response = await authed_l1_client.post(
"/api/v1/l1/intake",
json={"problem_statement": "outlook broken", "flow_id": str(flow.id)},
)
assert response.status_code == 200
assert response.json()["session_kind"] == "flow"
@pytest.mark.asyncio
async def test_step_appends_to_walked_path(authed_l1_client, active_flow_session):
response = await authed_l1_client.post(
f"/api/v1/l1/sessions/{active_flow_session.id}/step",
json={"node_id": "n1", "question": "q1", "answer": "yes", "note": None},
)
assert response.status_code == 200
assert len(response.json()["walked_path"]) == 1
@pytest.mark.asyncio
async def test_step_blocked_on_adhoc_session(authed_l1_client, active_adhoc_session):
response = await authed_l1_client.post(
f"/api/v1/l1/sessions/{active_adhoc_session.id}/step",
json={"node_id": "n1", "question": "q1", "answer": "yes"},
)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_escalate_without_walk_creates_escalated_session(authed_l1_client):
response = await authed_l1_client.post(
"/api/v1/l1/escalate-without-walk",
json={
"problem_statement": "no kb yet",
"reason_category": "No KB available",
},
)
assert response.status_code == 200
assert response.json()["status"] == "escalated"
@pytest.mark.asyncio
async def test_l1_endpoints_block_viewer(authed_viewer_client):
response = await authed_viewer_client.post(
"/api/v1/l1/intake",
json={"problem_statement": "x"},
)
assert response.status_code == 403
- Step 5: Run all L1 endpoint tests.
docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py -v
- Step 6: Commit.
git add backend/app/schemas/l1.py backend/app/api/endpoints/l1.py backend/app/api/router.py backend/tests/test_l1_endpoints.py
git commit -m "feat(l1): L1 endpoints (intake/queue/sessions/step/notes/resolve/escalate)"
Task 16: APScheduler cleanup job for abandoned sessions
Files:
-
Create:
backend/app/services/l1_session_cleanup.py -
Modify:
backend/app/main.py— register the job in lifespan -
Test:
backend/tests/test_l1_session_cleanup.py -
Step 1: Write failing test.
import pytest
from datetime import datetime, timedelta, timezone
from app.services.l1_session_cleanup import flip_stale_sessions
@pytest.mark.asyncio
async def test_flip_stale_sessions_abandons_old_active_sessions(db_session, account, l1_user):
# Insert one stale active + one fresh active + one already resolved
stale = L1WalkSession(
account_id=account.id,
created_by_user_id=l1_user.id,
ticket_id='x', ticket_kind='internal', session_kind='adhoc',
status='active',
last_step_at=datetime.now(timezone.utc) - timedelta(hours=25),
walked_path=[], walk_notes=[],
)
fresh = L1WalkSession(
account_id=account.id,
created_by_user_id=l1_user.id,
ticket_id='y', ticket_kind='internal', session_kind='adhoc',
status='active',
last_step_at=datetime.now(timezone.utc) - timedelta(hours=1),
walked_path=[], walk_notes=[],
)
db_session.add_all([stale, fresh])
await db_session.flush()
count = await flip_stale_sessions(db_session)
assert count == 1
await db_session.refresh(stale)
await db_session.refresh(fresh)
assert stale.status == 'abandoned'
assert fresh.status == 'active'
-
Step 2: Run, verify failure.
-
Step 3: Implement.
backend/app/services/l1_session_cleanup.py:
import logging
from datetime import datetime, timedelta, timezone
from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.l1_walk_session import L1WalkSession
logger = logging.getLogger(__name__)
async def flip_stale_sessions(db: AsyncSession) -> int:
"""
Flip active sessions to 'abandoned' if their last_step_at is older than 24 hours.
Returns the number of rows flipped.
"""
cutoff = datetime.now(timezone.utc) - timedelta(hours=24)
stmt = (
update(L1WalkSession)
.where(L1WalkSession.status == 'active')
.where(L1WalkSession.last_step_at < cutoff)
.values(status='abandoned')
)
result = await db.execute(stmt)
await db.commit()
return result.rowcount or 0
async def run_cleanup_job(session_factory) -> None:
"""Entrypoint for APScheduler — uses _admin_session_factory per Lesson on RLS at startup."""
async with session_factory() as db:
try:
count = await flip_stale_sessions(db)
if count > 0:
logger.info("l1_session_cleanup: flipped %d sessions to abandoned", count)
except Exception:
logger.exception("l1_session_cleanup: error during run")
- Step 4: Register the job in
main.pylifespan.
In backend/app/main.py, find the lifespan / APScheduler setup and add:
from app.services.l1_session_cleanup import run_cleanup_job
# Inside the lifespan startup section, alongside other scheduled jobs:
scheduler.add_job(
run_cleanup_job,
'interval',
hours=1,
max_instances=1, # Lesson 1
args=[_admin_session_factory],
id='l1_session_cleanup',
replace_existing=True,
)
- Step 5: Run unit test, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_l1_session_cleanup.py -v
- Step 6: Verify job registered at startup (manual smoke).
Restart backend, grep logs for l1_session_cleanup job-registered line. APScheduler logs job registration at startup.
- Step 7: Commit.
git add backend/app/services/l1_session_cleanup.py backend/app/main.py backend/tests/test_l1_session_cleanup.py
git commit -m "feat(l1): APScheduler hourly cleanup job for abandoned sessions"
Task 17: RLS regression tests for new tables
Files:
-
Create:
backend/tests/test_l1_rls.py -
Step 1: Write the tests.
import pytest
from sqlalchemy import select
from app.models.internal_ticket import InternalTicket
from app.models.l1_walk_session import L1WalkSession
@pytest.mark.asyncio
async def test_l1_cannot_read_other_accounts_internal_tickets(
db_account_a_session, account_b_internal_ticket
):
"""RLS must block cross-tenant reads on internal_tickets."""
stmt = select(InternalTicket).where(InternalTicket.id == account_b_internal_ticket.id)
result = await db_account_a_session.execute(stmt)
assert result.scalar_one_or_none() is None
@pytest.mark.asyncio
async def test_l1_cannot_read_other_accounts_walk_sessions(
db_account_a_session, account_b_walk_session
):
stmt = select(L1WalkSession).where(L1WalkSession.id == account_b_walk_session.id)
result = await db_account_a_session.execute(stmt)
assert result.scalar_one_or_none() is None
@pytest.mark.asyncio
async def test_with_check_blocks_cross_tenant_insert(db_account_a_session, account_b):
"""RLS WITH CHECK must reject inserts setting another account's account_id."""
bad_row = InternalTicket(
account_id=account_b.id,
created_by_user_id=...,
problem_statement='cross-tenant attempt',
)
db_account_a_session.add(bad_row)
with pytest.raises(Exception): # InsufficientPrivilegeError or similar
await db_account_a_session.flush()
(Use fixtures from the existing RLS test suite — db_account_a_session already exists per the project's tenant-isolation Phase 4 work. Adapt to actual fixture names.)
- Step 2: Run.
docker exec resolutionflow_backend pytest backend/tests/test_l1_rls.py -v
Expected: 3 PASSED.
- Step 3: Commit.
git add backend/tests/test_l1_rls.py
git commit -m "test(l1): RLS regression tests for internal_tickets + l1_walk_sessions"
Task 18: Frontend — usePermissions extensions + role type
Files:
-
Modify:
frontend/src/types/auth.ts(orfrontend/src/types/user.ts— whereverUser.account_roleis typed) -
Modify:
frontend/src/hooks/usePermissions.ts -
Step 1: Locate the role union type.
grep -rn "account_role:" frontend/src/types/
- Step 2: Add
'l1_tech'to the union.
In the relevant file (likely frontend/src/types/auth.ts), find the type:
export type AccountRole = 'owner' | 'engineer' | 'viewer'
Change to:
export type AccountRole = 'owner' | 'engineer' | 'l1_tech' | 'viewer'
- Step 3: Update
usePermissions.ts.
Open usePermissions.ts. Find the getEffectiveRole and hasMinimumRole functions and update:
type EffectiveRole = 'super_admin' | 'owner' | 'engineer' | 'l1_tech' | 'viewer'
function getEffectiveRole(user: User | null): EffectiveRole {
if (!user) return 'viewer'
if (user.is_super_admin) return 'super_admin'
if (user.account_role === 'owner') return 'owner'
if (user.account_role === 'engineer') return 'engineer'
if (user.account_role === 'l1_tech') return 'l1_tech'
return 'viewer'
}
const ROLE_RANK: Record<EffectiveRole, number> = {
super_admin: 5,
owner: 4,
engineer: 3,
l1_tech: 2,
viewer: 1,
}
function hasMinimumRole(user: User | null, minimum: EffectiveRole): boolean {
const effective = getEffectiveRole(user)
return ROLE_RANK[effective] >= ROLE_RANK[minimum]
}
And in the main return object of usePermissions(), add:
return {
// ... existing returned values ...
isL1Tech: effectiveRole === 'l1_tech',
canCoverL1: Boolean(user?.can_cover_l1) || effectiveRole === 'owner' || effectiveRole === 'super_admin',
canUseL1Surface: effectiveRole === 'l1_tech' || effectiveRole === 'owner' || effectiveRole === 'super_admin' || (user?.account_role === 'engineer' && Boolean(user?.can_cover_l1)),
canUseEngineerSurface: hasMinimumRole(user, 'engineer'),
}
- Step 4: Update User type to include
can_cover_l1.
In the User type definition:
export interface User {
id: string
email: string
// ... existing fields ...
account_role: AccountRole
is_super_admin: boolean
can_cover_l1: boolean // NEW
// ... rest ...
}
- Step 5: Run frontend type check.
docker exec -w /app resolutionflow_frontend npx tsc -b
Expected: no new type errors (existing usages of account_role may need updates — fix any compilation errors caused by the union widening).
- Step 6: Commit.
git add frontend/src/types/auth.ts frontend/src/hooks/usePermissions.ts
git commit -m "feat(l1): usePermissions extensions for l1_tech + coverage flag"
Task 19: Frontend — Sidebar role-based nav + ProtectedRoute redirect
Files:
-
Modify:
frontend/src/components/layout/Sidebar.tsx -
Modify:
frontend/src/components/layout/ProtectedRoute.tsx -
Step 1: Update
Sidebar.tsxto render role-based nav.
Find the nav array construction in Sidebar.tsx. Wrap it in a helper that picks per role:
import { usePermissions } from '@/hooks/usePermissions'
import { LayoutGrid, Ticket, FileText, BookOpen, Settings } from 'lucide-react'
function getNavItems(perms: ReturnType<typeof usePermissions>) {
// L1 tech sees only L1-relevant entries
if (perms.isL1Tech) {
return [
{ href: '/l1', icon: LayoutGrid, label: 'Workspace', shortLabel: 'Work' },
{ href: '/l1/tickets', icon: Ticket, label: 'Tickets', shortLabel: 'Tickets' },
{ href: '/l1/drafts', icon: FileText, label: 'My Drafts', shortLabel: 'Drafts' },
{ href: '/guides', icon: BookOpen, label: 'Guides', shortLabel: 'Guides' },
{ href: '/account', icon: Settings, label: 'Account', shortLabel: 'Acct' },
]
}
// Existing engineer/owner nav (kept as-is)
const items = [/* ... existing items ... */]
// Append L1 Workspace entry for coverage engineers + owners
if (perms.canCoverL1) {
items.push({
href: '/l1', icon: LayoutGrid, label: 'L1 Workspace', shortLabel: 'L1',
})
}
return items
}
// Inside the Sidebar component:
const perms = usePermissions()
const navItems = getNavItems(perms)
- Step 2: Add L1 post-login redirect to
ProtectedRoute.tsx.
In ProtectedRoute.tsx, find the existing auth-state handling and add:
// After authentication checks pass, before rendering children:
if (user?.account_role === 'l1_tech' && location.pathname === '/') {
return <Navigate to="/l1" replace />
}
- Step 3: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b
- Step 4: Commit.
git add frontend/src/components/layout/Sidebar.tsx frontend/src/components/layout/ProtectedRoute.tsx
git commit -m "feat(l1): role-based sidebar nav + L1 post-login redirect"
Task 20: Frontend — router updates + L1RouteGuard
Files:
-
Create:
frontend/src/components/layout/L1RouteGuard.tsx -
Modify:
frontend/src/router.tsx -
Step 1: Create the guard component.
import { Navigate } from 'react-router'
import { usePermissions } from '@/hooks/usePermissions'
export function L1RouteGuard({ children }: { children: React.ReactNode }) {
const perms = usePermissions()
if (!perms.canUseL1Surface) {
return <Navigate to="/" replace />
}
return <>{children}</>
}
- Step 2: Register routes.
In frontend/src/router.tsx, near the other lazyWithRetry declarations:
const L1Dashboard = lazyWithRetry(() => import('@/pages/l1/L1Dashboard'))
const L1WalkPage = lazyWithRetry(() => import('@/pages/l1/L1WalkPage'))
const L1DraftsPage = lazyWithRetry(() => import('@/pages/l1/L1DraftsPage'))
const L1TicketsPage = lazyWithRetry(() => import('@/pages/l1/L1TicketsPage'))
And inside the / ProtectedRoute children array, alongside existing routes:
{ path: 'l1', element: <L1RouteGuard><L1Dashboard /></L1RouteGuard> },
{ path: 'l1/walk/:sessionId', element: <L1RouteGuard><L1WalkPage /></L1RouteGuard> },
{ path: 'l1/drafts', element: <L1RouteGuard><L1DraftsPage /></L1RouteGuard> },
{ path: 'l1/tickets', element: <L1RouteGuard><L1TicketsPage /></L1RouteGuard> },
The lazyWithRetry-wrapped components are React.lazy under the hood, so they need a Suspense boundary already provided by the existing page() helper. Adapt to whichever pattern the existing routes use (likely element: page(L1Dashboard) wrapped manually, or pass through the guard).
Adjust based on actual router pattern:
{ path: 'l1', element: <L1RouteGuard>{page(L1Dashboard)}</L1RouteGuard> },
- Step 3: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b
(Will fail until the actual page components exist in Task 21+. That's OK — placeholder check that imports resolve.)
- Step 4: Commit (pages stubbed in next task).
git add frontend/src/components/layout/L1RouteGuard.tsx frontend/src/router.tsx
git commit -m "feat(l1): register /l1/* routes + L1RouteGuard"
Task 21: Frontend — L1Dashboard (active + empty state)
Files:
-
Create:
frontend/src/pages/l1/L1Dashboard.tsx -
Create:
frontend/src/components/l1/EmptyStateCard.tsx -
Create:
frontend/src/components/l1/ResumeInProgress.tsx -
Create:
frontend/src/api/l1.ts -
Create:
frontend/src/types/l1.ts -
Step 1: Create types.
frontend/src/types/l1.ts:
export type SessionKind = 'flow' | 'proposal' | 'adhoc'
export type SessionStatus = 'active' | 'resolved' | 'escalated' | 'abandoned'
export type TicketKind = 'psa' | 'internal'
export interface WalkSession {
id: string
session_kind: SessionKind
flow_id: string | null
flow_proposal_id: string | null
current_node_id: string | null
walked_path: WalkStep[]
walk_notes: AdhocNote[]
status: SessionStatus
started_at: string
last_step_at: string
resolved_at: string | null
}
export interface WalkStep {
node_id: string
question: string
answer: string
l1_note: string | null
}
export interface AdhocNote {
timestamp: string
content: string
}
export interface QueueRow {
ticket_id: string
ticket_kind: TicketKind
problem_statement: string | null
customer_name: string | null
status: string
created_at: string | null
}
export interface IntakeRequest {
problem_statement: string
customer_name?: string
customer_contact?: string
flow_id?: string
}
export interface IntakeResponse {
session_id: string
session_kind: SessionKind
ticket_id: string
ticket_kind: TicketKind
}
- Step 2: Create API client.
frontend/src/api/l1.ts:
import { apiClient } from './client'
import type {
IntakeRequest, IntakeResponse, QueueRow, WalkSession,
WalkStep, AdhocNote,
} from '@/types/l1'
export const l1Api = {
intake: (body: IntakeRequest) =>
apiClient.post<IntakeResponse>('/api/v1/l1/intake', body).then(r => r.data),
queue: (statusFilter?: string) =>
apiClient.get<QueueRow[]>('/api/v1/l1/queue', {
params: statusFilter ? { status_filter: statusFilter } : {},
}).then(r => r.data),
getSession: (sessionId: string) =>
apiClient.get<WalkSession>(`/api/v1/l1/sessions/${sessionId}`).then(r => r.data),
listActiveSessions: () =>
apiClient.get<WalkSession[]>('/api/v1/l1/sessions/active').then(r => r.data),
step: (sessionId: string, step: { node_id: string; question: string; answer: string; note?: string | null }) =>
apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/step`, step).then(r => r.data),
notes: (sessionId: string, notes: AdhocNote[]) =>
apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/notes`, { notes }).then(r => r.data),
resolve: (sessionId: string, body: { helpful: boolean; resolution_notes: string }) =>
apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/resolve`, body).then(r => r.data),
escalate: (sessionId: string, body: { reason: string; reason_category: string }) =>
apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/escalate`, body).then(r => r.data),
escalateWithoutWalk: (body: {
problem_statement: string; customer_name?: string; customer_contact?: string;
reason_category: string; reason?: string;
}) =>
apiClient.post<WalkSession>('/api/v1/l1/escalate-without-walk', body).then(r => r.data),
}
- Step 3: Create EmptyStateCard.
frontend/src/components/l1/EmptyStateCard.tsx:
import { usePermissions } from '@/hooks/usePermissions'
interface Props {
onUploadClick?: () => void
onConfigureConnectorClick?: () => void
}
export function EmptyStateCard({ onUploadClick, onConfigureConnectorClick }: Props) {
const perms = usePermissions()
const isOwnerOrCoverage = perms.canCoverL1 // (owner/super_admin always have it)
return (
<div className="rounded-lg border border-default bg-card p-6">
<h2 className="font-heading text-xl font-bold text-heading mb-2">
Your knowledge base is empty
</h2>
<p className="text-muted-foreground mb-4">
L1 Workspace works best when your account has KB content or authored flows.
Right now there's nothing to match against — calls will start as ad-hoc walks.
</p>
{isOwnerOrCoverage ? (
<div className="flex gap-3">
{onUploadClick && (
<button
type="button"
onClick={onUploadClick}
className="rounded-md bg-accent text-white px-4 py-2 text-sm font-medium hover:bg-accent/90 transition-colors"
>
Upload KB content
</button>
)}
{onConfigureConnectorClick && (
<button
type="button"
onClick={onConfigureConnectorClick}
className="rounded-md border border-default px-4 py-2 text-sm font-medium hover:bg-elevated transition-colors"
>
Configure connector
</button>
)}
</div>
) : (
<ul className="text-sm text-muted-foreground space-y-1 ml-4 list-disc">
<li>Ask your admin to upload KB documents</li>
<li>Or configure a KB connector under Account → KB</li>
<li>Or author a flow in the Flows library</li>
</ul>
)}
</div>
)
}
- Step 4: Create ResumeInProgress.
frontend/src/components/l1/ResumeInProgress.tsx:
import { useEffect, useState } from 'react'
import { Link } from 'react-router'
import { l1Api } from '@/api/l1'
import type { WalkSession } from '@/types/l1'
export function ResumeInProgress() {
const [sessions, setSessions] = useState<WalkSession[] | null>(null)
useEffect(() => {
l1Api.listActiveSessions().then(setSessions).catch(() => setSessions([]))
}, [])
if (!sessions || sessions.length === 0) return null
return (
<section>
<div className="flex items-center gap-3 mb-3">
<span className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground">
Resume in progress · {sessions.length}
</span>
<div className="flex-1 h-px bg-border" />
</div>
<div className="rounded-lg border border-default bg-card overflow-hidden">
{sessions.map((s) => (
<Link
key={s.id}
to={`/l1/walk/${s.id}`}
className="flex items-center justify-between px-4 py-3 hover:bg-elevated transition-colors border-b border-default last:border-b-0"
>
<div className="flex items-center gap-3">
<span className="font-mono text-xs text-muted-foreground">#{s.id.slice(0, 8)}</span>
<span className="text-sm">
{s.session_kind === 'adhoc'
? `Adhoc · ${s.walk_notes.length} notes`
: `Step ${s.walked_path.length}`}
</span>
</div>
<span className="text-xs text-muted-foreground">
{new Date(s.last_step_at).toLocaleTimeString()}
</span>
</Link>
))}
</div>
</section>
)
}
- Step 5: Create the Dashboard page.
frontend/src/pages/l1/L1Dashboard.tsx:
import { useEffect, useState } from 'react'
import { useNavigate } from 'react-router'
import { PageMeta } from '@/components/common/PageMeta'
import { useAuthStore } from '@/store/authStore'
import { l1Api } from '@/api/l1'
import { EmptyStateCard } from '@/components/l1/EmptyStateCard'
import { ResumeInProgress } from '@/components/l1/ResumeInProgress'
import type { QueueRow } from '@/types/l1'
// Existing helper to fetch flows + KB doc count:
import { dashboardApi } from '@/api/dashboard' // adjust import to actual path
export default function L1Dashboard() {
const user = useAuthStore((s) => s.user)
const navigate = useNavigate()
const [problem, setProblem] = useState('')
const [customerName, setCustomerName] = useState('')
const [customerContact, setCustomerContact] = useState('')
const [submitting, setSubmitting] = useState(false)
const [queue, setQueue] = useState<QueueRow[]>([])
const [isEmpty, setIsEmpty] = useState<boolean | null>(null)
useEffect(() => {
l1Api.queue('open').then(setQueue).catch(() => setQueue([]))
// Detect empty state via the dashboard stats endpoint (existing).
// Adjust to actual endpoint that returns flow + KB counts.
dashboardApi.getStats().then(stats => {
setIsEmpty((stats?.flow_count ?? 0) === 0 && (stats?.kb_doc_count ?? 0) === 0)
}).catch(() => setIsEmpty(false))
}, [])
const handleStart = async () => {
if (!problem.trim()) return
setSubmitting(true)
try {
const response = await l1Api.intake({
problem_statement: problem.trim(),
customer_name: customerName.trim() || undefined,
customer_contact: customerContact.trim() || undefined,
})
navigate(`/l1/walk/${response.session_id}`)
} finally {
setSubmitting(false)
}
}
const now = new Date()
const greeting = now.getHours() < 12 ? 'morning' : now.getHours() < 18 ? 'afternoon' : 'evening'
const firstName = user?.name?.split(' ')[0] || 'there'
return (
<div className="overflow-y-auto h-full">
<PageMeta title="L1 Workspace" />
<div className="max-w-4xl mx-auto px-6 pt-12 pb-12 space-y-8">
{/* Hero greeting */}
<div>
<p className="font-sans text-xs uppercase tracking-[0.12em] text-muted-foreground mb-1">
{now.toLocaleDateString('en-US', { weekday: 'long', month: 'long', day: 'numeric' })}
</p>
<h1 className="font-heading text-3xl sm:text-4xl font-extrabold tracking-tight text-heading leading-tight">
Good {greeting}, {firstName}.
</h1>
</div>
{/* Empty state (first-run) */}
{isEmpty && <EmptyStateCard />}
{/* Describe the problem */}
<section>
<div className="flex items-center gap-3 mb-3">
<span className="w-1 h-4 bg-accent rounded-sm" />
<span className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground">
Describe the problem
</span>
</div>
<div className="rounded-lg border border-default bg-card p-4 space-y-3">
<textarea
value={problem}
onChange={(e) => setProblem(e.target.value)}
placeholder="What's the user calling about?"
autoFocus
rows={3}
className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40"
/>
<div className="grid grid-cols-2 gap-3">
<input
value={customerName}
onChange={(e) => setCustomerName(e.target.value)}
placeholder="Customer name (optional)"
className="bg-page border border-default rounded-md px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40"
/>
<input
value={customerContact}
onChange={(e) => setCustomerContact(e.target.value)}
placeholder="Email or phone (optional)"
className="bg-page border border-default rounded-md px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40"
/>
</div>
<div className="flex justify-end">
<button
type="button"
onClick={handleStart}
disabled={!problem.trim() || submitting}
className="rounded-md bg-accent text-white px-5 py-2 text-sm font-medium hover:bg-accent/90 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
>
{submitting ? 'Starting…' : 'Start walk →'}
</button>
</div>
</div>
</section>
{/* Open tickets */}
{queue.length > 0 && (
<section>
<div className="flex items-center gap-3 mb-3">
<span className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground">
Open tickets · {queue.length}
</span>
<div className="flex-1 h-px bg-border" />
</div>
<div className="rounded-lg border border-default bg-card overflow-hidden">
{queue.map((row) => (
// Phase 1: rows are display-only. Each ticket already has an active session
// owned by the L1 who created it (visible in their "Resume in progress" widget).
// Cross-L1 ticket claiming + PSA-fed queue rows ship in Phase 2 — at that point
// rows become clickable and call a "start session from ticket" endpoint.
<div
key={row.ticket_id}
className="px-4 py-3 border-b border-default last:border-b-0 transition-colors"
>
<div className="flex items-center justify-between">
<span className="text-sm">
<span className="font-mono text-xs text-muted-foreground mr-2">
#{row.ticket_id.slice(0, 8)}
</span>
{row.problem_statement}
</span>
<span className="text-xs text-muted-foreground">
{row.ticket_kind === 'psa' ? 'PSA' : 'Internal'}
</span>
</div>
</div>
))}
</div>
</section>
)}
{/* Resume in progress */}
<ResumeInProgress />
</div>
</div>
)
}
- Step 6: Type-check + manual smoke.
docker exec -w /app resolutionflow_frontend npx tsc -b
Start dev server, log in as an L1, verify the page renders.
- Step 7: Commit.
git add frontend/src/pages/l1/L1Dashboard.tsx frontend/src/components/l1/EmptyStateCard.tsx frontend/src/components/l1/ResumeInProgress.tsx frontend/src/api/l1.ts frontend/src/types/l1.ts
git commit -m "feat(l1): L1 dashboard with empty state + resume widget"
Task 22: Frontend — L1WalkPage tree variant
Files:
-
Create:
frontend/src/pages/l1/L1WalkPage.tsx -
Create:
frontend/src/components/l1/L1WalkTreeVariant.tsx -
Step 1: Create the WalkPage wrapper that dispatches to variant.
frontend/src/pages/l1/L1WalkPage.tsx:
import { useEffect, useState } from 'react'
import { useParams, useNavigate } from 'react-router'
import { PageMeta } from '@/components/common/PageMeta'
import { l1Api } from '@/api/l1'
import { L1WalkTreeVariant } from '@/components/l1/L1WalkTreeVariant'
import { L1WalkAdhocVariant } from '@/components/l1/L1WalkAdhocVariant'
import type { WalkSession } from '@/types/l1'
export default function L1WalkPage() {
const { sessionId } = useParams<{ sessionId: string }>()
const navigate = useNavigate()
const [session, setSession] = useState<WalkSession | null>(null)
const [error, setError] = useState<string | null>(null)
useEffect(() => {
if (!sessionId) return
l1Api.getSession(sessionId)
.then(setSession)
.catch(err => setError(err?.message || 'Failed to load session'))
}, [sessionId])
if (error) {
return <div className="p-6 text-error-foreground">{error}</div>
}
if (!session) {
return <div className="p-6 text-muted-foreground">Loading…</div>
}
const handleResolved = () => navigate('/l1')
const handleEscalated = () => navigate('/l1')
return (
<>
<PageMeta title="L1 Walk" />
{session.session_kind === 'adhoc' ? (
<L1WalkAdhocVariant
session={session}
onSessionUpdate={setSession}
onResolved={handleResolved}
onEscalated={handleEscalated}
/>
) : (
<L1WalkTreeVariant
session={session}
onSessionUpdate={setSession}
onResolved={handleResolved}
onEscalated={handleEscalated}
/>
)}
</>
)
}
- Step 2: Create the tree variant.
frontend/src/components/l1/L1WalkTreeVariant.tsx:
import { useState } from 'react'
import { ChevronLeft } from 'lucide-react'
import { Link } from 'react-router'
import { l1Api } from '@/api/l1'
import type { WalkSession } from '@/types/l1'
// Reuse existing flow node fetcher — adapt import to actual path:
// import { flowsApi } from '@/api/flows'
interface Props {
session: WalkSession
onSessionUpdate: (s: WalkSession) => void
onResolved: () => void
onEscalated: () => void
}
export function L1WalkTreeVariant({ session, onSessionUpdate, onResolved, onEscalated }: Props) {
const [showResolve, setShowResolve] = useState(false)
const [showEscalate, setShowEscalate] = useState(false)
const [currentNode, setCurrentNode] = useState<any | null>(null) // Replace `any` with Node type
const [note, setNote] = useState('')
// ... load currentNode based on session.flow_id + session.current_node_id ...
// ... pseudo-code; integrate with existing flow-loading helper ...
const handleAnswer = async (answer: string) => {
if (!currentNode) return
const updated = await l1Api.step(session.id, {
node_id: currentNode.id,
question: currentNode.question || currentNode.title,
answer,
note: note || null,
})
onSessionUpdate(updated)
setNote('')
// ... advance currentNode based on tree edge for `answer` ...
}
return (
<div className="flex flex-col h-full">
{/* Header */}
<header className="border-b border-default px-6 py-4 flex items-center justify-between bg-sidebar">
<Link to="/l1" className="flex items-center gap-2 text-muted-foreground hover:text-heading">
<ChevronLeft className="w-4 h-4" />
<span className="font-mono text-xs">#{session.id.slice(0, 8)}</span>
{session.session_kind === 'proposal' && (
<span className="ml-2 text-xs bg-accent/10 text-accent px-2 py-0.5 rounded">AI-built</span>
)}
</Link>
<div className="flex gap-2">
<button
onClick={() => setShowEscalate(true)}
className="rounded-md border border-default px-3 py-1.5 text-sm hover:bg-elevated transition-colors"
>
Escalate
</button>
<button
onClick={() => setShowResolve(true)}
className="rounded-md bg-accent text-white px-3 py-1.5 text-sm hover:bg-accent/90 transition-colors"
>
Resolve ✓
</button>
</div>
</header>
{/* Two-pane body */}
<div className="flex-1 flex min-h-0">
<main className="flex-1 p-6 overflow-y-auto">
<p className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground mb-2">
Step {session.walked_path.length + 1}
</p>
{currentNode ? (
<div className="rounded-lg border border-default bg-card p-6 max-w-2xl">
<p className="text-lg mb-6">{currentNode.question || currentNode.title}</p>
<div className="flex gap-3">
<button
onClick={() => handleAnswer('yes')}
className="flex-1 rounded-md bg-accent text-white py-3 text-base font-medium hover:bg-accent/90 min-h-[44px]"
>
Yes
</button>
<button
onClick={() => handleAnswer('no')}
className="flex-1 rounded-md border border-default py-3 text-base font-medium hover:bg-elevated min-h-[44px]"
>
No
</button>
</div>
<textarea
value={note}
onChange={(e) => setNote(e.target.value)}
placeholder="Optional note for this step…"
rows={2}
className="mt-4 w-full bg-page border border-default rounded-md px-3 py-2 text-sm"
/>
</div>
) : (
<p className="text-muted-foreground">Loading flow…</p>
)}
</main>
{/* Right pane: transcript */}
<aside className="w-80 border-l border-default bg-page p-4 overflow-y-auto">
<p className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground mb-3">
Walked so far
</p>
<ol className="space-y-2 text-sm">
{session.walked_path.map((step, i) => (
<li key={i} className="flex flex-col">
<span className="text-muted-foreground text-xs">{step.question}</span>
<span className="font-medium">→ {step.answer}</span>
{step.l1_note && <span className="text-muted-foreground text-xs italic">{step.l1_note}</span>}
</li>
))}
</ol>
</aside>
</div>
{/* Resolve modal */}
{showResolve && (
<ResolveModal
onClose={() => setShowResolve(false)}
onConfirm={async (helpful, notes) => {
await l1Api.resolve(session.id, { helpful, resolution_notes: notes })
onResolved()
}}
/>
)}
{/* Escalate modal */}
{showEscalate && (
<EscalateModal
onClose={() => setShowEscalate(false)}
onConfirm={async (category, reason) => {
await l1Api.escalate(session.id, { reason, reason_category: category })
onEscalated()
}}
/>
)}
</div>
)
}
function ResolveModal({ onClose, onConfirm }: { onClose: () => void; onConfirm: (helpful: boolean, notes: string) => Promise<void> }) {
const [helpful, setHelpful] = useState<boolean | null>(null)
const [notes, setNotes] = useState('')
const [submitting, setSubmitting] = useState(false)
return (
<div className="fixed inset-0 bg-black/60 flex items-center justify-center z-50">
<div className="bg-card border border-default rounded-lg p-6 max-w-md w-full">
<h3 className="font-heading text-lg font-bold mb-4">Did this resolve it?</h3>
<div className="flex gap-3 mb-4">
<button
onClick={() => setHelpful(true)}
className={`flex-1 py-2 rounded-md ${helpful === true ? 'bg-accent text-white' : 'border border-default hover:bg-elevated'}`}
>
Yes
</button>
<button
onClick={() => setHelpful(false)}
className={`flex-1 py-2 rounded-md ${helpful === false ? 'bg-warning text-white' : 'border border-default hover:bg-elevated'}`}
>
No
</button>
</div>
<textarea
value={notes}
onChange={(e) => setNotes(e.target.value)}
rows={3}
placeholder="Resolution notes…"
className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm mb-4"
/>
<div className="flex justify-end gap-2">
<button onClick={onClose} className="rounded-md border border-default px-4 py-2 text-sm">Cancel</button>
<button
disabled={helpful === null || submitting}
onClick={async () => {
setSubmitting(true)
try { await onConfirm(helpful!, notes) } finally { setSubmitting(false) }
}}
className="rounded-md bg-accent text-white px-4 py-2 text-sm disabled:opacity-50"
>
{submitting ? 'Saving…' : 'Confirm'}
</button>
</div>
</div>
</div>
)
}
function EscalateModal({ onClose, onConfirm }: { onClose: () => void; onConfirm: (category: string, reason: string) => Promise<void> }) {
const [category, setCategory] = useState('Out of L1 scope')
const [reason, setReason] = useState('')
const [submitting, setSubmitting] = useState(false)
return (
<div className="fixed inset-0 bg-black/60 flex items-center justify-center z-50">
<div className="bg-card border border-default rounded-lg p-6 max-w-md w-full">
<h3 className="font-heading text-lg font-bold mb-4">Escalate to engineering</h3>
<label className="block text-xs uppercase tracking-wider text-muted-foreground mb-1">Reason</label>
<select
value={category}
onChange={(e) => setCategory(e.target.value)}
className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm mb-3"
>
<option>Out of L1 scope</option>
<option>Customer demanding senior</option>
<option>Tree dead-ended</option>
<option>AI tree wrong</option>
<option>No KB available</option>
<option>Other</option>
</select>
<textarea
value={reason}
onChange={(e) => setReason(e.target.value)}
rows={3}
placeholder="Details (optional)…"
className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm mb-4"
/>
<div className="flex justify-end gap-2">
<button onClick={onClose} className="rounded-md border border-default px-4 py-2 text-sm">Cancel</button>
<button
disabled={submitting}
onClick={async () => {
setSubmitting(true)
try { await onConfirm(category, reason) } finally { setSubmitting(false) }
}}
className="rounded-md bg-warning text-white px-4 py-2 text-sm disabled:opacity-50"
>
{submitting ? 'Escalating…' : 'Confirm escalate'}
</button>
</div>
</div>
</div>
)
}
(The currentNode loading from the flow tree is left as an integration with the existing flow-fetch helper — adapt to the actual codebase pattern. The structure here is the variant scaffolding; node navigation will pull from trees data.)
- Step 3: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b
- Step 4: Commit.
git add frontend/src/pages/l1/L1WalkPage.tsx frontend/src/components/l1/L1WalkTreeVariant.tsx
git commit -m "feat(l1): L1WalkPage with tree variant (yes/no walker, escalate/resolve modals)"
Task 23: Frontend — adhoc walker variant
Files:
-
Create:
frontend/src/components/l1/L1WalkAdhocVariant.tsx -
Step 1: Create the adhoc variant.
import { useEffect, useRef, useState } from 'react'
import { ChevronLeft } from 'lucide-react'
import { Link } from 'react-router'
import { l1Api } from '@/api/l1'
import type { AdhocNote, WalkSession } from '@/types/l1'
interface Props {
session: WalkSession
onSessionUpdate: (s: WalkSession) => void
onResolved: () => void
onEscalated: () => void
}
export function L1WalkAdhocVariant({ session, onSessionUpdate, onResolved, onEscalated }: Props) {
const [notesText, setNotesText] = useState(() =>
session.walk_notes.map(n => n.content).join('\n\n')
)
const [savedAt, setSavedAt] = useState<Date | null>(null)
const [showResolve, setShowResolve] = useState(false)
const [showEscalate, setShowEscalate] = useState(false)
const saveTimer = useRef<number | null>(null)
// Debounced autosave
useEffect(() => {
if (saveTimer.current) window.clearTimeout(saveTimer.current)
saveTimer.current = window.setTimeout(async () => {
const notes: AdhocNote[] = notesText
.split('\n\n')
.map(c => c.trim())
.filter(Boolean)
.map(content => ({ timestamp: new Date().toISOString(), content }))
try {
const updated = await l1Api.notes(session.id, notes)
onSessionUpdate(updated)
setSavedAt(new Date())
} catch {
// Surface error via toast; omitted here for brevity
}
}, 300)
return () => {
if (saveTimer.current) window.clearTimeout(saveTimer.current)
}
}, [notesText, session.id])
return (
<div className="flex flex-col h-full">
<header className="border-b border-default px-6 py-4 flex items-center justify-between bg-sidebar">
<Link to="/l1" className="flex items-center gap-2 text-muted-foreground hover:text-heading">
<ChevronLeft className="w-4 h-4" />
<span className="font-mono text-xs">#{session.id.slice(0, 8)}</span>
<span className="ml-2 text-xs bg-info/10 text-info px-2 py-0.5 rounded">Ad-hoc walk</span>
</Link>
<div className="flex gap-2">
<button
onClick={() => setShowEscalate(true)}
className="rounded-md border border-default px-3 py-1.5 text-sm hover:bg-elevated transition-colors"
>
Escalate
</button>
<button
onClick={() => setShowResolve(true)}
className="rounded-md bg-accent text-white px-3 py-1.5 text-sm hover:bg-accent/90 transition-colors"
>
Resolve ✓
</button>
</div>
</header>
<main className="flex-1 p-6 overflow-y-auto">
<div className="max-w-3xl mx-auto">
<p className="text-sm text-muted-foreground mb-3">
Take notes as you work through the call. They're auto-saved.
</p>
<textarea
value={notesText}
onChange={(e) => setNotesText(e.target.value)}
rows={20}
placeholder="What did the customer say? What did you check? What did you try?"
className="w-full bg-card border border-default rounded-md px-4 py-3 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40 leading-relaxed"
/>
{savedAt && (
<p className="text-xs text-muted-foreground mt-2">
Saved {Math.round((Date.now() - savedAt.getTime()) / 1000)}s ago
</p>
)}
</div>
</main>
{/* Reuse the ResolveModal + EscalateModal from L1WalkTreeVariant — extract to a shared module
in a small follow-up commit if duplication is undesirable. Phase 1 is fine inline. */}
{showResolve && (
<ResolveModalInline
defaultNotes={notesText}
onClose={() => setShowResolve(false)}
onConfirm={async (helpful, notes) => {
await l1Api.resolve(session.id, { helpful, resolution_notes: notes })
onResolved()
}}
/>
)}
{showEscalate && (
<EscalateModalInline
onClose={() => setShowEscalate(false)}
onConfirm={async (category, reason) => {
await l1Api.escalate(session.id, { reason, reason_category: category })
onEscalated()
}}
/>
)}
</div>
)
}
// Same as ResolveModal/EscalateModal in L1WalkTreeVariant; rename and re-export
// from a shared file in a small refactor pass after Phase 1 lands.
function ResolveModalInline(props: any) { /* identical to ResolveModal from Task 22 */ return null }
function EscalateModalInline(props: any) { /* identical to EscalateModal from Task 22 */ return null }
Note for the engineer: the modal components are duplicated for now (Task 22 has them inline). After this task lands, do a small follow-up commit extracting
ResolveModal+EscalateModalintofrontend/src/components/l1/WalkModals.tsxand import from both variants. Keep Phase 1 unblocked by inlining; refactor before merge tomain.
- Step 2: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b
- Step 3: Commit.
git add frontend/src/components/l1/L1WalkAdhocVariant.tsx
git commit -m "feat(l1): adhoc walker variant with debounced note autosave"
Task 24: Frontend — drafts page, tickets page, coverage banner, seat counter widget
Files:
-
Create:
frontend/src/pages/l1/L1DraftsPage.tsx -
Create:
frontend/src/pages/l1/L1TicketsPage.tsx -
Create:
frontend/src/components/l1/L1CoverageBanner.tsx -
Create:
frontend/src/components/admin/SeatCounterWidget.tsx -
Create:
frontend/src/api/seats.ts -
Step 1: Create L1DraftsPage (read-only stub for Phase 1).
import { PageMeta } from '@/components/common/PageMeta'
export default function L1DraftsPage() {
return (
<div className="overflow-y-auto h-full">
<PageMeta title="My Drafts" />
<div className="max-w-4xl mx-auto px-6 pt-12 pb-12">
<h1 className="font-heading text-2xl font-bold mb-4">My AI drafts</h1>
<p className="text-muted-foreground">
AI-built drafts you've created will show here once AI build is enabled (Phase 2).
</p>
</div>
</div>
)
}
- Step 2: Create L1TicketsPage.
import { useEffect, useState } from 'react'
import { PageMeta } from '@/components/common/PageMeta'
import { l1Api } from '@/api/l1'
import type { QueueRow } from '@/types/l1'
export default function L1TicketsPage() {
const [rows, setRows] = useState<QueueRow[]>([])
const [statusFilter, setStatusFilter] = useState<string>('')
useEffect(() => {
l1Api.queue(statusFilter || undefined).then(setRows).catch(() => setRows([]))
}, [statusFilter])
return (
<div className="overflow-y-auto h-full">
<PageMeta title="Tickets" />
<div className="max-w-5xl mx-auto px-6 pt-12 pb-12">
<div className="flex items-center justify-between mb-6">
<h1 className="font-heading text-2xl font-bold">Tickets</h1>
<select
value={statusFilter}
onChange={(e) => setStatusFilter(e.target.value)}
className="bg-card border border-default rounded-md px-3 py-2 text-sm"
>
<option value="">All</option>
<option value="open">Open</option>
<option value="walking">Walking</option>
<option value="resolved">Resolved</option>
<option value="escalated">Escalated</option>
</select>
</div>
<div className="rounded-lg border border-default bg-card overflow-hidden">
{rows.map((r) => (
// Phase 1: rows are display-only (see comment in L1Dashboard). Phase 2 makes them clickable.
<div
key={r.ticket_id}
className="px-4 py-3 border-b border-default last:border-b-0"
>
<div className="flex items-center justify-between">
<div>
<span className="font-mono text-xs text-muted-foreground mr-2">
#{r.ticket_id.slice(0, 8)}
</span>
<span className="text-sm">{r.problem_statement}</span>
</div>
<div className="flex items-center gap-2">
<span className="text-xs px-2 py-0.5 rounded bg-elevated text-muted-foreground">
{r.status}
</span>
<span className="text-xs text-muted-foreground">
{r.ticket_kind === 'psa' ? 'PSA' : 'Internal'}
</span>
</div>
</div>
</div>
))}
{rows.length === 0 && (
<p className="px-4 py-8 text-sm text-muted-foreground text-center">No tickets.</p>
)}
</div>
</div>
</div>
)
}
- Step 3: Create L1CoverageBanner.
import { useNavigate } from 'react-router'
import { usePermissions } from '@/hooks/usePermissions'
export function L1CoverageBanner() {
const perms = usePermissions()
const navigate = useNavigate()
if (perms.isL1Tech) return null
if (!perms.canCoverL1) return null
return (
<div className="bg-info-dim text-info text-sm px-4 py-1.5 flex items-center justify-between border-b border-info/20">
<span>You're covering L1. Actions logged as coverage.</span>
<button
onClick={() => navigate('/')}
className="text-info underline-offset-2 hover:underline"
>
Switch back →
</button>
</div>
)
}
Mount it at the top of each /l1/* page (add <L1CoverageBanner /> as the first child of the page wrapper, or wrap inside L1RouteGuard).
- Step 4: Create seat-counter API + widget.
frontend/src/api/seats.ts:
import { apiClient } from './client'
export interface SeatCheck {
available: boolean
current: number
limit: number | null
role: 'engineer' | 'l1_tech'
}
export interface SeatUsage {
engineer: SeatCheck
l1_tech: SeatCheck
}
export const seatsApi = {
getUsage: () => apiClient.get<SeatUsage>('/api/v1/accounts/me/seats').then(r => r.data),
}
frontend/src/components/admin/SeatCounterWidget.tsx:
import { useEffect, useState } from 'react'
import { seatsApi, type SeatUsage } from '@/api/seats'
export function SeatCounterWidget() {
const [usage, setUsage] = useState<SeatUsage | null>(null)
useEffect(() => {
seatsApi.getUsage().then(setUsage).catch(() => setUsage(null))
}, [])
if (!usage) return null
return (
<div className="rounded-lg border border-default bg-card p-4 grid grid-cols-2 gap-4">
<SeatRow label="Engineer seats" check={usage.engineer} />
<SeatRow label="L1 seats" check={usage.l1_tech} />
</div>
)
}
function SeatRow({ label, check }: { label: string; check: SeatUsage['engineer'] }) {
const overLimit = check.limit !== null && check.current > check.limit
const limitText = check.limit === null ? '∞' : check.limit
return (
<div className={overLimit ? 'text-warning' : ''}>
<p className="text-xs uppercase tracking-wider text-muted-foreground mb-1">{label}</p>
<p className="text-lg font-mono">
{check.current} / {limitText}
</p>
{overLimit && <p className="text-xs">Over limit (grandfathered)</p>}
</div>
)
}
Drop this widget into /admin/users and /account/users pages (modify those pages to import and render it).
- Step 5: Type-check and smoke.
docker exec -w /app resolutionflow_frontend npx tsc -b
- Step 6: Commit.
git add frontend/src/pages/l1/L1DraftsPage.tsx frontend/src/pages/l1/L1TicketsPage.tsx frontend/src/components/l1/L1CoverageBanner.tsx frontend/src/components/admin/SeatCounterWidget.tsx frontend/src/api/seats.ts
git commit -m "feat(l1): drafts/tickets pages + coverage banner + seat counter widget"
Task 25: E2E Playwright tests
Files:
-
Create:
frontend/e2e/l1-workspace.spec.ts -
Step 1: Write the E2E tests.
import { test, expect } from '@playwright/test'
test.describe('L1 Workspace', () => {
test('L1 user lands on /l1, sees dashboard, can intake an adhoc walk and resolve', async ({ page }) => {
await page.goto('/login')
await page.fill('input[type=email]', 'l1@resolutionflow.example.com')
await page.fill('input[type=password]', 'TestPass123!')
await page.click('button[type=submit]')
// L1 lands on /l1
await expect(page).toHaveURL(/\/l1$/)
await expect(page.getByRole('heading', { name: /good (morning|afternoon|evening)/i })).toBeVisible()
// Intake
await page.fill('textarea[placeholder*="calling about"]', 'Customer says Outlook is broken')
await page.click('button:has-text("Start walk")')
// Lands on walker (adhoc since no flow_id and no AI build)
await expect(page).toHaveURL(/\/l1\/walk\//)
await expect(page.getByText('Ad-hoc walk')).toBeVisible()
// Take some notes
await page.fill('textarea[placeholder*="customer say"]', 'Walked customer through restarting Outlook')
// Resolve
await page.click('button:has-text("Resolve")')
await page.click('button:has-text("Yes")')
await page.fill('textarea[placeholder="Resolution notes…"]', 'Fixed via restart')
await page.click('button:has-text("Confirm")')
// Back to /l1
await expect(page).toHaveURL(/\/l1$/)
})
test('L1 cannot access /pilot, /trees/new, /escalations', async ({ page }) => {
await page.goto('/login')
// ... auth as L1 ...
await page.goto('/pilot')
await expect(page).toHaveURL(/\/l1$/) // Redirected away
await page.goto('/trees/new')
await expect(page).not.toHaveURL(/\/trees\/new/)
await page.goto('/escalations')
await expect(page).not.toHaveURL(/\/escalations/)
})
test('Engineer with coverage flag sees L1 entry and coverage banner', async ({ page }) => {
// Pre-seed: engineer user with can_cover_l1=true
await page.goto('/login')
await page.fill('input[type=email]', 'engineer-coverage@resolutionflow.example.com')
await page.fill('input[type=password]', 'TestPass123!')
await page.click('button[type=submit]')
// Sidebar shows L1 Workspace
await expect(page.getByRole('link', { name: /L1 Workspace/i })).toBeVisible()
await page.click('a:has-text("L1 Workspace")')
// Coverage banner visible
await expect(page.getByText(/Covering L1/)).toBeVisible()
})
test('Owner inviting past engineer seat limit gets 402', async ({ page, request }) => {
// Seed an account with seat_limit=2 and 2 active engineers (use API or DB fixture)
// ... auth as owner ...
const response = await request.post('/api/v1/invites', {
data: { email: 'fourth@example.com', role: 'engineer' },
headers: { Authorization: 'Bearer <owner-token>' },
})
expect(response.status()).toBe(402)
const body = await response.json()
expect(body.detail.code).toBe('seat_limit_exceeded')
})
})
(Pre-seed users for l1@resolutionflow.example.com and engineer-coverage@resolutionflow.example.com need to be added to the existing seed script in backend/scripts/seed_test_users.py — include this as part of the task.)
- Step 2: Add the seed users.
In backend/scripts/seed_test_users.py, add:
# Add to the seed list:
{
'email': 'l1@resolutionflow.example.com',
'name': 'L1 Test',
'account_role': 'l1_tech',
'is_super_admin': False,
},
{
'email': 'engineer-coverage@resolutionflow.example.com',
'name': 'Engineer Coverage',
'account_role': 'engineer',
'is_super_admin': False,
'can_cover_l1': True,
},
Re-run the seed:
docker exec -w /app resolutionflow_backend python -m scripts.seed_test_users
- Step 3: Run E2E tests.
cd frontend && npx playwright test e2e/l1-workspace.spec.ts
Expected: 4 tests pass.
- Step 4: Commit.
git add frontend/e2e/l1-workspace.spec.ts backend/scripts/seed_test_users.py
git commit -m "test(l1): E2E Playwright suite + seed L1 + coverage engineer users"
Task 26: Phase 1 acceptance validation
This is the final task — manual verification against the spec's acceptance criteria (spec §15).
- Step 1: Run the full backend test suite.
docker exec resolutionflow_backend pytest --override-ini="addopts="
Expected: all green, including the new L1/RLS/seat tests.
- Step 2: Build frontend cleanly.
docker exec -w /app resolutionflow_frontend npm run build
Expected: success, no type errors.
- Step 3: Run E2E.
cd frontend && npx playwright test
- Step 4: Acceptance walkthrough — check each spec §15 line by line.
For each criterion in spec §15 (the v1 acceptance list), manually verify:
-
L1 role assignable; L1 user sees L1 sidebar only; no engineer route reachable (
/pilot,/trees/new,/escalationsredirect or 403) -
L1 intake creates a ticket and lands in walker session (flow or adhoc)
-
Walker handles flow walks AND adhoc walks; resolve/escalate work in both
-
Concurrent sessions supported (start two intakes, both appear in "Resume in progress" widget)
-
Browser-close recovery (refresh mid-walk → state restored)
-
First-run empty-state card renders on a brand-new account
-
Escalate generates package + reassigns ticket (Phase 1: internal ticket only; PSA reassignment is a stub)
-
Resolve flips
validated_by_outcomewhen walking a proposal withhelpful=true(use seed data with a manually-created FlowProposal) -
Coverage flag end-to-end: owner toggles
can_cover_l1on engineer; engineer sees/l1nav and banner; audit log row hasacting_as='l1_coverage' -
Seat enforcement blocks engineer invite at limit; blocks L1 invite at limit; grandfathered over-seated account still functions
-
RLS: L1 in account A cannot read account B's tickets / walk sessions / drafts (covered by
test_l1_rls.py) -
L1 seat count tracked separately from engineer seats in
SeatCounterWidget -
L1 cannot access
/account/kb(route guard redirects) -
Step 5: If any acceptance criterion fails, file a small follow-up task.
Don't try to fix every issue inside Task 26 — Phase 1's job is to ship the foundation. File a follow-up commit for any non-blocking gap.
- Step 6: Final commit (if anything was tweaked during acceptance walkthrough).
git status
git add <fixed files>
git commit -m "fix(l1): acceptance-walkthrough fixes for Phase 1"
- Step 7: Push the branch.
git push -u origin design/l1-workspace
- Step 8: Open PR.
Use the project's PR pattern (gh CLI on the GitHub mirror, or Gitea web UI). PR body should reference both the spec and this plan.
Phase 1 done
Once all 26 tasks ship, Phase 1 is complete. The L1 surface works end-to-end against authored flows and adhoc walks; AI build and KB connectors are deferred to Phase 2 and Phase 3 (separate plans).
The Phase 2 plan (docs/superpowers/plans/2026-MM-DD-l1-workspace-phase-2.md) will cover:
kb_documents+kb_document_chunkstables- Manual upload + paste KB ingestion (extending KB Accelerator)
match_or_buildorchestratorai_tree_builder(Anthropic-backed)- SUGGEST_THRESHOLD near-miss UX
- Full BuildAbortedNoKB screen with three CTAs
- FlowProposal generation pipeline
The Phase 3 plan will cover the three KB connectors (IT Glue, Hudu, Microsoft Graph) — each can ship independently within the phase.