diff --git a/docs/superpowers/plans/2026-05-28-l1-workspace-phase-1.md b/docs/superpowers/plans/2026-05-28-l1-workspace-phase-1.md new file mode 100644 index 00000000..db5ac99e --- /dev/null +++ b/docs/superpowers/plans/2026-05-28-l1-workspace-phase-1.md @@ -0,0 +1,4092 @@ +# L1 Workspace — Phase 1 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Ship the L1 helpdesk workspace — new role, dedicated `/l1/*` surface, walker (tree + adhoc variants), session lifecycle, escalation, internal-ticket fallback, coverage flag, seat enforcement — working end-to-end against existing authored flows. + +**Architecture:** New `l1_tech` role between `engineer` and `viewer`. Two new tables (`l1_walk_sessions`, `internal_tickets`) and small column additions to `flow_proposals`, `users`, `accounts`, `subscriptions`, `audit_logs`. New `services/l1_session_service.py` orchestrates walking-session lifecycle. New `services/seat_enforcement.py` shared by invite/role-change paths for both L1 and engineer seats. Frontend gets a role-gated `/l1/*` page tree. AI tree-builder and KB connectors are explicitly out of Phase 1 (Phases 2 and 3). + +**Tech Stack:** Python 3.12 + FastAPI + SQLAlchemy 2.0 async + Alembic + Pydantic v2 + APScheduler (backend). React 19 + Vite + TypeScript + Tailwind v4 + Zustand + React Router v7 (frontend). PostgreSQL 16 with RLS. + +**Spec:** [docs/superpowers/specs/2026-05-28-l1-workspace-design.md](../specs/2026-05-28-l1-workspace-design.md) — read in full before starting. + +**Out of scope for Phase 1** (deferred to Phase 2/3): `match_or_build` orchestrator, AI tree-builder, `kb_documents` tables, KB connectors (IT Glue / Hudu / Microsoft Graph), `SUGGEST_THRESHOLD` near-miss UX, BuildAbortedNoKB screen with near-miss option. **User-facing flow selection** is also deferred — Phase 1 intake creates adhoc walks only; the walker's flow variant exists in code and works when `flow_id` is passed to the intake endpoint (used by tests + direct API), but the UI surface for L1s to manually pick a flow ships in Phase 2 alongside the AI matcher. FlowProposal column extensions ARE included in Phase 1 (model is ready; Phase 2 populates). + +--- + +## File Structure + +**Backend — new files:** +- `backend/app/models/l1_walk_session.py` — SQLAlchemy `L1WalkSession` +- `backend/app/models/internal_ticket.py` — SQLAlchemy `InternalTicket` +- `backend/app/schemas/l1.py` — Pydantic request/response shapes +- `backend/app/schemas/seat_enforcement.py` — `SeatCheckResult`, `SeatUsage` shapes +- `backend/app/services/seat_enforcement.py` — `check_seat_available` shared helper +- `backend/app/services/internal_ticket_service.py` — CRUD + status transitions +- `backend/app/services/l1_session_service.py` — session lifecycle (start/step/notes/resolve/escalate) +- `backend/app/services/l1_session_cleanup.py` — APScheduler hourly job, 24h abandonment +- `backend/app/api/endpoints/l1.py` — all `/l1/*` endpoints +- `backend/alembic/versions/_add_l1_tech_role.py` — role + column additions +- `backend/alembic/versions/_extend_flow_proposals.py` — FlowProposal columns +- `backend/alembic/versions/_create_internal_tickets.py` — table + RLS +- `backend/alembic/versions/_create_l1_walk_sessions.py` — table + RLS + check constraint +- `backend/tests/test_seat_enforcement.py` +- `backend/tests/test_internal_ticket_service.py` +- `backend/tests/test_l1_session_service.py` +- `backend/tests/test_l1_endpoints.py` +- `backend/tests/test_l1_rls.py` + +**Backend — modified files:** +- `backend/app/models/flow_proposal.py` — add new columns +- `backend/app/models/user.py` — add `can_cover_l1` +- `backend/app/models/account.py` — add `l1_seats_purchased` +- `backend/app/models/subscription.py` — add `l1_seat_limit` +- `backend/app/models/audit_log.py` — add `acting_as` +- `backend/app/core/permissions.py` — add `l1_tech` to role docstring + helpers +- `backend/app/api/deps.py` — add `require_l1`, `require_l1_or_coverage`, `require_l1_or_above` +- `backend/app/api/router.py` — register `l1` router + `internal-tickets` router +- `backend/app/api/endpoints/invite.py` — integrate seat enforcement +- `backend/app/api/endpoints/accounts.py` — seat-usage endpoint + coverage PATCH + role-change check +- `backend/app/main.py` — register cleanup scheduler in lifespan + +**Frontend — new files:** +- `frontend/src/pages/l1/L1Dashboard.tsx` +- `frontend/src/pages/l1/L1WalkPage.tsx` +- `frontend/src/pages/l1/L1DraftsPage.tsx` +- `frontend/src/pages/l1/L1TicketsPage.tsx` +- `frontend/src/components/l1/L1WalkTreeVariant.tsx` +- `frontend/src/components/l1/L1WalkAdhocVariant.tsx` +- `frontend/src/components/l1/L1CoverageBanner.tsx` +- `frontend/src/components/l1/EmptyStateCard.tsx` +- `frontend/src/components/l1/ResumeInProgress.tsx` +- `frontend/src/components/admin/SeatCounterWidget.tsx` +- `frontend/src/components/layout/L1RouteGuard.tsx` +- `frontend/src/api/l1.ts` +- `frontend/src/types/l1.ts` + +**Frontend — modified files:** +- `frontend/src/hooks/usePermissions.ts` — add `isL1Tech`, `canCoverL1`, role-tier check +- `frontend/src/components/layout/Sidebar.tsx` — role-based nav array +- `frontend/src/components/layout/ProtectedRoute.tsx` — L1 post-login redirect +- `frontend/src/router.tsx` — register `/l1/*` routes +- `frontend/src/types/auth.ts` (or wherever `User.account_role` lives) — add `'l1_tech'` to union +- `frontend/src/api/index.ts` — export `l1` API +- `frontend/src/types/index.ts` — export `l1` types + +--- + +## Task 1: Backend — extend role docstring + permission helpers + +**Files:** +- Modify: `backend/app/core/permissions.py` (header docstring + any role-list constants) + +- [ ] **Step 1: Open file and locate the role docstring (around lines 5–10).** + +Read [permissions.py](../../backend/app/core/permissions.py) to confirm current shape. + +- [ ] **Step 2: Add `l1_tech` to the role docstring.** + +Replace the existing role list block: + +```python +""" +Permissions module. + +Role hierarchy: +- super_admin: is_super_admin=True, full system access +- owner: account_role='owner', manage account resources +- engineer: account_role='engineer' (default), CRUD own trees/steps +- l1_tech: account_role='l1_tech', use /l1/* surface only — walk flows, resolve/escalate +- viewer: account_role='viewer', read-only (can browse, run sessions, rate steps) +""" +``` + +- [ ] **Step 3: If there's a `VALID_ROLES` constant or similar enum/list, add `'l1_tech'`.** + +Grep first: + +```bash +grep -n "engineer.*viewer\|VALID_ROLES\|ROLE_HIERARCHY" backend/app/core/permissions.py +``` + +If a list/tuple exists, insert `'l1_tech'` between `'engineer'` and `'viewer'`. If not, no change. + +- [ ] **Step 4: No tests for this docstring-only change. Move to commit.** + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/core/permissions.py +git commit -m "feat(l1): add l1_tech role to permissions docstring" +``` + +--- + +## Task 2: Backend — add `require_l1`, `require_l1_or_coverage`, `require_l1_or_above` deps + +**Files:** +- Modify: `backend/app/api/deps.py` (add new dep functions after `require_engineer_or_admin`) +- Test: `backend/tests/test_deps_l1.py` (new) + +- [ ] **Step 1: Write the failing tests.** + +Create `backend/tests/test_deps_l1.py`: + +```python +import pytest +from fastapi import HTTPException +from app.api.deps import require_l1, require_l1_or_coverage, require_l1_or_above +from tests.factories import make_user # existing test factory + + +@pytest.mark.asyncio +async def test_require_l1_passes_for_l1_tech(): + user = make_user(account_role='l1_tech') + result = await require_l1(current_user=user) + assert result is user + + +@pytest.mark.asyncio +async def test_require_l1_blocks_engineer(): + user = make_user(account_role='engineer') + with pytest.raises(HTTPException) as exc: + await require_l1(current_user=user) + assert exc.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_require_l1_or_coverage_passes_engineer_with_flag(): + user = make_user(account_role='engineer', can_cover_l1=True) + result = await require_l1_or_coverage(current_user=user) + assert result is user + + +@pytest.mark.asyncio +async def test_require_l1_or_coverage_blocks_engineer_without_flag(): + user = make_user(account_role='engineer', can_cover_l1=False) + with pytest.raises(HTTPException) as exc: + await require_l1_or_coverage(current_user=user) + assert exc.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_require_l1_or_coverage_passes_owner_always(): + user = make_user(account_role='owner', can_cover_l1=False) + result = await require_l1_or_coverage(current_user=user) + assert result is user + + +@pytest.mark.asyncio +async def test_require_l1_or_above_passes_engineer(): + user = make_user(account_role='engineer') + result = await require_l1_or_above(current_user=user) + assert result is user + + +@pytest.mark.asyncio +async def test_require_l1_or_above_blocks_viewer(): + user = make_user(account_role='viewer') + with pytest.raises(HTTPException) as exc: + await require_l1_or_above(current_user=user) + assert exc.value.status_code == 403 +``` + +- [ ] **Step 2: Run tests to verify they fail.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_deps_l1.py -v +``` +Expected: ImportError (require_l1 etc. don't exist yet). + +- [ ] **Step 3: Add the new deps in `backend/app/api/deps.py`.** + +After the existing `require_engineer_or_admin` definition, add: + +```python +async def require_l1( + current_user: User = Depends(get_current_active_user), +) -> User: + """L1 tech only (exact match). Used by endpoints exclusive to L1 self-service.""" + if current_user.is_super_admin: + return current_user # super_admin bypass for support purposes + if current_user.account_role != "l1_tech": + raise HTTPException(status_code=403, detail="L1 tech role required") + return current_user + + +async def require_l1_or_coverage( + current_user: User = Depends(get_current_active_user), +) -> User: + """ + L1 endpoints accessible to: l1_tech, engineers with can_cover_l1, owners, super_admin. + The "coverage" tier — engineers covering a frontline shift. + """ + if current_user.is_super_admin: + return current_user + role = current_user.account_role + if role == "l1_tech": + return current_user + if role == "owner": + return current_user + if role == "engineer" and current_user.can_cover_l1: + return current_user + raise HTTPException( + status_code=403, + detail="L1 access requires l1_tech role or engineer coverage flag", + ) + + +async def require_l1_or_above( + current_user: User = Depends(get_current_active_user), +) -> User: + """ + Anyone at L1 tier or higher. Used for shared resources L1s can see + (e.g., flow library, KB connector list view). + """ + if current_user.is_super_admin: + return current_user + if current_user.account_role in ("l1_tech", "engineer", "owner"): + return current_user + raise HTTPException(status_code=403, detail="L1 or above required") +``` + +Also ensure `User` model is imported at the top of `deps.py` (likely already imported). + +- [ ] **Step 4: Run tests to verify they pass.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_deps_l1.py -v +``` +Expected: 7 PASSED. + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/api/deps.py backend/tests/test_deps_l1.py +git commit -m "feat(l1): add require_l1, require_l1_or_coverage, require_l1_or_above deps" +``` + +--- + +## Task 3: Migration — column additions (can_cover_l1, l1_seats_purchased, audit_logs.acting_as, l1_seat_limit) + +**Files:** +- Create: `backend/alembic/versions/_add_l1_columns.py` +- Modify: `backend/app/models/user.py` (add `can_cover_l1`) +- Modify: `backend/app/models/account.py` (add `l1_seats_purchased`) +- Modify: `backend/app/models/subscription.py` (add `l1_seat_limit`) +- Modify: `backend/app/models/audit_log.py` (add `acting_as`) + +- [ ] **Step 1: Generate the manual migration (no autogenerate).** + +```bash +docker exec -w /app resolutionflow_backend alembic revision -m "add_l1_columns" +``` +This creates a file `backend/alembic/versions/_add_l1_columns.py`. Note the hash from the output. + +- [ ] **Step 2: Write the migration content.** + +Open the generated file and replace the `upgrade()` / `downgrade()` bodies: + +```python +def upgrade() -> None: + op.add_column( + 'users', + sa.Column('can_cover_l1', sa.Boolean(), nullable=False, server_default='false'), + ) + op.add_column( + 'accounts', + sa.Column('l1_seats_purchased', sa.Integer(), nullable=False, server_default='0'), + ) + op.add_column( + 'subscriptions', + sa.Column('l1_seat_limit', sa.Integer(), nullable=True), + ) + op.add_column( + 'audit_logs', + sa.Column('acting_as', sa.String(30), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column('audit_logs', 'acting_as') + op.drop_column('subscriptions', 'l1_seat_limit') + op.drop_column('accounts', 'l1_seats_purchased') + op.drop_column('users', 'can_cover_l1') +``` + +- [ ] **Step 3: Add the matching columns to the SQLAlchemy models.** + +In `backend/app/models/user.py`, add inside the `User` class column block: +```python +can_cover_l1: Mapped[bool] = mapped_column( + sa.Boolean(), nullable=False, server_default=sa.text('false') +) +``` + +In `backend/app/models/account.py`: +```python +l1_seats_purchased: Mapped[int] = mapped_column( + sa.Integer(), nullable=False, server_default=sa.text('0') +) +``` + +In `backend/app/models/subscription.py`: +```python +l1_seat_limit: Mapped[Optional[int]] = mapped_column(sa.Integer(), nullable=True) +``` + +In `backend/app/models/audit_log.py`: +```python +acting_as: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True) +``` + +- [ ] **Step 4: Run migration to verify it applies.** + +```bash +docker exec -w /app resolutionflow_backend alembic upgrade head +``` +Expected: applies successfully, no errors. + +- [ ] **Step 5: Verify schema via psql.** + +```bash +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d users" | grep can_cover_l1 +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d accounts" | grep l1_seats_purchased +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d subscriptions" | grep l1_seat_limit +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d audit_logs" | grep acting_as +``` +Expected: each grep returns the new column row. + +- [ ] **Step 6: Verify downgrade works (then upgrade again).** + +```bash +docker exec -w /app resolutionflow_backend alembic downgrade -1 +docker exec -w /app resolutionflow_backend alembic upgrade head +``` + +- [ ] **Step 7: Commit.** + +```bash +git add backend/alembic/versions/_add_l1_columns.py backend/app/models/user.py backend/app/models/account.py backend/app/models/subscription.py backend/app/models/audit_log.py +git commit -m "feat(l1): add column additions migration (can_cover_l1, l1_seats_purchased, l1_seat_limit, acting_as)" +``` + +--- + +## Task 4: Migration — extend `flow_proposals` with L1 source columns + +**Files:** +- Create: `backend/alembic/versions/_extend_flow_proposals.py` +- Modify: `backend/app/models/flow_proposal.py` + +Spec §5.1: add `source`, `linked_ticket_id`, `linked_ticket_kind`, `validated_by_outcome` to `flow_proposals`. (`walked_path_snapshot` is NOT added — it lives on `l1_walk_sessions` per the revised design.) + +- [ ] **Step 1: Generate migration.** + +```bash +docker exec -w /app resolutionflow_backend alembic revision -m "extend_flow_proposals_l1" +``` + +- [ ] **Step 2: Write migration content.** + +```python +def upgrade() -> None: + op.add_column( + 'flow_proposals', + sa.Column('source', sa.String(30), nullable=True), + ) + op.add_column( + 'flow_proposals', + sa.Column('linked_ticket_id', sa.String(64), nullable=True), + ) + op.add_column( + 'flow_proposals', + sa.Column('linked_ticket_kind', sa.String(10), nullable=True), + ) + op.add_column( + 'flow_proposals', + sa.Column('validated_by_outcome', sa.Boolean(), nullable=False, server_default='false'), + ) + + # Backfill existing rows + op.execute("UPDATE flow_proposals SET source = 'manual_draft' WHERE source IS NULL") + + # Now enforce NOT NULL on source + op.alter_column('flow_proposals', 'source', nullable=False) + + # CHECK constraint on source values + op.create_check_constraint( + 'ck_flow_proposals_source', + 'flow_proposals', + "source IN ('ai_realtime_l1', 'kb_accelerator', 'manual_draft', 'ai_promoted')", + ) + + # CHECK constraint on linked_ticket_kind values + op.create_check_constraint( + 'ck_flow_proposals_linked_ticket_kind', + 'flow_proposals', + "linked_ticket_kind IS NULL OR linked_ticket_kind IN ('psa', 'internal')", + ) + + +def downgrade() -> None: + op.drop_constraint('ck_flow_proposals_linked_ticket_kind', 'flow_proposals', type_='check') + op.drop_constraint('ck_flow_proposals_source', 'flow_proposals', type_='check') + op.drop_column('flow_proposals', 'validated_by_outcome') + op.drop_column('flow_proposals', 'linked_ticket_kind') + op.drop_column('flow_proposals', 'linked_ticket_id') + op.drop_column('flow_proposals', 'source') +``` + +- [ ] **Step 3: Update `FlowProposal` model.** + +In `backend/app/models/flow_proposal.py`, add inside the class: + +```python +source: Mapped[str] = mapped_column(sa.String(30), nullable=False, server_default=sa.text("'manual_draft'")) +linked_ticket_id: Mapped[Optional[str]] = mapped_column(sa.String(64), nullable=True) +linked_ticket_kind: Mapped[Optional[str]] = mapped_column(sa.String(10), nullable=True) +validated_by_outcome: Mapped[bool] = mapped_column( + sa.Boolean(), nullable=False, server_default=sa.text('false') +) +``` + +Also update `__table_args__` to include the new check constraints: + +```python +__table_args__ = ( + # ... existing constraints ... + CheckConstraint( + "source IN ('ai_realtime_l1', 'kb_accelerator', 'manual_draft', 'ai_promoted')", + name="ck_flow_proposals_source", + ), + CheckConstraint( + "linked_ticket_kind IS NULL OR linked_ticket_kind IN ('psa', 'internal')", + name="ck_flow_proposals_linked_ticket_kind", + ), +) +``` + +- [ ] **Step 4: Apply migration.** + +```bash +docker exec -w /app resolutionflow_backend alembic upgrade head +``` + +- [ ] **Step 5: Verify with psql.** + +```bash +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d flow_proposals" +``` +Expected: shows new columns + check constraints. + +- [ ] **Step 6: Commit.** + +```bash +git add backend/alembic/versions/_extend_flow_proposals.py backend/app/models/flow_proposal.py +git commit -m "feat(l1): extend FlowProposal with source/linked_ticket/validated_by_outcome" +``` + +--- + +## Task 5: Migration + model — create `internal_tickets` table + +**Files:** +- Create: `backend/alembic/versions/_create_internal_tickets.py` +- Create: `backend/app/models/internal_ticket.py` + +- [ ] **Step 1: Generate migration.** + +```bash +docker exec -w /app resolutionflow_backend alembic revision -m "create_internal_tickets" +``` + +- [ ] **Step 2: Write migration content.** + +```python +def upgrade() -> None: + op.create_table( + 'internal_tickets', + sa.Column('id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('account_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('created_by_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('customer_name', sa.String(120), nullable=True), + sa.Column('customer_contact', sa.String(200), nullable=True), + sa.Column('problem_statement', sa.Text(), nullable=False), + sa.Column('status', sa.String(30), nullable=False, server_default='open'), + sa.Column('flow_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('flow_proposal_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('ai_session_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('assigned_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('resolution_notes', sa.Text(), nullable=True), + sa.Column('psa_promoted_ticket_id', sa.String(64), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')), + sa.Column('resolved_at', sa.DateTime(timezone=True), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['account_id'], ['accounts.id'], ondelete='CASCADE'), + sa.ForeignKeyConstraint(['created_by_user_id'], ['users.id'], ondelete='RESTRICT'), + sa.ForeignKeyConstraint(['flow_id'], ['trees.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['flow_proposal_id'], ['flow_proposals.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['ai_session_id'], ['ai_sessions.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['assigned_user_id'], ['users.id'], ondelete='SET NULL'), + sa.CheckConstraint( + "status IN ('open', 'walking', 'resolved', 'escalated')", + name='ck_internal_tickets_status', + ), + ) + op.create_index('ix_internal_tickets_account_id', 'internal_tickets', ['account_id']) + op.create_index('ix_internal_tickets_status', 'internal_tickets', ['status']) + op.create_index('ix_internal_tickets_assigned_user_id', 'internal_tickets', ['assigned_user_id']) + + # RLS — match the project pattern + op.execute("ALTER TABLE internal_tickets ENABLE ROW LEVEL SECURITY") + op.execute(""" + CREATE POLICY internal_tickets_account_isolation ON internal_tickets + USING (account_id = current_setting('app.current_account_id')::uuid) + WITH CHECK (account_id = current_setting('app.current_account_id')::uuid) + """) + + +def downgrade() -> None: + op.execute("DROP POLICY IF EXISTS internal_tickets_account_isolation ON internal_tickets") + op.execute("ALTER TABLE internal_tickets DISABLE ROW LEVEL SECURITY") + op.drop_index('ix_internal_tickets_assigned_user_id', 'internal_tickets') + op.drop_index('ix_internal_tickets_status', 'internal_tickets') + op.drop_index('ix_internal_tickets_account_id', 'internal_tickets') + op.drop_table('internal_tickets') +``` + +- [ ] **Step 3: Create the SQLAlchemy model.** + +Create `backend/app/models/internal_ticket.py`: + +```python +import uuid +from datetime import datetime +from typing import Optional + +import sqlalchemy as sa +from sqlalchemy import CheckConstraint, ForeignKey +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class InternalTicket(Base): + __tablename__ = "internal_tickets" + __table_args__ = ( + CheckConstraint( + "status IN ('open', 'walking', 'resolved', 'escalated')", + name="ck_internal_tickets_status", + ), + ) + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + account_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("accounts.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + created_by_user_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="RESTRICT"), + nullable=False, + ) + customer_name: Mapped[Optional[str]] = mapped_column(sa.String(120), nullable=True) + customer_contact: Mapped[Optional[str]] = mapped_column(sa.String(200), nullable=True) + problem_statement: Mapped[str] = mapped_column(sa.Text(), nullable=False) + status: Mapped[str] = mapped_column( + sa.String(30), nullable=False, server_default=sa.text("'open'"), index=True + ) + flow_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), ForeignKey("trees.id", ondelete="SET NULL"), nullable=True + ) + flow_proposal_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), ForeignKey("flow_proposals.id", ondelete="SET NULL"), nullable=True + ) + ai_session_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), ForeignKey("ai_sessions.id", ondelete="SET NULL"), nullable=True + ) + assigned_user_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True + ) + resolution_notes: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True) + psa_promoted_ticket_id: Mapped[Optional[str]] = mapped_column(sa.String(64), nullable=True) + created_at: Mapped[datetime] = mapped_column( + sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()") + ) + updated_at: Mapped[datetime] = mapped_column( + sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()"), + onupdate=sa.text("now()"), + ) + resolved_at: Mapped[Optional[datetime]] = mapped_column(sa.DateTime(timezone=True), nullable=True) +``` + +- [ ] **Step 4: Apply migration.** + +```bash +docker exec -w /app resolutionflow_backend alembic upgrade head +``` + +- [ ] **Step 5: Verify RLS policy via psql.** + +```bash +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "SELECT polname, polcmd, polqual FROM pg_policy WHERE polrelid = 'internal_tickets'::regclass" +``` +Expected: one policy row `internal_tickets_account_isolation`. + +- [ ] **Step 6: Commit.** + +```bash +git add backend/alembic/versions/_create_internal_tickets.py backend/app/models/internal_ticket.py +git commit -m "feat(l1): create internal_tickets table with RLS" +``` + +--- + +## Task 6: Migration + model — create `l1_walk_sessions` table + +**Files:** +- Create: `backend/alembic/versions/_create_l1_walk_sessions.py` +- Create: `backend/app/models/l1_walk_session.py` + +- [ ] **Step 1: Generate migration.** + +```bash +docker exec -w /app resolutionflow_backend alembic revision -m "create_l1_walk_sessions" +``` + +- [ ] **Step 2: Write migration content.** + +```python +def upgrade() -> None: + op.create_table( + 'l1_walk_sessions', + sa.Column('id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('account_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('created_by_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('acting_as', sa.String(30), nullable=True), + sa.Column('ticket_id', sa.String(64), nullable=False), + sa.Column('ticket_kind', sa.String(10), nullable=False), + sa.Column('session_kind', sa.String(20), nullable=False), + sa.Column('flow_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('flow_proposal_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('current_node_id', sa.String(100), nullable=True), + sa.Column('walked_path', sa.dialects.postgresql.JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")), + sa.Column('walk_notes', sa.dialects.postgresql.JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")), + sa.Column('status', sa.String(20), nullable=False, server_default='active'), + sa.Column('resolution_notes', sa.Text(), nullable=True), + sa.Column('helpful', sa.Boolean(), nullable=True), + sa.Column('escalation_reason', sa.Text(), nullable=True), + sa.Column('escalation_reason_category', sa.String(30), nullable=True), + sa.Column('started_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')), + sa.Column('last_step_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')), + sa.Column('resolved_at', sa.DateTime(timezone=True), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['account_id'], ['accounts.id'], ondelete='CASCADE'), + sa.ForeignKeyConstraint(['created_by_user_id'], ['users.id'], ondelete='RESTRICT'), + sa.ForeignKeyConstraint(['flow_id'], ['trees.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['flow_proposal_id'], ['flow_proposals.id'], ondelete='SET NULL'), + sa.CheckConstraint( + "ticket_kind IN ('psa', 'internal')", + name='ck_l1_walk_sessions_ticket_kind', + ), + sa.CheckConstraint( + "session_kind IN ('flow', 'proposal', 'adhoc')", + name='ck_l1_walk_sessions_session_kind', + ), + sa.CheckConstraint( + "status IN ('active', 'resolved', 'escalated', 'abandoned')", + name='ck_l1_walk_sessions_status', + ), + sa.CheckConstraint( + "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) " + "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) " + "OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)", + name='ck_l1_walk_sessions_target_consistency', + ), + ) + op.create_index('ix_l1_walk_sessions_account_id', 'l1_walk_sessions', ['account_id']) + op.create_index('ix_l1_walk_sessions_created_by_user_id', 'l1_walk_sessions', ['created_by_user_id']) + op.create_index('ix_l1_walk_sessions_status', 'l1_walk_sessions', ['status']) + op.create_index('ix_l1_walk_sessions_last_step_at', 'l1_walk_sessions', ['last_step_at']) + + op.execute("ALTER TABLE l1_walk_sessions ENABLE ROW LEVEL SECURITY") + op.execute(""" + CREATE POLICY l1_walk_sessions_account_isolation ON l1_walk_sessions + USING (account_id = current_setting('app.current_account_id')::uuid) + WITH CHECK (account_id = current_setting('app.current_account_id')::uuid) + """) + + +def downgrade() -> None: + op.execute("DROP POLICY IF EXISTS l1_walk_sessions_account_isolation ON l1_walk_sessions") + op.execute("ALTER TABLE l1_walk_sessions DISABLE ROW LEVEL SECURITY") + op.drop_index('ix_l1_walk_sessions_last_step_at', 'l1_walk_sessions') + op.drop_index('ix_l1_walk_sessions_status', 'l1_walk_sessions') + op.drop_index('ix_l1_walk_sessions_created_by_user_id', 'l1_walk_sessions') + op.drop_index('ix_l1_walk_sessions_account_id', 'l1_walk_sessions') + op.drop_table('l1_walk_sessions') +``` + +- [ ] **Step 3: Create the SQLAlchemy model.** + +Create `backend/app/models/l1_walk_session.py`: + +```python +import uuid +from datetime import datetime +from typing import Any, Optional + +import sqlalchemy as sa +from sqlalchemy import CheckConstraint, ForeignKey +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class L1WalkSession(Base): + __tablename__ = "l1_walk_sessions" + __table_args__ = ( + CheckConstraint( + "ticket_kind IN ('psa', 'internal')", + name="ck_l1_walk_sessions_ticket_kind", + ), + CheckConstraint( + "session_kind IN ('flow', 'proposal', 'adhoc')", + name="ck_l1_walk_sessions_session_kind", + ), + CheckConstraint( + "status IN ('active', 'resolved', 'escalated', 'abandoned')", + name="ck_l1_walk_sessions_status", + ), + CheckConstraint( + "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) " + "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) " + "OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)", + name="ck_l1_walk_sessions_target_consistency", + ), + ) + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + account_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("accounts.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + created_by_user_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="RESTRICT"), + nullable=False, + index=True, + ) + acting_as: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True) + ticket_id: Mapped[str] = mapped_column(sa.String(64), nullable=False) + ticket_kind: Mapped[str] = mapped_column(sa.String(10), nullable=False) + session_kind: Mapped[str] = mapped_column(sa.String(20), nullable=False) + flow_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), ForeignKey("trees.id", ondelete="SET NULL"), nullable=True + ) + flow_proposal_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), ForeignKey("flow_proposals.id", ondelete="SET NULL"), nullable=True + ) + current_node_id: Mapped[Optional[str]] = mapped_column(sa.String(100), nullable=True) + walked_path: Mapped[list[dict[str, Any]]] = mapped_column( + JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb") + ) + walk_notes: Mapped[list[dict[str, Any]]] = mapped_column( + JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb") + ) + status: Mapped[str] = mapped_column( + sa.String(20), nullable=False, server_default=sa.text("'active'"), index=True + ) + resolution_notes: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True) + helpful: Mapped[Optional[bool]] = mapped_column(sa.Boolean(), nullable=True) + escalation_reason: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True) + escalation_reason_category: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True) + started_at: Mapped[datetime] = mapped_column( + sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()") + ) + last_step_at: Mapped[datetime] = mapped_column( + sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()"), index=True + ) + resolved_at: Mapped[Optional[datetime]] = mapped_column(sa.DateTime(timezone=True), nullable=True) +``` + +- [ ] **Step 4: Apply migration + verify RLS.** + +```bash +docker exec -w /app resolutionflow_backend alembic upgrade head +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "SELECT polname FROM pg_policy WHERE polrelid = 'l1_walk_sessions'::regclass" +``` + +- [ ] **Step 5: Verify check constraint blocks invalid combos.** + +```bash +docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "INSERT INTO l1_walk_sessions (id, account_id, created_by_user_id, ticket_id, ticket_kind, session_kind) VALUES (gen_random_uuid(), '00000000-0000-0000-0000-000000000000', '00000000-0000-0000-0000-000000000000', 'x', 'psa', 'flow')" +``` +Expected: violates `ck_l1_walk_sessions_target_consistency` (flow_id NULL). + +- [ ] **Step 6: Commit.** + +```bash +git add backend/alembic/versions/_create_l1_walk_sessions.py backend/app/models/l1_walk_session.py +git commit -m "feat(l1): create l1_walk_sessions table with RLS + target-consistency check" +``` + +--- + +## Task 7: Seat enforcement service + +**Files:** +- Create: `backend/app/schemas/seat_enforcement.py` +- Create: `backend/app/services/seat_enforcement.py` +- Test: `backend/tests/test_seat_enforcement.py` + +- [ ] **Step 1: Write the failing tests.** + +Create `backend/tests/test_seat_enforcement.py`: + +```python +import pytest +from app.services.seat_enforcement import check_seat_available +from tests.factories import make_account, make_subscription, make_user + + +@pytest.mark.asyncio +async def test_engineer_seat_available_when_under_limit(db_session): + account = await make_account(db_session) + sub = await make_subscription(db_session, account_id=account.id, seat_limit=5) + for _ in range(3): + await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True) + result = await check_seat_available(account, sub, 'engineer', db_session) + assert result.available is True + assert result.current == 3 + assert result.limit == 5 + + +@pytest.mark.asyncio +async def test_engineer_seat_unavailable_when_at_limit(db_session): + account = await make_account(db_session) + sub = await make_subscription(db_session, account_id=account.id, seat_limit=2) + for _ in range(2): + await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True) + result = await check_seat_available(account, sub, 'engineer', db_session) + assert result.available is False + assert result.current == 2 + assert result.limit == 2 + + +@pytest.mark.asyncio +async def test_l1_seat_uses_separate_limit(db_session): + account = await make_account(db_session) + sub = await make_subscription(db_session, account_id=account.id, seat_limit=2, l1_seat_limit=10) + for _ in range(2): + await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True) + # Engineer limit hit but L1 seats still available + eng_result = await check_seat_available(account, sub, 'engineer', db_session) + l1_result = await check_seat_available(account, sub, 'l1_tech', db_session) + assert eng_result.available is False + assert l1_result.available is True + assert l1_result.limit == 10 + + +@pytest.mark.asyncio +async def test_unlimited_when_limit_null(db_session): + account = await make_account(db_session) + sub = await make_subscription(db_session, account_id=account.id, seat_limit=None) + for _ in range(100): + await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True) + result = await check_seat_available(account, sub, 'engineer', db_session) + assert result.available is True + assert result.limit is None +``` + +- [ ] **Step 2: Run tests, verify failure.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v +``` +Expected: ImportError. + +- [ ] **Step 3: Create schema.** + +`backend/app/schemas/seat_enforcement.py`: + +```python +from typing import Literal, Optional + +from pydantic import BaseModel + +Role = Literal['engineer', 'l1_tech'] + + +class SeatCheckResult(BaseModel): + available: bool + current: int + limit: Optional[int] # None = unlimited + role: Role + + +class SeatUsage(BaseModel): + engineer: SeatCheckResult + l1_tech: SeatCheckResult +``` + +- [ ] **Step 4: Create service.** + +`backend/app/services/seat_enforcement.py`: + +```python +from typing import Literal + +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.account import Account +from app.models.subscription import Subscription +from app.models.user import User +from app.schemas.seat_enforcement import SeatCheckResult + + +Role = Literal['engineer', 'l1_tech'] + + +def _limit_for_role(subscription: Subscription, role: Role) -> int | None: + if role == 'engineer': + return subscription.seat_limit + if role == 'l1_tech': + return subscription.l1_seat_limit + raise ValueError(f"Unknown role: {role}") + + +async def check_seat_available( + account: Account, + subscription: Subscription, + role: Role, + db: AsyncSession, +) -> SeatCheckResult: + """ + Count active users with the given role in the account, compare against + the role-specific seat limit on the subscription. Returns availability. + + None limit = unlimited (returns available=True). + """ + limit = _limit_for_role(subscription, role) + + stmt = ( + select(func.count(User.id)) + .where(User.account_id == account.id) + .where(User.account_role == role) + .where(User.is_active.is_(True)) + ) + current = (await db.execute(stmt)).scalar_one() + + if limit is None: + return SeatCheckResult(available=True, current=current, limit=None, role=role) + return SeatCheckResult( + available=current < limit, + current=current, + limit=limit, + role=role, + ) + + +async def get_seat_usage( + account: Account, + subscription: Subscription, + db: AsyncSession, +) -> tuple[SeatCheckResult, SeatCheckResult]: + """Return (engineer, l1_tech) seat-usage tuple for the seat-counter widget.""" + eng = await check_seat_available(account, subscription, 'engineer', db) + l1 = await check_seat_available(account, subscription, 'l1_tech', db) + return eng, l1 +``` + +- [ ] **Step 5: Run tests, verify pass.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v +``` +Expected: 4 PASSED. + +- [ ] **Step 6: Commit.** + +```bash +git add backend/app/schemas/seat_enforcement.py backend/app/services/seat_enforcement.py backend/tests/test_seat_enforcement.py +git commit -m "feat(l1): add seat_enforcement service for engineer + L1 seat limits" +``` + +--- + +## Task 8: Integrate seat enforcement into invite + accept-invite + role change + +**Files:** +- Modify: `backend/app/api/endpoints/invite.py` — block invite create when limit reached +- Modify: `backend/app/api/endpoints/accounts.py` or wherever accept-invite lives — re-check at accept time +- Modify: wherever role-change PATCH lives (likely `accounts.py` or `admin.py`) — re-check before commit +- Test: extend `backend/tests/test_seat_enforcement.py` + +- [ ] **Step 1: Write failing integration tests.** + +Add to `backend/tests/test_seat_enforcement.py`: + +```python +@pytest.mark.asyncio +async def test_invite_blocked_when_engineer_seats_full(authed_owner_client, db_session): + # Setup: account at engineer seat limit + # ... (use existing test fixtures to seed an account with N=limit engineers) + response = await authed_owner_client.post( + "/api/v1/invites", + json={"email": "new@example.com", "role": "engineer"}, + ) + assert response.status_code == 402 + body = response.json() + assert body["detail"]["code"] == "seat_limit_exceeded" + assert body["detail"]["role"] == "engineer" + assert body["detail"]["current"] == body["detail"]["limit"] + + +@pytest.mark.asyncio +async def test_invite_blocked_when_l1_seats_full(authed_owner_client, db_session): + # Same as above but for l1_tech role + response = await authed_owner_client.post( + "/api/v1/invites", + json={"email": "new@example.com", "role": "l1_tech"}, + ) + assert response.status_code == 402 + + +@pytest.mark.asyncio +async def test_invite_succeeds_when_l1_seats_available(authed_owner_client, db_session): + # Account with l1_seat_limit=10, current L1 count = 0 + response = await authed_owner_client.post( + "/api/v1/invites", + json={"email": "new@example.com", "role": "l1_tech"}, + ) + assert response.status_code == 201 + + +@pytest.mark.asyncio +async def test_role_change_blocked_when_target_seats_full(authed_owner_client, viewer_user, db_session): + # Try promoting viewer → engineer when engineer seats are full + response = await authed_owner_client.patch( + f"/api/v1/users/{viewer_user.id}/role", + json={"account_role": "engineer"}, + ) + assert response.status_code == 402 +``` + +- [ ] **Step 2: Run, verify failure.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py::test_invite_blocked_when_engineer_seats_full -v +``` + +- [ ] **Step 3: Add seat check to invite create endpoint.** + +In `backend/app/api/endpoints/invite.py`, locate the create endpoint and add at the top (before persistence): + +```python +from app.services.seat_enforcement import check_seat_available + +# ... inside the create handler ... +if payload.role in ('engineer', 'l1_tech'): + # Load the account's subscription (existing pattern in the codebase) + sub = await get_active_subscription(db, current_user.account_id) + result = await check_seat_available(account, sub, payload.role, db) + if not result.available: + raise HTTPException( + status_code=402, + detail={ + "code": "seat_limit_exceeded", + "role": result.role, + "current": result.current, + "limit": result.limit, + "upgrade_url": "/account/billing", # Frontend deep-links to the existing /account/billing page; Stripe customer portal link is generated server-side there. + }, + ) +``` + +(If the codebase has an existing helper like `get_active_subscription`, use it. If not, add a thin helper in `services/billing.py`.) + +- [ ] **Step 4: Add check to accept-invite endpoint.** + +Find the accept-invite endpoint (`grep -rn "accept.invite\|@router.post.*accept" backend/app/api/endpoints/`). Add the same `check_seat_available` call before the user is upgraded from invite to active user. Race-condition guard. + +- [ ] **Step 5: Add check to role-change endpoint.** + +Find the role-change PATCH endpoint. If promoting toward `engineer` or `l1_tech`, run the check first. Return same 402 shape on block. + +- [ ] **Step 6: Run integration tests, verify pass.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v +``` + +- [ ] **Step 7: Commit.** + +```bash +git add backend/app/api/endpoints/invite.py backend/app/api/endpoints/accounts.py backend/tests/test_seat_enforcement.py +git commit -m "feat(l1): enforce seat limits on invite, accept-invite, role-change for engineer + L1" +``` + +--- + +## Task 9: `GET /api/v1/accounts/me/seats` endpoint + +**Files:** +- Modify: `backend/app/api/endpoints/accounts.py` (add endpoint) +- Test: `backend/tests/test_l1_endpoints.py` (new) + +- [ ] **Step 1: Write failing test.** + +Create `backend/tests/test_l1_endpoints.py`: + +```python +import pytest + + +@pytest.mark.asyncio +async def test_get_seats_returns_both_role_counts(authed_engineer_client, db_session): + response = await authed_engineer_client.get("/api/v1/accounts/me/seats") + assert response.status_code == 200 + body = response.json() + assert "engineer" in body + assert "l1_tech" in body + assert {"available", "current", "limit", "role"}.issubset(body["engineer"].keys()) + + +@pytest.mark.asyncio +async def test_get_seats_blocked_for_viewer(authed_viewer_client): + response = await authed_viewer_client.get("/api/v1/accounts/me/seats") + assert response.status_code == 403 +``` + +- [ ] **Step 2: Run, verify failure.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py::test_get_seats_returns_both_role_counts -v +``` + +- [ ] **Step 3: Add endpoint to `accounts.py`.** + +```python +from app.services.seat_enforcement import get_seat_usage +from app.schemas.seat_enforcement import SeatUsage + +@router.get("/me/seats", response_model=SeatUsage) +async def get_my_account_seat_usage( + current_user: User = Depends(require_engineer_or_admin), + db: AsyncSession = Depends(get_db), +): + account = await db.get(Account, current_user.account_id) + sub = await get_active_subscription(db, current_user.account_id) + engineer, l1_tech = await get_seat_usage(account, sub, db) + return SeatUsage(engineer=engineer, l1_tech=l1_tech) +``` + +- [ ] **Step 4: Run tests, verify pass.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py -v +``` + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/api/endpoints/accounts.py backend/tests/test_l1_endpoints.py +git commit -m "feat(l1): GET /accounts/me/seats endpoint" +``` + +--- + +## Task 10: `PATCH /api/v1/users/{id}/coverage` endpoint + +**Files:** +- Modify: `backend/app/api/endpoints/accounts.py` (or wherever user-management endpoints live) +- Test: extend `backend/tests/test_l1_endpoints.py` + +- [ ] **Step 1: Write failing tests.** + +```python +@pytest.mark.asyncio +async def test_owner_can_toggle_coverage_on_engineer(authed_owner_client, engineer_user): + response = await authed_owner_client.patch( + f"/api/v1/users/{engineer_user.id}/coverage", + json={"can_cover_l1": True}, + ) + assert response.status_code == 200 + assert response.json()["can_cover_l1"] is True + + +@pytest.mark.asyncio +async def test_engineer_cannot_toggle_coverage_on_self(authed_engineer_client, engineer_user): + response = await authed_engineer_client.patch( + f"/api/v1/users/{engineer_user.id}/coverage", + json={"can_cover_l1": True}, + ) + assert response.status_code == 403 + + +@pytest.mark.asyncio +async def test_coverage_only_applies_to_engineers(authed_owner_client, viewer_user): + response = await authed_owner_client.patch( + f"/api/v1/users/{viewer_user.id}/coverage", + json={"can_cover_l1": True}, + ) + assert response.status_code == 422 # validation: viewer can't have coverage +``` + +- [ ] **Step 2: Run, verify failure.** + +- [ ] **Step 3: Add endpoint.** + +```python +from pydantic import BaseModel + +class CoveragePatch(BaseModel): + can_cover_l1: bool + +@router.patch("/users/{user_id}/coverage") +async def patch_user_coverage( + user_id: UUID, + payload: CoveragePatch, + current_user: User = Depends(require_account_owner), + db: AsyncSession = Depends(get_db), +): + target = await db.get(User, user_id) + if not target or target.account_id != current_user.account_id: + raise HTTPException(status_code=404) + if target.account_role != 'engineer': + raise HTTPException( + status_code=422, + detail="can_cover_l1 only applies to engineers", + ) + target.can_cover_l1 = payload.can_cover_l1 + await db.commit() + return {"id": str(target.id), "can_cover_l1": target.can_cover_l1} +``` + +- [ ] **Step 4: Run tests, verify pass.** + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/api/endpoints/accounts.py backend/tests/test_l1_endpoints.py +git commit -m "feat(l1): PATCH /users/{id}/coverage for engineer L1-coverage flag" +``` + +--- + +## Task 11: `internal_ticket_service.py` + +**Files:** +- Create: `backend/app/services/internal_ticket_service.py` +- Test: `backend/tests/test_internal_ticket_service.py` + +- [ ] **Step 1: Write failing tests.** + +```python +import pytest +import uuid +from app.services.internal_ticket_service import ( + create_ticket, update_status, get_ticket, list_tickets_for_account, +) + + +@pytest.mark.asyncio +async def test_create_ticket_sets_status_open(db_session, account, l1_user): + ticket = await create_ticket( + db_session, + account_id=account.id, + created_by_user_id=l1_user.id, + problem_statement="Outlook can't connect", + customer_name="Alice", + ) + assert ticket.status == 'open' + assert ticket.account_id == account.id + + +@pytest.mark.asyncio +async def test_update_status_to_resolved_sets_resolved_at(db_session, internal_ticket): + updated = await update_status(db_session, ticket_id=internal_ticket.id, status='resolved') + assert updated.status == 'resolved' + assert updated.resolved_at is not None + + +@pytest.mark.asyncio +async def test_list_tickets_filters_by_account(db_session, account_a, account_b, ticket_a, ticket_b): + rows = await list_tickets_for_account(db_session, account_id=account_a.id) + assert ticket_a in rows + assert ticket_b not in rows +``` + +- [ ] **Step 2: Run, verify failure.** + +- [ ] **Step 3: Implement service.** + +```python +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.internal_ticket import InternalTicket + + +async def create_ticket( + db: AsyncSession, + *, + account_id: UUID, + created_by_user_id: UUID, + problem_statement: str, + customer_name: Optional[str] = None, + customer_contact: Optional[str] = None, +) -> InternalTicket: + ticket = InternalTicket( + account_id=account_id, + created_by_user_id=created_by_user_id, + problem_statement=problem_statement, + customer_name=customer_name, + customer_contact=customer_contact, + ) + db.add(ticket) + await db.flush() + return ticket + + +async def update_status( + db: AsyncSession, + *, + ticket_id: UUID, + status: str, + resolution_notes: Optional[str] = None, + assigned_user_id: Optional[UUID] = None, +) -> InternalTicket: + ticket = await db.get(InternalTicket, ticket_id) + if not ticket: + raise ValueError(f"InternalTicket {ticket_id} not found") + ticket.status = status + if status == 'resolved': + ticket.resolved_at = datetime.now(timezone.utc) + if resolution_notes is not None: + ticket.resolution_notes = resolution_notes + if assigned_user_id is not None: + ticket.assigned_user_id = assigned_user_id + await db.flush() + return ticket + + +async def get_ticket(db: AsyncSession, ticket_id: UUID) -> Optional[InternalTicket]: + return await db.get(InternalTicket, ticket_id) + + +async def list_tickets_for_account( + db: AsyncSession, + *, + account_id: UUID, + status: Optional[str] = None, + limit: int = 100, +) -> list[InternalTicket]: + stmt = select(InternalTicket).where(InternalTicket.account_id == account_id) + if status: + stmt = stmt.where(InternalTicket.status == status) + stmt = stmt.order_by(InternalTicket.created_at.desc()).limit(limit) + result = await db.execute(stmt) + return list(result.scalars()) + + +async def promote_to_psa( + db: AsyncSession, + *, + ticket_id: UUID, + psa_ticket_id: str, +) -> InternalTicket: + ticket = await db.get(InternalTicket, ticket_id) + if not ticket: + raise ValueError(f"InternalTicket {ticket_id} not found") + ticket.psa_promoted_ticket_id = psa_ticket_id + await db.flush() + return ticket +``` + +- [ ] **Step 4: Run, verify pass.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_internal_ticket_service.py -v +``` + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/services/internal_ticket_service.py backend/tests/test_internal_ticket_service.py +git commit -m "feat(l1): internal_ticket_service with CRUD + status transitions" +``` + +--- + +## Task 12: `l1_session_service.py` — start (flow / proposal / adhoc) + +**Files:** +- Create: `backend/app/services/l1_session_service.py` +- Test: `backend/tests/test_l1_session_service.py` + +This task implements the session start path only. Steps/notes/resolve/escalate are split into Tasks 13/14 to keep each task bite-sized. + +- [ ] **Step 1: Write failing tests.** + +```python +import pytest +from app.services.l1_session_service import start_flow_session, start_adhoc_session + + +@pytest.mark.asyncio +async def test_start_flow_session_creates_active_session(db_session, account, l1_user, flow, internal_ticket): + session = await start_flow_session( + db_session, + account_id=account.id, + user=l1_user, + flow_id=flow.id, + ticket_id=str(internal_ticket.id), + ticket_kind='internal', + ) + assert session.session_kind == 'flow' + assert session.flow_id == flow.id + assert session.flow_proposal_id is None + assert session.status == 'active' + assert session.walked_path == [] + + +@pytest.mark.asyncio +async def test_start_adhoc_session_no_flow(db_session, account, l1_user, internal_ticket): + session = await start_adhoc_session( + db_session, + account_id=account.id, + user=l1_user, + ticket_id=str(internal_ticket.id), + ticket_kind='internal', + ) + assert session.session_kind == 'adhoc' + assert session.flow_id is None + assert session.flow_proposal_id is None + assert session.walked_path == [] + assert session.walk_notes == [] + + +@pytest.mark.asyncio +async def test_start_session_records_acting_as_for_coverage_engineer( + db_session, account, engineer_with_coverage, flow, internal_ticket +): + session = await start_flow_session( + db_session, + account_id=account.id, + user=engineer_with_coverage, + flow_id=flow.id, + ticket_id=str(internal_ticket.id), + ticket_kind='internal', + ) + assert session.acting_as == 'l1_coverage' +``` + +- [ ] **Step 2: Run, verify failure.** + +- [ ] **Step 3: Implement start functions.** + +```python +from typing import Optional +from uuid import UUID + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.l1_walk_session import L1WalkSession +from app.models.user import User + + +def _resolve_acting_as(user: User) -> Optional[str]: + """An engineer (with coverage flag) acting in L1 mode gets tagged for audit.""" + if user.account_role == 'engineer': + return 'l1_coverage' + return None + + +async def start_flow_session( + db: AsyncSession, + *, + account_id: UUID, + user: User, + flow_id: UUID, + ticket_id: str, + ticket_kind: str, # 'psa' | 'internal' +) -> L1WalkSession: + session = L1WalkSession( + account_id=account_id, + created_by_user_id=user.id, + acting_as=_resolve_acting_as(user), + ticket_id=ticket_id, + ticket_kind=ticket_kind, + session_kind='flow', + flow_id=flow_id, + walked_path=[], + walk_notes=[], + ) + db.add(session) + await db.flush() + return session + + +async def start_proposal_session( + db: AsyncSession, + *, + account_id: UUID, + user: User, + flow_proposal_id: UUID, + ticket_id: str, + ticket_kind: str, +) -> L1WalkSession: + session = L1WalkSession( + account_id=account_id, + created_by_user_id=user.id, + acting_as=_resolve_acting_as(user), + ticket_id=ticket_id, + ticket_kind=ticket_kind, + session_kind='proposal', + flow_proposal_id=flow_proposal_id, + walked_path=[], + walk_notes=[], + ) + db.add(session) + await db.flush() + return session + + +async def start_adhoc_session( + db: AsyncSession, + *, + account_id: UUID, + user: User, + ticket_id: str, + ticket_kind: str, +) -> L1WalkSession: + session = L1WalkSession( + account_id=account_id, + created_by_user_id=user.id, + acting_as=_resolve_acting_as(user), + ticket_id=ticket_id, + ticket_kind=ticket_kind, + session_kind='adhoc', + walked_path=[], + walk_notes=[], + ) + db.add(session) + await db.flush() + return session +``` + +- [ ] **Step 4: Run, verify pass.** + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py +git commit -m "feat(l1): session start (flow/proposal/adhoc) with acting_as tagging" +``` + +--- + +## Task 13: `l1_session_service.py` — step + notes + +**Files:** +- Modify: `backend/app/services/l1_session_service.py` +- Modify: `backend/tests/test_l1_session_service.py` + +- [ ] **Step 1: Write failing tests (append to existing file).** + +```python +@pytest.mark.asyncio +async def test_record_step_appends_to_walked_path(db_session, active_flow_session): + updated = await record_step( + db_session, + session_id=active_flow_session.id, + node_id='n1', + question='Has the user signed back in?', + answer='yes', + note=None, + ) + assert len(updated.walked_path) == 1 + assert updated.walked_path[0] == { + 'node_id': 'n1', + 'question': 'Has the user signed back in?', + 'answer': 'yes', + 'l1_note': None, + } + assert updated.current_node_id == 'n1' + + +@pytest.mark.asyncio +async def test_record_step_blocks_on_adhoc_session(db_session, active_adhoc_session): + with pytest.raises(ValueError, match="adhoc"): + await record_step( + db_session, + session_id=active_adhoc_session.id, + node_id='n1', question='x', answer='y', note=None, + ) + + +@pytest.mark.asyncio +async def test_update_notes_replaces_walk_notes(db_session, active_adhoc_session): + new_notes = [{'timestamp': '2026-05-28T10:00:00Z', 'content': 'Customer said outlook crashed'}] + updated = await update_notes(db_session, session_id=active_adhoc_session.id, notes=new_notes) + assert updated.walk_notes == new_notes +``` + +- [ ] **Step 2: Run, verify failure.** + +- [ ] **Step 3: Implement.** + +Append to `l1_session_service.py`: + +```python +from datetime import datetime, timezone + + +async def record_step( + db: AsyncSession, + *, + session_id: UUID, + node_id: str, + question: str, + answer: str, + note: Optional[str], +) -> L1WalkSession: + session = await db.get(L1WalkSession, session_id) + if not session: + raise ValueError(f"L1WalkSession {session_id} not found") + if session.session_kind == 'adhoc': + raise ValueError("Cannot record step on adhoc session — use update_notes instead") + if session.status != 'active': + raise ValueError(f"Session {session_id} is not active (status={session.status})") + entry = { + 'node_id': node_id, + 'question': question, + 'answer': answer, + 'l1_note': note, + } + # JSONB append — assign new list because SQLAlchemy doesn't track in-place mutations + session.walked_path = [*session.walked_path, entry] + session.current_node_id = node_id + session.last_step_at = datetime.now(timezone.utc) + await db.flush() + return session + + +async def update_notes( + db: AsyncSession, + *, + session_id: UUID, + notes: list[dict], +) -> L1WalkSession: + session = await db.get(L1WalkSession, session_id) + if not session: + raise ValueError(f"L1WalkSession {session_id} not found") + # Cap at 256KB (rough check, JSON-encoded size) + import json + encoded_size = len(json.dumps(notes).encode('utf-8')) + if encoded_size > 256 * 1024: + raise ValueError("walk_notes exceeds 256KB cap — consider escalating") + session.walk_notes = notes + session.last_step_at = datetime.now(timezone.utc) + await db.flush() + return session +``` + +- [ ] **Step 4: Run, verify pass.** + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py +git commit -m "feat(l1): record_step + update_notes for walk sessions" +``` + +--- + +## Task 14: `l1_session_service.py` — resolve / escalate / escalate-without-walk + +**Files:** +- Modify: `backend/app/services/l1_session_service.py` +- Modify: `backend/tests/test_l1_session_service.py` + +- [ ] **Step 1: Write failing tests.** + +```python +@pytest.mark.asyncio +async def test_resolve_helpful_proposal_flips_validated_by_outcome( + db_session, active_proposal_session, flow_proposal +): + await resolve( + db_session, + session_id=active_proposal_session.id, + helpful=True, + resolution_notes="Walked the user through restoring profile", + ) + await db_session.refresh(flow_proposal) + assert flow_proposal.validated_by_outcome is True + + +@pytest.mark.asyncio +async def test_resolve_unhelpful_does_not_flip_validation( + db_session, active_proposal_session, flow_proposal +): + await resolve( + db_session, + session_id=active_proposal_session.id, + helpful=False, + resolution_notes="Tree was wrong", + ) + await db_session.refresh(flow_proposal) + assert flow_proposal.validated_by_outcome is False + + +@pytest.mark.asyncio +async def test_resolve_adhoc_session_closes_ticket( + db_session, active_adhoc_session, internal_ticket +): + await resolve( + db_session, + session_id=active_adhoc_session.id, + helpful=True, + resolution_notes="Customer rebooted, fixed", + ) + await db_session.refresh(active_adhoc_session) + assert active_adhoc_session.status == 'resolved' + assert active_adhoc_session.resolved_at is not None + await db_session.refresh(internal_ticket) + assert internal_ticket.status == 'resolved' + + +@pytest.mark.asyncio +async def test_escalate_marks_session_and_ticket( + db_session, active_flow_session, internal_ticket +): + await escalate( + db_session, + session_id=active_flow_session.id, + reason="Customer demanding senior", + reason_category="Customer demanding senior", + ) + await db_session.refresh(active_flow_session) + assert active_flow_session.status == 'escalated' + await db_session.refresh(internal_ticket) + assert internal_ticket.status == 'escalated' + + +@pytest.mark.asyncio +async def test_escalate_without_walk_creates_escalated_session( + db_session, account, l1_user, internal_ticket +): + session = await escalate_without_walk( + db_session, + account_id=account.id, + user=l1_user, + ticket_id=str(internal_ticket.id), + ticket_kind='internal', + reason_category='No KB available', + reason='No knowledge base content yet', + ) + assert session.status == 'escalated' + assert session.session_kind == 'adhoc' # adhoc as placeholder kind + assert session.escalation_reason_category == 'No KB available' +``` + +- [ ] **Step 2: Run, verify failure.** + +- [ ] **Step 3: Implement.** + +Append to `l1_session_service.py`: + +```python +from app.models.flow_proposal import FlowProposal +from app.services import internal_ticket_service +# PSA service import — adjust to actual path +# from app.services.psa.registry import PsaProviderRegistry + + +async def resolve( + db: AsyncSession, + *, + session_id: UUID, + helpful: bool, + resolution_notes: str, +) -> L1WalkSession: + session = await db.get(L1WalkSession, session_id) + if not session: + raise ValueError(f"L1WalkSession {session_id} not found") + if session.status != 'active': + raise ValueError(f"Session not active (status={session.status})") + session.status = 'resolved' + session.helpful = helpful + session.resolution_notes = resolution_notes + session.resolved_at = datetime.now(timezone.utc) + session.last_step_at = session.resolved_at + + # Outcome validation: flip proposal flag on helpful=true + if helpful and session.session_kind == 'proposal' and session.flow_proposal_id: + proposal = await db.get(FlowProposal, session.flow_proposal_id) + if proposal: + proposal.validated_by_outcome = True + + # Close the ticket + if session.ticket_kind == 'internal': + await internal_ticket_service.update_status( + db, ticket_id=UUID(session.ticket_id), + status='resolved', resolution_notes=resolution_notes, + ) + else: + # PSA close is intentionally deferred to Phase 2 (which adds the full escalation_package_generator + # integration alongside the AI build pipeline). For Phase 1, PSA-backed sessions update only the + # local session row on resolve; engineers verify ticket state directly in their PSA UI. + # This is documented in spec §11 "Internal ticket fallback" and the Phase 1 scope section. + pass + + await db.flush() + return session + + +async def escalate( + db: AsyncSession, + *, + session_id: UUID, + reason: str, + reason_category: str, +) -> L1WalkSession: + session = await db.get(L1WalkSession, session_id) + if not session: + raise ValueError(f"L1WalkSession {session_id} not found") + if session.status != 'active': + raise ValueError(f"Session not active (status={session.status})") + session.status = 'escalated' + session.escalation_reason = reason + session.escalation_reason_category = reason_category + session.resolved_at = datetime.now(timezone.utc) + session.last_step_at = session.resolved_at + + # Mark ticket escalated + if session.ticket_kind == 'internal': + await internal_ticket_service.update_status( + db, ticket_id=UUID(session.ticket_id), status='escalated', + ) + else: + # PSA reassign — Phase 1 stub; full integration with escalation_package_generator + # follows in Phase 2 alongside the ai_session creation for engineer pickup. + pass + + await db.flush() + return session + + +async def escalate_without_walk( + db: AsyncSession, + *, + account_id: UUID, + user: User, + ticket_id: str, + ticket_kind: str, + reason_category: str, + reason: Optional[str] = None, +) -> L1WalkSession: + """ + Used from the no-KB / no-flow-picked screen — creates an immediately-escalated session + with no walked_path. Lets escalation reporting still capture the call. + """ + session = L1WalkSession( + account_id=account_id, + created_by_user_id=user.id, + acting_as=_resolve_acting_as(user), + ticket_id=ticket_id, + ticket_kind=ticket_kind, + session_kind='adhoc', + walked_path=[], + walk_notes=[], + status='escalated', + escalation_reason=reason, + escalation_reason_category=reason_category, + resolved_at=datetime.now(timezone.utc), + ) + session.last_step_at = session.resolved_at + db.add(session) + if ticket_kind == 'internal': + await internal_ticket_service.update_status( + db, ticket_id=UUID(ticket_id), status='escalated', + ) + await db.flush() + return session +``` + +- [ ] **Step 4: Run, verify pass.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_l1_session_service.py -v +``` + +- [ ] **Step 5: Commit.** + +```bash +git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py +git commit -m "feat(l1): resolve/escalate/escalate-without-walk for l1 sessions" +``` + +--- + +## Task 15: L1 endpoints (`api/endpoints/l1.py`) + +**Files:** +- Create: `backend/app/schemas/l1.py` +- Create: `backend/app/api/endpoints/l1.py` +- Modify: `backend/app/api/router.py` (register `l1` router) +- Modify: `backend/tests/test_l1_endpoints.py` + +- [ ] **Step 1: Define request/response schemas.** + +`backend/app/schemas/l1.py`: + +```python +from datetime import datetime +from typing import Any, Literal, Optional +from uuid import UUID + +from pydantic import BaseModel + + +class IntakeRequest(BaseModel): + problem_statement: str + customer_name: Optional[str] = None + customer_contact: Optional[str] = None + flow_id: Optional[UUID] = None # if provided, starts flow session; else adhoc + + +class IntakeResponse(BaseModel): + session_id: UUID + session_kind: Literal['flow', 'proposal', 'adhoc'] + ticket_id: str + ticket_kind: Literal['psa', 'internal'] + + +class StepRequest(BaseModel): + node_id: str + question: str + answer: str + note: Optional[str] = None + + +class NotesRequest(BaseModel): + notes: list[dict[str, Any]] + + +class ResolveRequest(BaseModel): + helpful: bool + resolution_notes: str + + +class EscalateRequest(BaseModel): + reason: Optional[str] = None + reason_category: str + + +class EscalateWithoutWalkRequest(BaseModel): + problem_statement: str + customer_name: Optional[str] = None + customer_contact: Optional[str] = None + reason_category: str + reason: Optional[str] = None + + +class WalkSessionResponse(BaseModel): + id: UUID + session_kind: str + flow_id: Optional[UUID] + flow_proposal_id: Optional[UUID] + current_node_id: Optional[str] + walked_path: list[dict[str, Any]] + walk_notes: list[dict[str, Any]] + status: str + started_at: datetime + last_step_at: datetime + resolved_at: Optional[datetime] + + +class QueueRow(BaseModel): + ticket_id: str + ticket_kind: Literal['psa', 'internal'] + problem_statement: Optional[str] + customer_name: Optional[str] + status: str + created_at: Optional[datetime] +``` + +- [ ] **Step 2: Write the endpoints.** + +`backend/app/api/endpoints/l1.py`: + +```python +from typing import Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import ( + get_current_active_user, get_db, require_l1_or_coverage, +) +from app.models.l1_walk_session import L1WalkSession +from app.models.user import User +from app.schemas.l1 import ( + EscalateRequest, EscalateWithoutWalkRequest, IntakeRequest, IntakeResponse, + NotesRequest, QueueRow, ResolveRequest, StepRequest, WalkSessionResponse, +) +from app.services import internal_ticket_service, l1_session_service + + +router = APIRouter(prefix="/l1", tags=["l1"]) + + +def _as_response(session: L1WalkSession) -> WalkSessionResponse: + return WalkSessionResponse( + id=session.id, + session_kind=session.session_kind, + flow_id=session.flow_id, + flow_proposal_id=session.flow_proposal_id, + current_node_id=session.current_node_id, + walked_path=session.walked_path, + walk_notes=session.walk_notes, + status=session.status, + started_at=session.started_at, + last_step_at=session.last_step_at, + resolved_at=session.resolved_at, + ) + + +@router.post("/intake", response_model=IntakeResponse) +async def intake( + payload: IntakeRequest, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + """ + Phase 1 intake: + - Creates an internal ticket (PSA integration deferred to Phase 2/3 escalation polish). + - Starts an L1WalkSession: flow kind if flow_id provided, adhoc otherwise. + + Phase 2 will replace this with match_or_build + suggest/aborted_no_kb outcomes. + """ + # Phase 1: always create internal ticket. PSA support handled in escalate() / escalation_package. + ticket = await internal_ticket_service.create_ticket( + db, + account_id=user.account_id, + created_by_user_id=user.id, + problem_statement=payload.problem_statement, + customer_name=payload.customer_name, + customer_contact=payload.customer_contact, + ) + + if payload.flow_id: + session = await l1_session_service.start_flow_session( + db, + account_id=user.account_id, + user=user, + flow_id=payload.flow_id, + ticket_id=str(ticket.id), + ticket_kind='internal', + ) + else: + session = await l1_session_service.start_adhoc_session( + db, + account_id=user.account_id, + user=user, + ticket_id=str(ticket.id), + ticket_kind='internal', + ) + + await db.commit() + return IntakeResponse( + session_id=session.id, + session_kind=session.session_kind, + ticket_id=str(ticket.id), + ticket_kind='internal', + ) + + +@router.get("/queue", response_model=list[QueueRow]) +async def queue( + status_filter: Optional[str] = None, + limit: int = 50, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + """Phase 1: returns internal tickets only. PSA queue merge in Phase 2.""" + tickets = await internal_ticket_service.list_tickets_for_account( + db, account_id=user.account_id, status=status_filter, limit=limit, + ) + return [ + QueueRow( + ticket_id=str(t.id), + ticket_kind='internal', + problem_statement=t.problem_statement, + customer_name=t.customer_name, + status=t.status, + created_at=t.created_at, + ) + for t in tickets + ] + + +@router.get("/sessions/{session_id}", response_model=WalkSessionResponse) +async def get_session( + session_id: UUID, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + session = await db.get(L1WalkSession, session_id) + if not session or session.account_id != user.account_id: + raise HTTPException(status_code=404) + return _as_response(session) + + +@router.get("/sessions/active", response_model=list[WalkSessionResponse]) +async def list_active_sessions( + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + """List the caller's currently-active sessions for the dashboard 'Resume in progress' widget.""" + stmt = ( + select(L1WalkSession) + .where(L1WalkSession.created_by_user_id == user.id) + .where(L1WalkSession.status == 'active') + .order_by(L1WalkSession.last_step_at.desc()) + .limit(20) + ) + result = await db.execute(stmt) + return [_as_response(s) for s in result.scalars()] + + +@router.post("/sessions/{session_id}/step", response_model=WalkSessionResponse) +async def post_step( + session_id: UUID, + payload: StepRequest, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + session = await db.get(L1WalkSession, session_id) + if not session or session.account_id != user.account_id: + raise HTTPException(status_code=404) + try: + updated = await l1_session_service.record_step( + db, session_id=session_id, + node_id=payload.node_id, question=payload.question, + answer=payload.answer, note=payload.note, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + await db.commit() + return _as_response(updated) + + +@router.post("/sessions/{session_id}/notes", response_model=WalkSessionResponse) +async def post_notes( + session_id: UUID, + payload: NotesRequest, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + session = await db.get(L1WalkSession, session_id) + if not session or session.account_id != user.account_id: + raise HTTPException(status_code=404) + try: + updated = await l1_session_service.update_notes( + db, session_id=session_id, notes=payload.notes, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + await db.commit() + return _as_response(updated) + + +@router.post("/sessions/{session_id}/resolve", response_model=WalkSessionResponse) +async def post_resolve( + session_id: UUID, + payload: ResolveRequest, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + session = await db.get(L1WalkSession, session_id) + if not session or session.account_id != user.account_id: + raise HTTPException(status_code=404) + try: + updated = await l1_session_service.resolve( + db, session_id=session_id, helpful=payload.helpful, + resolution_notes=payload.resolution_notes, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + await db.commit() + return _as_response(updated) + + +@router.post("/sessions/{session_id}/escalate", response_model=WalkSessionResponse) +async def post_escalate( + session_id: UUID, + payload: EscalateRequest, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + session = await db.get(L1WalkSession, session_id) + if not session or session.account_id != user.account_id: + raise HTTPException(status_code=404) + try: + updated = await l1_session_service.escalate( + db, session_id=session_id, + reason=payload.reason or '', + reason_category=payload.reason_category, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + await db.commit() + return _as_response(updated) + + +@router.post("/escalate-without-walk", response_model=WalkSessionResponse) +async def post_escalate_without_walk( + payload: EscalateWithoutWalkRequest, + user: User = Depends(require_l1_or_coverage), + db: AsyncSession = Depends(get_db), +): + ticket = await internal_ticket_service.create_ticket( + db, + account_id=user.account_id, + created_by_user_id=user.id, + problem_statement=payload.problem_statement, + customer_name=payload.customer_name, + customer_contact=payload.customer_contact, + ) + session = await l1_session_service.escalate_without_walk( + db, + account_id=user.account_id, + user=user, + ticket_id=str(ticket.id), + ticket_kind='internal', + reason_category=payload.reason_category, + reason=payload.reason, + ) + await db.commit() + return _as_response(session) +``` + +- [ ] **Step 3: Register the router.** + +In `backend/app/api/router.py`, add: + +```python +from app.api.endpoints import l1 +api_router.include_router(l1.router) +``` + +- [ ] **Step 4: Add E2E endpoint tests.** + +In `backend/tests/test_l1_endpoints.py`, add: + +```python +@pytest.mark.asyncio +async def test_intake_creates_adhoc_session_when_no_flow_id(authed_l1_client): + response = await authed_l1_client.post( + "/api/v1/l1/intake", + json={"problem_statement": "outlook broken", "customer_name": "Alice"}, + ) + assert response.status_code == 200 + body = response.json() + assert body["session_kind"] == "adhoc" + assert body["ticket_kind"] == "internal" + + +@pytest.mark.asyncio +async def test_intake_creates_flow_session_when_flow_id_provided(authed_l1_client, flow): + response = await authed_l1_client.post( + "/api/v1/l1/intake", + json={"problem_statement": "outlook broken", "flow_id": str(flow.id)}, + ) + assert response.status_code == 200 + assert response.json()["session_kind"] == "flow" + + +@pytest.mark.asyncio +async def test_step_appends_to_walked_path(authed_l1_client, active_flow_session): + response = await authed_l1_client.post( + f"/api/v1/l1/sessions/{active_flow_session.id}/step", + json={"node_id": "n1", "question": "q1", "answer": "yes", "note": None}, + ) + assert response.status_code == 200 + assert len(response.json()["walked_path"]) == 1 + + +@pytest.mark.asyncio +async def test_step_blocked_on_adhoc_session(authed_l1_client, active_adhoc_session): + response = await authed_l1_client.post( + f"/api/v1/l1/sessions/{active_adhoc_session.id}/step", + json={"node_id": "n1", "question": "q1", "answer": "yes"}, + ) + assert response.status_code == 400 + + +@pytest.mark.asyncio +async def test_escalate_without_walk_creates_escalated_session(authed_l1_client): + response = await authed_l1_client.post( + "/api/v1/l1/escalate-without-walk", + json={ + "problem_statement": "no kb yet", + "reason_category": "No KB available", + }, + ) + assert response.status_code == 200 + assert response.json()["status"] == "escalated" + + +@pytest.mark.asyncio +async def test_l1_endpoints_block_viewer(authed_viewer_client): + response = await authed_viewer_client.post( + "/api/v1/l1/intake", + json={"problem_statement": "x"}, + ) + assert response.status_code == 403 +``` + +- [ ] **Step 5: Run all L1 endpoint tests.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py -v +``` + +- [ ] **Step 6: Commit.** + +```bash +git add backend/app/schemas/l1.py backend/app/api/endpoints/l1.py backend/app/api/router.py backend/tests/test_l1_endpoints.py +git commit -m "feat(l1): L1 endpoints (intake/queue/sessions/step/notes/resolve/escalate)" +``` + +--- + +## Task 16: APScheduler cleanup job for abandoned sessions + +**Files:** +- Create: `backend/app/services/l1_session_cleanup.py` +- Modify: `backend/app/main.py` — register the job in lifespan +- Test: `backend/tests/test_l1_session_cleanup.py` + +- [ ] **Step 1: Write failing test.** + +```python +import pytest +from datetime import datetime, timedelta, timezone +from app.services.l1_session_cleanup import flip_stale_sessions + + +@pytest.mark.asyncio +async def test_flip_stale_sessions_abandons_old_active_sessions(db_session, account, l1_user): + # Insert one stale active + one fresh active + one already resolved + stale = L1WalkSession( + account_id=account.id, + created_by_user_id=l1_user.id, + ticket_id='x', ticket_kind='internal', session_kind='adhoc', + status='active', + last_step_at=datetime.now(timezone.utc) - timedelta(hours=25), + walked_path=[], walk_notes=[], + ) + fresh = L1WalkSession( + account_id=account.id, + created_by_user_id=l1_user.id, + ticket_id='y', ticket_kind='internal', session_kind='adhoc', + status='active', + last_step_at=datetime.now(timezone.utc) - timedelta(hours=1), + walked_path=[], walk_notes=[], + ) + db_session.add_all([stale, fresh]) + await db_session.flush() + count = await flip_stale_sessions(db_session) + assert count == 1 + await db_session.refresh(stale) + await db_session.refresh(fresh) + assert stale.status == 'abandoned' + assert fresh.status == 'active' +``` + +- [ ] **Step 2: Run, verify failure.** + +- [ ] **Step 3: Implement.** + +`backend/app/services/l1_session_cleanup.py`: + +```python +import logging +from datetime import datetime, timedelta, timezone + +from sqlalchemy import update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.l1_walk_session import L1WalkSession + + +logger = logging.getLogger(__name__) + + +async def flip_stale_sessions(db: AsyncSession) -> int: + """ + Flip active sessions to 'abandoned' if their last_step_at is older than 24 hours. + Returns the number of rows flipped. + """ + cutoff = datetime.now(timezone.utc) - timedelta(hours=24) + stmt = ( + update(L1WalkSession) + .where(L1WalkSession.status == 'active') + .where(L1WalkSession.last_step_at < cutoff) + .values(status='abandoned') + ) + result = await db.execute(stmt) + await db.commit() + return result.rowcount or 0 + + +async def run_cleanup_job(session_factory) -> None: + """Entrypoint for APScheduler — uses _admin_session_factory per Lesson on RLS at startup.""" + async with session_factory() as db: + try: + count = await flip_stale_sessions(db) + if count > 0: + logger.info("l1_session_cleanup: flipped %d sessions to abandoned", count) + except Exception: + logger.exception("l1_session_cleanup: error during run") +``` + +- [ ] **Step 4: Register the job in `main.py` lifespan.** + +In `backend/app/main.py`, find the lifespan / APScheduler setup and add: + +```python +from app.services.l1_session_cleanup import run_cleanup_job + +# Inside the lifespan startup section, alongside other scheduled jobs: +scheduler.add_job( + run_cleanup_job, + 'interval', + hours=1, + max_instances=1, # Lesson 1 + args=[_admin_session_factory], + id='l1_session_cleanup', + replace_existing=True, +) +``` + +- [ ] **Step 5: Run unit test, verify pass.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_l1_session_cleanup.py -v +``` + +- [ ] **Step 6: Verify job registered at startup (manual smoke).** + +Restart backend, grep logs for `l1_session_cleanup` job-registered line. APScheduler logs job registration at startup. + +- [ ] **Step 7: Commit.** + +```bash +git add backend/app/services/l1_session_cleanup.py backend/app/main.py backend/tests/test_l1_session_cleanup.py +git commit -m "feat(l1): APScheduler hourly cleanup job for abandoned sessions" +``` + +--- + +## Task 17: RLS regression tests for new tables + +**Files:** +- Create: `backend/tests/test_l1_rls.py` + +- [ ] **Step 1: Write the tests.** + +```python +import pytest +from sqlalchemy import select +from app.models.internal_ticket import InternalTicket +from app.models.l1_walk_session import L1WalkSession + + +@pytest.mark.asyncio +async def test_l1_cannot_read_other_accounts_internal_tickets( + db_account_a_session, account_b_internal_ticket +): + """RLS must block cross-tenant reads on internal_tickets.""" + stmt = select(InternalTicket).where(InternalTicket.id == account_b_internal_ticket.id) + result = await db_account_a_session.execute(stmt) + assert result.scalar_one_or_none() is None + + +@pytest.mark.asyncio +async def test_l1_cannot_read_other_accounts_walk_sessions( + db_account_a_session, account_b_walk_session +): + stmt = select(L1WalkSession).where(L1WalkSession.id == account_b_walk_session.id) + result = await db_account_a_session.execute(stmt) + assert result.scalar_one_or_none() is None + + +@pytest.mark.asyncio +async def test_with_check_blocks_cross_tenant_insert(db_account_a_session, account_b): + """RLS WITH CHECK must reject inserts setting another account's account_id.""" + bad_row = InternalTicket( + account_id=account_b.id, + created_by_user_id=..., + problem_statement='cross-tenant attempt', + ) + db_account_a_session.add(bad_row) + with pytest.raises(Exception): # InsufficientPrivilegeError or similar + await db_account_a_session.flush() +``` + +(Use fixtures from the existing RLS test suite — `db_account_a_session` already exists per the project's tenant-isolation Phase 4 work. Adapt to actual fixture names.) + +- [ ] **Step 2: Run.** + +```bash +docker exec resolutionflow_backend pytest backend/tests/test_l1_rls.py -v +``` +Expected: 3 PASSED. + +- [ ] **Step 3: Commit.** + +```bash +git add backend/tests/test_l1_rls.py +git commit -m "test(l1): RLS regression tests for internal_tickets + l1_walk_sessions" +``` + +--- + +## Task 18: Frontend — `usePermissions` extensions + role type + +**Files:** +- Modify: `frontend/src/types/auth.ts` (or `frontend/src/types/user.ts` — wherever `User.account_role` is typed) +- Modify: `frontend/src/hooks/usePermissions.ts` + +- [ ] **Step 1: Locate the role union type.** + +```bash +grep -rn "account_role:" frontend/src/types/ +``` + +- [ ] **Step 2: Add `'l1_tech'` to the union.** + +In the relevant file (likely `frontend/src/types/auth.ts`), find the type: + +```typescript +export type AccountRole = 'owner' | 'engineer' | 'viewer' +``` + +Change to: + +```typescript +export type AccountRole = 'owner' | 'engineer' | 'l1_tech' | 'viewer' +``` + +- [ ] **Step 3: Update `usePermissions.ts`.** + +Open [usePermissions.ts](../../frontend/src/hooks/usePermissions.ts). Find the `getEffectiveRole` and `hasMinimumRole` functions and update: + +```typescript +type EffectiveRole = 'super_admin' | 'owner' | 'engineer' | 'l1_tech' | 'viewer' + +function getEffectiveRole(user: User | null): EffectiveRole { + if (!user) return 'viewer' + if (user.is_super_admin) return 'super_admin' + if (user.account_role === 'owner') return 'owner' + if (user.account_role === 'engineer') return 'engineer' + if (user.account_role === 'l1_tech') return 'l1_tech' + return 'viewer' +} + +const ROLE_RANK: Record = { + super_admin: 5, + owner: 4, + engineer: 3, + l1_tech: 2, + viewer: 1, +} + +function hasMinimumRole(user: User | null, minimum: EffectiveRole): boolean { + const effective = getEffectiveRole(user) + return ROLE_RANK[effective] >= ROLE_RANK[minimum] +} +``` + +And in the main return object of `usePermissions()`, add: + +```typescript +return { + // ... existing returned values ... + isL1Tech: effectiveRole === 'l1_tech', + canCoverL1: Boolean(user?.can_cover_l1) || effectiveRole === 'owner' || effectiveRole === 'super_admin', + canUseL1Surface: effectiveRole === 'l1_tech' || effectiveRole === 'owner' || effectiveRole === 'super_admin' || (user?.account_role === 'engineer' && Boolean(user?.can_cover_l1)), + canUseEngineerSurface: hasMinimumRole(user, 'engineer'), +} +``` + +- [ ] **Step 4: Update User type to include `can_cover_l1`.** + +In the User type definition: + +```typescript +export interface User { + id: string + email: string + // ... existing fields ... + account_role: AccountRole + is_super_admin: boolean + can_cover_l1: boolean // NEW + // ... rest ... +} +``` + +- [ ] **Step 5: Run frontend type check.** + +```bash +docker exec -w /app resolutionflow_frontend npx tsc -b +``` +Expected: no new type errors (existing usages of `account_role` may need updates — fix any compilation errors caused by the union widening). + +- [ ] **Step 6: Commit.** + +```bash +git add frontend/src/types/auth.ts frontend/src/hooks/usePermissions.ts +git commit -m "feat(l1): usePermissions extensions for l1_tech + coverage flag" +``` + +--- + +## Task 19: Frontend — Sidebar role-based nav + ProtectedRoute redirect + +**Files:** +- Modify: `frontend/src/components/layout/Sidebar.tsx` +- Modify: `frontend/src/components/layout/ProtectedRoute.tsx` + +- [ ] **Step 1: Update `Sidebar.tsx` to render role-based nav.** + +Find the nav array construction in [Sidebar.tsx](../../frontend/src/components/layout/Sidebar.tsx). Wrap it in a helper that picks per role: + +```typescript +import { usePermissions } from '@/hooks/usePermissions' +import { LayoutGrid, Ticket, FileText, BookOpen, Settings } from 'lucide-react' + +function getNavItems(perms: ReturnType) { + // L1 tech sees only L1-relevant entries + if (perms.isL1Tech) { + return [ + { href: '/l1', icon: LayoutGrid, label: 'Workspace', shortLabel: 'Work' }, + { href: '/l1/tickets', icon: Ticket, label: 'Tickets', shortLabel: 'Tickets' }, + { href: '/l1/drafts', icon: FileText, label: 'My Drafts', shortLabel: 'Drafts' }, + { href: '/guides', icon: BookOpen, label: 'Guides', shortLabel: 'Guides' }, + { href: '/account', icon: Settings, label: 'Account', shortLabel: 'Acct' }, + ] + } + + // Existing engineer/owner nav (kept as-is) + const items = [/* ... existing items ... */] + + // Append L1 Workspace entry for coverage engineers + owners + if (perms.canCoverL1) { + items.push({ + href: '/l1', icon: LayoutGrid, label: 'L1 Workspace', shortLabel: 'L1', + }) + } + return items +} + +// Inside the Sidebar component: +const perms = usePermissions() +const navItems = getNavItems(perms) +``` + +- [ ] **Step 2: Add L1 post-login redirect to `ProtectedRoute.tsx`.** + +In [ProtectedRoute.tsx](../../frontend/src/components/layout/ProtectedRoute.tsx), find the existing auth-state handling and add: + +```typescript +// After authentication checks pass, before rendering children: +if (user?.account_role === 'l1_tech' && location.pathname === '/') { + return +} +``` + +- [ ] **Step 3: Type-check.** + +```bash +docker exec -w /app resolutionflow_frontend npx tsc -b +``` + +- [ ] **Step 4: Commit.** + +```bash +git add frontend/src/components/layout/Sidebar.tsx frontend/src/components/layout/ProtectedRoute.tsx +git commit -m "feat(l1): role-based sidebar nav + L1 post-login redirect" +``` + +--- + +## Task 20: Frontend — router updates + `L1RouteGuard` + +**Files:** +- Create: `frontend/src/components/layout/L1RouteGuard.tsx` +- Modify: `frontend/src/router.tsx` + +- [ ] **Step 1: Create the guard component.** + +```tsx +import { Navigate } from 'react-router' +import { usePermissions } from '@/hooks/usePermissions' + +export function L1RouteGuard({ children }: { children: React.ReactNode }) { + const perms = usePermissions() + if (!perms.canUseL1Surface) { + return + } + return <>{children} +} +``` + +- [ ] **Step 2: Register routes.** + +In `frontend/src/router.tsx`, near the other `lazyWithRetry` declarations: + +```typescript +const L1Dashboard = lazyWithRetry(() => import('@/pages/l1/L1Dashboard')) +const L1WalkPage = lazyWithRetry(() => import('@/pages/l1/L1WalkPage')) +const L1DraftsPage = lazyWithRetry(() => import('@/pages/l1/L1DraftsPage')) +const L1TicketsPage = lazyWithRetry(() => import('@/pages/l1/L1TicketsPage')) +``` + +And inside the `/` ProtectedRoute children array, alongside existing routes: + +```typescript +{ path: 'l1', element: }, +{ path: 'l1/walk/:sessionId', element: }, +{ path: 'l1/drafts', element: }, +{ path: 'l1/tickets', element: }, +``` + +The `lazyWithRetry`-wrapped components are React.lazy under the hood, so they need a Suspense boundary already provided by the existing `page()` helper. Adapt to whichever pattern the existing routes use (likely `element: page(L1Dashboard)` wrapped manually, or pass through the guard). + +Adjust based on actual router pattern: + +```typescript +{ path: 'l1', element: {page(L1Dashboard)} }, +``` + +- [ ] **Step 3: Type-check.** + +```bash +docker exec -w /app resolutionflow_frontend npx tsc -b +``` +(Will fail until the actual page components exist in Task 21+. That's OK — placeholder check that imports resolve.) + +- [ ] **Step 4: Commit (pages stubbed in next task).** + +```bash +git add frontend/src/components/layout/L1RouteGuard.tsx frontend/src/router.tsx +git commit -m "feat(l1): register /l1/* routes + L1RouteGuard" +``` + +--- + +## Task 21: Frontend — `L1Dashboard` (active + empty state) + +**Files:** +- Create: `frontend/src/pages/l1/L1Dashboard.tsx` +- Create: `frontend/src/components/l1/EmptyStateCard.tsx` +- Create: `frontend/src/components/l1/ResumeInProgress.tsx` +- Create: `frontend/src/api/l1.ts` +- Create: `frontend/src/types/l1.ts` + +- [ ] **Step 1: Create types.** + +`frontend/src/types/l1.ts`: + +```typescript +export type SessionKind = 'flow' | 'proposal' | 'adhoc' +export type SessionStatus = 'active' | 'resolved' | 'escalated' | 'abandoned' +export type TicketKind = 'psa' | 'internal' + +export interface WalkSession { + id: string + session_kind: SessionKind + flow_id: string | null + flow_proposal_id: string | null + current_node_id: string | null + walked_path: WalkStep[] + walk_notes: AdhocNote[] + status: SessionStatus + started_at: string + last_step_at: string + resolved_at: string | null +} + +export interface WalkStep { + node_id: string + question: string + answer: string + l1_note: string | null +} + +export interface AdhocNote { + timestamp: string + content: string +} + +export interface QueueRow { + ticket_id: string + ticket_kind: TicketKind + problem_statement: string | null + customer_name: string | null + status: string + created_at: string | null +} + +export interface IntakeRequest { + problem_statement: string + customer_name?: string + customer_contact?: string + flow_id?: string +} + +export interface IntakeResponse { + session_id: string + session_kind: SessionKind + ticket_id: string + ticket_kind: TicketKind +} +``` + +- [ ] **Step 2: Create API client.** + +`frontend/src/api/l1.ts`: + +```typescript +import { apiClient } from './client' +import type { + IntakeRequest, IntakeResponse, QueueRow, WalkSession, + WalkStep, AdhocNote, +} from '@/types/l1' + +export const l1Api = { + intake: (body: IntakeRequest) => + apiClient.post('/api/v1/l1/intake', body).then(r => r.data), + + queue: (statusFilter?: string) => + apiClient.get('/api/v1/l1/queue', { + params: statusFilter ? { status_filter: statusFilter } : {}, + }).then(r => r.data), + + getSession: (sessionId: string) => + apiClient.get(`/api/v1/l1/sessions/${sessionId}`).then(r => r.data), + + listActiveSessions: () => + apiClient.get('/api/v1/l1/sessions/active').then(r => r.data), + + step: (sessionId: string, step: { node_id: string; question: string; answer: string; note?: string | null }) => + apiClient.post(`/api/v1/l1/sessions/${sessionId}/step`, step).then(r => r.data), + + notes: (sessionId: string, notes: AdhocNote[]) => + apiClient.post(`/api/v1/l1/sessions/${sessionId}/notes`, { notes }).then(r => r.data), + + resolve: (sessionId: string, body: { helpful: boolean; resolution_notes: string }) => + apiClient.post(`/api/v1/l1/sessions/${sessionId}/resolve`, body).then(r => r.data), + + escalate: (sessionId: string, body: { reason: string; reason_category: string }) => + apiClient.post(`/api/v1/l1/sessions/${sessionId}/escalate`, body).then(r => r.data), + + escalateWithoutWalk: (body: { + problem_statement: string; customer_name?: string; customer_contact?: string; + reason_category: string; reason?: string; + }) => + apiClient.post('/api/v1/l1/escalate-without-walk', body).then(r => r.data), +} +``` + +- [ ] **Step 3: Create EmptyStateCard.** + +`frontend/src/components/l1/EmptyStateCard.tsx`: + +```tsx +import { usePermissions } from '@/hooks/usePermissions' + +interface Props { + onUploadClick?: () => void + onConfigureConnectorClick?: () => void +} + +export function EmptyStateCard({ onUploadClick, onConfigureConnectorClick }: Props) { + const perms = usePermissions() + const isOwnerOrCoverage = perms.canCoverL1 // (owner/super_admin always have it) + + return ( +
+

