feat(ai-session): add Phase 2 PSA integration, escalation handoff, and session management

Phase 2 of the FlowPilot-First Pivot connecting AI sessions to ConnectWise PSA:

Slice 1 — PSA Ticket Intake:
- FlowPilotEngine accepts psa_ticket intake with graceful CW API fallback
- Ticket picker on intake screen (refactored TicketPickerModal for dual-mode)
- Ticket context card in session sidebar

Slice 2 — Auto Documentation Push:
- PSA documentation service with resolution/escalation note formatting
- Time entry creation via new ConnectWise provider method
- Automatic retry scheduler (APScheduler, 5min interval, 3 retries)
- PSA push status indicators in frontend with manual retry button
- Member mapping warning when CW member not mapped

Slice 3 — Session Pause/Resume & Escalation Handoff:
- Pause/resume endpoints for same-engineer session bookmarking
- Escalation flow: requesting_escalation status, self-escalation blocked
- Enhanced escalation package with LLM-generated hypotheses/suggestions
- Pickup endpoint with continue/fresh resume modes and briefing step
- Escalation queue (sidebar nav + dedicated page)
- SessionBriefing component with continue/fresh choice UI
- EscalateModal with PSA-aware button text

Slice 4 — Mid-Session Ticket Linking:
- Link ticket retroactively with context injection into system prompt
- Link Ticket button in session sidebar

Slice 5 — FlowPilot PSA Settings:
- Settings tab on IntegrationsPage with 7 configurable options
- Stored as flowpilot_settings JSONB on PsaConnection

Database: 2 migrations (flowpilot_settings, psa_post_log changes, status constraint)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 01:30:05 +00:00
parent 2063a799b0
commit bbe590bfec
37 changed files with 3698 additions and 121 deletions

View File

@@ -0,0 +1,78 @@
"""phase2 psa flowpilot integration
Revision ID: bb2101378a61
Revises: f1a2b3c4d5e6
Create Date: 2026-03-18 23:05:01.099910
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'bb2101378a61'
down_revision: Union[str, None] = 'f1a2b3c4d5e6'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# 1. Add flowpilot_settings JSONB to psa_connections
op.add_column('psa_connections', sa.Column(
'flowpilot_settings',
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
server_default='{}',
comment='FlowPilot-specific settings: auto_push, time_rounding, note_visibility, etc.',
))
# 2. Add ai_session_id FK to psa_post_log
op.add_column('psa_post_log', sa.Column(
'ai_session_id',
sa.Uuid(),
nullable=True,
comment='FK to AI sessions (Phase 2). Original session_id FK remains for legacy sessions.',
))
op.create_index(
op.f('ix_psa_post_log_ai_session_id'),
'psa_post_log',
['ai_session_id'],
)
op.create_foreign_key(
'fk_psa_post_log_ai_session_id',
'psa_post_log',
'ai_sessions',
['ai_session_id'],
['id'],
ondelete='CASCADE',
)
# 3. Make original session_id nullable (was NOT NULL — legacy sessions only)
op.alter_column('psa_post_log', 'session_id', nullable=True)
# 4. Add retry_count and next_retry_at for automatic retries
op.add_column('psa_post_log', sa.Column(
'retry_count',
sa.Integer(),
nullable=False,
server_default='0',
comment='Number of retry attempts for failed PSA pushes',
))
op.add_column('psa_post_log', sa.Column(
'next_retry_at',
sa.DateTime(timezone=True),
nullable=True,
comment='When to attempt the next retry',
))
def downgrade() -> None:
op.drop_column('psa_post_log', 'next_retry_at')
op.drop_column('psa_post_log', 'retry_count')
op.alter_column('psa_post_log', 'session_id', nullable=False)
op.drop_constraint('fk_psa_post_log_ai_session_id', 'psa_post_log', type_='foreignkey')
op.drop_index(op.f('ix_psa_post_log_ai_session_id'), table_name='psa_post_log')
op.drop_column('psa_post_log', 'ai_session_id')
op.drop_column('psa_connections', 'flowpilot_settings')

View File

@@ -0,0 +1,35 @@
"""add requesting_escalation status
Revision ID: cc3201489b72
Revises: bb2101378a61
Create Date: 2026-03-18 23:30:00.000000
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = 'cc3201489b72'
down_revision: Union[str, None] = 'bb2101378a61'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Drop old status constraint and recreate with new values
op.drop_constraint('ck_ai_sessions_status', 'ai_sessions', type_='check')
op.create_check_constraint(
'ck_ai_sessions_status',
'ai_sessions',
"status IN ('active', 'paused', 'resolved', 'escalated', 'requesting_escalation', 'abandoned')",
)
def downgrade() -> None:
op.drop_constraint('ck_ai_sessions_status', 'ai_sessions', type_='check')
op.create_check_constraint(
'ck_ai_sessions_status',
'ai_sessions',
"status IN ('active', 'paused', 'resolved', 'escalated', 'abandoned')",
)

View File

