Files
resolutionflow/docs/superpowers/plans/2026-05-28-l1-workspace-phase-1.md
Michael Chihlas d40cb834b1 docs(plan): L1 workspace Phase 1 implementation plan
26 bite-sized TDD tasks covering: l1_tech role + perms, seat enforcement
(L1 + engineer together), 5 migrations (role/columns, FlowProposal,
internal_tickets, l1_walk_sessions), seat_enforcement/internal_ticket/
l1_session services, full L1 endpoint surface (intake/queue/step/notes/
resolve/escalate/escalate-without-walk), APScheduler cleanup for 24h
abandoned sessions, frontend usePermissions/Sidebar/router updates,
L1Dashboard (active + empty state + resume widget), L1WalkPage with tree
and adhoc variants, coverage banner, seat counter widget, RLS regression
tests, E2E Playwright suite, acceptance walkthrough.

Phase 2 (AI build + KB documents) and Phase 3 (KB connectors) get
their own plan files. Phase 1 ships with adhoc walks as the default
intake; user-facing flow selection ships in Phase 2 alongside the AI
matcher. PSA close/reassign is a Phase 1 stub (deferred to Phase 2).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 11:58:41 -04:00

138 KiB
Raw Blame History

L1 Workspace — Phase 1 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Ship the L1 helpdesk workspace — new role, dedicated /l1/* surface, walker (tree + adhoc variants), session lifecycle, escalation, internal-ticket fallback, coverage flag, seat enforcement — working end-to-end against existing authored flows.

Architecture: New l1_tech role between engineer and viewer. Two new tables (l1_walk_sessions, internal_tickets) and small column additions to flow_proposals, users, accounts, subscriptions, audit_logs. New services/l1_session_service.py orchestrates walking-session lifecycle. New services/seat_enforcement.py shared by invite/role-change paths for both L1 and engineer seats. Frontend gets a role-gated /l1/* page tree. AI tree-builder and KB connectors are explicitly out of Phase 1 (Phases 2 and 3).

Tech Stack: Python 3.12 + FastAPI + SQLAlchemy 2.0 async + Alembic + Pydantic v2 + APScheduler (backend). React 19 + Vite + TypeScript + Tailwind v4 + Zustand + React Router v7 (frontend). PostgreSQL 16 with RLS.

Spec: docs/superpowers/specs/2026-05-28-l1-workspace-design.md — read in full before starting.

Out of scope for Phase 1 (deferred to Phase 2/3): match_or_build orchestrator, AI tree-builder, kb_documents tables, KB connectors (IT Glue / Hudu / Microsoft Graph), SUGGEST_THRESHOLD near-miss UX, BuildAbortedNoKB screen with near-miss option. User-facing flow selection is also deferred — Phase 1 intake creates adhoc walks only; the walker's flow variant exists in code and works when flow_id is passed to the intake endpoint (used by tests + direct API), but the UI surface for L1s to manually pick a flow ships in Phase 2 alongside the AI matcher. FlowProposal column extensions ARE included in Phase 1 (model is ready; Phase 2 populates).


File Structure

Backend — new files:

  • backend/app/models/l1_walk_session.py — SQLAlchemy L1WalkSession
  • backend/app/models/internal_ticket.py — SQLAlchemy InternalTicket
  • backend/app/schemas/l1.py — Pydantic request/response shapes
  • backend/app/schemas/seat_enforcement.pySeatCheckResult, SeatUsage shapes
  • backend/app/services/seat_enforcement.pycheck_seat_available shared helper
  • backend/app/services/internal_ticket_service.py — CRUD + status transitions
  • backend/app/services/l1_session_service.py — session lifecycle (start/step/notes/resolve/escalate)
  • backend/app/services/l1_session_cleanup.py — APScheduler hourly job, 24h abandonment
  • backend/app/api/endpoints/l1.py — all /l1/* endpoints
  • backend/alembic/versions/<hash>_add_l1_tech_role.py — role + column additions
  • backend/alembic/versions/<hash>_extend_flow_proposals.py — FlowProposal columns
  • backend/alembic/versions/<hash>_create_internal_tickets.py — table + RLS
  • backend/alembic/versions/<hash>_create_l1_walk_sessions.py — table + RLS + check constraint
  • backend/tests/test_seat_enforcement.py
  • backend/tests/test_internal_ticket_service.py
  • backend/tests/test_l1_session_service.py
  • backend/tests/test_l1_endpoints.py
  • backend/tests/test_l1_rls.py

Backend — modified files:

  • backend/app/models/flow_proposal.py — add new columns
  • backend/app/models/user.py — add can_cover_l1
  • backend/app/models/account.py — add l1_seats_purchased
  • backend/app/models/subscription.py — add l1_seat_limit
  • backend/app/models/audit_log.py — add acting_as
  • backend/app/core/permissions.py — add l1_tech to role docstring + helpers
  • backend/app/api/deps.py — add require_l1, require_l1_or_coverage, require_l1_or_above
  • backend/app/api/router.py — register l1 router + internal-tickets router
  • backend/app/api/endpoints/invite.py — integrate seat enforcement
  • backend/app/api/endpoints/accounts.py — seat-usage endpoint + coverage PATCH + role-change check
  • backend/app/main.py — register cleanup scheduler in lifespan

Frontend — new files:

  • frontend/src/pages/l1/L1Dashboard.tsx
  • frontend/src/pages/l1/L1WalkPage.tsx
  • frontend/src/pages/l1/L1DraftsPage.tsx
  • frontend/src/pages/l1/L1TicketsPage.tsx
  • frontend/src/components/l1/L1WalkTreeVariant.tsx
  • frontend/src/components/l1/L1WalkAdhocVariant.tsx
  • frontend/src/components/l1/L1CoverageBanner.tsx
  • frontend/src/components/l1/EmptyStateCard.tsx
  • frontend/src/components/l1/ResumeInProgress.tsx
  • frontend/src/components/admin/SeatCounterWidget.tsx
  • frontend/src/components/layout/L1RouteGuard.tsx
  • frontend/src/api/l1.ts
  • frontend/src/types/l1.ts

Frontend — modified files:

  • frontend/src/hooks/usePermissions.ts — add isL1Tech, canCoverL1, role-tier check
  • frontend/src/components/layout/Sidebar.tsx — role-based nav array
  • frontend/src/components/layout/ProtectedRoute.tsx — L1 post-login redirect
  • frontend/src/router.tsx — register /l1/* routes
  • frontend/src/types/auth.ts (or wherever User.account_role lives) — add 'l1_tech' to union
  • frontend/src/api/index.ts — export l1 API
  • frontend/src/types/index.ts — export l1 types

Task 1: Backend — extend role docstring + permission helpers

Files:

  • Modify: backend/app/core/permissions.py (header docstring + any role-list constants)

  • Step 1: Open file and locate the role docstring (around lines 510).

Read permissions.py to confirm current shape.

  • Step 2: Add l1_tech to the role docstring.

Replace the existing role list block:

"""
Permissions module.

Role hierarchy:
- super_admin: is_super_admin=True, full system access
- owner:       account_role='owner', manage account resources
- engineer:    account_role='engineer' (default), CRUD own trees/steps
- l1_tech:     account_role='l1_tech', use /l1/* surface only — walk flows, resolve/escalate
- viewer:      account_role='viewer', read-only (can browse, run sessions, rate steps)
"""
  • Step 3: If there's a VALID_ROLES constant or similar enum/list, add 'l1_tech'.

Grep first:

grep -n "engineer.*viewer\|VALID_ROLES\|ROLE_HIERARCHY" backend/app/core/permissions.py

If a list/tuple exists, insert 'l1_tech' between 'engineer' and 'viewer'. If not, no change.

  • Step 4: No tests for this docstring-only change. Move to commit.

  • Step 5: Commit.

git add backend/app/core/permissions.py
git commit -m "feat(l1): add l1_tech role to permissions docstring"

Task 2: Backend — add require_l1, require_l1_or_coverage, require_l1_or_above deps

Files:

  • Modify: backend/app/api/deps.py (add new dep functions after require_engineer_or_admin)

  • Test: backend/tests/test_deps_l1.py (new)

  • Step 1: Write the failing tests.

Create backend/tests/test_deps_l1.py:

import pytest
from fastapi import HTTPException
from app.api.deps import require_l1, require_l1_or_coverage, require_l1_or_above
from tests.factories import make_user  # existing test factory


@pytest.mark.asyncio
async def test_require_l1_passes_for_l1_tech():
    user = make_user(account_role='l1_tech')
    result = await require_l1(current_user=user)
    assert result is user


@pytest.mark.asyncio
async def test_require_l1_blocks_engineer():
    user = make_user(account_role='engineer')
    with pytest.raises(HTTPException) as exc:
        await require_l1(current_user=user)
    assert exc.value.status_code == 403


@pytest.mark.asyncio
async def test_require_l1_or_coverage_passes_engineer_with_flag():
    user = make_user(account_role='engineer', can_cover_l1=True)
    result = await require_l1_or_coverage(current_user=user)
    assert result is user


@pytest.mark.asyncio
async def test_require_l1_or_coverage_blocks_engineer_without_flag():
    user = make_user(account_role='engineer', can_cover_l1=False)
    with pytest.raises(HTTPException) as exc:
        await require_l1_or_coverage(current_user=user)
    assert exc.value.status_code == 403


@pytest.mark.asyncio
async def test_require_l1_or_coverage_passes_owner_always():
    user = make_user(account_role='owner', can_cover_l1=False)
    result = await require_l1_or_coverage(current_user=user)
    assert result is user


@pytest.mark.asyncio
async def test_require_l1_or_above_passes_engineer():
    user = make_user(account_role='engineer')
    result = await require_l1_or_above(current_user=user)
    assert result is user


@pytest.mark.asyncio
async def test_require_l1_or_above_blocks_viewer():
    user = make_user(account_role='viewer')
    with pytest.raises(HTTPException) as exc:
        await require_l1_or_above(current_user=user)
    assert exc.value.status_code == 403
  • Step 2: Run tests to verify they fail.
docker exec resolutionflow_backend pytest backend/tests/test_deps_l1.py -v

Expected: ImportError (require_l1 etc. don't exist yet).

  • Step 3: Add the new deps in backend/app/api/deps.py.

After the existing require_engineer_or_admin definition, add:

async def require_l1(
    current_user: User = Depends(get_current_active_user),
) -> User:
    """L1 tech only (exact match). Used by endpoints exclusive to L1 self-service."""
    if current_user.is_super_admin:
        return current_user  # super_admin bypass for support purposes
    if current_user.account_role != "l1_tech":
        raise HTTPException(status_code=403, detail="L1 tech role required")
    return current_user


async def require_l1_or_coverage(
    current_user: User = Depends(get_current_active_user),
) -> User:
    """
    L1 endpoints accessible to: l1_tech, engineers with can_cover_l1, owners, super_admin.
    The "coverage" tier — engineers covering a frontline shift.
    """
    if current_user.is_super_admin:
        return current_user
    role = current_user.account_role
    if role == "l1_tech":
        return current_user
    if role == "owner":
        return current_user
    if role == "engineer" and current_user.can_cover_l1:
        return current_user
    raise HTTPException(
        status_code=403,
        detail="L1 access requires l1_tech role or engineer coverage flag",
    )


async def require_l1_or_above(
    current_user: User = Depends(get_current_active_user),
) -> User:
    """
    Anyone at L1 tier or higher. Used for shared resources L1s can see
    (e.g., flow library, KB connector list view).
    """
    if current_user.is_super_admin:
        return current_user
    if current_user.account_role in ("l1_tech", "engineer", "owner"):
        return current_user
    raise HTTPException(status_code=403, detail="L1 or above required")

Also ensure User model is imported at the top of deps.py (likely already imported).

  • Step 4: Run tests to verify they pass.
docker exec resolutionflow_backend pytest backend/tests/test_deps_l1.py -v

Expected: 7 PASSED.

  • Step 5: Commit.
git add backend/app/api/deps.py backend/tests/test_deps_l1.py
git commit -m "feat(l1): add require_l1, require_l1_or_coverage, require_l1_or_above deps"

Task 3: Migration — column additions (can_cover_l1, l1_seats_purchased, audit_logs.acting_as, l1_seat_limit)

Files:

  • Create: backend/alembic/versions/<hash>_add_l1_columns.py

  • Modify: backend/app/models/user.py (add can_cover_l1)

  • Modify: backend/app/models/account.py (add l1_seats_purchased)

  • Modify: backend/app/models/subscription.py (add l1_seat_limit)

  • Modify: backend/app/models/audit_log.py (add acting_as)

  • Step 1: Generate the manual migration (no autogenerate).

docker exec -w /app resolutionflow_backend alembic revision -m "add_l1_columns"

This creates a file backend/alembic/versions/<hash>_add_l1_columns.py. Note the hash from the output.

  • Step 2: Write the migration content.

Open the generated file and replace the upgrade() / downgrade() bodies:

def upgrade() -> None:
    op.add_column(
        'users',
        sa.Column('can_cover_l1', sa.Boolean(), nullable=False, server_default='false'),
    )
    op.add_column(
        'accounts',
        sa.Column('l1_seats_purchased', sa.Integer(), nullable=False, server_default='0'),
    )
    op.add_column(
        'subscriptions',
        sa.Column('l1_seat_limit', sa.Integer(), nullable=True),
    )
    op.add_column(
        'audit_logs',
        sa.Column('acting_as', sa.String(30), nullable=True),
    )


def downgrade() -> None:
    op.drop_column('audit_logs', 'acting_as')
    op.drop_column('subscriptions', 'l1_seat_limit')
    op.drop_column('accounts', 'l1_seats_purchased')
    op.drop_column('users', 'can_cover_l1')
  • Step 3: Add the matching columns to the SQLAlchemy models.

In backend/app/models/user.py, add inside the User class column block:

can_cover_l1: Mapped[bool] = mapped_column(
    sa.Boolean(), nullable=False, server_default=sa.text('false')
)

In backend/app/models/account.py:

l1_seats_purchased: Mapped[int] = mapped_column(
    sa.Integer(), nullable=False, server_default=sa.text('0')
)

In backend/app/models/subscription.py:

l1_seat_limit: Mapped[Optional[int]] = mapped_column(sa.Integer(), nullable=True)

In backend/app/models/audit_log.py:

acting_as: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True)
  • Step 4: Run migration to verify it applies.
docker exec -w /app resolutionflow_backend alembic upgrade head

Expected: applies successfully, no errors.

  • Step 5: Verify schema via psql.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d users" | grep can_cover_l1
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d accounts" | grep l1_seats_purchased
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d subscriptions" | grep l1_seat_limit
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d audit_logs" | grep acting_as

Expected: each grep returns the new column row.

  • Step 6: Verify downgrade works (then upgrade again).
docker exec -w /app resolutionflow_backend alembic downgrade -1
docker exec -w /app resolutionflow_backend alembic upgrade head
  • Step 7: Commit.
git add backend/alembic/versions/<hash>_add_l1_columns.py backend/app/models/user.py backend/app/models/account.py backend/app/models/subscription.py backend/app/models/audit_log.py
git commit -m "feat(l1): add column additions migration (can_cover_l1, l1_seats_purchased, l1_seat_limit, acting_as)"

Task 4: Migration — extend flow_proposals with L1 source columns

Files:

  • Create: backend/alembic/versions/<hash>_extend_flow_proposals.py
  • Modify: backend/app/models/flow_proposal.py

Spec §5.1: add source, linked_ticket_id, linked_ticket_kind, validated_by_outcome to flow_proposals. (walked_path_snapshot is NOT added — it lives on l1_walk_sessions per the revised design.)

  • Step 1: Generate migration.
docker exec -w /app resolutionflow_backend alembic revision -m "extend_flow_proposals_l1"
  • Step 2: Write migration content.
def upgrade() -> None:
    op.add_column(
        'flow_proposals',
        sa.Column('source', sa.String(30), nullable=True),
    )
    op.add_column(
        'flow_proposals',
        sa.Column('linked_ticket_id', sa.String(64), nullable=True),
    )
    op.add_column(
        'flow_proposals',
        sa.Column('linked_ticket_kind', sa.String(10), nullable=True),
    )
    op.add_column(
        'flow_proposals',
        sa.Column('validated_by_outcome', sa.Boolean(), nullable=False, server_default='false'),
    )

    # Backfill existing rows
    op.execute("UPDATE flow_proposals SET source = 'manual_draft' WHERE source IS NULL")

    # Now enforce NOT NULL on source
    op.alter_column('flow_proposals', 'source', nullable=False)

    # CHECK constraint on source values
    op.create_check_constraint(
        'ck_flow_proposals_source',
        'flow_proposals',
        "source IN ('ai_realtime_l1', 'kb_accelerator', 'manual_draft', 'ai_promoted')",
    )

    # CHECK constraint on linked_ticket_kind values
    op.create_check_constraint(
        'ck_flow_proposals_linked_ticket_kind',
        'flow_proposals',
        "linked_ticket_kind IS NULL OR linked_ticket_kind IN ('psa', 'internal')",
    )


def downgrade() -> None:
    op.drop_constraint('ck_flow_proposals_linked_ticket_kind', 'flow_proposals', type_='check')
    op.drop_constraint('ck_flow_proposals_source', 'flow_proposals', type_='check')
    op.drop_column('flow_proposals', 'validated_by_outcome')
    op.drop_column('flow_proposals', 'linked_ticket_kind')
    op.drop_column('flow_proposals', 'linked_ticket_id')
    op.drop_column('flow_proposals', 'source')
  • Step 3: Update FlowProposal model.

In backend/app/models/flow_proposal.py, add inside the class:

source: Mapped[str] = mapped_column(sa.String(30), nullable=False, server_default=sa.text("'manual_draft'"))
linked_ticket_id: Mapped[Optional[str]] = mapped_column(sa.String(64), nullable=True)
linked_ticket_kind: Mapped[Optional[str]] = mapped_column(sa.String(10), nullable=True)
validated_by_outcome: Mapped[bool] = mapped_column(
    sa.Boolean(), nullable=False, server_default=sa.text('false')
)

Also update __table_args__ to include the new check constraints:

__table_args__ = (
    # ... existing constraints ...
    CheckConstraint(
        "source IN ('ai_realtime_l1', 'kb_accelerator', 'manual_draft', 'ai_promoted')",
        name="ck_flow_proposals_source",
    ),
    CheckConstraint(
        "linked_ticket_kind IS NULL OR linked_ticket_kind IN ('psa', 'internal')",
        name="ck_flow_proposals_linked_ticket_kind",
    ),
)
  • Step 4: Apply migration.
docker exec -w /app resolutionflow_backend alembic upgrade head
  • Step 5: Verify with psql.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d flow_proposals"

Expected: shows new columns + check constraints.

  • Step 6: Commit.
git add backend/alembic/versions/<hash>_extend_flow_proposals.py backend/app/models/flow_proposal.py
git commit -m "feat(l1): extend FlowProposal with source/linked_ticket/validated_by_outcome"

Task 5: Migration + model — create internal_tickets table

Files:

  • Create: backend/alembic/versions/<hash>_create_internal_tickets.py

  • Create: backend/app/models/internal_ticket.py

  • Step 1: Generate migration.

docker exec -w /app resolutionflow_backend alembic revision -m "create_internal_tickets"
  • Step 2: Write migration content.
def upgrade() -> None:
    op.create_table(
        'internal_tickets',
        sa.Column('id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('account_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('created_by_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('customer_name', sa.String(120), nullable=True),
        sa.Column('customer_contact', sa.String(200), nullable=True),
        sa.Column('problem_statement', sa.Text(), nullable=False),
        sa.Column('status', sa.String(30), nullable=False, server_default='open'),
        sa.Column('flow_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
        sa.Column('flow_proposal_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
        sa.Column('ai_session_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
        sa.Column('assigned_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
        sa.Column('resolution_notes', sa.Text(), nullable=True),
        sa.Column('psa_promoted_ticket_id', sa.String(64), nullable=True),
        sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
        sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
        sa.Column('resolved_at', sa.DateTime(timezone=True), nullable=True),
        sa.PrimaryKeyConstraint('id'),
        sa.ForeignKeyConstraint(['account_id'], ['accounts.id'], ondelete='CASCADE'),
        sa.ForeignKeyConstraint(['created_by_user_id'], ['users.id'], ondelete='RESTRICT'),
        sa.ForeignKeyConstraint(['flow_id'], ['trees.id'], ondelete='SET NULL'),
        sa.ForeignKeyConstraint(['flow_proposal_id'], ['flow_proposals.id'], ondelete='SET NULL'),
        sa.ForeignKeyConstraint(['ai_session_id'], ['ai_sessions.id'], ondelete='SET NULL'),
        sa.ForeignKeyConstraint(['assigned_user_id'], ['users.id'], ondelete='SET NULL'),
        sa.CheckConstraint(
            "status IN ('open', 'walking', 'resolved', 'escalated')",
            name='ck_internal_tickets_status',
        ),
    )
    op.create_index('ix_internal_tickets_account_id', 'internal_tickets', ['account_id'])
    op.create_index('ix_internal_tickets_status', 'internal_tickets', ['status'])
    op.create_index('ix_internal_tickets_assigned_user_id', 'internal_tickets', ['assigned_user_id'])

    # RLS — match the project pattern
    op.execute("ALTER TABLE internal_tickets ENABLE ROW LEVEL SECURITY")
    op.execute("""
        CREATE POLICY internal_tickets_account_isolation ON internal_tickets
        USING (account_id = current_setting('app.current_account_id')::uuid)
        WITH CHECK (account_id = current_setting('app.current_account_id')::uuid)
    """)


def downgrade() -> None:
    op.execute("DROP POLICY IF EXISTS internal_tickets_account_isolation ON internal_tickets")
    op.execute("ALTER TABLE internal_tickets DISABLE ROW LEVEL SECURITY")
    op.drop_index('ix_internal_tickets_assigned_user_id', 'internal_tickets')
    op.drop_index('ix_internal_tickets_status', 'internal_tickets')
    op.drop_index('ix_internal_tickets_account_id', 'internal_tickets')
    op.drop_table('internal_tickets')
  • Step 3: Create the SQLAlchemy model.

Create backend/app/models/internal_ticket.py:

import uuid
from datetime import datetime
from typing import Optional

import sqlalchemy as sa
from sqlalchemy import CheckConstraint, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column

from app.db.base import Base


class InternalTicket(Base):
    __tablename__ = "internal_tickets"
    __table_args__ = (
        CheckConstraint(
            "status IN ('open', 'walking', 'resolved', 'escalated')",
            name="ck_internal_tickets_status",
        ),
    )

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    account_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("accounts.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    created_by_user_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("users.id", ondelete="RESTRICT"),
        nullable=False,
    )
    customer_name: Mapped[Optional[str]] = mapped_column(sa.String(120), nullable=True)
    customer_contact: Mapped[Optional[str]] = mapped_column(sa.String(200), nullable=True)
    problem_statement: Mapped[str] = mapped_column(sa.Text(), nullable=False)
    status: Mapped[str] = mapped_column(
        sa.String(30), nullable=False, server_default=sa.text("'open'"), index=True
    )
    flow_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True), ForeignKey("trees.id", ondelete="SET NULL"), nullable=True
    )
    flow_proposal_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True), ForeignKey("flow_proposals.id", ondelete="SET NULL"), nullable=True
    )
    ai_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True), ForeignKey("ai_sessions.id", ondelete="SET NULL"), nullable=True
    )
    assigned_user_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
    )
    resolution_notes: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True)
    psa_promoted_ticket_id: Mapped[Optional[str]] = mapped_column(sa.String(64), nullable=True)
    created_at: Mapped[datetime] = mapped_column(
        sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")
    )
    updated_at: Mapped[datetime] = mapped_column(
        sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()"),
        onupdate=sa.text("now()"),
    )
    resolved_at: Mapped[Optional[datetime]] = mapped_column(sa.DateTime(timezone=True), nullable=True)
  • Step 4: Apply migration.
docker exec -w /app resolutionflow_backend alembic upgrade head
  • Step 5: Verify RLS policy via psql.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "SELECT polname, polcmd, polqual FROM pg_policy WHERE polrelid = 'internal_tickets'::regclass"

Expected: one policy row internal_tickets_account_isolation.

  • Step 6: Commit.
git add backend/alembic/versions/<hash>_create_internal_tickets.py backend/app/models/internal_ticket.py
git commit -m "feat(l1): create internal_tickets table with RLS"

Task 6: Migration + model — create l1_walk_sessions table

Files:

  • Create: backend/alembic/versions/<hash>_create_l1_walk_sessions.py

  • Create: backend/app/models/l1_walk_session.py

  • Step 1: Generate migration.

docker exec -w /app resolutionflow_backend alembic revision -m "create_l1_walk_sessions"
  • Step 2: Write migration content.
def upgrade() -> None:
    op.create_table(
        'l1_walk_sessions',
        sa.Column('id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('account_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('created_by_user_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('acting_as', sa.String(30), nullable=True),
        sa.Column('ticket_id', sa.String(64), nullable=False),
        sa.Column('ticket_kind', sa.String(10), nullable=False),
        sa.Column('session_kind', sa.String(20), nullable=False),
        sa.Column('flow_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
        sa.Column('flow_proposal_id', sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
        sa.Column('current_node_id', sa.String(100), nullable=True),
        sa.Column('walked_path', sa.dialects.postgresql.JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")),
        sa.Column('walk_notes', sa.dialects.postgresql.JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")),
        sa.Column('status', sa.String(20), nullable=False, server_default='active'),
        sa.Column('resolution_notes', sa.Text(), nullable=True),
        sa.Column('helpful', sa.Boolean(), nullable=True),
        sa.Column('escalation_reason', sa.Text(), nullable=True),
        sa.Column('escalation_reason_category', sa.String(30), nullable=True),
        sa.Column('started_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
        sa.Column('last_step_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
        sa.Column('resolved_at', sa.DateTime(timezone=True), nullable=True),
        sa.PrimaryKeyConstraint('id'),
        sa.ForeignKeyConstraint(['account_id'], ['accounts.id'], ondelete='CASCADE'),
        sa.ForeignKeyConstraint(['created_by_user_id'], ['users.id'], ondelete='RESTRICT'),
        sa.ForeignKeyConstraint(['flow_id'], ['trees.id'], ondelete='SET NULL'),
        sa.ForeignKeyConstraint(['flow_proposal_id'], ['flow_proposals.id'], ondelete='SET NULL'),
        sa.CheckConstraint(
            "ticket_kind IN ('psa', 'internal')",
            name='ck_l1_walk_sessions_ticket_kind',
        ),
        sa.CheckConstraint(
            "session_kind IN ('flow', 'proposal', 'adhoc')",
            name='ck_l1_walk_sessions_session_kind',
        ),
        sa.CheckConstraint(
            "status IN ('active', 'resolved', 'escalated', 'abandoned')",
            name='ck_l1_walk_sessions_status',
        ),
        sa.CheckConstraint(
            "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
            "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
            "OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)",
            name='ck_l1_walk_sessions_target_consistency',
        ),
    )
    op.create_index('ix_l1_walk_sessions_account_id', 'l1_walk_sessions', ['account_id'])
    op.create_index('ix_l1_walk_sessions_created_by_user_id', 'l1_walk_sessions', ['created_by_user_id'])
    op.create_index('ix_l1_walk_sessions_status', 'l1_walk_sessions', ['status'])
    op.create_index('ix_l1_walk_sessions_last_step_at', 'l1_walk_sessions', ['last_step_at'])

    op.execute("ALTER TABLE l1_walk_sessions ENABLE ROW LEVEL SECURITY")
    op.execute("""
        CREATE POLICY l1_walk_sessions_account_isolation ON l1_walk_sessions
        USING (account_id = current_setting('app.current_account_id')::uuid)
        WITH CHECK (account_id = current_setting('app.current_account_id')::uuid)
    """)


def downgrade() -> None:
    op.execute("DROP POLICY IF EXISTS l1_walk_sessions_account_isolation ON l1_walk_sessions")
    op.execute("ALTER TABLE l1_walk_sessions DISABLE ROW LEVEL SECURITY")
    op.drop_index('ix_l1_walk_sessions_last_step_at', 'l1_walk_sessions')
    op.drop_index('ix_l1_walk_sessions_status', 'l1_walk_sessions')
    op.drop_index('ix_l1_walk_sessions_created_by_user_id', 'l1_walk_sessions')
    op.drop_index('ix_l1_walk_sessions_account_id', 'l1_walk_sessions')
    op.drop_table('l1_walk_sessions')
  • Step 3: Create the SQLAlchemy model.

Create backend/app/models/l1_walk_session.py:

import uuid
from datetime import datetime
from typing import Any, Optional

import sqlalchemy as sa
from sqlalchemy import CheckConstraint, ForeignKey
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column

from app.db.base import Base


class L1WalkSession(Base):
    __tablename__ = "l1_walk_sessions"
    __table_args__ = (
        CheckConstraint(
            "ticket_kind IN ('psa', 'internal')",
            name="ck_l1_walk_sessions_ticket_kind",
        ),
        CheckConstraint(
            "session_kind IN ('flow', 'proposal', 'adhoc')",
            name="ck_l1_walk_sessions_session_kind",
        ),
        CheckConstraint(
            "status IN ('active', 'resolved', 'escalated', 'abandoned')",
            name="ck_l1_walk_sessions_status",
        ),
        CheckConstraint(
            "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
            "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
            "OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)",
            name="ck_l1_walk_sessions_target_consistency",
        ),
    )

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    account_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("accounts.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    created_by_user_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("users.id", ondelete="RESTRICT"),
        nullable=False,
        index=True,
    )
    acting_as: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True)
    ticket_id: Mapped[str] = mapped_column(sa.String(64), nullable=False)
    ticket_kind: Mapped[str] = mapped_column(sa.String(10), nullable=False)
    session_kind: Mapped[str] = mapped_column(sa.String(20), nullable=False)
    flow_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True), ForeignKey("trees.id", ondelete="SET NULL"), nullable=True
    )
    flow_proposal_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True), ForeignKey("flow_proposals.id", ondelete="SET NULL"), nullable=True
    )
    current_node_id: Mapped[Optional[str]] = mapped_column(sa.String(100), nullable=True)
    walked_path: Mapped[list[dict[str, Any]]] = mapped_column(
        JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")
    )
    walk_notes: Mapped[list[dict[str, Any]]] = mapped_column(
        JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb")
    )
    status: Mapped[str] = mapped_column(
        sa.String(20), nullable=False, server_default=sa.text("'active'"), index=True
    )
    resolution_notes: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True)
    helpful: Mapped[Optional[bool]] = mapped_column(sa.Boolean(), nullable=True)
    escalation_reason: Mapped[Optional[str]] = mapped_column(sa.Text(), nullable=True)
    escalation_reason_category: Mapped[Optional[str]] = mapped_column(sa.String(30), nullable=True)
    started_at: Mapped[datetime] = mapped_column(
        sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()")
    )
    last_step_at: Mapped[datetime] = mapped_column(
        sa.DateTime(timezone=True), nullable=False, server_default=sa.text("now()"), index=True
    )
    resolved_at: Mapped[Optional[datetime]] = mapped_column(sa.DateTime(timezone=True), nullable=True)
  • Step 4: Apply migration + verify RLS.
docker exec -w /app resolutionflow_backend alembic upgrade head
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "SELECT polname FROM pg_policy WHERE polrelid = 'l1_walk_sessions'::regclass"
  • Step 5: Verify check constraint blocks invalid combos.
docker exec -it resolutionflow_postgres psql -U postgres -d resolutionflow -c "INSERT INTO l1_walk_sessions (id, account_id, created_by_user_id, ticket_id, ticket_kind, session_kind) VALUES (gen_random_uuid(), '00000000-0000-0000-0000-000000000000', '00000000-0000-0000-0000-000000000000', 'x', 'psa', 'flow')"

Expected: violates ck_l1_walk_sessions_target_consistency (flow_id NULL).

  • Step 6: Commit.
git add backend/alembic/versions/<hash>_create_l1_walk_sessions.py backend/app/models/l1_walk_session.py
git commit -m "feat(l1): create l1_walk_sessions table with RLS + target-consistency check"

Task 7: Seat enforcement service

Files:

  • Create: backend/app/schemas/seat_enforcement.py

  • Create: backend/app/services/seat_enforcement.py

  • Test: backend/tests/test_seat_enforcement.py

  • Step 1: Write the failing tests.

Create backend/tests/test_seat_enforcement.py:

import pytest
from app.services.seat_enforcement import check_seat_available
from tests.factories import make_account, make_subscription, make_user


@pytest.mark.asyncio
async def test_engineer_seat_available_when_under_limit(db_session):
    account = await make_account(db_session)
    sub = await make_subscription(db_session, account_id=account.id, seat_limit=5)
    for _ in range(3):
        await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
    result = await check_seat_available(account, sub, 'engineer', db_session)
    assert result.available is True
    assert result.current == 3
    assert result.limit == 5


@pytest.mark.asyncio
async def test_engineer_seat_unavailable_when_at_limit(db_session):
    account = await make_account(db_session)
    sub = await make_subscription(db_session, account_id=account.id, seat_limit=2)
    for _ in range(2):
        await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
    result = await check_seat_available(account, sub, 'engineer', db_session)
    assert result.available is False
    assert result.current == 2
    assert result.limit == 2


@pytest.mark.asyncio
async def test_l1_seat_uses_separate_limit(db_session):
    account = await make_account(db_session)
    sub = await make_subscription(db_session, account_id=account.id, seat_limit=2, l1_seat_limit=10)
    for _ in range(2):
        await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
    # Engineer limit hit but L1 seats still available
    eng_result = await check_seat_available(account, sub, 'engineer', db_session)
    l1_result = await check_seat_available(account, sub, 'l1_tech', db_session)
    assert eng_result.available is False
    assert l1_result.available is True
    assert l1_result.limit == 10


@pytest.mark.asyncio
async def test_unlimited_when_limit_null(db_session):
    account = await make_account(db_session)
    sub = await make_subscription(db_session, account_id=account.id, seat_limit=None)
    for _ in range(100):
        await make_user(db_session, account_id=account.id, account_role='engineer', is_active=True)
    result = await check_seat_available(account, sub, 'engineer', db_session)
    assert result.available is True
    assert result.limit is None
  • Step 2: Run tests, verify failure.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v

Expected: ImportError.

  • Step 3: Create schema.

backend/app/schemas/seat_enforcement.py:

from typing import Literal, Optional

from pydantic import BaseModel

Role = Literal['engineer', 'l1_tech']


class SeatCheckResult(BaseModel):
    available: bool
    current: int
    limit: Optional[int]   # None = unlimited
    role: Role


class SeatUsage(BaseModel):
    engineer: SeatCheckResult
    l1_tech: SeatCheckResult
  • Step 4: Create service.

backend/app/services/seat_enforcement.py:

from typing import Literal

from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.account import Account
from app.models.subscription import Subscription
from app.models.user import User
from app.schemas.seat_enforcement import SeatCheckResult


Role = Literal['engineer', 'l1_tech']


def _limit_for_role(subscription: Subscription, role: Role) -> int | None:
    if role == 'engineer':
        return subscription.seat_limit
    if role == 'l1_tech':
        return subscription.l1_seat_limit
    raise ValueError(f"Unknown role: {role}")


async def check_seat_available(
    account: Account,
    subscription: Subscription,
    role: Role,
    db: AsyncSession,
) -> SeatCheckResult:
    """
    Count active users with the given role in the account, compare against
    the role-specific seat limit on the subscription. Returns availability.

    None limit = unlimited (returns available=True).
    """
    limit = _limit_for_role(subscription, role)

    stmt = (
        select(func.count(User.id))
        .where(User.account_id == account.id)
        .where(User.account_role == role)
        .where(User.is_active.is_(True))
    )
    current = (await db.execute(stmt)).scalar_one()

    if limit is None:
        return SeatCheckResult(available=True, current=current, limit=None, role=role)
    return SeatCheckResult(
        available=current < limit,
        current=current,
        limit=limit,
        role=role,
    )


async def get_seat_usage(
    account: Account,
    subscription: Subscription,
    db: AsyncSession,
) -> tuple[SeatCheckResult, SeatCheckResult]:
    """Return (engineer, l1_tech) seat-usage tuple for the seat-counter widget."""
    eng = await check_seat_available(account, subscription, 'engineer', db)
    l1 = await check_seat_available(account, subscription, 'l1_tech', db)
    return eng, l1
  • Step 5: Run tests, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v

Expected: 4 PASSED.

  • Step 6: Commit.
git add backend/app/schemas/seat_enforcement.py backend/app/services/seat_enforcement.py backend/tests/test_seat_enforcement.py
git commit -m "feat(l1): add seat_enforcement service for engineer + L1 seat limits"

Task 8: Integrate seat enforcement into invite + accept-invite + role change

Files:

  • Modify: backend/app/api/endpoints/invite.py — block invite create when limit reached

  • Modify: backend/app/api/endpoints/accounts.py or wherever accept-invite lives — re-check at accept time

  • Modify: wherever role-change PATCH lives (likely accounts.py or admin.py) — re-check before commit

  • Test: extend backend/tests/test_seat_enforcement.py

  • Step 1: Write failing integration tests.

Add to backend/tests/test_seat_enforcement.py:

@pytest.mark.asyncio
async def test_invite_blocked_when_engineer_seats_full(authed_owner_client, db_session):
    # Setup: account at engineer seat limit
    # ... (use existing test fixtures to seed an account with N=limit engineers)
    response = await authed_owner_client.post(
        "/api/v1/invites",
        json={"email": "new@example.com", "role": "engineer"},
    )
    assert response.status_code == 402
    body = response.json()
    assert body["detail"]["code"] == "seat_limit_exceeded"
    assert body["detail"]["role"] == "engineer"
    assert body["detail"]["current"] == body["detail"]["limit"]


@pytest.mark.asyncio
async def test_invite_blocked_when_l1_seats_full(authed_owner_client, db_session):
    # Same as above but for l1_tech role
    response = await authed_owner_client.post(
        "/api/v1/invites",
        json={"email": "new@example.com", "role": "l1_tech"},
    )
    assert response.status_code == 402


@pytest.mark.asyncio
async def test_invite_succeeds_when_l1_seats_available(authed_owner_client, db_session):
    # Account with l1_seat_limit=10, current L1 count = 0
    response = await authed_owner_client.post(
        "/api/v1/invites",
        json={"email": "new@example.com", "role": "l1_tech"},
    )
    assert response.status_code == 201


@pytest.mark.asyncio
async def test_role_change_blocked_when_target_seats_full(authed_owner_client, viewer_user, db_session):
    # Try promoting viewer → engineer when engineer seats are full
    response = await authed_owner_client.patch(
        f"/api/v1/users/{viewer_user.id}/role",
        json={"account_role": "engineer"},
    )
    assert response.status_code == 402
  • Step 2: Run, verify failure.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py::test_invite_blocked_when_engineer_seats_full -v
  • Step 3: Add seat check to invite create endpoint.

In backend/app/api/endpoints/invite.py, locate the create endpoint and add at the top (before persistence):

from app.services.seat_enforcement import check_seat_available

# ... inside the create handler ...
if payload.role in ('engineer', 'l1_tech'):
    # Load the account's subscription (existing pattern in the codebase)
    sub = await get_active_subscription(db, current_user.account_id)
    result = await check_seat_available(account, sub, payload.role, db)
    if not result.available:
        raise HTTPException(
            status_code=402,
            detail={
                "code": "seat_limit_exceeded",
                "role": result.role,
                "current": result.current,
                "limit": result.limit,
                "upgrade_url": "/account/billing",   # Frontend deep-links to the existing /account/billing page; Stripe customer portal link is generated server-side there.
            },
        )

(If the codebase has an existing helper like get_active_subscription, use it. If not, add a thin helper in services/billing.py.)

  • Step 4: Add check to accept-invite endpoint.

Find the accept-invite endpoint (grep -rn "accept.invite\|@router.post.*accept" backend/app/api/endpoints/). Add the same check_seat_available call before the user is upgraded from invite to active user. Race-condition guard.

  • Step 5: Add check to role-change endpoint.

Find the role-change PATCH endpoint. If promoting toward engineer or l1_tech, run the check first. Return same 402 shape on block.

  • Step 6: Run integration tests, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_seat_enforcement.py -v
  • Step 7: Commit.
git add backend/app/api/endpoints/invite.py backend/app/api/endpoints/accounts.py backend/tests/test_seat_enforcement.py
git commit -m "feat(l1): enforce seat limits on invite, accept-invite, role-change for engineer + L1"

Task 9: GET /api/v1/accounts/me/seats endpoint

Files:

  • Modify: backend/app/api/endpoints/accounts.py (add endpoint)

  • Test: backend/tests/test_l1_endpoints.py (new)

  • Step 1: Write failing test.

Create backend/tests/test_l1_endpoints.py:

import pytest


@pytest.mark.asyncio
async def test_get_seats_returns_both_role_counts(authed_engineer_client, db_session):
    response = await authed_engineer_client.get("/api/v1/accounts/me/seats")
    assert response.status_code == 200
    body = response.json()
    assert "engineer" in body
    assert "l1_tech" in body
    assert {"available", "current", "limit", "role"}.issubset(body["engineer"].keys())


@pytest.mark.asyncio
async def test_get_seats_blocked_for_viewer(authed_viewer_client):
    response = await authed_viewer_client.get("/api/v1/accounts/me/seats")
    assert response.status_code == 403
  • Step 2: Run, verify failure.
docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py::test_get_seats_returns_both_role_counts -v
  • Step 3: Add endpoint to accounts.py.
from app.services.seat_enforcement import get_seat_usage
from app.schemas.seat_enforcement import SeatUsage

@router.get("/me/seats", response_model=SeatUsage)
async def get_my_account_seat_usage(
    current_user: User = Depends(require_engineer_or_admin),
    db: AsyncSession = Depends(get_db),
):
    account = await db.get(Account, current_user.account_id)
    sub = await get_active_subscription(db, current_user.account_id)
    engineer, l1_tech = await get_seat_usage(account, sub, db)
    return SeatUsage(engineer=engineer, l1_tech=l1_tech)
  • Step 4: Run tests, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py -v
  • Step 5: Commit.
git add backend/app/api/endpoints/accounts.py backend/tests/test_l1_endpoints.py
git commit -m "feat(l1): GET /accounts/me/seats endpoint"

Task 10: PATCH /api/v1/users/{id}/coverage endpoint

Files:

  • Modify: backend/app/api/endpoints/accounts.py (or wherever user-management endpoints live)

  • Test: extend backend/tests/test_l1_endpoints.py

  • Step 1: Write failing tests.

@pytest.mark.asyncio
async def test_owner_can_toggle_coverage_on_engineer(authed_owner_client, engineer_user):
    response = await authed_owner_client.patch(
        f"/api/v1/users/{engineer_user.id}/coverage",
        json={"can_cover_l1": True},
    )
    assert response.status_code == 200
    assert response.json()["can_cover_l1"] is True


@pytest.mark.asyncio
async def test_engineer_cannot_toggle_coverage_on_self(authed_engineer_client, engineer_user):
    response = await authed_engineer_client.patch(
        f"/api/v1/users/{engineer_user.id}/coverage",
        json={"can_cover_l1": True},
    )
    assert response.status_code == 403


@pytest.mark.asyncio
async def test_coverage_only_applies_to_engineers(authed_owner_client, viewer_user):
    response = await authed_owner_client.patch(
        f"/api/v1/users/{viewer_user.id}/coverage",
        json={"can_cover_l1": True},
    )
    assert response.status_code == 422  # validation: viewer can't have coverage
  • Step 2: Run, verify failure.

  • Step 3: Add endpoint.

from pydantic import BaseModel

class CoveragePatch(BaseModel):
    can_cover_l1: bool

@router.patch("/users/{user_id}/coverage")
async def patch_user_coverage(
    user_id: UUID,
    payload: CoveragePatch,
    current_user: User = Depends(require_account_owner),
    db: AsyncSession = Depends(get_db),
):
    target = await db.get(User, user_id)
    if not target or target.account_id != current_user.account_id:
        raise HTTPException(status_code=404)
    if target.account_role != 'engineer':
        raise HTTPException(
            status_code=422,
            detail="can_cover_l1 only applies to engineers",
        )
    target.can_cover_l1 = payload.can_cover_l1
    await db.commit()
    return {"id": str(target.id), "can_cover_l1": target.can_cover_l1}
  • Step 4: Run tests, verify pass.

  • Step 5: Commit.

git add backend/app/api/endpoints/accounts.py backend/tests/test_l1_endpoints.py
git commit -m "feat(l1): PATCH /users/{id}/coverage for engineer L1-coverage flag"

Task 11: internal_ticket_service.py

Files:

  • Create: backend/app/services/internal_ticket_service.py

  • Test: backend/tests/test_internal_ticket_service.py

  • Step 1: Write failing tests.

import pytest
import uuid
from app.services.internal_ticket_service import (
    create_ticket, update_status, get_ticket, list_tickets_for_account,
)


@pytest.mark.asyncio
async def test_create_ticket_sets_status_open(db_session, account, l1_user):
    ticket = await create_ticket(
        db_session,
        account_id=account.id,
        created_by_user_id=l1_user.id,
        problem_statement="Outlook can't connect",
        customer_name="Alice",
    )
    assert ticket.status == 'open'
    assert ticket.account_id == account.id


@pytest.mark.asyncio
async def test_update_status_to_resolved_sets_resolved_at(db_session, internal_ticket):
    updated = await update_status(db_session, ticket_id=internal_ticket.id, status='resolved')
    assert updated.status == 'resolved'
    assert updated.resolved_at is not None


@pytest.mark.asyncio
async def test_list_tickets_filters_by_account(db_session, account_a, account_b, ticket_a, ticket_b):
    rows = await list_tickets_for_account(db_session, account_id=account_a.id)
    assert ticket_a in rows
    assert ticket_b not in rows
  • Step 2: Run, verify failure.

  • Step 3: Implement service.

from datetime import datetime, timezone
from typing import Optional
from uuid import UUID

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.internal_ticket import InternalTicket


async def create_ticket(
    db: AsyncSession,
    *,
    account_id: UUID,
    created_by_user_id: UUID,
    problem_statement: str,
    customer_name: Optional[str] = None,
    customer_contact: Optional[str] = None,
) -> InternalTicket:
    ticket = InternalTicket(
        account_id=account_id,
        created_by_user_id=created_by_user_id,
        problem_statement=problem_statement,
        customer_name=customer_name,
        customer_contact=customer_contact,
    )
    db.add(ticket)
    await db.flush()
    return ticket


async def update_status(
    db: AsyncSession,
    *,
    ticket_id: UUID,
    status: str,
    resolution_notes: Optional[str] = None,
    assigned_user_id: Optional[UUID] = None,
) -> InternalTicket:
    ticket = await db.get(InternalTicket, ticket_id)
    if not ticket:
        raise ValueError(f"InternalTicket {ticket_id} not found")
    ticket.status = status
    if status == 'resolved':
        ticket.resolved_at = datetime.now(timezone.utc)
        if resolution_notes is not None:
            ticket.resolution_notes = resolution_notes
    if assigned_user_id is not None:
        ticket.assigned_user_id = assigned_user_id
    await db.flush()
    return ticket


async def get_ticket(db: AsyncSession, ticket_id: UUID) -> Optional[InternalTicket]:
    return await db.get(InternalTicket, ticket_id)


async def list_tickets_for_account(
    db: AsyncSession,
    *,
    account_id: UUID,
    status: Optional[str] = None,
    limit: int = 100,
) -> list[InternalTicket]:
    stmt = select(InternalTicket).where(InternalTicket.account_id == account_id)
    if status:
        stmt = stmt.where(InternalTicket.status == status)
    stmt = stmt.order_by(InternalTicket.created_at.desc()).limit(limit)
    result = await db.execute(stmt)
    return list(result.scalars())


async def promote_to_psa(
    db: AsyncSession,
    *,
    ticket_id: UUID,
    psa_ticket_id: str,
) -> InternalTicket:
    ticket = await db.get(InternalTicket, ticket_id)
    if not ticket:
        raise ValueError(f"InternalTicket {ticket_id} not found")
    ticket.psa_promoted_ticket_id = psa_ticket_id
    await db.flush()
    return ticket
  • Step 4: Run, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_internal_ticket_service.py -v
  • Step 5: Commit.
git add backend/app/services/internal_ticket_service.py backend/tests/test_internal_ticket_service.py
git commit -m "feat(l1): internal_ticket_service with CRUD + status transitions"

Task 12: l1_session_service.py — start (flow / proposal / adhoc)

Files:

  • Create: backend/app/services/l1_session_service.py
  • Test: backend/tests/test_l1_session_service.py

This task implements the session start path only. Steps/notes/resolve/escalate are split into Tasks 13/14 to keep each task bite-sized.

  • Step 1: Write failing tests.
import pytest
from app.services.l1_session_service import start_flow_session, start_adhoc_session


@pytest.mark.asyncio
async def test_start_flow_session_creates_active_session(db_session, account, l1_user, flow, internal_ticket):
    session = await start_flow_session(
        db_session,
        account_id=account.id,
        user=l1_user,
        flow_id=flow.id,
        ticket_id=str(internal_ticket.id),
        ticket_kind='internal',
    )
    assert session.session_kind == 'flow'
    assert session.flow_id == flow.id
    assert session.flow_proposal_id is None
    assert session.status == 'active'
    assert session.walked_path == []


@pytest.mark.asyncio
async def test_start_adhoc_session_no_flow(db_session, account, l1_user, internal_ticket):
    session = await start_adhoc_session(
        db_session,
        account_id=account.id,
        user=l1_user,
        ticket_id=str(internal_ticket.id),
        ticket_kind='internal',
    )
    assert session.session_kind == 'adhoc'
    assert session.flow_id is None
    assert session.flow_proposal_id is None
    assert session.walked_path == []
    assert session.walk_notes == []


@pytest.mark.asyncio
async def test_start_session_records_acting_as_for_coverage_engineer(
    db_session, account, engineer_with_coverage, flow, internal_ticket
):
    session = await start_flow_session(
        db_session,
        account_id=account.id,
        user=engineer_with_coverage,
        flow_id=flow.id,
        ticket_id=str(internal_ticket.id),
        ticket_kind='internal',
    )
    assert session.acting_as == 'l1_coverage'
  • Step 2: Run, verify failure.

  • Step 3: Implement start functions.

from typing import Optional
from uuid import UUID

from sqlalchemy.ext.asyncio import AsyncSession

from app.models.l1_walk_session import L1WalkSession
from app.models.user import User


def _resolve_acting_as(user: User) -> Optional[str]:
    """An engineer (with coverage flag) acting in L1 mode gets tagged for audit."""
    if user.account_role == 'engineer':
        return 'l1_coverage'
    return None


async def start_flow_session(
    db: AsyncSession,
    *,
    account_id: UUID,
    user: User,
    flow_id: UUID,
    ticket_id: str,
    ticket_kind: str,  # 'psa' | 'internal'
) -> L1WalkSession:
    session = L1WalkSession(
        account_id=account_id,
        created_by_user_id=user.id,
        acting_as=_resolve_acting_as(user),
        ticket_id=ticket_id,
        ticket_kind=ticket_kind,
        session_kind='flow',
        flow_id=flow_id,
        walked_path=[],
        walk_notes=[],
    )
    db.add(session)
    await db.flush()
    return session


async def start_proposal_session(
    db: AsyncSession,
    *,
    account_id: UUID,
    user: User,
    flow_proposal_id: UUID,
    ticket_id: str,
    ticket_kind: str,
) -> L1WalkSession:
    session = L1WalkSession(
        account_id=account_id,
        created_by_user_id=user.id,
        acting_as=_resolve_acting_as(user),
        ticket_id=ticket_id,
        ticket_kind=ticket_kind,
        session_kind='proposal',
        flow_proposal_id=flow_proposal_id,
        walked_path=[],
        walk_notes=[],
    )
    db.add(session)
    await db.flush()
    return session


async def start_adhoc_session(
    db: AsyncSession,
    *,
    account_id: UUID,
    user: User,
    ticket_id: str,
    ticket_kind: str,
) -> L1WalkSession:
    session = L1WalkSession(
        account_id=account_id,
        created_by_user_id=user.id,
        acting_as=_resolve_acting_as(user),
        ticket_id=ticket_id,
        ticket_kind=ticket_kind,
        session_kind='adhoc',
        walked_path=[],
        walk_notes=[],
    )
    db.add(session)
    await db.flush()
    return session
  • Step 4: Run, verify pass.

  • Step 5: Commit.

git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): session start (flow/proposal/adhoc) with acting_as tagging"

Task 13: l1_session_service.py — step + notes

Files:

  • Modify: backend/app/services/l1_session_service.py

  • Modify: backend/tests/test_l1_session_service.py

  • Step 1: Write failing tests (append to existing file).

@pytest.mark.asyncio
async def test_record_step_appends_to_walked_path(db_session, active_flow_session):
    updated = await record_step(
        db_session,
        session_id=active_flow_session.id,
        node_id='n1',
        question='Has the user signed back in?',
        answer='yes',
        note=None,
    )
    assert len(updated.walked_path) == 1
    assert updated.walked_path[0] == {
        'node_id': 'n1',
        'question': 'Has the user signed back in?',
        'answer': 'yes',
        'l1_note': None,
    }
    assert updated.current_node_id == 'n1'


@pytest.mark.asyncio
async def test_record_step_blocks_on_adhoc_session(db_session, active_adhoc_session):
    with pytest.raises(ValueError, match="adhoc"):
        await record_step(
            db_session,
            session_id=active_adhoc_session.id,
            node_id='n1', question='x', answer='y', note=None,
        )


@pytest.mark.asyncio
async def test_update_notes_replaces_walk_notes(db_session, active_adhoc_session):
    new_notes = [{'timestamp': '2026-05-28T10:00:00Z', 'content': 'Customer said outlook crashed'}]
    updated = await update_notes(db_session, session_id=active_adhoc_session.id, notes=new_notes)
    assert updated.walk_notes == new_notes
  • Step 2: Run, verify failure.

  • Step 3: Implement.

Append to l1_session_service.py:

from datetime import datetime, timezone


async def record_step(
    db: AsyncSession,
    *,
    session_id: UUID,
    node_id: str,
    question: str,
    answer: str,
    note: Optional[str],
) -> L1WalkSession:
    session = await db.get(L1WalkSession, session_id)
    if not session:
        raise ValueError(f"L1WalkSession {session_id} not found")
    if session.session_kind == 'adhoc':
        raise ValueError("Cannot record step on adhoc session — use update_notes instead")
    if session.status != 'active':
        raise ValueError(f"Session {session_id} is not active (status={session.status})")
    entry = {
        'node_id': node_id,
        'question': question,
        'answer': answer,
        'l1_note': note,
    }
    # JSONB append — assign new list because SQLAlchemy doesn't track in-place mutations
    session.walked_path = [*session.walked_path, entry]
    session.current_node_id = node_id
    session.last_step_at = datetime.now(timezone.utc)
    await db.flush()
    return session


async def update_notes(
    db: AsyncSession,
    *,
    session_id: UUID,
    notes: list[dict],
) -> L1WalkSession:
    session = await db.get(L1WalkSession, session_id)
    if not session:
        raise ValueError(f"L1WalkSession {session_id} not found")
    # Cap at 256KB (rough check, JSON-encoded size)
    import json
    encoded_size = len(json.dumps(notes).encode('utf-8'))
    if encoded_size > 256 * 1024:
        raise ValueError("walk_notes exceeds 256KB cap — consider escalating")
    session.walk_notes = notes
    session.last_step_at = datetime.now(timezone.utc)
    await db.flush()
    return session
  • Step 4: Run, verify pass.

  • Step 5: Commit.

git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): record_step + update_notes for walk sessions"

Task 14: l1_session_service.py — resolve / escalate / escalate-without-walk

Files:

  • Modify: backend/app/services/l1_session_service.py

  • Modify: backend/tests/test_l1_session_service.py

  • Step 1: Write failing tests.

@pytest.mark.asyncio
async def test_resolve_helpful_proposal_flips_validated_by_outcome(
    db_session, active_proposal_session, flow_proposal
):
    await resolve(
        db_session,
        session_id=active_proposal_session.id,
        helpful=True,
        resolution_notes="Walked the user through restoring profile",
    )
    await db_session.refresh(flow_proposal)
    assert flow_proposal.validated_by_outcome is True


@pytest.mark.asyncio
async def test_resolve_unhelpful_does_not_flip_validation(
    db_session, active_proposal_session, flow_proposal
):
    await resolve(
        db_session,
        session_id=active_proposal_session.id,
        helpful=False,
        resolution_notes="Tree was wrong",
    )
    await db_session.refresh(flow_proposal)
    assert flow_proposal.validated_by_outcome is False


@pytest.mark.asyncio
async def test_resolve_adhoc_session_closes_ticket(
    db_session, active_adhoc_session, internal_ticket
):
    await resolve(
        db_session,
        session_id=active_adhoc_session.id,
        helpful=True,
        resolution_notes="Customer rebooted, fixed",
    )
    await db_session.refresh(active_adhoc_session)
    assert active_adhoc_session.status == 'resolved'
    assert active_adhoc_session.resolved_at is not None
    await db_session.refresh(internal_ticket)
    assert internal_ticket.status == 'resolved'


@pytest.mark.asyncio
async def test_escalate_marks_session_and_ticket(
    db_session, active_flow_session, internal_ticket
):
    await escalate(
        db_session,
        session_id=active_flow_session.id,
        reason="Customer demanding senior",
        reason_category="Customer demanding senior",
    )
    await db_session.refresh(active_flow_session)
    assert active_flow_session.status == 'escalated'
    await db_session.refresh(internal_ticket)
    assert internal_ticket.status == 'escalated'


@pytest.mark.asyncio
async def test_escalate_without_walk_creates_escalated_session(
    db_session, account, l1_user, internal_ticket
):
    session = await escalate_without_walk(
        db_session,
        account_id=account.id,
        user=l1_user,
        ticket_id=str(internal_ticket.id),
        ticket_kind='internal',
        reason_category='No KB available',
        reason='No knowledge base content yet',
    )
    assert session.status == 'escalated'
    assert session.session_kind == 'adhoc'  # adhoc as placeholder kind
    assert session.escalation_reason_category == 'No KB available'
  • Step 2: Run, verify failure.

  • Step 3: Implement.

Append to l1_session_service.py:

from app.models.flow_proposal import FlowProposal
from app.services import internal_ticket_service
# PSA service import — adjust to actual path
# from app.services.psa.registry import PsaProviderRegistry


async def resolve(
    db: AsyncSession,
    *,
    session_id: UUID,
    helpful: bool,
    resolution_notes: str,
) -> L1WalkSession:
    session = await db.get(L1WalkSession, session_id)
    if not session:
        raise ValueError(f"L1WalkSession {session_id} not found")
    if session.status != 'active':
        raise ValueError(f"Session not active (status={session.status})")
    session.status = 'resolved'
    session.helpful = helpful
    session.resolution_notes = resolution_notes
    session.resolved_at = datetime.now(timezone.utc)
    session.last_step_at = session.resolved_at

    # Outcome validation: flip proposal flag on helpful=true
    if helpful and session.session_kind == 'proposal' and session.flow_proposal_id:
        proposal = await db.get(FlowProposal, session.flow_proposal_id)
        if proposal:
            proposal.validated_by_outcome = True

    # Close the ticket
    if session.ticket_kind == 'internal':
        await internal_ticket_service.update_status(
            db, ticket_id=UUID(session.ticket_id),
            status='resolved', resolution_notes=resolution_notes,
        )
    else:
        # PSA close is intentionally deferred to Phase 2 (which adds the full escalation_package_generator
        # integration alongside the AI build pipeline). For Phase 1, PSA-backed sessions update only the
        # local session row on resolve; engineers verify ticket state directly in their PSA UI.
        # This is documented in spec §11 "Internal ticket fallback" and the Phase 1 scope section.
        pass

    await db.flush()
    return session


async def escalate(
    db: AsyncSession,
    *,
    session_id: UUID,
    reason: str,
    reason_category: str,
) -> L1WalkSession:
    session = await db.get(L1WalkSession, session_id)
    if not session:
        raise ValueError(f"L1WalkSession {session_id} not found")
    if session.status != 'active':
        raise ValueError(f"Session not active (status={session.status})")
    session.status = 'escalated'
    session.escalation_reason = reason
    session.escalation_reason_category = reason_category
    session.resolved_at = datetime.now(timezone.utc)
    session.last_step_at = session.resolved_at

    # Mark ticket escalated
    if session.ticket_kind == 'internal':
        await internal_ticket_service.update_status(
            db, ticket_id=UUID(session.ticket_id), status='escalated',
        )
    else:
        # PSA reassign — Phase 1 stub; full integration with escalation_package_generator
        # follows in Phase 2 alongside the ai_session creation for engineer pickup.
        pass

    await db.flush()
    return session


async def escalate_without_walk(
    db: AsyncSession,
    *,
    account_id: UUID,
    user: User,
    ticket_id: str,
    ticket_kind: str,
    reason_category: str,
    reason: Optional[str] = None,
) -> L1WalkSession:
    """
    Used from the no-KB / no-flow-picked screen — creates an immediately-escalated session
    with no walked_path. Lets escalation reporting still capture the call.
    """
    session = L1WalkSession(
        account_id=account_id,
        created_by_user_id=user.id,
        acting_as=_resolve_acting_as(user),
        ticket_id=ticket_id,
        ticket_kind=ticket_kind,
        session_kind='adhoc',
        walked_path=[],
        walk_notes=[],
        status='escalated',
        escalation_reason=reason,
        escalation_reason_category=reason_category,
        resolved_at=datetime.now(timezone.utc),
    )
    session.last_step_at = session.resolved_at
    db.add(session)
    if ticket_kind == 'internal':
        await internal_ticket_service.update_status(
            db, ticket_id=UUID(ticket_id), status='escalated',
        )
    await db.flush()
    return session
  • Step 4: Run, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_l1_session_service.py -v
  • Step 5: Commit.
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): resolve/escalate/escalate-without-walk for l1 sessions"

Task 15: L1 endpoints (api/endpoints/l1.py)

Files:

  • Create: backend/app/schemas/l1.py

  • Create: backend/app/api/endpoints/l1.py

  • Modify: backend/app/api/router.py (register l1 router)

  • Modify: backend/tests/test_l1_endpoints.py

  • Step 1: Define request/response schemas.

backend/app/schemas/l1.py:

from datetime import datetime
from typing import Any, Literal, Optional
from uuid import UUID

from pydantic import BaseModel


class IntakeRequest(BaseModel):
    problem_statement: str
    customer_name: Optional[str] = None
    customer_contact: Optional[str] = None
    flow_id: Optional[UUID] = None    # if provided, starts flow session; else adhoc


class IntakeResponse(BaseModel):
    session_id: UUID
    session_kind: Literal['flow', 'proposal', 'adhoc']
    ticket_id: str
    ticket_kind: Literal['psa', 'internal']


class StepRequest(BaseModel):
    node_id: str
    question: str
    answer: str
    note: Optional[str] = None


class NotesRequest(BaseModel):
    notes: list[dict[str, Any]]


class ResolveRequest(BaseModel):
    helpful: bool
    resolution_notes: str


class EscalateRequest(BaseModel):
    reason: Optional[str] = None
    reason_category: str


class EscalateWithoutWalkRequest(BaseModel):
    problem_statement: str
    customer_name: Optional[str] = None
    customer_contact: Optional[str] = None
    reason_category: str
    reason: Optional[str] = None


class WalkSessionResponse(BaseModel):
    id: UUID
    session_kind: str
    flow_id: Optional[UUID]
    flow_proposal_id: Optional[UUID]
    current_node_id: Optional[str]
    walked_path: list[dict[str, Any]]
    walk_notes: list[dict[str, Any]]
    status: str
    started_at: datetime
    last_step_at: datetime
    resolved_at: Optional[datetime]


class QueueRow(BaseModel):
    ticket_id: str
    ticket_kind: Literal['psa', 'internal']
    problem_statement: Optional[str]
    customer_name: Optional[str]
    status: str
    created_at: Optional[datetime]
  • Step 2: Write the endpoints.

backend/app/api/endpoints/l1.py:

from typing import Optional
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.deps import (
    get_current_active_user, get_db, require_l1_or_coverage,
)
from app.models.l1_walk_session import L1WalkSession
from app.models.user import User
from app.schemas.l1 import (
    EscalateRequest, EscalateWithoutWalkRequest, IntakeRequest, IntakeResponse,
    NotesRequest, QueueRow, ResolveRequest, StepRequest, WalkSessionResponse,
)
from app.services import internal_ticket_service, l1_session_service


router = APIRouter(prefix="/l1", tags=["l1"])


def _as_response(session: L1WalkSession) -> WalkSessionResponse:
    return WalkSessionResponse(
        id=session.id,
        session_kind=session.session_kind,
        flow_id=session.flow_id,
        flow_proposal_id=session.flow_proposal_id,
        current_node_id=session.current_node_id,
        walked_path=session.walked_path,
        walk_notes=session.walk_notes,
        status=session.status,
        started_at=session.started_at,
        last_step_at=session.last_step_at,
        resolved_at=session.resolved_at,
    )


@router.post("/intake", response_model=IntakeResponse)
async def intake(
    payload: IntakeRequest,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    """
    Phase 1 intake:
    - Creates an internal ticket (PSA integration deferred to Phase 2/3 escalation polish).
    - Starts an L1WalkSession: flow kind if flow_id provided, adhoc otherwise.

    Phase 2 will replace this with match_or_build + suggest/aborted_no_kb outcomes.
    """
    # Phase 1: always create internal ticket. PSA support handled in escalate() / escalation_package.
    ticket = await internal_ticket_service.create_ticket(
        db,
        account_id=user.account_id,
        created_by_user_id=user.id,
        problem_statement=payload.problem_statement,
        customer_name=payload.customer_name,
        customer_contact=payload.customer_contact,
    )

    if payload.flow_id:
        session = await l1_session_service.start_flow_session(
            db,
            account_id=user.account_id,
            user=user,
            flow_id=payload.flow_id,
            ticket_id=str(ticket.id),
            ticket_kind='internal',
        )
    else:
        session = await l1_session_service.start_adhoc_session(
            db,
            account_id=user.account_id,
            user=user,
            ticket_id=str(ticket.id),
            ticket_kind='internal',
        )

    await db.commit()
    return IntakeResponse(
        session_id=session.id,
        session_kind=session.session_kind,
        ticket_id=str(ticket.id),
        ticket_kind='internal',
    )


@router.get("/queue", response_model=list[QueueRow])
async def queue(
    status_filter: Optional[str] = None,
    limit: int = 50,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    """Phase 1: returns internal tickets only. PSA queue merge in Phase 2."""
    tickets = await internal_ticket_service.list_tickets_for_account(
        db, account_id=user.account_id, status=status_filter, limit=limit,
    )
    return [
        QueueRow(
            ticket_id=str(t.id),
            ticket_kind='internal',
            problem_statement=t.problem_statement,
            customer_name=t.customer_name,
            status=t.status,
            created_at=t.created_at,
        )
        for t in tickets
    ]


@router.get("/sessions/{session_id}", response_model=WalkSessionResponse)
async def get_session(
    session_id: UUID,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    session = await db.get(L1WalkSession, session_id)
    if not session or session.account_id != user.account_id:
        raise HTTPException(status_code=404)
    return _as_response(session)


@router.get("/sessions/active", response_model=list[WalkSessionResponse])
async def list_active_sessions(
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    """List the caller's currently-active sessions for the dashboard 'Resume in progress' widget."""
    stmt = (
        select(L1WalkSession)
        .where(L1WalkSession.created_by_user_id == user.id)
        .where(L1WalkSession.status == 'active')
        .order_by(L1WalkSession.last_step_at.desc())
        .limit(20)
    )
    result = await db.execute(stmt)
    return [_as_response(s) for s in result.scalars()]


@router.post("/sessions/{session_id}/step", response_model=WalkSessionResponse)
async def post_step(
    session_id: UUID,
    payload: StepRequest,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    session = await db.get(L1WalkSession, session_id)
    if not session or session.account_id != user.account_id:
        raise HTTPException(status_code=404)
    try:
        updated = await l1_session_service.record_step(
            db, session_id=session_id,
            node_id=payload.node_id, question=payload.question,
            answer=payload.answer, note=payload.note,
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))
    await db.commit()
    return _as_response(updated)


@router.post("/sessions/{session_id}/notes", response_model=WalkSessionResponse)
async def post_notes(
    session_id: UUID,
    payload: NotesRequest,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    session = await db.get(L1WalkSession, session_id)
    if not session or session.account_id != user.account_id:
        raise HTTPException(status_code=404)
    try:
        updated = await l1_session_service.update_notes(
            db, session_id=session_id, notes=payload.notes,
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))
    await db.commit()
    return _as_response(updated)


@router.post("/sessions/{session_id}/resolve", response_model=WalkSessionResponse)
async def post_resolve(
    session_id: UUID,
    payload: ResolveRequest,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    session = await db.get(L1WalkSession, session_id)
    if not session or session.account_id != user.account_id:
        raise HTTPException(status_code=404)
    try:
        updated = await l1_session_service.resolve(
            db, session_id=session_id, helpful=payload.helpful,
            resolution_notes=payload.resolution_notes,
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))
    await db.commit()
    return _as_response(updated)


@router.post("/sessions/{session_id}/escalate", response_model=WalkSessionResponse)
async def post_escalate(
    session_id: UUID,
    payload: EscalateRequest,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    session = await db.get(L1WalkSession, session_id)
    if not session or session.account_id != user.account_id:
        raise HTTPException(status_code=404)
    try:
        updated = await l1_session_service.escalate(
            db, session_id=session_id,
            reason=payload.reason or '',
            reason_category=payload.reason_category,
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))
    await db.commit()
    return _as_response(updated)


@router.post("/escalate-without-walk", response_model=WalkSessionResponse)
async def post_escalate_without_walk(
    payload: EscalateWithoutWalkRequest,
    user: User = Depends(require_l1_or_coverage),
    db: AsyncSession = Depends(get_db),
):
    ticket = await internal_ticket_service.create_ticket(
        db,
        account_id=user.account_id,
        created_by_user_id=user.id,
        problem_statement=payload.problem_statement,
        customer_name=payload.customer_name,
        customer_contact=payload.customer_contact,
    )
    session = await l1_session_service.escalate_without_walk(
        db,
        account_id=user.account_id,
        user=user,
        ticket_id=str(ticket.id),
        ticket_kind='internal',
        reason_category=payload.reason_category,
        reason=payload.reason,
    )
    await db.commit()
    return _as_response(session)
  • Step 3: Register the router.

In backend/app/api/router.py, add:

from app.api.endpoints import l1
api_router.include_router(l1.router)
  • Step 4: Add E2E endpoint tests.

In backend/tests/test_l1_endpoints.py, add:

@pytest.mark.asyncio
async def test_intake_creates_adhoc_session_when_no_flow_id(authed_l1_client):
    response = await authed_l1_client.post(
        "/api/v1/l1/intake",
        json={"problem_statement": "outlook broken", "customer_name": "Alice"},
    )
    assert response.status_code == 200
    body = response.json()
    assert body["session_kind"] == "adhoc"
    assert body["ticket_kind"] == "internal"


@pytest.mark.asyncio
async def test_intake_creates_flow_session_when_flow_id_provided(authed_l1_client, flow):
    response = await authed_l1_client.post(
        "/api/v1/l1/intake",
        json={"problem_statement": "outlook broken", "flow_id": str(flow.id)},
    )
    assert response.status_code == 200
    assert response.json()["session_kind"] == "flow"


@pytest.mark.asyncio
async def test_step_appends_to_walked_path(authed_l1_client, active_flow_session):
    response = await authed_l1_client.post(
        f"/api/v1/l1/sessions/{active_flow_session.id}/step",
        json={"node_id": "n1", "question": "q1", "answer": "yes", "note": None},
    )
    assert response.status_code == 200
    assert len(response.json()["walked_path"]) == 1


@pytest.mark.asyncio
async def test_step_blocked_on_adhoc_session(authed_l1_client, active_adhoc_session):
    response = await authed_l1_client.post(
        f"/api/v1/l1/sessions/{active_adhoc_session.id}/step",
        json={"node_id": "n1", "question": "q1", "answer": "yes"},
    )
    assert response.status_code == 400


@pytest.mark.asyncio
async def test_escalate_without_walk_creates_escalated_session(authed_l1_client):
    response = await authed_l1_client.post(
        "/api/v1/l1/escalate-without-walk",
        json={
            "problem_statement": "no kb yet",
            "reason_category": "No KB available",
        },
    )
    assert response.status_code == 200
    assert response.json()["status"] == "escalated"


@pytest.mark.asyncio
async def test_l1_endpoints_block_viewer(authed_viewer_client):
    response = await authed_viewer_client.post(
        "/api/v1/l1/intake",
        json={"problem_statement": "x"},
    )
    assert response.status_code == 403
  • Step 5: Run all L1 endpoint tests.
docker exec resolutionflow_backend pytest backend/tests/test_l1_endpoints.py -v
  • Step 6: Commit.
git add backend/app/schemas/l1.py backend/app/api/endpoints/l1.py backend/app/api/router.py backend/tests/test_l1_endpoints.py
git commit -m "feat(l1): L1 endpoints (intake/queue/sessions/step/notes/resolve/escalate)"

Task 16: APScheduler cleanup job for abandoned sessions

Files:

  • Create: backend/app/services/l1_session_cleanup.py

  • Modify: backend/app/main.py — register the job in lifespan

  • Test: backend/tests/test_l1_session_cleanup.py

  • Step 1: Write failing test.

import pytest
from datetime import datetime, timedelta, timezone
from app.services.l1_session_cleanup import flip_stale_sessions


@pytest.mark.asyncio
async def test_flip_stale_sessions_abandons_old_active_sessions(db_session, account, l1_user):
    # Insert one stale active + one fresh active + one already resolved
    stale = L1WalkSession(
        account_id=account.id,
        created_by_user_id=l1_user.id,
        ticket_id='x', ticket_kind='internal', session_kind='adhoc',
        status='active',
        last_step_at=datetime.now(timezone.utc) - timedelta(hours=25),
        walked_path=[], walk_notes=[],
    )
    fresh = L1WalkSession(
        account_id=account.id,
        created_by_user_id=l1_user.id,
        ticket_id='y', ticket_kind='internal', session_kind='adhoc',
        status='active',
        last_step_at=datetime.now(timezone.utc) - timedelta(hours=1),
        walked_path=[], walk_notes=[],
    )
    db_session.add_all([stale, fresh])
    await db_session.flush()
    count = await flip_stale_sessions(db_session)
    assert count == 1
    await db_session.refresh(stale)
    await db_session.refresh(fresh)
    assert stale.status == 'abandoned'
    assert fresh.status == 'active'
  • Step 2: Run, verify failure.

  • Step 3: Implement.

backend/app/services/l1_session_cleanup.py:

import logging
from datetime import datetime, timedelta, timezone

from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.l1_walk_session import L1WalkSession


logger = logging.getLogger(__name__)


async def flip_stale_sessions(db: AsyncSession) -> int:
    """
    Flip active sessions to 'abandoned' if their last_step_at is older than 24 hours.
    Returns the number of rows flipped.
    """
    cutoff = datetime.now(timezone.utc) - timedelta(hours=24)
    stmt = (
        update(L1WalkSession)
        .where(L1WalkSession.status == 'active')
        .where(L1WalkSession.last_step_at < cutoff)
        .values(status='abandoned')
    )
    result = await db.execute(stmt)
    await db.commit()
    return result.rowcount or 0


async def run_cleanup_job(session_factory) -> None:
    """Entrypoint for APScheduler — uses _admin_session_factory per Lesson on RLS at startup."""
    async with session_factory() as db:
        try:
            count = await flip_stale_sessions(db)
            if count > 0:
                logger.info("l1_session_cleanup: flipped %d sessions to abandoned", count)
        except Exception:
            logger.exception("l1_session_cleanup: error during run")
  • Step 4: Register the job in main.py lifespan.

In backend/app/main.py, find the lifespan / APScheduler setup and add:

from app.services.l1_session_cleanup import run_cleanup_job

# Inside the lifespan startup section, alongside other scheduled jobs:
scheduler.add_job(
    run_cleanup_job,
    'interval',
    hours=1,
    max_instances=1,  # Lesson 1
    args=[_admin_session_factory],
    id='l1_session_cleanup',
    replace_existing=True,
)
  • Step 5: Run unit test, verify pass.
docker exec resolutionflow_backend pytest backend/tests/test_l1_session_cleanup.py -v
  • Step 6: Verify job registered at startup (manual smoke).

Restart backend, grep logs for l1_session_cleanup job-registered line. APScheduler logs job registration at startup.

  • Step 7: Commit.
git add backend/app/services/l1_session_cleanup.py backend/app/main.py backend/tests/test_l1_session_cleanup.py
git commit -m "feat(l1): APScheduler hourly cleanup job for abandoned sessions"

Task 17: RLS regression tests for new tables

Files:

  • Create: backend/tests/test_l1_rls.py

  • Step 1: Write the tests.

import pytest
from sqlalchemy import select
from app.models.internal_ticket import InternalTicket
from app.models.l1_walk_session import L1WalkSession


@pytest.mark.asyncio
async def test_l1_cannot_read_other_accounts_internal_tickets(
    db_account_a_session, account_b_internal_ticket
):
    """RLS must block cross-tenant reads on internal_tickets."""
    stmt = select(InternalTicket).where(InternalTicket.id == account_b_internal_ticket.id)
    result = await db_account_a_session.execute(stmt)
    assert result.scalar_one_or_none() is None


@pytest.mark.asyncio
async def test_l1_cannot_read_other_accounts_walk_sessions(
    db_account_a_session, account_b_walk_session
):
    stmt = select(L1WalkSession).where(L1WalkSession.id == account_b_walk_session.id)
    result = await db_account_a_session.execute(stmt)
    assert result.scalar_one_or_none() is None


@pytest.mark.asyncio
async def test_with_check_blocks_cross_tenant_insert(db_account_a_session, account_b):
    """RLS WITH CHECK must reject inserts setting another account's account_id."""
    bad_row = InternalTicket(
        account_id=account_b.id,
        created_by_user_id=...,
        problem_statement='cross-tenant attempt',
    )
    db_account_a_session.add(bad_row)
    with pytest.raises(Exception):  # InsufficientPrivilegeError or similar
        await db_account_a_session.flush()

(Use fixtures from the existing RLS test suite — db_account_a_session already exists per the project's tenant-isolation Phase 4 work. Adapt to actual fixture names.)

  • Step 2: Run.
docker exec resolutionflow_backend pytest backend/tests/test_l1_rls.py -v

Expected: 3 PASSED.

  • Step 3: Commit.
git add backend/tests/test_l1_rls.py
git commit -m "test(l1): RLS regression tests for internal_tickets + l1_walk_sessions"

Task 18: Frontend — usePermissions extensions + role type

Files:

  • Modify: frontend/src/types/auth.ts (or frontend/src/types/user.ts — wherever User.account_role is typed)

  • Modify: frontend/src/hooks/usePermissions.ts

  • Step 1: Locate the role union type.

grep -rn "account_role:" frontend/src/types/
  • Step 2: Add 'l1_tech' to the union.

In the relevant file (likely frontend/src/types/auth.ts), find the type:

export type AccountRole = 'owner' | 'engineer' | 'viewer'

Change to:

export type AccountRole = 'owner' | 'engineer' | 'l1_tech' | 'viewer'
  • Step 3: Update usePermissions.ts.

Open usePermissions.ts. Find the getEffectiveRole and hasMinimumRole functions and update:

type EffectiveRole = 'super_admin' | 'owner' | 'engineer' | 'l1_tech' | 'viewer'

function getEffectiveRole(user: User | null): EffectiveRole {
  if (!user) return 'viewer'
  if (user.is_super_admin) return 'super_admin'
  if (user.account_role === 'owner') return 'owner'
  if (user.account_role === 'engineer') return 'engineer'
  if (user.account_role === 'l1_tech') return 'l1_tech'
  return 'viewer'
}

const ROLE_RANK: Record<EffectiveRole, number> = {
  super_admin: 5,
  owner: 4,
  engineer: 3,
  l1_tech: 2,
  viewer: 1,
}

function hasMinimumRole(user: User | null, minimum: EffectiveRole): boolean {
  const effective = getEffectiveRole(user)
  return ROLE_RANK[effective] >= ROLE_RANK[minimum]
}

And in the main return object of usePermissions(), add:

return {
  // ... existing returned values ...
  isL1Tech: effectiveRole === 'l1_tech',
  canCoverL1: Boolean(user?.can_cover_l1) || effectiveRole === 'owner' || effectiveRole === 'super_admin',
  canUseL1Surface: effectiveRole === 'l1_tech' || effectiveRole === 'owner' || effectiveRole === 'super_admin' || (user?.account_role === 'engineer' && Boolean(user?.can_cover_l1)),
  canUseEngineerSurface: hasMinimumRole(user, 'engineer'),
}
  • Step 4: Update User type to include can_cover_l1.

In the User type definition:

export interface User {
  id: string
  email: string
  // ... existing fields ...
  account_role: AccountRole
  is_super_admin: boolean
  can_cover_l1: boolean   // NEW
  // ... rest ...
}
  • Step 5: Run frontend type check.
docker exec -w /app resolutionflow_frontend npx tsc -b

Expected: no new type errors (existing usages of account_role may need updates — fix any compilation errors caused by the union widening).

  • Step 6: Commit.
git add frontend/src/types/auth.ts frontend/src/hooks/usePermissions.ts
git commit -m "feat(l1): usePermissions extensions for l1_tech + coverage flag"

Task 19: Frontend — Sidebar role-based nav + ProtectedRoute redirect

Files:

  • Modify: frontend/src/components/layout/Sidebar.tsx

  • Modify: frontend/src/components/layout/ProtectedRoute.tsx

  • Step 1: Update Sidebar.tsx to render role-based nav.

Find the nav array construction in Sidebar.tsx. Wrap it in a helper that picks per role:

import { usePermissions } from '@/hooks/usePermissions'
import { LayoutGrid, Ticket, FileText, BookOpen, Settings } from 'lucide-react'

function getNavItems(perms: ReturnType<typeof usePermissions>) {
  // L1 tech sees only L1-relevant entries
  if (perms.isL1Tech) {
    return [
      { href: '/l1', icon: LayoutGrid, label: 'Workspace', shortLabel: 'Work' },
      { href: '/l1/tickets', icon: Ticket, label: 'Tickets', shortLabel: 'Tickets' },
      { href: '/l1/drafts', icon: FileText, label: 'My Drafts', shortLabel: 'Drafts' },
      { href: '/guides', icon: BookOpen, label: 'Guides', shortLabel: 'Guides' },
      { href: '/account', icon: Settings, label: 'Account', shortLabel: 'Acct' },
    ]
  }

  // Existing engineer/owner nav (kept as-is)
  const items = [/* ... existing items ... */]

  // Append L1 Workspace entry for coverage engineers + owners
  if (perms.canCoverL1) {
    items.push({
      href: '/l1', icon: LayoutGrid, label: 'L1 Workspace', shortLabel: 'L1',
    })
  }
  return items
}