+ Your knowledge base is empty +

+

+ L1 Workspace works best when your account has KB content or authored flows. + Right now there's nothing to match against — calls will start as ad-hoc walks. +

+ {isOwnerOrCoverage ? ( +
+ {onUploadClick && ( + + )} + {onConfigureConnectorClick && ( + + )} +
+ ) : ( +
    +
  • Ask your admin to upload KB documents
  • +
  • Or configure a KB connector under Account → KB
  • +
  • Or author a flow in the Flows library
  • +
+ )} +
+ ) +} +``` + +- [ ] **Step 4: Create ResumeInProgress.** + +`frontend/src/components/l1/ResumeInProgress.tsx`: + +```tsx +import { useEffect, useState } from 'react' +import { Link } from 'react-router' +import { l1Api } from '@/api/l1' +import type { WalkSession } from '@/types/l1' + +export function ResumeInProgress() { + const [sessions, setSessions] = useState(null) + + useEffect(() => { + l1Api.listActiveSessions().then(setSessions).catch(() => setSessions([])) + }, []) + + if (!sessions || sessions.length === 0) return null + + return ( +
+
+ + Resume in progress · {sessions.length} + +
+
+
+ {sessions.map((s) => ( + +
+ #{s.id.slice(0, 8)} + + {s.session_kind === 'adhoc' + ? `Adhoc · ${s.walk_notes.length} notes` + : `Step ${s.walked_path.length}`} + +
+ + {new Date(s.last_step_at).toLocaleTimeString()} + + + ))} +
+
+ ) +} +``` + +- [ ] **Step 5: Create the Dashboard page.** + +`frontend/src/pages/l1/L1Dashboard.tsx`: + +```tsx +import { useEffect, useState } from 'react' +import { useNavigate } from 'react-router' +import { PageMeta } from '@/components/common/PageMeta' +import { useAuthStore } from '@/store/authStore' +import { l1Api } from '@/api/l1' +import { EmptyStateCard } from '@/components/l1/EmptyStateCard' +import { ResumeInProgress } from '@/components/l1/ResumeInProgress' +import type { QueueRow } from '@/types/l1' +// Existing helper to fetch flows + KB doc count: +import { dashboardApi } from '@/api/dashboard' // adjust import to actual path + +export default function L1Dashboard() { + const user = useAuthStore((s) => s.user) + const navigate = useNavigate() + const [problem, setProblem] = useState('') + const [customerName, setCustomerName] = useState('') + const [customerContact, setCustomerContact] = useState('') + const [submitting, setSubmitting] = useState(false) + const [queue, setQueue] = useState([]) + const [isEmpty, setIsEmpty] = useState(null) + + useEffect(() => { + l1Api.queue('open').then(setQueue).catch(() => setQueue([])) + // Detect empty state via the dashboard stats endpoint (existing). + // Adjust to actual endpoint that returns flow + KB counts. + dashboardApi.getStats().then(stats => { + setIsEmpty((stats?.flow_count ?? 0) === 0 && (stats?.kb_doc_count ?? 0) === 0) + }).catch(() => setIsEmpty(false)) + }, []) + + const handleStart = async () => { + if (!problem.trim()) return + setSubmitting(true) + try { + const response = await l1Api.intake({ + problem_statement: problem.trim(), + customer_name: customerName.trim() || undefined, + customer_contact: customerContact.trim() || undefined, + }) + navigate(`/l1/walk/${response.session_id}`) + } finally { + setSubmitting(false) + } + } + + const now = new Date() + const greeting = now.getHours() < 12 ? 'morning' : now.getHours() < 18 ? 'afternoon' : 'evening' + const firstName = user?.name?.split(' ')[0] || 'there' + + return ( +
+ +
+ {/* Hero greeting */} +
+

+ {now.toLocaleDateString('en-US', { weekday: 'long', month: 'long', day: 'numeric' })} +

+

+ Good {greeting}, {firstName}. +

+
+ + {/* Empty state (first-run) */} + {isEmpty && } + + {/* Describe the problem */} +
+
+ + + Describe the problem + +
+
+