@@ -35,12 +35,15 @@ from app.schemas.ai_session import (
SessionCloseResponse,
SessionDocumentation,
RateSessionRequest,
PickupSessionRequest,
LinkTicketRequest,
AISessionSummary,
AISessionDetail,
AISessionStepResponse,
StepOptionSchema,
)
from app.services import flowpilot_engine
from app.services.psa_documentation_service import retry_failed_push
logger = logging.getLogger(__name__)
@@ -272,6 +275,184 @@ async def escalate_session(
return result
# ── Pause ──
@router.post("/{session_id}/pause", status_code=204)
@limiter.limit("15/minute")
async def pause_session(
request: Request,
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Pause an active FlowPilot session for later resume."""
try:
await flowpilot_engine.pause_session(
session_id=session_id,
user_id=current_user.id,
db=db,
)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
await db.commit()
# ── Resume ──
@router.post("/{session_id}/resume", status_code=204)
@limiter.limit("15/minute")
async def resume_session(
request: Request,
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Resume a paused FlowPilot session."""
try:
await flowpilot_engine.resume_session(
session_id=session_id,
user_id=current_user.id,
db=db,
)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
await db.commit()
# ── Escalation Queue ──
@router.get("/escalation-queue", response_model=list[AISessionSummary])
@limiter.limit("30/minute")
async def get_escalation_queue(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""List sessions requesting escalation for the current user's team."""
if not current_user.team_id:
return []
result = await db.execute(
select(AISession)
.where(
AISession.team_id == current_user.team_id,
AISession.status == "requesting_escalation",
AISession.user_id != current_user.id, # Don't show own escalated sessions
)
.order_by(AISession.created_at.desc())
)
sessions = result.scalars().all()
return [AISessionSummary.model_validate(s) for s in sessions]
# ── Pickup Escalated Session ──
@router.post("/{session_id}/pickup", response_model=StepResponseResponse)
@limiter.limit("5/minute")
async def pickup_session(
request: Request,
session_id: UUID,
data: PickupSessionRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Pick up an escalated session as a new engineer."""
_require_ai_enabled()
await _check_quota(current_user, db)
try:
result = await flowpilot_engine.pickup_session(
session_id=session_id,
resume_mode=data.resume_mode,
additional_context=data.additional_context,
user_id=current_user.id,
team_id=current_user.team_id,
db=db,
)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
except Exception as e:
logger.exception("FlowPilot pickup failed: %s", e)
await _record_usage(
current_user, db,
generation_type="flowpilot_pickup",
input_tokens=0, output_tokens=0,
succeeded=False,
session_id=session_id,
error_code=type(e).__name__,
)
await db.commit()
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"AI provider error ({type(e).__name__}). Please try again.",
)
await _record_usage(
current_user, db,
generation_type="flowpilot_pickup",
input_tokens=0, output_tokens=0,
succeeded=True,
session_id=session_id,
)
await db.commit()
return result
# ── Link Ticket ──
@router.post("/{session_id}/link-ticket", response_model=AISessionDetail)
@limiter.limit("10/minute")
async def link_ticket_to_session(
request: Request,
session_id: UUID,
data: LinkTicketRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Link a PSA ticket to an in-progress session retroactively."""
try:
await flowpilot_engine.link_ticket(
session_id=session_id,
psa_ticket_id=data.psa_ticket_id,
psa_connection_id=data.psa_connection_id,
user_id=current_user.id,
db=db,
)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
await db.commit()
# Return updated session detail
result = await db.execute(
select(AISession)
.options(selectinload(AISession.steps))
.where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
detail = AISessionDetail.model_validate(session)
return detail
# ── List sessions ──
@router.get("", response_model=list[AISessionSummary])
@@ -323,8 +504,10 @@ async def get_session(
if not session:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
# Allow access if user is owner or escalation target
if session.user_id != current_user.id and session.escalated_to_id != current_user.id:
# Allow access if user is owner, escalation target, or picked-up handler
pkg = session.escalation_package or {}
is_handler = pkg.get("picked_up_by") == str(current_user.id)
if session.user_id != current_user.id and session.escalated_to_id != current_user.id and not is_handler:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
# Build step responses
@@ -409,3 +592,48 @@ async def rate_session(
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
await db.commit()
# ── Retry PSA Push ──
@router.post("/{session_id}/retry-psa-push")
@limiter.limit("5/minute")
async def retry_psa_push_endpoint(
request: Request,
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Manually retry a failed PSA documentation push."""
from app.models.psa_post_log import PsaPostLog
# Find the latest failed push log for this session
result = await db.execute(
select(PsaPostLog)
.where(
PsaPostLog.ai_session_id == session_id,
PsaPostLog.status.in_(["failed", "pending_retry"]),
)
.order_by(PsaPostLog.posted_at.desc())
.limit(1)
)
log_entry = result.scalar_one_or_none()
if not log_entry:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No failed PSA push found for this session",
)
# Reset to pending_retry and attempt immediately
log_entry.status = "pending_retry"
log_entry.retry_count = max(0, log_entry.retry_count - 1) # Give one more attempt
success = await retry_failed_push(log_entry, db)
await db.commit()
return {
"psa_push_status": "sent" if success else log_entry.status,
"psa_push_error": log_entry.error_message if not success else None,
}

View File

@@ -279,6 +279,69 @@ async def test_connection(
return result
# ── FlowPilot PSA Settings ──────────────────────────────────────
@router.get("/connections/{connection_id}/flowpilot-settings")
async def get_flowpilot_settings(
connection_id: UUID,
current_user: Annotated[User, Depends(require_account_owner)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get FlowPilot-specific settings for a PSA connection."""
conn = await _get_connection_or_404(connection_id, current_user, db)
# Return settings with defaults filled in
defaults = {
"auto_push": True,
"auto_time_entry": True,
"time_rounding": "15min",
"note_visibility": "internal",
"include_diagnostic_steps": True,
"prompt_status_on_resolution": False,
"prompt_status_on_escalation": False,
}
settings_data = {**defaults, **(conn.flowpilot_settings or {})}
return settings_data
@router.put("/connections/{connection_id}/flowpilot-settings")
async def update_flowpilot_settings(
connection_id: UUID,
data: dict,
current_user: Annotated[User, Depends(require_account_owner)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Update FlowPilot-specific settings for a PSA connection."""
conn = await _get_connection_or_404(connection_id, current_user, db)
# Validate allowed keys
allowed_keys = {
"auto_push", "auto_time_entry", "time_rounding",
"note_visibility", "include_diagnostic_steps",
"prompt_status_on_resolution", "prompt_status_on_escalation",
}
filtered = {k: v for k, v in data.items() if k in allowed_keys}
# Merge with existing
current = conn.flowpilot_settings or {}
current.update(filtered)
conn.flowpilot_settings = current
await db.commit()
await db.refresh(conn)
defaults = {
"auto_push": True,
"auto_time_entry": True,
"time_rounding": "15min",
"note_visibility": "internal",
"include_diagnostic_steps": True,
"prompt_status_on_resolution": False,
"prompt_status_on_escalation": False,
}
return {**defaults, **(conn.flowpilot_settings or {})}
# ── ticket / status / company endpoints ──────────────────────────

View File

@@ -180,6 +180,16 @@ async def lifespan(app: FastAPI):
replace_existing=True,
)
# PSA push retry (every 5 minutes)
from app.services.psa_retry_scheduler import process_pending_retries
scheduler.add_job(
process_pending_retries,
trigger="interval",
minutes=5,
id="psa_push_retry",
replace_existing=True,
)
# Auto-seed trees in background on PR environments
seed_task = None
if settings.SEED_ON_DEPLOY:

View File

@@ -35,7 +35,7 @@ class AISession(Base):
name="ck_ai_sessions_intake_type",
),
CheckConstraint(
"status IN ('active', 'paused', 'resolved', 'escalated', 'abandoned')",
"status IN ('active', 'paused', 'resolved', 'escalated', 'requesting_escalation', 'abandoned')",
name="ck_ai_sessions_status",
),
CheckConstraint(

View File

@@ -1,11 +1,11 @@
"""PSA connection model — one per account."""
import uuid
from datetime import datetime, timezone
from typing import Optional
from typing import Optional, Any
from sqlalchemy import String, DateTime, Boolean, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
@@ -43,6 +43,10 @@ class PsaConnection(Base):
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
flowpilot_settings: Mapped[Optional[dict[str, Any]]] = mapped_column(
JSONB, nullable=True, server_default="{}",
comment="FlowPilot-specific settings: auto_push, time_rounding, note_visibility, etc.",
)
# Relationships
account = relationship("Account", back_populates="psa_connection")

View File

@@ -3,7 +3,7 @@ import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import String, DateTime, Text, ForeignKey
from sqlalchemy import String, DateTime, Text, Integer, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
@@ -16,10 +16,18 @@ class PsaPostLog(Base):
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
session_id: Mapped[uuid.UUID] = mapped_column(
# Legacy sessions FK (nullable for AI sessions)
session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("sessions.id", ondelete="CASCADE"),
nullable=False,
nullable=True,
index=True,
)
# AI sessions FK (Phase 2)
ai_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("ai_sessions.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
psa_connection_id: Mapped[Optional[uuid.UUID]] = mapped_column(
@@ -35,8 +43,16 @@ class PsaPostLog(Base):
)
status: Mapped[str] = mapped_column(
String(20), nullable=False
) # 'success' or 'failed'
) # 'success', 'failed', 'pending_retry'
error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
retry_count: Mapped[int] = mapped_column(
Integer, nullable=False, default=0,
comment="Number of retry attempts for failed PSA pushes",
)
next_retry_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True,
comment="When to attempt the next retry",
)
status_changed_from: Mapped[Optional[str]] = mapped_column(
String(100), nullable=True
)
@@ -54,5 +70,6 @@ class PsaPostLog(Base):
# Relationships
session = relationship("Session", foreign_keys=[session_id])
ai_session = relationship("AISession", foreign_keys=[ai_session_id])
psa_connection = relationship("PsaConnection", foreign_keys=[psa_connection_id])
user = relationship("User", foreign_keys=[posted_by])

View File

@@ -42,6 +42,7 @@ class AISessionCreateResponse(BaseModel):
matched_flow_name: str | None = None
match_score: float | None = None
first_step: AISessionStepResponse
psa_context_status: str | None = None # loaded | unavailable | None (no PSA)
# ── Step interaction ──
@@ -131,6 +132,9 @@ class SessionCloseResponse(BaseModel):
session_id: UUID
status: str
documentation: SessionDocumentation
psa_push_status: str = "no_psa" # sent | pending_retry | no_psa | failed
psa_push_error: str | None = None
member_mapping_warning: str | None = None
class RateSessionRequest(BaseModel):
@@ -139,6 +143,18 @@ class RateSessionRequest(BaseModel):
feedback: str | None = None
class PickupSessionRequest(BaseModel):
"""Pick up an escalated session as a new engineer."""
resume_mode: str = Field("continue", pattern="^(continue|fresh)$")
additional_context: str | None = None
class LinkTicketRequest(BaseModel):
"""Link a PSA ticket to an in-progress session."""
psa_ticket_id: str
psa_connection_id: UUID
# ── List / Detail ──
class AISessionSummary(BaseModel):
@@ -151,6 +167,8 @@ class AISessionSummary(BaseModel):
confidence_tier: str
step_count: int
session_rating: int | None = None
psa_ticket_id: str | None = None
escalation_reason: str | None = None
created_at: datetime
resolved_at: datetime | None = None
@@ -166,6 +184,9 @@ class AISessionDetail(AISessionSummary):
resolution_action: str | None = None
escalation_reason: str | None = None
session_feedback: str | None = None
psa_ticket_id: str | None = None
psa_connection_id: UUID | None = None
ticket_data: dict[str, Any] | None = None
steps: list[AISessionStepResponse] = []
model_config = {"from_attributes": True}

View File

@@ -170,8 +170,32 @@ async def start_session(
) -> AISessionCreateResponse:
"""Start a new FlowPilot session: classify intake, match flows, get first step."""
# 0. Process PSA ticket intake if applicable
ticket_context_block = None
ticket_data = None
psa_context_status = None
if request.intake_type == "psa_ticket" and request.psa_connection_id and request.psa_ticket_id:
ticket_context_block, ticket_data, psa_context_status = await _process_ticket_intake(
psa_connection_id=request.psa_connection_id,
psa_ticket_id=request.psa_ticket_id,
db=db,
)
# Enrich intake content with ticket context for classification
if ticket_data:
enriched_content = dict(request.intake_content)
enriched_content["ticket_data"] = {
"summary": ticket_data.get("ticket", {}).get("summary", ""),
"company": ticket_data.get("company", {}).get("name", ""),
"priority": ticket_data.get("ticket", {}).get("priority", ""),
}
request = request.model_copy(update={"intake_content": enriched_content})
# 1. Classify intake via fast LLM call
intake_text = _extract_intake_text(request.intake_content)
# Include ticket context in classification text if available
if ticket_context_block:
intake_text = f"{ticket_context_block}\n\n{intake_text}"
classification = await _classify_intake(intake_text)
# 2. Try to match existing flows
@@ -199,9 +223,14 @@ async def start_session(
f"Use it as a guide but adapt to the specific situation."
)
# Include ticket context in system prompt if available
ticket_prompt_section = ""
if ticket_context_block:
ticket_prompt_section = f"\n## PSA TICKET CONTEXT\n{ticket_context_block}\n"
system_prompt = FLOWPILOT_SYSTEM_PROMPT.format(
structured_output_schema=STRUCTURED_OUTPUT_SCHEMA,
team_context="", # Phase 2: team-specific context
team_context=ticket_prompt_section,
matched_flow_context=matched_flow_context,
)
@@ -261,6 +290,7 @@ async def start_session(
match_score=match_score,
psa_ticket_id=request.psa_ticket_id,
psa_connection_id=request.psa_connection_id,
ticket_data=ticket_data,
total_input_tokens=input_tokens,
total_output_tokens=output_tokens,
step_count=1,
@@ -294,6 +324,7 @@ async def start_session(
matched_flow_name=matched_flow_name,
match_score=match_score,
first_step=_build_step_response(step, session),
psa_context_status=psa_context_status,
)
@@ -419,10 +450,14 @@ async def resolve_session(
await db.flush()
# Push documentation to PSA if ticket is linked
psa_result = await _push_to_psa(session, user_id, db)
return SessionCloseResponse(
session_id=session.id,
status=session.status,
documentation=documentation,
**psa_result,
)
@@ -432,31 +467,276 @@ async def escalate_session(
user_id: UUID,
db: AsyncSession,
) -> SessionCloseResponse:
"""Escalate a session to another engineer."""
"""Escalate a session — sets status to requesting_escalation for pickup."""
session = await _load_session(session_id, user_id, db)
if session.status not in ("active", "paused"):
raise ValueError(f"Cannot escalate session in status: {session.status}")
session.status = "escalated"
session.resolved_at = datetime.now(timezone.utc)
# Block self-escalation
if request.escalated_to_id and request.escalated_to_id == user_id:
raise ValueError("Cannot escalate a session to yourself. Use pause instead.")
session.status = "requesting_escalation"
# Don't set resolved_at — session isn't done yet
session.escalation_reason = request.escalation_reason
session.escalated_to_id = request.escalated_to_id
# Build escalation package
session.escalation_package = _build_escalation_package(session)
# Build enhanced escalation package
session.escalation_package = await _build_escalation_package_enhanced(session, user_id)
documentation = _generate_documentation(session)
await db.flush()
# Push documentation to PSA if ticket is linked
psa_result = await _push_to_psa(session, user_id, db)
return SessionCloseResponse(
session_id=session.id,
status=session.status,
documentation=documentation,
**psa_result,
)
async def pickup_session(
session_id: UUID,
resume_mode: str,
additional_context: Optional[str],
user_id: UUID,
team_id: Optional[UUID],
db: AsyncSession,
) -> StepResponseResponse:
"""Pick up an escalated session as a new engineer.
Generates a briefing step summarizing prior work, then either continues
the conversation or starts fresh with the new engineer's context.
"""
session = await _load_session(
session_id, user_id, db,
allow_team_access=True, team_id=team_id,
)
if session.status != "requesting_escalation":
raise ValueError(f"Session is {session.status}, not requesting_escalation")
# Can't pick up your own session
if session.user_id == user_id:
raise ValueError("Cannot pick up your own escalated session")
# Record the pickup in the escalation package
pkg = session.escalation_package or {}
pkg["picked_up_by"] = str(user_id)
pkg["picked_up_at"] = datetime.now(timezone.utc).isoformat()
session.escalation_package = pkg
# Reactivate the session
session.status = "active"
# Build a briefing message for the new engineer
original_user_name = "the previous engineer"
if session.user and hasattr(session.user, 'display_name') and session.user.display_name:
original_user_name = session.user.display_name
briefing_parts = [
f"## Escalation Briefing",
f"**Escalated by:** {original_user_name}",
f"**Reason:** {session.escalation_reason or 'Not specified'}",
"",
f"**Problem:** {session.problem_summary or 'Unknown'}",
]
steps_tried = pkg.get("steps_tried", [])
if steps_tried:
briefing_parts.append("")
briefing_parts.append("**Steps already taken:**")
for i, step in enumerate(steps_tried, 1):
desc = step.get("description", "")
resp = step.get("response", "")
briefing_parts.append(f"{i}. {desc}")
if resp:
briefing_parts.append(f"{resp}")
if hypotheses := pkg.get("remaining_hypotheses"):
briefing_parts.append("")
briefing_parts.append("**Remaining hypotheses:**")
if isinstance(hypotheses, list):
for h in hypotheses:
briefing_parts.append(f"- {h}")
else:
briefing_parts.append(str(hypotheses))
if suggestions := pkg.get("suggested_next_steps"):
briefing_parts.append("")
briefing_parts.append("**Suggested next steps:**")
if isinstance(suggestions, list):
for s in suggestions:
briefing_parts.append(f"- {s}")
else:
briefing_parts.append(str(suggestions))
briefing_text = "\n".join(briefing_parts)
# Create a briefing step (special intake_analysis type)
briefing_step = AISessionStep(
id=uuid.uuid4(),
session_id=session.id,
step_order=session.step_count,
step_type="action",
content={
"text": briefing_text,
"type": "briefing",
"allow_free_text": False,
"allow_skip": False,
},
context_message="Escalation briefing — here's what was tried before you.",
confidence_at_step=session.confidence_score,
ai_reasoning="Escalation handoff briefing for receiving engineer",
input_tokens=0,
output_tokens=0,
)
db.add(briefing_step)
session.step_count += 1
# Now generate the next step based on resume_mode
if resume_mode == "fresh" and additional_context:
# Engineer B provides their own input
user_message = f"[Picking up escalated session] {additional_context}"
else:
# Continue where A left off
user_message = (
"[Picking up escalated session] I've reviewed the briefing above. "
"Please continue the diagnosis based on everything tried so far."
)
# Append to conversation
session.conversation_messages = session.conversation_messages + [
{"role": "user", "content": user_message}
]
# Call LLM for next step
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, input_tokens, output_tokens = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=session.conversation_messages,
max_tokens=2048,
)
try:
parsed = _parse_structured_output(raw_response)
except ValueError:
retry_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response},
{"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema."},
]
raw_response, retry_in, retry_out = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=retry_messages,
max_tokens=2048,
)
input_tokens += retry_in
output_tokens += retry_out
parsed = _parse_structured_output(raw_response)
session.conversation_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response}
]
confidence = parsed.get("confidence", session.confidence_score)
session.confidence_score = confidence
session.confidence_tier = _confidence_to_tier(confidence)
session.total_input_tokens += input_tokens
session.total_output_tokens += output_tokens
session.step_count += 1
next_step = _create_step_from_parsed(
session_id=session.id,
step_order=session.step_count - 1,
parsed=parsed,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
db.add(next_step)
await db.flush()
return StepResponseResponse(
session_id=session.id,
status=session.status,
confidence_tier=session.confidence_tier,
confidence_score=session.confidence_score,
next_step=_build_step_response(next_step, session),
resolution_suggested=parsed["type"] == "resolution_suggestion",
resolution_summary=parsed.get("resolution_summary") if parsed["type"] == "resolution_suggestion" else None,
)
async def link_ticket(
session_id: UUID,
psa_ticket_id: str,
psa_connection_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> None:
"""Link a PSA ticket to an in-progress session and inject context."""
session = await _load_session(session_id, user_id, db)
if session.status not in ("active", "paused"):
raise ValueError(f"Cannot link ticket to session in status: {session.status}")
# Store the ticket link
session.psa_ticket_id = psa_ticket_id
session.psa_connection_id = psa_connection_id
# Try to fetch ticket context
ticket_context_block, ticket_data, _ = await _process_ticket_intake(
psa_connection_id=psa_connection_id,
psa_ticket_id=psa_ticket_id,
db=db,
)
if ticket_data:
session.ticket_data = ticket_data
# Inject ticket context into the system prompt for subsequent steps
if ticket_context_block and session.system_prompt_snapshot:
ticket_section = f"\n\n## PSA TICKET CONTEXT (linked mid-session)\n{ticket_context_block}\n"
session.system_prompt_snapshot = session.system_prompt_snapshot + ticket_section
await db.flush()
async def pause_session(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> None:
"""Pause an active session for the same engineer to resume later."""
session = await _load_session(session_id, user_id, db)
if session.status != "active":
raise ValueError(f"Cannot pause session in status: {session.status}")
session.status = "paused"
await db.flush()
async def resume_session(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> None:
"""Resume a paused session for the same engineer."""
session = await _load_session(session_id, user_id, db)
if session.status != "paused":
raise ValueError(f"Cannot resume session in status: {session.status}")
session.status = "active"
await db.flush()
async def rate_session(
session_id: UUID,
rating: int,
@@ -487,11 +767,23 @@ async def _load_session(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
allow_team_access: bool = False,
team_id: Optional[UUID] = None,
) -> AISession:
"""Load session with steps, verifying ownership."""
"""Load session with steps and user relationships, verifying ownership.
Args:
allow_team_access: If True, same-team users can access sessions in
'requesting_escalation' status (for escalation pickup).
team_id: Required when allow_team_access is True.
"""
result = await db.execute(
select(AISession)
.options(selectinload(AISession.steps))
.options(
selectinload(AISession.steps),
selectinload(AISession.user),
selectinload(AISession.escalated_to),
)
.where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
@@ -499,11 +791,21 @@ async def _load_session(
if not session:
raise ValueError("Session not found")
# Allow access if user is the session owner or the escalation target
if session.user_id != user_id and session.escalated_to_id != user_id:
raise PermissionError("Not authorized to access this session")
# Owner or escalation target always has access
if session.user_id == user_id or session.escalated_to_id == user_id:
return session
return session
# Engineer who picked up an escalated session has access
pkg = session.escalation_package or {}
if pkg.get("picked_up_by") == str(user_id):
return session
# Team-based access for escalation pickup
if allow_team_access and team_id and session.team_id == team_id:
if session.status == "requesting_escalation":
return session
raise PermissionError("Not authorized to access this session")
async def _classify_intake(intake_text: str) -> dict[str, Any]:
@@ -708,8 +1010,67 @@ def _generate_documentation(session: AISession) -> SessionDocumentation:
)
def _build_escalation_package(session: AISession) -> dict[str, Any]:
"""Build context package for the receiving engineer."""
async def _push_to_psa(
session: AISession,
user_id: UUID,
db: AsyncSession,
) -> dict[str, Any]:
"""Push documentation to PSA if session has a linked ticket.
Returns dict with psa_push_status, psa_push_error, member_mapping_warning.
"""
if not session.psa_ticket_id or not session.psa_connection_id:
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
try:
from app.services.psa_documentation_service import push_documentation
return await push_documentation(session, user_id, db)
except Exception as e:
logger.warning("PSA documentation push failed for session %s: %s", session.id, e)
return {
"psa_push_status": "failed",
"psa_push_error": str(e)[:200],
"member_mapping_warning": None,
}
async def _process_ticket_intake(
psa_connection_id: UUID,
psa_ticket_id: str,
db: AsyncSession,
) -> tuple[Optional[str], Optional[dict[str, Any]], str]:
"""Fetch ticket context from PSA and format for AI prompt.
Returns:
(ticket_context_block, ticket_data_dict, psa_context_status)
- ticket_context_block: formatted text for system prompt, or None on failure
- ticket_data_dict: serialized TicketContext for storage, or None on failure
- psa_context_status: "loaded" or "unavailable"
"""
try:
from app.services.psa.registry import get_provider_for_connection
from app.services.psa.ticket_context import format_ticket_context_for_prompt
provider = await get_provider_for_connection(psa_connection_id, db)
ticket_context = await provider.get_ticket_context(
int(psa_ticket_id), str(psa_connection_id)
)
ticket_prompt_block = format_ticket_context_for_prompt(ticket_context)
ticket_data = ticket_context.model_dump(mode="json")
return ticket_prompt_block, ticket_data, "loaded"
except Exception as e:
logger.warning(
"Failed to fetch ticket context for ticket %s (connection %s): %s",
psa_ticket_id, psa_connection_id, e,
)
return None, None, "unavailable"
async def _build_escalation_package_enhanced(
session: AISession,
user_id: UUID,
) -> dict[str, Any]:
"""Build enhanced context package with LLM-generated hypotheses."""
steps_tried = []
for step in session.steps:
content = step.content or {}
@@ -727,7 +1088,8 @@ def _build_escalation_package(session: AISession) -> dict[str, Any]:
entry["action_result"] = step.action_result
steps_tried.append(entry)
return {
package = {
"original_user_id": str(user_id),
"problem_summary": session.problem_summary,
"problem_domain": session.problem_domain,
"intake_content": session.intake_content,
@@ -735,3 +1097,36 @@ def _build_escalation_package(session: AISession) -> dict[str, Any]:
"steps_tried": steps_tried,
"escalation_reason": session.escalation_reason,
}
# LLM call for remaining hypotheses and suggested next steps (fast model)
try:
conversation_summary = "\n".join(
f"- {s.get('description', '')}{s.get('response', 'no response')}"
for s in steps_tried
)
prompt = (
"Based on this diagnostic conversation for an IT troubleshooting session:\n\n"
f"Problem: {session.problem_summary}\n"
f"Domain: {session.problem_domain}\n\n"
f"Steps taken:\n{conversation_summary}\n\n"
f"Escalation reason: {session.escalation_reason}\n\n"
"Respond with ONLY a JSON object:\n"
'{"remaining_hypotheses": ["hypothesis1", "hypothesis2"], '
'"suggested_next_steps": ["step1", "step2"], '
'"steps_ruled_out": ["ruled_out1"]}'
)
provider = get_ai_provider(settings.get_model_for_action("quick_action"))
raw, _, _ = await provider.generate_json(
system_prompt="You are an expert IT diagnostic assistant. Analyze the escalation context and provide concise insights.",
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
)
insights = json.loads(raw.strip().strip("`").lstrip("json\n"))
package["remaining_hypotheses"] = insights.get("remaining_hypotheses", [])
package["suggested_next_steps"] = insights.get("suggested_next_steps", [])
package["steps_ruled_out"] = insights.get("steps_ruled_out", [])
except Exception as e:
logger.warning("Failed to generate escalation insights: %s", e)
# Fall back gracefully — don't block the escalation
return package

View File

@@ -11,6 +11,7 @@ from .types import (
PSACompany,
PSAMember,
PSAConfiguration,
PSATimeEntry,
)
@@ -66,3 +67,14 @@ class PSAProvider(ABC):
@abstractmethod
async def get_ticket_configurations(self, ticket_id: str) -> list[PSAConfiguration]:
...
@abstractmethod
async def create_time_entry(
self,
ticket_id: str,
member_id: str,
hours: float,
notes: str | None = None,
work_type: str | None = None,
) -> PSATimeEntry:
...

View File

@@ -15,6 +15,7 @@ from app.services.psa.types import (
PSACompany,
PSAMember,
PSAConfiguration,
PSATimeEntry,
)
from .client import ConnectWiseClient
@@ -514,6 +515,37 @@ class ConnectWiseProvider(PSAProvider):
psa_cache.set(cache_key, ctx, ttl_seconds=300)
return ctx
async def create_time_entry(
self,
ticket_id: str,
member_id: str,
hours: float,
notes: str | None = None,
work_type: str | None = None,
) -> PSATimeEntry:
"""Create a time entry on a CW ticket via POST /time/entries."""
payload: dict = {
"chargeToId": int(ticket_id),
"chargeToType": "ServiceTicket",
"member": {"id": int(member_id)},
"timeStart": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"actualHours": hours,
}
if notes:
payload["notes"] = notes[:2000] # CW limit
if work_type:
payload["workType"] = {"name": work_type}
data = await self._client.post("/time/entries", payload)
return PSATimeEntry(
id=str(data["id"]),
ticket_id=ticket_id,
member_id=member_id,
hours=data.get("actualHours", hours),
notes=data.get("notes"),
created_at=data.get("timeStart"),
)
# ── Private helpers ───────────────────────────────────────────────
@staticmethod

View File

@@ -31,6 +31,32 @@ async def get_provider_for_account(
provider="unknown",
)
return _instantiate_provider(connection)
async def get_provider_for_connection(
connection_id: UUID, db: AsyncSession
) -> PSAProvider:
"""Look up a specific PSA connection by ID, decrypt credentials, instantiate provider."""
result = await db.execute(
select(PsaConnection).where(
PsaConnection.id == connection_id,
PsaConnection.is_active.is_(True),
)
)
connection = result.scalar_one_or_none()
if not connection:
raise PSAConnectionError(
"PSA connection not found or inactive.",
provider="unknown",
)
return _instantiate_provider(connection)
def _instantiate_provider(connection: PsaConnection) -> PSAProvider:
"""Create the appropriate provider instance from a connection record."""
if connection.provider == "connectwise":
from app.services.psa.connectwise.client import ConnectWiseClient
from app.services.psa.connectwise.provider import ConnectWiseProvider

View File

@@ -57,6 +57,16 @@ class PSAConfiguration(BaseModel):
company_name: str | None = None
class PSATimeEntry(BaseModel):
id: str
ticket_id: str
member_id: str | None = None
hours: float
notes: str | None = None
work_type: str | None = None
created_at: str | None = None
class NoteType:
INTERNAL_ANALYSIS = "internal_analysis"
RESOLUTION = "resolution"

View File

@@ -0,0 +1,402 @@
"""PSA Documentation Push Service.
Generates structured documentation from FlowPilot AI sessions and pushes
it back to ConnectWise as internal notes + optional time entries.
"""
import logging
import math
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai_session import AISession
from app.models.psa_connection import PsaConnection
from app.models.psa_member_mapping import PsaMemberMapping
from app.models.psa_post_log import PsaPostLog
from app.services.psa.registry import get_provider_for_connection
from app.services.psa.types import NoteType
from app.services.redaction_service import apply_redaction_to_text
logger = logging.getLogger(__name__)
# Default flowpilot_settings values
DEFAULT_SETTINGS = {
"auto_push": True,
"auto_time_entry": True,
"time_rounding": "15min", # "15min", "30min", "exact", "none"
"note_visibility": "internal", # "internal", "both"
"include_diagnostic_steps": True,
}
def _get_setting(connection: PsaConnection, key: str) -> Any:
"""Get a flowpilot setting with default fallback."""
settings = connection.flowpilot_settings or {}
return settings.get(key, DEFAULT_SETTINGS.get(key))
def _round_hours(hours: float, rounding: str) -> float:
"""Round hours according to the rounding setting."""
if rounding == "exact":
return round(hours, 2)
elif rounding == "30min":
return math.ceil(hours * 2) / 2
else: # default 15min
return math.ceil(hours * 4) / 4
def _format_datetime(dt: datetime | None) -> str:
"""Format a datetime for display in notes."""
if not dt:
return "N/A"
return dt.strftime("%Y-%m-%d %I:%M %p UTC")
def format_resolution_note(session: AISession, include_steps: bool = True) -> str:
"""Format a resolved session as a plain-text note for CW."""
lines = [
"═══ FlowPilot Session Documentation ═══",
f"Session: {session.id}",
]
# Engineer name from relationship if loaded, otherwise user_id
engineer_name = getattr(session, 'user', None)
if engineer_name and hasattr(engineer_name, 'display_name'):
lines.append(f"Engineer: {engineer_name.display_name}")
lines.extend([
f"Date: {_format_datetime(session.resolved_at)}",
f"Started: {_format_datetime(session.created_at)}",
f"Ended: {_format_datetime(session.resolved_at)}",
])
# Duration
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
if minutes < 60:
lines.append(f"Duration: {minutes}m")
else:
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m")
lines.append("")
lines.append("── Problem ──")
lines.append(session.problem_summary or "No summary available")
if session.problem_domain:
lines.append(f"Domain: {session.problem_domain}")
# Diagnostic steps
if include_steps and session.steps:
lines.append("")
lines.append("── Diagnosis Path ──")
for step in session.steps:
content = step.content or {}
step_type = content.get("type", step.step_type).capitalize()
description = content.get("text", "")
response_text = ""
if step.was_skipped:
response_text = "Skipped"
elif step.selected_option:
# Try to find the label
if step.options_presented:
for opt in step.options_presented:
if opt.get("value") == step.selected_option:
response_text = opt.get("label", step.selected_option)
break
else:
response_text = step.selected_option
else:
response_text = step.selected_option
elif step.free_text_input:
response_text = step.free_text_input
lines.append(f"{step.step_order + 1}. [{step_type}] {description}")
if response_text:
lines.append(f" → Response: {response_text}")
if step.action_result:
result = step.action_result
outcome = "Succeeded" if result.get("success") else "Did not resolve"
if details := result.get("details"):
outcome += f"{details}"
lines.append(f" → Result: {outcome}")
# Resolution
lines.append("")
lines.append("── Resolution ──")
lines.append(session.resolution_summary or "No resolution summary")
if session.resolution_action:
lines.append(session.resolution_action)
# Confidence
lines.append("")
lines.append("── AI Confidence ──")
lines.append(f"Final confidence: {session.confidence_tier} ({session.confidence_score:.0%})")
# Timing section (always present)
lines.append("")
lines.append("── Session Timing ──")
lines.append(f"Start: {_format_datetime(session.created_at)}")
lines.append(f"End: {_format_datetime(session.resolved_at)}")
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
lines.append("")
lines.append("Generated by ResolutionFlow FlowPilot")
return "\n".join(lines)
def format_escalation_note(session: AISession, include_steps: bool = True) -> str:
"""Format an escalated session as a plain-text note for CW."""
lines = [
"═══ FlowPilot Escalation Documentation ═══",
f"Session: {session.id}",
]
engineer_name = getattr(session, 'user', None)
if engineer_name and hasattr(engineer_name, 'display_name'):
lines.append(f"Escalated by: {engineer_name.display_name}")
escalated_to = getattr(session, 'escalated_to', None)
if escalated_to and hasattr(escalated_to, 'display_name'):
lines.append(f"Escalated to: {escalated_to.display_name}")
else:
lines.append("Escalated to: Unassigned")
lines.extend([
f"Date: {_format_datetime(session.resolved_at or datetime.now(timezone.utc))}",
f"Started: {_format_datetime(session.created_at)}",
])
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Duration: {minutes}m")
lines.append("")
lines.append("── Problem ──")
lines.append(session.problem_summary or "No summary available")
# Work completed
if include_steps and session.steps:
lines.append("")
lines.append("── Work Completed ──")
for step in session.steps:
content = step.content or {}
description = content.get("text", "")
lines.append(f"{step.step_order + 1}. {description}")
# Escalation reason
lines.append("")
lines.append("── Escalation Reason ──")
lines.append(session.escalation_reason or "No reason provided")
# Escalation package details
pkg = session.escalation_package or {}
if hypotheses := pkg.get("remaining_hypotheses"):
lines.append("")
lines.append("── Remaining Hypotheses ──")
if isinstance(hypotheses, list):
for h in hypotheses:
lines.append(f"- {h}")
else:
lines.append(str(hypotheses))
if suggestions := pkg.get("suggested_next_steps"):
lines.append("")
lines.append("── Suggested Next Steps ──")
if isinstance(suggestions, list):
for s in suggestions:
lines.append(f"- {s}")
else:
lines.append(str(suggestions))
# Timing
lines.append("")
lines.append("── Session Timing ──")
lines.append(f"Start: {_format_datetime(session.created_at)}")
escalated_at = session.resolved_at or datetime.now(timezone.utc)
lines.append(f"Escalated: {_format_datetime(escalated_at)}")
if session.created_at:
delta = escalated_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
lines.append("")
lines.append("Generated by ResolutionFlow FlowPilot")
return "\n".join(lines)
async def push_documentation(
session: AISession,
user_id: UUID,
db: AsyncSession,
) -> dict[str, Any]:
"""Push session documentation to PSA ticket.
Returns:
{
"psa_push_status": "sent" | "pending_retry" | "failed" | "no_psa",
"psa_push_error": str | None,
"member_mapping_warning": str | None,
}
"""
if not session.psa_ticket_id or not session.psa_connection_id:
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
# Load connection and check settings
result = await db.execute(
select(PsaConnection).where(PsaConnection.id == session.psa_connection_id)
)
connection = result.scalar_one_or_none()
if not connection:
return {"psa_push_status": "failed", "psa_push_error": "PSA connection not found", "member_mapping_warning": None}
if not _get_setting(connection, "auto_push"):
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
# Format the note
include_steps = _get_setting(connection, "include_diagnostic_steps")
if session.status == "resolved":
note_text = format_resolution_note(session, include_steps=include_steps)
else:
note_text = format_escalation_note(session, include_steps=include_steps)
# Redact sensitive data
note_text, _ = apply_redaction_to_text(note_text)
# Determine note type
visibility = _get_setting(connection, "note_visibility")
note_type = NoteType.INTERNAL_ANALYSIS if visibility == "internal" else NoteType.DESCRIPTION
# Check member mapping for time entry
member_mapping_warning = None
member_mapping = None
if _get_setting(connection, "auto_time_entry") and _get_setting(connection, "time_rounding") != "none":
mapping_result = await db.execute(
select(PsaMemberMapping).where(
PsaMemberMapping.psa_connection_id == session.psa_connection_id,
PsaMemberMapping.user_id == user_id,
)
)
member_mapping = mapping_result.scalar_one_or_none()
if not member_mapping:
member_mapping_warning = "Map your CW account in Settings → Integrations to enable auto-logged time entries."
# Push to PSA
try:
provider = await get_provider_for_connection(session.psa_connection_id, db)
# Post the note
posted_note = await provider.post_note(
ticket_id=session.psa_ticket_id,
text=note_text,
note_type=note_type,
)
# Create time entry if member mapping exists
if member_mapping and session.resolved_at and session.created_at:
try:
delta = session.resolved_at - session.created_at
hours = delta.total_seconds() / 3600
rounding = _get_setting(connection, "time_rounding")
rounded_hours = _round_hours(hours, rounding)
if rounded_hours > 0:
await provider.create_time_entry(
ticket_id=session.psa_ticket_id,
member_id=member_mapping.external_member_id,
hours=rounded_hours,
notes=f"FlowPilot session: {session.problem_summary or 'Troubleshooting'}",
)
except Exception as e:
logger.warning("Failed to create time entry for session %s: %s", session.id, e)
# Don't fail the note push just because time entry failed
# Log success
log_entry = PsaPostLog(
id=uuid.uuid4(),
ai_session_id=session.id,
psa_connection_id=session.psa_connection_id,
ticket_id=session.psa_ticket_id,
note_type=note_type,
content_posted=note_text[:10000], # Truncate for storage
external_note_id=posted_note.id,
status="success",
posted_by=user_id,
)
db.add(log_entry)
return {
"psa_push_status": "sent",
"psa_push_error": None,
"member_mapping_warning": member_mapping_warning,
}
except Exception as e:
logger.warning("PSA push failed for session %s: %s", session.id, e)
# Log failure with retry scheduling
log_entry = PsaPostLog(
id=uuid.uuid4(),
ai_session_id=session.id,
psa_connection_id=session.psa_connection_id,
ticket_id=session.psa_ticket_id,
note_type=note_type,
content_posted=note_text[:10000],
status="pending_retry",
error_message=str(e)[:500],
retry_count=0,
next_retry_at=datetime.now(timezone.utc) + timedelta(minutes=5),
posted_by=user_id,
)
db.add(log_entry)
return {
"psa_push_status": "pending_retry",
"psa_push_error": str(e)[:200],
"member_mapping_warning": member_mapping_warning,
}
async def retry_failed_push(
log_entry: PsaPostLog,
db: AsyncSession,
) -> bool:
"""Retry a failed PSA push. Returns True on success."""
try:
provider = await get_provider_for_connection(log_entry.psa_connection_id, db)
posted_note = await provider.post_note(
ticket_id=log_entry.ticket_id,
text=log_entry.content_posted,
note_type=log_entry.note_type,
)
log_entry.status = "success"
log_entry.external_note_id = posted_note.id
log_entry.error_message = None
log_entry.next_retry_at = None
return True
except Exception as e:
log_entry.retry_count += 1
log_entry.error_message = str(e)[:500]
if log_entry.retry_count >= 3:
log_entry.status = "failed"
log_entry.next_retry_at = None
else:
# Exponential backoff: 5min, 15min, 45min
backoff_minutes = 5 * (3 ** log_entry.retry_count)
log_entry.next_retry_at = datetime.now(timezone.utc) + timedelta(minutes=backoff_minutes)
logger.warning(
"PSA retry %d failed for log %s: %s",
log_entry.retry_count, log_entry.id, e,
)
return False

View File

@@ -0,0 +1,52 @@
"""Background scheduler for retrying failed PSA documentation pushes.
Runs every 5 minutes via APScheduler, picks up PsaPostLog entries
with status='pending_retry' and next_retry_at <= now.
"""
import logging
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import async_session_maker
from app.models.psa_post_log import PsaPostLog
from app.services.psa_documentation_service import retry_failed_push
logger = logging.getLogger(__name__)
async def process_pending_retries() -> None:
"""Process all pending PSA push retries that are due."""
async with async_session_maker() as db:
try:
result = await db.execute(
select(PsaPostLog)
.where(
PsaPostLog.status == "pending_retry",
PsaPostLog.next_retry_at <= datetime.now(timezone.utc),
PsaPostLog.retry_count < 3,
)
.limit(20) # Process in batches
)
entries = result.scalars().all()
if not entries:
return
logger.info("Processing %d pending PSA push retries", len(entries))
for entry in entries:
success = await retry_failed_push(entry, db)
if success:
logger.info("PSA retry succeeded for log %s", entry.id)
else:
logger.warning(
"PSA retry %d/%d failed for log %s",
entry.retry_count, 3, entry.id,
)
await db.commit()
except Exception as e:
logger.error("PSA retry scheduler error: %s", e)
await db.rollback()