// Inside the Sidebar component:
const perms = usePermissions()
const navItems = getNavItems(perms)
  • Step 2: Add L1 post-login redirect to ProtectedRoute.tsx.

In ProtectedRoute.tsx, find the existing auth-state handling and add:

// After authentication checks pass, before rendering children:
if (user?.account_role === 'l1_tech' && location.pathname === '/') {
  return <Navigate to="/l1" replace />
}
  • Step 3: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b
  • Step 4: Commit.
git add frontend/src/components/layout/Sidebar.tsx frontend/src/components/layout/ProtectedRoute.tsx
git commit -m "feat(l1): role-based sidebar nav + L1 post-login redirect"

Task 20: Frontend — router updates + L1RouteGuard

Files:

  • Create: frontend/src/components/layout/L1RouteGuard.tsx

  • Modify: frontend/src/router.tsx

  • Step 1: Create the guard component.

import { Navigate } from 'react-router'
import { usePermissions } from '@/hooks/usePermissions'

export function L1RouteGuard({ children }: { children: React.ReactNode }) {
  const perms = usePermissions()
  if (!perms.canUseL1Surface) {
    return <Navigate to="/" replace />
  }
  return <>{children}</>
}
  • Step 2: Register routes.

In frontend/src/router.tsx, near the other lazyWithRetry declarations:

