diff --git a/backend/app/schemas/notification.py b/backend/app/schemas/notification.py index 63b6bf9d..d1276b8e 100644 --- a/backend/app/schemas/notification.py +++ b/backend/app/schemas/notification.py @@ -11,6 +11,7 @@ VALID_EVENTS = { "proposal.pending", "proposal.approved", "knowledge_gap.detected", + "l1.session.escalated", } diff --git a/backend/app/services/l1_session_service.py b/backend/app/services/l1_session_service.py index c86cd239..d339cc8e 100644 --- a/backend/app/services/l1_session_service.py +++ b/backend/app/services/l1_session_service.py @@ -7,6 +7,7 @@ from datetime import datetime, timezone from typing import Optional from uuid import UUID +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.audit import log_audit @@ -15,6 +16,7 @@ from app.models.l1_walk_session import L1WalkSession from app.models.user import User from app.services import ai_tree_builder from app.services import internal_ticket_service +from app.services.notification_service import notify def _resolve_acting_as(user: User) -> Optional[str]: @@ -263,6 +265,24 @@ async def resolve( if proposal: proposal.validated_by_outcome = True + # Flywheel capture: persist a validated FlowProposal for ai_build sessions + # resolved as helpful. Captures the AI-generated path as training signal. + if helpful and session.session_kind == "ai_build" and session.walked_path: + tree_structure = ai_tree_builder.normalize_walked_path(session.walked_path) + db.add(FlowProposal( + account_id=session.account_id, + l1_session_id=session.id, + source_session_id=None, + proposal_type="new_flow", + title=(session.resolution_notes or "AI L1 resolution")[:255], + proposed_flow_data={"tree_structure": tree_structure, "match_keywords": []}, + source="ai_realtime_l1", + validated_by_outcome=True, + linked_ticket_id=session.ticket_id, + linked_ticket_kind=session.ticket_kind, + status="pending", + )) + if session.ticket_kind == "internal": await internal_ticket_service.update_status( db, @@ -339,6 +359,28 @@ async def escalate( account_id=session.account_id, acting_as=session.acting_as, ) + + # Notify engineers (owner/admin/engineer roles) about the escalation. + eng_rows = await db.execute( + select(User.id).where( + User.account_id == session.account_id, + User.is_active.is_(True), + User.account_role.in_(("owner", "admin", "engineer")), + ) + ) + target_ids = [r[0] for r in eng_rows.all()] + await notify( + "l1.session.escalated", + session.account_id, + { + "problem_summary": session.ticket_id, + "session_id": str(session.id), + "reason_category": reason_category, + }, + db, + target_user_ids=target_ids, + ) + await db.flush() return session diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index edf1bf7d..e249f451 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -381,6 +381,7 @@ def _build_notification_title(event: str, payload: dict[str, Any]) -> str: "proposal.pending": "New flow proposal: {title}", "proposal.approved": "Flow proposal approved: {title}", "knowledge_gap.detected": "Knowledge gap detected: {gap_type}", + "l1.session.escalated": "L1 session escalated: {problem_summary}", "test": "Test Notification from ResolutionFlow", } @@ -415,6 +416,7 @@ def _build_notification_body(event: str, payload: dict[str, Any]) -> str: "proposal.pending": "A new flow proposal \"{title}\" is awaiting review in the review queue.", "proposal.approved": "The flow proposal \"{title}\" has been approved and is ready for use.", "knowledge_gap.detected": "A {gap_type} knowledge gap has been identified. Review recommended.", + "l1.session.escalated": "L1 escalated a ticket: {problem_summary}", "test": "This is a test notification to verify your notification channel is working correctly.", } template = bodies.get(event, f"Event: {event}") @@ -437,6 +439,9 @@ def _build_notification_link(event: str, payload: dict[str, Any]) -> Optional[st "proposal.pending": "/review-queue", "proposal.approved": "/review-queue", "knowledge_gap.detected": "/analytics/flowpilot", + # L1 AI-build escalations go to the escalations dashboard — not to + # a specific pilot session, which may not have a pickup flow. + "l1.session.escalated": "/escalations", } template = links.get(event) if template is None: diff --git a/backend/tests/test_l1_session_service.py b/backend/tests/test_l1_session_service.py index bf4e2669..217b30cc 100644 --- a/backend/tests/test_l1_session_service.py +++ b/backend/tests/test_l1_session_service.py @@ -869,6 +869,74 @@ async def test_advance_ai_build_wrong_session_kind_raises(test_db: AsyncSession, test_db, session_id=s.id, problem_text="printer", category="printer") +# --------------------------------------------------------------------------- +# T9: flywheel capture on resolve + engineer notification on escalate +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_resolve_ai_build_creates_outcome_validated_proposal(test_db: AsyncSession, monkeypatch): + """resolve(helpful=True) on an ai_build session creates a FlowProposal with validated_by_outcome=True.""" + from app.services import l1_session_service as svc + account = await _make_account(test_db) + l1_user = await _make_user(test_db, account_id=account.id) + ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id) + + s = await svc.start_ai_build_session( + test_db, account_id=account.id, user=l1_user, + ticket_id=str(ticket.id), ticket_kind="internal", + ) + # Populate walked_path with at least one node (needed for normalize_walked_path) + s.walked_path = [ + {"node_type": "question", "id": "n1", "text": "On?", "answer": "no"}, + {"node_type": "resolved", "id": "n2", "text": "Fixed."}, + ] + await test_db.flush() + + await svc.resolve(test_db, session_id=s.id, helpful=True, resolution_notes="ok") + + props = (await test_db.execute( + select(FlowProposal).where(FlowProposal.l1_session_id == s.id) + )).scalars().all() + assert len(props) == 1 + assert props[0].source == "ai_realtime_l1" + assert props[0].validated_by_outcome is True + assert props[0].source_session_id is None + assert props[0].proposed_flow_data["tree_structure"]["id"] == "n1" + assert props[0].proposal_type == "new_flow" + assert props[0].proposed_flow_data["match_keywords"] == [] + + +@pytest.mark.asyncio +async def test_escalate_notifies_engineers(test_db: AsyncSession, monkeypatch): + """escalate() calls notify with event='l1.session.escalated' and explicit engineer recipients.""" + from app.services import l1_session_service as svc + calls = {} + + async def fake_notify(event, account_id, payload, db, target_user_ids=None): + calls["event"] = event + calls["target_user_ids"] = target_user_ids + + monkeypatch.setattr(svc, "notify", fake_notify) + + account = await _make_account(test_db) + # l1_user is the session owner (account_role="l1_tech" by default — NOT in the recipient query) + l1_user = await _make_user(test_db, account_id=account.id) + # Seed an eligible recipient: account_role="engineer" matches the production query + # (owner/admin/engineer). Without this user, target_ids would be [] and the + # eng.id assertion below would fail, proving the assertion is non-vacuous. + eng = await _make_user(test_db, account_id=account.id, account_role="engineer") + ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id) + + s = await svc.start_ai_build_session( + test_db, account_id=account.id, user=l1_user, + ticket_id=str(ticket.id), ticket_kind="internal", + ) + await svc.escalate(test_db, session_id=s.id, reason="stuck", reason_category="exhausted_safe_steps") + assert calls["event"] == "l1.session.escalated" + assert isinstance(calls["target_user_ids"], list) and len(calls["target_user_ids"]) >= 1 + assert eng.id in calls["target_user_ids"] # the eligible engineer is a recipient + + # --------------------------------------------------------------------------- # T14 audit log tests (spec §5.6.1) # ---------------------------------------------------------------------------