const L1Dashboard = lazyWithRetry(() => import('@/pages/l1/L1Dashboard'))
const L1WalkPage = lazyWithRetry(() => import('@/pages/l1/L1WalkPage'))
const L1DraftsPage = lazyWithRetry(() => import('@/pages/l1/L1DraftsPage'))
const L1TicketsPage = lazyWithRetry(() => import('@/pages/l1/L1TicketsPage'))

And inside the / ProtectedRoute children array, alongside existing routes:

{ path: 'l1', element: <L1RouteGuard><L1Dashboard /></L1RouteGuard> },
{ path: 'l1/walk/:sessionId', element: <L1RouteGuard><L1WalkPage /></L1RouteGuard> },
{ path: 'l1/drafts', element: <L1RouteGuard><L1DraftsPage /></L1RouteGuard> },
{ path: 'l1/tickets', element: <L1RouteGuard><L1TicketsPage /></L1RouteGuard> },

The lazyWithRetry-wrapped components are React.lazy under the hood, so they need a Suspense boundary already provided by the existing page() helper. Adapt to whichever pattern the existing routes use (likely element: page(L1Dashboard) wrapped manually, or pass through the guard).

Adjust based on actual router pattern:

{ path: 'l1', element: <L1RouteGuard>{page(L1Dashboard)}</L1RouteGuard> },
  • Step 3: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b

(Will fail until the actual page components exist in Task 21+. That's OK — placeholder check that imports resolve.)

  • Step 4: Commit (pages stubbed in next task).
git add frontend/src/components/layout/L1RouteGuard.tsx frontend/src/router.tsx
git commit -m "feat(l1): register /l1/* routes + L1RouteGuard"

Task 21: Frontend — L1Dashboard (active + empty state)

Files:

  • Create: frontend/src/pages/l1/L1Dashboard.tsx

  • Create: frontend/src/components/l1/EmptyStateCard.tsx

  • Create: frontend/src/components/l1/ResumeInProgress.tsx

  • Create: frontend/src/api/l1.ts

  • Create: frontend/src/types/l1.ts

  • Step 1: Create types.

frontend/src/types/l1.ts:

export type SessionKind = 'flow' | 'proposal' | 'adhoc'
export type SessionStatus = 'active' | 'resolved' | 'escalated' | 'abandoned'
export type TicketKind = 'psa' | 'internal'

export interface WalkSession {
  id: string
  session_kind: SessionKind
  flow_id: string | null
  flow_proposal_id: string | null
  current_node_id: string | null
  walked_path: WalkStep[]
  walk_notes: AdhocNote[]
  status: SessionStatus
  started_at: string
  last_step_at: string
  resolved_at: string | null
}

export interface WalkStep {
  node_id: string
  question: string
  answer: string
  l1_note: string | null
}

export interface AdhocNote {
  timestamp: string
  content: string
}

export interface QueueRow {
  ticket_id: string
  ticket_kind: TicketKind
  problem_statement: string | null
  customer_name: string | null
  status: string
  created_at: string | null
}

export interface IntakeRequest {
  problem_statement: string
  customer_name?: string
  customer_contact?: string
  flow_id?: string
}

export interface IntakeResponse {
  session_id: string
  session_kind: SessionKind
  ticket_id: string
  ticket_kind: TicketKind
}
  • Step 2: Create API client.

frontend/src/api/l1.ts:

import { apiClient } from './client'
import type {
  IntakeRequest, IntakeResponse, QueueRow, WalkSession,
  WalkStep, AdhocNote,
} from '@/types/l1'

export const l1Api = {
  intake: (body: IntakeRequest) =>
    apiClient.post<IntakeResponse>('/api/v1/l1/intake', body).then(r => r.data),

  queue: (statusFilter?: string) =>
    apiClient.get<QueueRow[]>('/api/v1/l1/queue', {
      params: statusFilter ? { status_filter: statusFilter } : {},
    }).then(r => r.data),

  getSession: (sessionId: string) =>
    apiClient.get<WalkSession>(`/api/v1/l1/sessions/${sessionId}`).then(r => r.data),

  listActiveSessions: () =>
    apiClient.get<WalkSession[]>('/api/v1/l1/sessions/active').then(r => r.data),

  step: (sessionId: string, step: { node_id: string; question: string; answer: string; note?: string | null }) =>
    apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/step`, step).then(r => r.data),

  notes: (sessionId: string, notes: AdhocNote[]) =>
    apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/notes`, { notes }).then(r => r.data),

  resolve: (sessionId: string, body: { helpful: boolean; resolution_notes: string }) =>
    apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/resolve`, body).then(r => r.data),

  escalate: (sessionId: string, body: { reason: string; reason_category: string }) =>
    apiClient.post<WalkSession>(`/api/v1/l1/sessions/${sessionId}/escalate`, body).then(r => r.data),

  escalateWithoutWalk: (body: {
    problem_statement: string; customer_name?: string; customer_contact?: string;
    reason_category: string; reason?: string;
  }) =>
    apiClient.post<WalkSession>('/api/v1/l1/escalate-without-walk', body).then(r => r.data),
}
  • Step 3: Create EmptyStateCard.

frontend/src/components/l1/EmptyStateCard.tsx:

import { usePermissions } from '@/hooks/usePermissions'

interface Props {
  onUploadClick?: () => void
  onConfigureConnectorClick?: () => void
}

export function EmptyStateCard({ onUploadClick, onConfigureConnectorClick }: Props) {
  const perms = usePermissions()
  const isOwnerOrCoverage = perms.canCoverL1   // (owner/super_admin always have it)

  return (
    <div className="rounded-lg border border-default bg-card p-6">
      <h2 className="font-heading text-xl font-bold text-heading mb-2">
        Your knowledge base is empty
      </h2>
      <p className="text-muted-foreground mb-4">
        L1 Workspace works best when your account has KB content or authored flows.
        Right now there's nothing to match against  calls will start as ad-hoc walks.
      </p>
      {isOwnerOrCoverage ? (
        <div className="flex gap-3">
          {onUploadClick && (
            <button
              type="button"
              onClick={onUploadClick}
              className="rounded-md bg-accent text-white px-4 py-2 text-sm font-medium hover:bg-accent/90 transition-colors"
            >
              Upload KB content
            </button>
          )}
          {onConfigureConnectorClick && (
            <button
              type="button"
              onClick={onConfigureConnectorClick}
              className="rounded-md border border-default px-4 py-2 text-sm font-medium hover:bg-elevated transition-colors"
            >
              Configure connector
            </button>
          )}
        </div>
      ) : (
        <ul className="text-sm text-muted-foreground space-y-1 ml-4 list-disc">
          <li>Ask your admin to upload KB documents</li>
          <li>Or configure a KB connector under Account  KB</li>
          <li>Or author a flow in the Flows library</li>
        </ul>
      )}
    </div>
  )
}
  • Step 4: Create ResumeInProgress.

frontend/src/components/l1/ResumeInProgress.tsx:

import { useEffect, useState } from 'react'
import { Link } from 'react-router'
import { l1Api } from '@/api/l1'
import type { WalkSession } from '@/types/l1'

export function ResumeInProgress() {
  const [sessions, setSessions] = useState<WalkSession[] | null>(null)

  useEffect(() => {
    l1Api.listActiveSessions().then(setSessions).catch(() => setSessions([]))
  }, [])

  if (!sessions || sessions.length === 0) return null

  return (
    <section>
      <div className="flex items-center gap-3 mb-3">
        <span className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground">
          Resume in progress · {sessions.length}
        </span>
        <div className="flex-1 h-px bg-border" />
      </div>
      <div className="rounded-lg border border-default bg-card overflow-hidden">
        {sessions.map((s) => (
          <Link
            key={s.id}
            to={`/l1/walk/${s.id}`}
            className="flex items-center justify-between px-4 py-3 hover:bg-elevated transition-colors border-b border-default last:border-b-0"
          >
            <div className="flex items-center gap-3">
              <span className="font-mono text-xs text-muted-foreground">#{s.id.slice(0, 8)}</span>
              <span className="text-sm">
                {s.session_kind === 'adhoc'
                  ? `Adhoc · ${s.walk_notes.length} notes`
                  : `Step ${s.walked_path.length}`}
              </span>
            </div>
            <span className="text-xs text-muted-foreground">
              {new Date(s.last_step_at).toLocaleTimeString()}
            </span>
          </Link>
        ))}
      </div>
    </section>
  )
}
  • Step 5: Create the Dashboard page.

frontend/src/pages/l1/L1Dashboard.tsx:

import { useEffect, useState } from 'react'
import { useNavigate } from 'react-router'
import { PageMeta } from '@/components/common/PageMeta'
import { useAuthStore } from '@/store/authStore'
import { l1Api } from '@/api/l1'
import { EmptyStateCard } from '@/components/l1/EmptyStateCard'
import { ResumeInProgress } from '@/components/l1/ResumeInProgress'
import type { QueueRow } from '@/types/l1'
// Existing helper to fetch flows + KB doc count:
import { dashboardApi } from '@/api/dashboard'   // adjust import to actual path

export default function L1Dashboard() {
  const user = useAuthStore((s) => s.user)
  const navigate = useNavigate()
  const [problem, setProblem] = useState('')
  const [customerName, setCustomerName] = useState('')
  const [customerContact, setCustomerContact] = useState('')
  const [submitting, setSubmitting] = useState(false)
  const [queue, setQueue] = useState<QueueRow[]>([])
  const [isEmpty, setIsEmpty] = useState<boolean | null>(null)

  useEffect(() => {
    l1Api.queue('open').then(setQueue).catch(() => setQueue([]))
    // Detect empty state via the dashboard stats endpoint (existing).
    // Adjust to actual endpoint that returns flow + KB counts.
    dashboardApi.getStats().then(stats => {
      setIsEmpty((stats?.flow_count ?? 0) === 0 && (stats?.kb_doc_count ?? 0) === 0)
    }).catch(() => setIsEmpty(false))
  }, [])

  const handleStart = async () => {
    if (!problem.trim()) return
    setSubmitting(true)
    try {
      const response = await l1Api.intake({
        problem_statement: problem.trim(),
        customer_name: customerName.trim() || undefined,
        customer_contact: customerContact.trim() || undefined,
      })
      navigate(`/l1/walk/${response.session_id}`)
    } finally {
      setSubmitting(false)
    }
  }

  const now = new Date()
  const greeting = now.getHours() < 12 ? 'morning' : now.getHours() < 18 ? 'afternoon' : 'evening'
  const firstName = user?.name?.split(' ')[0] || 'there'

  return (
    <div className="overflow-y-auto h-full">
      <PageMeta title="L1 Workspace" />
      <div className="max-w-4xl mx-auto px-6 pt-12 pb-12 space-y-8">
        {/* Hero greeting */}
        <div>
          <p className="font-sans text-xs uppercase tracking-[0.12em] text-muted-foreground mb-1">
            {now.toLocaleDateString('en-US', { weekday: 'long', month: 'long', day: 'numeric' })}
          </p>
          <h1 className="font-heading text-3xl sm:text-4xl font-extrabold tracking-tight text-heading leading-tight">
            Good {greeting}, {firstName}.
          </h1>
        </div>

        {/* Empty state (first-run) */}
        {isEmpty && <EmptyStateCard />}

        {/* Describe the problem */}
        <section>
          <div className="flex items-center gap-3 mb-3">
            <span className="w-1 h-4 bg-accent rounded-sm" />
            <span className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground">
              Describe the problem
            </span>
          </div>
          <div className="rounded-lg border border-default bg-card p-4 space-y-3">
            <textarea
              value={problem}
              onChange={(e) => setProblem(e.target.value)}
              placeholder="What's the user calling about?"
              autoFocus
              rows={3}
              className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40"
            />
            <div className="grid grid-cols-2 gap-3">
              <input
                value={customerName}
                onChange={(e) => setCustomerName(e.target.value)}
                placeholder="Customer name (optional)"
                className="bg-page border border-default rounded-md px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40"
              />
              <input
                value={customerContact}
                onChange={(e) => setCustomerContact(e.target.value)}
                placeholder="Email or phone (optional)"
                className="bg-page border border-default rounded-md px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40"
              />
            </div>
            <div className="flex justify-end">
              <button
                type="button"
                onClick={handleStart}
                disabled={!problem.trim() || submitting}
                className="rounded-md bg-accent text-white px-5 py-2 text-sm font-medium hover:bg-accent/90 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
              >
                {submitting ? 'Starting…' : 'Start walk →'}
              </button>
            </div>
          </div>
        </section>

        {/* Open tickets */}
        {queue.length > 0 && (
          <section>
            <div className="flex items-center gap-3 mb-3">
              <span className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground">
                Open tickets · {queue.length}
              </span>
              <div className="flex-1 h-px bg-border" />
            </div>
            <div className="rounded-lg border border-default bg-card overflow-hidden">
              {queue.map((row) => (
                // Phase 1: rows are display-only. Each ticket already has an active session
                // owned by the L1 who created it (visible in their "Resume in progress" widget).
                // Cross-L1 ticket claiming + PSA-fed queue rows ship in Phase 2 — at that point
                // rows become clickable and call a "start session from ticket" endpoint.
                <div
                  key={row.ticket_id}
                  className="px-4 py-3 border-b border-default last:border-b-0 transition-colors"
                >
                  <div className="flex items-center justify-between">
                    <span className="text-sm">
                      <span className="font-mono text-xs text-muted-foreground mr-2">
                        #{row.ticket_id.slice(0, 8)}
                      </span>
                      {row.problem_statement}
                    </span>
                    <span className="text-xs text-muted-foreground">
                      {row.ticket_kind === 'psa' ? 'PSA' : 'Internal'}
                    </span>
                  </div>
                </div>
              ))}
            </div>
          </section>
        )}

        {/* Resume in progress */}
        <ResumeInProgress />
      </div>
    </div>
  )
}
  • Step 6: Type-check + manual smoke.
docker exec -w /app resolutionflow_frontend npx tsc -b

Start dev server, log in as an L1, verify the page renders.

  • Step 7: Commit.
git add frontend/src/pages/l1/L1Dashboard.tsx frontend/src/components/l1/EmptyStateCard.tsx frontend/src/components/l1/ResumeInProgress.tsx frontend/src/api/l1.ts frontend/src/types/l1.ts
git commit -m "feat(l1): L1 dashboard with empty state + resume widget"

Task 22: Frontend — L1WalkPage tree variant

Files:

  • Create: frontend/src/pages/l1/L1WalkPage.tsx

  • Create: frontend/src/components/l1/L1WalkTreeVariant.tsx

  • Step 1: Create the WalkPage wrapper that dispatches to variant.

frontend/src/pages/l1/L1WalkPage.tsx:

import { useEffect, useState } from 'react'
import { useParams, useNavigate } from 'react-router'
import { PageMeta } from '@/components/common/PageMeta'
import { l1Api } from '@/api/l1'
import { L1WalkTreeVariant } from '@/components/l1/L1WalkTreeVariant'
import { L1WalkAdhocVariant } from '@/components/l1/L1WalkAdhocVariant'
import type { WalkSession } from '@/types/l1'

export default function L1WalkPage() {
  const { sessionId } = useParams<{ sessionId: string }>()
  const navigate = useNavigate()
  const [session, setSession] = useState<WalkSession | null>(null)
  const [error, setError] = useState<string | null>(null)

  useEffect(() => {
    if (!sessionId) return
    l1Api.getSession(sessionId)
      .then(setSession)
      .catch(err => setError(err?.message || 'Failed to load session'))
  }, [sessionId])

  if (error) {
    return <div className="p-6 text-error-foreground">{error}</div>
  }
  if (!session) {
    return <div className="p-6 text-muted-foreground">Loading</div>
  }

  const handleResolved = () => navigate('/l1')
  const handleEscalated = () => navigate('/l1')

  return (
    <>
      <PageMeta title="L1 Walk" />
      {session.session_kind === 'adhoc' ? (
        <L1WalkAdhocVariant
          session={session}
          onSessionUpdate={setSession}
          onResolved={handleResolved}
          onEscalated={handleEscalated}
        />
      ) : (
        <L1WalkTreeVariant
          session={session}
          onSessionUpdate={setSession}
          onResolved={handleResolved}
          onEscalated={handleEscalated}
        />
      )}
    </>
  )
}
  • Step 2: Create the tree variant.

frontend/src/components/l1/L1WalkTreeVariant.tsx:

import { useState } from 'react'
import { ChevronLeft } from 'lucide-react'
import { Link } from 'react-router'
import { l1Api } from '@/api/l1'
import type { WalkSession } from '@/types/l1'
// Reuse existing flow node fetcher — adapt import to actual path:
// import { flowsApi } from '@/api/flows'

interface Props {
  session: WalkSession
  onSessionUpdate: (s: WalkSession) => void
  onResolved: () => void
  onEscalated: () => void
}

export function L1WalkTreeVariant({ session, onSessionUpdate, onResolved, onEscalated }: Props) {
  const [showResolve, setShowResolve] = useState(false)
  const [showEscalate, setShowEscalate] = useState(false)
  const [currentNode, setCurrentNode] = useState<any | null>(null)  // Replace `any` with Node type
  const [note, setNote] = useState('')

  // ... load currentNode based on session.flow_id + session.current_node_id ...
  // ... pseudo-code; integrate with existing flow-loading helper ...

  const handleAnswer = async (answer: string) => {
    if (!currentNode) return
    const updated = await l1Api.step(session.id, {
      node_id: currentNode.id,
      question: currentNode.question || currentNode.title,
      answer,
      note: note || null,
    })
    onSessionUpdate(updated)
    setNote('')
    // ... advance currentNode based on tree edge for `answer` ...
  }

  return (
    <div className="flex flex-col h-full">
      {/* Header */}
      <header className="border-b border-default px-6 py-4 flex items-center justify-between bg-sidebar">
        <Link to="/l1" className="flex items-center gap-2 text-muted-foreground hover:text-heading">
          <ChevronLeft className="w-4 h-4" />
          <span className="font-mono text-xs">#{session.id.slice(0, 8)}</span>
          {session.session_kind === 'proposal' && (
            <span className="ml-2 text-xs bg-accent/10 text-accent px-2 py-0.5 rounded">AI-built</span>
          )}
        </Link>
        <div className="flex gap-2">
          <button
            onClick={() => setShowEscalate(true)}
            className="rounded-md border border-default px-3 py-1.5 text-sm hover:bg-elevated transition-colors"
          >
            Escalate
          </button>
          <button
            onClick={() => setShowResolve(true)}
            className="rounded-md bg-accent text-white px-3 py-1.5 text-sm hover:bg-accent/90 transition-colors"
          >
            Resolve 
          </button>
        </div>
      </header>

      {/* Two-pane body */}
      <div className="flex-1 flex min-h-0">
        <main className="flex-1 p-6 overflow-y-auto">
          <p className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground mb-2">
            Step {session.walked_path.length + 1}
          </p>
          {currentNode ? (
            <div className="rounded-lg border border-default bg-card p-6 max-w-2xl">
              <p className="text-lg mb-6">{currentNode.question || currentNode.title}</p>
              <div className="flex gap-3">
                <button
                  onClick={() => handleAnswer('yes')}
                  className="flex-1 rounded-md bg-accent text-white py-3 text-base font-medium hover:bg-accent/90 min-h-[44px]"
                >
                  Yes
                </button>
                <button
                  onClick={() => handleAnswer('no')}
                  className="flex-1 rounded-md border border-default py-3 text-base font-medium hover:bg-elevated min-h-[44px]"
                >
                  No
                </button>
              </div>
              <textarea
                value={note}
                onChange={(e) => setNote(e.target.value)}
                placeholder="Optional note for this step…"
                rows={2}
                className="mt-4 w-full bg-page border border-default rounded-md px-3 py-2 text-sm"
              />
            </div>
          ) : (
            <p className="text-muted-foreground">Loading flow</p>
          )}
        </main>

        {/* Right pane: transcript */}
        <aside className="w-80 border-l border-default bg-page p-4 overflow-y-auto">
          <p className="font-sans text-[0.625rem] uppercase tracking-[0.12em] font-semibold text-muted-foreground mb-3">
            Walked so far
          </p>
          <ol className="space-y-2 text-sm">
            {session.walked_path.map((step, i) => (
              <li key={i} className="flex flex-col">
                <span className="text-muted-foreground text-xs">{step.question}</span>
                <span className="font-medium"> {step.answer}</span>
                {step.l1_note && <span className="text-muted-foreground text-xs italic">{step.l1_note}</span>}
              </li>
            ))}
          </ol>
        </aside>
      </div>

      {/* Resolve modal */}
      {showResolve && (
        <ResolveModal
          onClose={() => setShowResolve(false)}
          onConfirm={async (helpful, notes) => {
            await l1Api.resolve(session.id, { helpful, resolution_notes: notes })
            onResolved()
          }}
        />
      )}
      {/* Escalate modal */}
      {showEscalate && (
        <EscalateModal
          onClose={() => setShowEscalate(false)}
          onConfirm={async (category, reason) => {
            await l1Api.escalate(session.id, { reason, reason_category: category })
            onEscalated()
          }}
        />
      )}
    </div>
  )
}

function ResolveModal({ onClose, onConfirm }: { onClose: () => void; onConfirm: (helpful: boolean, notes: string) => Promise<void> }) {
  const [helpful, setHelpful] = useState<boolean | null>(null)
  const [notes, setNotes] = useState('')
  const [submitting, setSubmitting] = useState(false)

  return (
    <div className="fixed inset-0 bg-black/60 flex items-center justify-center z-50">
      <div className="bg-card border border-default rounded-lg p-6 max-w-md w-full">
        <h3 className="font-heading text-lg font-bold mb-4">Did this resolve it?</h3>
        <div className="flex gap-3 mb-4">
          <button
            onClick={() => setHelpful(true)}
            className={`flex-1 py-2 rounded-md ${helpful === true ? 'bg-accent text-white' : 'border border-default hover:bg-elevated'}`}
          >
            Yes
          </button>
          <button
            onClick={() => setHelpful(false)}
            className={`flex-1 py-2 rounded-md ${helpful === false ? 'bg-warning text-white' : 'border border-default hover:bg-elevated'}`}
          >
            No
          </button>
        </div>
        <textarea
          value={notes}
          onChange={(e) => setNotes(e.target.value)}
          rows={3}
          placeholder="Resolution notes…"
          className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm mb-4"
        />
        <div className="flex justify-end gap-2">
          <button onClick={onClose} className="rounded-md border border-default px-4 py-2 text-sm">Cancel</button>
          <button
            disabled={helpful === null || submitting}
            onClick={async () => {
              setSubmitting(true)
              try { await onConfirm(helpful!, notes) } finally { setSubmitting(false) }
            }}
            className="rounded-md bg-accent text-white px-4 py-2 text-sm disabled:opacity-50"
          >
            {submitting ? 'Saving…' : 'Confirm'}
          </button>
        </div>
      </div>
    </div>
  )
}

function EscalateModal({ onClose, onConfirm }: { onClose: () => void; onConfirm: (category: string, reason: string) => Promise<void> }) {
  const [category, setCategory] = useState('Out of L1 scope')
  const [reason, setReason] = useState('')
  const [submitting, setSubmitting] = useState(false)

  return (
    <div className="fixed inset-0 bg-black/60 flex items-center justify-center z-50">
      <div className="bg-card border border-default rounded-lg p-6 max-w-md w-full">
        <h3 className="font-heading text-lg font-bold mb-4">Escalate to engineering</h3>
        <label className="block text-xs uppercase tracking-wider text-muted-foreground mb-1">Reason</label>
        <select
          value={category}
          onChange={(e) => setCategory(e.target.value)}
          className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm mb-3"
        >
          <option>Out of L1 scope</option>
          <option>Customer demanding senior</option>
          <option>Tree dead-ended</option>
          <option>AI tree wrong</option>
          <option>No KB available</option>
          <option>Other</option>
        </select>
        <textarea
          value={reason}
          onChange={(e) => setReason(e.target.value)}
          rows={3}
          placeholder="Details (optional)…"
          className="w-full bg-page border border-default rounded-md px-3 py-2 text-sm mb-4"
        />
        <div className="flex justify-end gap-2">
          <button onClick={onClose} className="rounded-md border border-default px-4 py-2 text-sm">Cancel</button>
          <button
            disabled={submitting}
            onClick={async () => {
              setSubmitting(true)
              try { await onConfirm(category, reason) } finally { setSubmitting(false) }
            }}
            className="rounded-md bg-warning text-white px-4 py-2 text-sm disabled:opacity-50"
          >
            {submitting ? 'Escalating…' : 'Confirm escalate'}
          </button>
        </div>
      </div>
    </div>
  )
}

(The currentNode loading from the flow tree is left as an integration with the existing flow-fetch helper — adapt to the actual codebase pattern. The structure here is the variant scaffolding; node navigation will pull from trees data.)

  • Step 3: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b
  • Step 4: Commit.
git add frontend/src/pages/l1/L1WalkPage.tsx frontend/src/components/l1/L1WalkTreeVariant.tsx
git commit -m "feat(l1): L1WalkPage with tree variant (yes/no walker, escalate/resolve modals)"

Task 23: Frontend — adhoc walker variant

Files:

  • Create: frontend/src/components/l1/L1WalkAdhocVariant.tsx

  • Step 1: Create the adhoc variant.

import { useEffect, useRef, useState } from 'react'
import { ChevronLeft } from 'lucide-react'
import { Link } from 'react-router'
import { l1Api } from '@/api/l1'
import type { AdhocNote, WalkSession } from '@/types/l1'

interface Props {
  session: WalkSession
  onSessionUpdate: (s: WalkSession) => void
  onResolved: () => void
  onEscalated: () => void
}

export function L1WalkAdhocVariant({ session, onSessionUpdate, onResolved, onEscalated }: Props) {
  const [notesText, setNotesText] = useState(() =>
    session.walk_notes.map(n => n.content).join('\n\n')
  )
  const [savedAt, setSavedAt] = useState<Date | null>(null)
  const [showResolve, setShowResolve] = useState(false)
  const [showEscalate, setShowEscalate] = useState(false)
  const saveTimer = useRef<number | null>(null)

  // Debounced autosave
  useEffect(() => {
    if (saveTimer.current) window.clearTimeout(saveTimer.current)
    saveTimer.current = window.setTimeout(async () => {
      const notes: AdhocNote[] = notesText
        .split('\n\n')
        .map(c => c.trim())
        .filter(Boolean)
        .map(content => ({ timestamp: new Date().toISOString(), content }))
      try {
        const updated = await l1Api.notes(session.id, notes)
        onSessionUpdate(updated)
        setSavedAt(new Date())
      } catch {
        // Surface error via toast; omitted here for brevity
      }
    }, 300)
    return () => {
      if (saveTimer.current) window.clearTimeout(saveTimer.current)
    }
  }, [notesText, session.id])

  return (
    <div className="flex flex-col h-full">
      <header className="border-b border-default px-6 py-4 flex items-center justify-between bg-sidebar">
        <Link to="/l1" className="flex items-center gap-2 text-muted-foreground hover:text-heading">
          <ChevronLeft className="w-4 h-4" />
          <span className="font-mono text-xs">#{session.id.slice(0, 8)}</span>
          <span className="ml-2 text-xs bg-info/10 text-info px-2 py-0.5 rounded">Ad-hoc walk</span>
        </Link>
        <div className="flex gap-2">
          <button
            onClick={() => setShowEscalate(true)}
            className="rounded-md border border-default px-3 py-1.5 text-sm hover:bg-elevated transition-colors"
          >
            Escalate
          </button>
          <button
            onClick={() => setShowResolve(true)}
            className="rounded-md bg-accent text-white px-3 py-1.5 text-sm hover:bg-accent/90 transition-colors"
          >
            Resolve 
          </button>
        </div>
      </header>

      <main className="flex-1 p-6 overflow-y-auto">
        <div className="max-w-3xl mx-auto">
          <p className="text-sm text-muted-foreground mb-3">
            Take notes as you work through the call. They're auto-saved.
          </p>
          <textarea
            value={notesText}
            onChange={(e) => setNotesText(e.target.value)}
            rows={20}
            placeholder="What did the customer say? What did you check? What did you try?"
            className="w-full bg-card border border-default rounded-md px-4 py-3 text-sm focus:outline-none focus:ring-2 focus:ring-accent/40 leading-relaxed"
          />
          {savedAt && (
            <p className="text-xs text-muted-foreground mt-2">
              Saved {Math.round((Date.now() - savedAt.getTime()) / 1000)}s ago
            </p>
          )}
        </div>
      </main>

      {/* Reuse the ResolveModal + EscalateModal from L1WalkTreeVariant — extract to a shared module
          in a small follow-up commit if duplication is undesirable. Phase 1 is fine inline. */}
      {showResolve && (
        <ResolveModalInline
          defaultNotes={notesText}
          onClose={() => setShowResolve(false)}
          onConfirm={async (helpful, notes) => {
            await l1Api.resolve(session.id, { helpful, resolution_notes: notes })
            onResolved()
          }}
        />
      )}
      {showEscalate && (
        <EscalateModalInline
          onClose={() => setShowEscalate(false)}
          onConfirm={async (category, reason) => {
            await l1Api.escalate(session.id, { reason, reason_category: category })
            onEscalated()
          }}
        />
      )}
    </div>
  )
}

// Same as ResolveModal/EscalateModal in L1WalkTreeVariant; rename and re-export
// from a shared file in a small refactor pass after Phase 1 lands.
function ResolveModalInline(props: any) { /* identical to ResolveModal from Task 22 */ return null }
function EscalateModalInline(props: any) { /* identical to EscalateModal from Task 22 */ return null }

Note for the engineer: the modal components are duplicated for now (Task 22 has them inline). After this task lands, do a small follow-up commit extracting ResolveModal + EscalateModal into frontend/src/components/l1/WalkModals.tsx and import from both variants. Keep Phase 1 unblocked by inlining; refactor before merge to main.

  • Step 2: Type-check.
docker exec -w /app resolutionflow_frontend npx tsc -b
  • Step 3: Commit.
git add frontend/src/components/l1/L1WalkAdhocVariant.tsx
git commit -m "feat(l1): adhoc walker variant with debounced note autosave"

Task 24: Frontend — drafts page, tickets page, coverage banner, seat counter widget

Files:

  • Create: frontend/src/pages/l1/L1DraftsPage.tsx

  • Create: frontend/src/pages/l1/L1TicketsPage.tsx

  • Create: frontend/src/components/l1/L1CoverageBanner.tsx

  • Create: frontend/src/components/admin/SeatCounterWidget.tsx

  • Create: frontend/src/api/seats.ts

  • Step 1: Create L1DraftsPage (read-only stub for Phase 1).

import { PageMeta } from '@/components/common/PageMeta'

export default function L1DraftsPage() {
  return (
    <div className="overflow-y-auto h-full">
      <PageMeta title="My Drafts" />
      <div className="max-w-4xl mx-auto px-6 pt-12 pb-12">
        <h1 className="font-heading text-2xl font-bold mb-4">My AI drafts</h1>
        <p className="text-muted-foreground">
          AI-built drafts you've created will show here once AI build is enabled (Phase 2).
        </p>
      </div>
    </div>
  )
}
  • Step 2: Create L1TicketsPage.
import { useEffect, useState } from 'react'
import { PageMeta } from '@/components/common/PageMeta'
import { l1Api } from '@/api/l1'
import type { QueueRow } from '@/types/l1'

export default function L1TicketsPage() {
  const [rows, setRows] = useState<QueueRow[]>([])
  const [statusFilter, setStatusFilter] = useState<string>('')

  useEffect(() => {
    l1Api.queue(statusFilter || undefined).then(setRows).catch(() => setRows([]))
  }, [statusFilter])

  return (
    <div className="overflow-y-auto h-full">
      <PageMeta title="Tickets" />
      <div className="max-w-5xl mx-auto px-6 pt-12 pb-12">
        <div className="flex items-center justify-between mb-6">
          <h1 className="font-heading text-2xl font-bold">Tickets</h1>
          <select
            value={statusFilter}
            onChange={(e) => setStatusFilter(e.target.value)}
            className="bg-card border border-default rounded-md px-3 py-2 text-sm"
          >
            <option value="">All</option>
            <option value="open">Open</option>
            <option value="walking">Walking</option>
            <option value="resolved">Resolved</option>
            <option value="escalated">Escalated</option>
          </select>
        </div>
        <div className="rounded-lg border border-default bg-card overflow-hidden">
          {rows.map((r) => (
            // Phase 1: rows are display-only (see comment in L1Dashboard). Phase 2 makes them clickable.
            <div
              key={r.ticket_id}
              className="px-4 py-3 border-b border-default last:border-b-0"
            >
              <div className="flex items-center justify-between">
                <div>
                  <span className="font-mono text-xs text-muted-foreground mr-2">
                    #{r.ticket_id.slice(0, 8)}
                  </span>
                  <span className="text-sm">{r.problem_statement}</span>
                </div>
                <div className="flex items-center gap-2">
                  <span className="text-xs px-2 py-0.5 rounded bg-elevated text-muted-foreground">
                    {r.status}
                  </span>
                  <span className="text-xs text-muted-foreground">
                    {r.ticket_kind === 'psa' ? 'PSA' : 'Internal'}
                  </span>
                </div>
              </div>
            </div>
          ))}
          {rows.length === 0 && (
            <p className="px-4 py-8 text-sm text-muted-foreground text-center">No tickets.</p>
          )}
        </div>
      </div>
    </div>
  )
}
  • Step 3: Create L1CoverageBanner.
import { useNavigate } from 'react-router'
import { usePermissions } from '@/hooks/usePermissions'

export function L1CoverageBanner() {
  const perms = usePermissions()
  const navigate = useNavigate()
  if (perms.isL1Tech) return null
  if (!perms.canCoverL1) return null

  return (
    <div className="bg-info-dim text-info text-sm px-4 py-1.5 flex items-center justify-between border-b border-info/20">
      <span>You're covering L1. Actions logged as coverage.</span>
      <button
        onClick={() => navigate('/')}
        className="text-info underline-offset-2 hover:underline"
      >
        Switch back 
      </button>
    </div>
  )
}

Mount it at the top of each /l1/* page (add <L1CoverageBanner /> as the first child of the page wrapper, or wrap inside L1RouteGuard).

  • Step 4: Create seat-counter API + widget.

frontend/src/api/seats.ts:

import { apiClient } from './client'

export interface SeatCheck {
  available: boolean
  current: number
  limit: number | null
  role: 'engineer' | 'l1_tech'
}

export interface SeatUsage {
  engineer: SeatCheck
  l1_tech: SeatCheck
}

export const seatsApi = {
  getUsage: () => apiClient.get<SeatUsage>('/api/v1/accounts/me/seats').then(r => r.data),
}

frontend/src/components/admin/SeatCounterWidget.tsx:

import { useEffect, useState } from 'react'
import { seatsApi, type SeatUsage } from '@/api/seats'

export function SeatCounterWidget() {
  const [usage, setUsage] = useState<SeatUsage | null>(null)

  useEffect(() => {
    seatsApi.getUsage().then(setUsage).catch(() => setUsage(null))
  }, [])

  if (!usage) return null

  return (
    <div className="rounded-lg border border-default bg-card p-4 grid grid-cols-2 gap-4">
      <SeatRow label="Engineer seats" check={usage.engineer} />
      <SeatRow label="L1 seats" check={usage.l1_tech} />
    </div>
  )
}

function SeatRow({ label, check }: { label: string; check: SeatUsage['engineer'] }) {
  const overLimit = check.limit !== null && check.current > check.limit
  const limitText = check.limit === null ? '∞' : check.limit
  return (
    <div className={overLimit ? 'text-warning' : ''}>
      <p className="text-xs uppercase tracking-wider text-muted-foreground mb-1">{label}</p>
      <p className="text-lg font-mono">
        {check.current} / {limitText}
      </p>
      {overLimit && <p className="text-xs">Over limit (grandfathered)</p>}
    </div>
  )
}

Drop this widget into /admin/users and /account/users pages (modify those pages to import and render it).

  • Step 5: Type-check and smoke.
docker exec -w /app resolutionflow_frontend npx tsc -b
  • Step 6: Commit.
git add frontend/src/pages/l1/L1DraftsPage.tsx frontend/src/pages/l1/L1TicketsPage.tsx frontend/src/components/l1/L1CoverageBanner.tsx frontend/src/components/admin/SeatCounterWidget.tsx frontend/src/api/seats.ts
git commit -m "feat(l1): drafts/tickets pages + coverage banner + seat counter widget"

Task 25: E2E Playwright tests

Files:

  • Create: frontend/e2e/l1-workspace.spec.ts

  • Step 1: Write the E2E tests.

import { test, expect } from '@playwright/test'

test.describe('L1 Workspace', () => {
  test('L1 user lands on /l1, sees dashboard, can intake an adhoc walk and resolve', async ({ page }) => {
    await page.goto('/login')
    await page.fill('input[type=email]', 'l1@resolutionflow.example.com')
    await page.fill('input[type=password]', 'TestPass123!')
    await page.click('button[type=submit]')

    // L1 lands on /l1
    await expect(page).toHaveURL(/\/l1$/)
    await expect(page.getByRole('heading', { name: /good (morning|afternoon|evening)/i })).toBeVisible()

    // Intake
    await page.fill('textarea[placeholder*="calling about"]', 'Customer says Outlook is broken')
    await page.click('button:has-text("Start walk")')

    // Lands on walker (adhoc since no flow_id and no AI build)
    await expect(page).toHaveURL(/\/l1\/walk\//)
    await expect(page.getByText('Ad-hoc walk')).toBeVisible()

    // Take some notes
    await page.fill('textarea[placeholder*="customer say"]', 'Walked customer through restarting Outlook')

    // Resolve
    await page.click('button:has-text("Resolve")')
    await page.click('button:has-text("Yes")')
    await page.fill('textarea[placeholder="Resolution notes…"]', 'Fixed via restart')
    await page.click('button:has-text("Confirm")')

    // Back to /l1
    await expect(page).toHaveURL(/\/l1$/)
  })

  test('L1 cannot access /pilot, /trees/new, /escalations', async ({ page }) => {
    await page.goto('/login')
    // ... auth as L1 ...
    await page.goto('/pilot')
    await expect(page).toHaveURL(/\/l1$/)   // Redirected away
    await page.goto('/trees/new')
    await expect(page).not.toHaveURL(/\/trees\/new/)
    await page.goto('/escalations')
    await expect(page).not.toHaveURL(/\/escalations/)
  })

  test('Engineer with coverage flag sees L1 entry and coverage banner', async ({ page }) => {
    // Pre-seed: engineer user with can_cover_l1=true
    await page.goto('/login')
    await page.fill('input[type=email]', 'engineer-coverage@resolutionflow.example.com')
    await page.fill('input[type=password]', 'TestPass123!')
    await page.click('button[type=submit]')

    // Sidebar shows L1 Workspace
    await expect(page.getByRole('link', { name: /L1 Workspace/i })).toBeVisible()
    await page.click('a:has-text("L1 Workspace")')

    // Coverage banner visible
    await expect(page.getByText(/Covering L1/)).toBeVisible()
  })

  test('Owner inviting past engineer seat limit gets 402', async ({ page, request }) => {
    // Seed an account with seat_limit=2 and 2 active engineers (use API or DB fixture)
    // ... auth as owner ...
    const response = await request.post('/api/v1/invites', {
      data: { email: 'fourth@example.com', role: 'engineer' },
      headers: { Authorization: 'Bearer <owner-token>' },
    })
    expect(response.status()).toBe(402)
    const body = await response.json()
    expect(body.detail.code).toBe('seat_limit_exceeded')
  })
})

(Pre-seed users for l1@resolutionflow.example.com and engineer-coverage@resolutionflow.example.com need to be added to the existing seed script in backend/scripts/seed_test_users.py — include this as part of the task.)

  • Step 2: Add the seed users.

In backend/scripts/seed_test_users.py, add:

# Add to the seed list:
{
    'email': 'l1@resolutionflow.example.com',
    'name': 'L1 Test',
    'account_role': 'l1_tech',
    'is_super_admin': False,
},
{
    'email': 'engineer-coverage@resolutionflow.example.com',
    'name': 'Engineer Coverage',
    'account_role': 'engineer',
    'is_super_admin': False,
    'can_cover_l1': True,
},

Re-run the seed:

docker exec -w /app resolutionflow_backend python -m scripts.seed_test_users
  • Step 3: Run E2E tests.
cd frontend && npx playwright test e2e/l1-workspace.spec.ts

Expected: 4 tests pass.

  • Step 4: Commit.
git add frontend/e2e/l1-workspace.spec.ts backend/scripts/seed_test_users.py
git commit -m "test(l1): E2E Playwright suite + seed L1 + coverage engineer users"

Task 26: Phase 1 acceptance validation

This is the final task — manual verification against the spec's acceptance criteria (spec §15).

  • Step 1: Run the full backend test suite.
docker exec resolutionflow_backend pytest --override-ini="addopts="

Expected: all green, including the new L1/RLS/seat tests.

  • Step 2: Build frontend cleanly.
docker exec -w /app resolutionflow_frontend npm run build

Expected: success, no type errors.

  • Step 3: Run E2E.
cd frontend && npx playwright test
  • Step 4: Acceptance walkthrough — check each spec §15 line by line.

For each criterion in spec §15 (the v1 acceptance list), manually verify:

  • L1 role assignable; L1 user sees L1 sidebar only; no engineer route reachable (/pilot, /trees/new, /escalations redirect or 403)

  • L1 intake creates a ticket and lands in walker session (flow or adhoc)

  • Walker handles flow walks AND adhoc walks; resolve/escalate work in both

  • Concurrent sessions supported (start two intakes, both appear in "Resume in progress" widget)

  • Browser-close recovery (refresh mid-walk → state restored)

  • First-run empty-state card renders on a brand-new account

  • Escalate generates package + reassigns ticket (Phase 1: internal ticket only; PSA reassignment is a stub)

  • Resolve flips validated_by_outcome when walking a proposal with helpful=true (use seed data with a manually-created FlowProposal)

  • Coverage flag end-to-end: owner toggles can_cover_l1 on engineer; engineer sees /l1 nav and banner; audit log row has acting_as='l1_coverage'

  • Seat enforcement blocks engineer invite at limit; blocks L1 invite at limit; grandfathered over-seated account still functions

  • RLS: L1 in account A cannot read account B's tickets / walk sessions / drafts (covered by test_l1_rls.py)

  • L1 seat count tracked separately from engineer seats in SeatCounterWidget

  • L1 cannot access /account/kb (route guard redirects)

  • Step 5: If any acceptance criterion fails, file a small follow-up task.

Don't try to fix every issue inside Task 26 — Phase 1's job is to ship the foundation. File a follow-up commit for any non-blocking gap.

  • Step 6: Final commit (if anything was tweaked during acceptance walkthrough).
git status
git add <fixed files>
git commit -m "fix(l1): acceptance-walkthrough fixes for Phase 1"
  • Step 7: Push the branch.
git push -u origin design/l1-workspace
  • Step 8: Open PR.

Use the project's PR pattern (gh CLI on the GitHub mirror, or Gitea web UI). PR body should reference both the spec and this plan.


Phase 1 done

Once all 26 tasks ship, Phase 1 is complete. The L1 surface works end-to-end against authored flows and adhoc walks; AI build and KB connectors are deferred to Phase 2 and Phase 3 (separate plans).

The Phase 2 plan (docs/superpowers/plans/2026-MM-DD-l1-workspace-phase-2.md) will cover:

  • kb_documents + kb_document_chunks tables
  • Manual upload + paste KB ingestion (extending KB Accelerator)
  • match_or_build orchestrator
  • ai_tree_builder (Anthropic-backed)
  • SUGGEST_THRESHOLD near-miss UX
  • Full BuildAbortedNoKB screen with three CTAs
  • FlowProposal generation pipeline

The Phase 3 plan will cover the three KB connectors (IT Glue, Hudu, Microsoft Graph) — each can ship independently within the phase.