Files
resolutionflow/backend/app/services/psa/connectwise/provider.py
chihlasm 8534dbfb5f feat: command palette, PSA ticket context, session-to-flow converter (#108)
* feat: add paletteIntent utility for command palette query classification

Detects query intent ('question' | 'keyword' | 'page' | 'empty') to drive
smart result ordering in the enhanced command palette.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add recentFlows localStorage utility for command palette empty state

Tracks recently visited flows (capped at 10) with deduplication by id,
surfaced in command palette when query is empty.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: rewrite CommandPalette with categorized results and smart ranking

- Adds FlowPilot AI result (always present when query is non-empty)
- Intent-aware ordering: question → FlowPilot prominent; page → pages first;
  keyword → FlowPilot at top with flows/sessions/tags below
- Pages section with admin-gated items (uses useAuthStore)
- Tags extracted from flow search results with ?tag= navigation
- Quick Actions for create/import/scripts
- Empty state shows recent flows + quick actions
- Grouped rendering with section labels per design system
- Keyboard nav flattened across groups

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add FlowPilot prefill handoff from command palette to AssistantChatPage

When navigated to /assistant with location.state.prefill, automatically
creates a new chat and sends the prefill message without user interaction.
Clears location state after handling to prevent re-trigger on back navigation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: track recently visited flows for command palette empty state

Calls addRecentFlow after tree data loads in both TreeNavigationPage and
ProceduralNavigationPage so the command palette can surface recent flows
when the query is empty.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: use useMemo instead of useCallback for groups builder in CommandPalette

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add PSA ticket context Pydantic schemas (Task 6)

Add TicketDetails, CompanyInfo, ContactInfo, ConfigItem, TicketNote,
RelatedTicket, and TicketContext models in schemas/psa_context.py for
structured ticket context enrichment used by AI prompt injection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add ticket context prompt formatter (Task 7)

format_ticket_context_for_prompt() in services/psa/ticket_context.py
serializes TicketContext into structured text for AI system prompts,
with 10-note limit, 200-char text previews, and human-readable timestamps.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add get_ticket_context() to ConnectWise provider (Task 8)

Fetches ticket details, company, contact, configurations, notes, and
related open tickets in parallel via asyncio.gather with partial failure
tolerance. Results are cached with a 5-minute TTL per ticket/connection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add GET /integrations/psa/tickets/{id}/context endpoint (Task 9)

Returns rich TicketContext for a ticket ID. Handles PSA auth failures
(returns structured error), ticket-not-found (404), and general PSA
errors (502). Requires active PSA connection for the user's account.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: inject PSA ticket context into copilot system prompt (Task 10)

When a copilot conversation has an associated session with a linked PSA
ticket, fetch the ticket context and append it to the system prompt.
Failure is non-critical — errors are logged and the copilot proceeds
without context rather than failing the request.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add PSA context API client with TypeScript interfaces

Defines TicketDetails, CompanyInfo, ContactInfo, ConfigItemInfo,
TicketNote, RelatedTicket, and TicketContext interfaces matching backend
psa_context.py schemas. Exports psaContextApi with getTicketContext().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add useTicketContext hook for PSA ticket context fetching

Accepts psaTicketId and psaConnectionId, fetches context on mount
when both IDs are present, and exposes refresh() for manual re-fetch.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add TicketContextPanel component with accordion sections

Glass-card panel showing ticket summary, status/priority/SLA, and
accordion sections for Client, Contact, Devices, Notes, and Related
tickets. Matches design system with font-label labels and ice-cyan accents.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: mount TicketContextPanel in session runners when ticket is linked

ProceduralNavigationPage renders panel in left sidebar below step checklist.
TreeNavigationPage renders panel above breadcrumb trail. Both use
useTicketContext hook and show panel only when psa_ticket_id is set.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add fallback_steps to TypeScript types (Task 15)

Add optional fallback_steps field to ProceduralStep interface.
Add FallbackStepRecord interface and fallback_decisions field to Session.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add backend validation for fallback steps (Task 16)

Validate fallback_steps in procedural flow validation: required fields,
no nested fallback_steps, no duplicate IDs. Add FallbackStepRecord schema
and fallback_decisions field to SessionResponse.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: create FallbackSteps UI component (Task 17)

Collapsible component supporting edit and execute modes. Edit mode
provides title/description inputs with add/remove controls. Execute
mode shows "This worked" / "Didn't help" action buttons with emerald/
rose styling. Amber accent styling throughout.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: integrate FallbackSteps into editor and session runner (Task 18)

Wire FallbackSteps edit mode into StepEditor for procedure_step type
with add/remove/update handlers using crypto.randomUUID(). Add execute
mode rendering in ProceduralNavigationPage with fallbackDecisions state
tracking per parent step.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add session-to-flow request/response schemas (Task 19)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add session-to-flow AI generation service (Task 20)

Converts completed troubleshooting sessions into reusable procedural flows
with fallback branches. Includes PSA ticket context integration and
AI-generated step validation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add POST /ai/session-to-flow endpoint (Task 21)

Converts a completed session into a reusable procedural flow using AI.
Includes quota checking, usage recording, and proper error handling.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add Create Flow from Session button to session detail page (Task 22)

Adds sessionToFlow API client, exports from api/index.ts, and integrates
a prominent "Create Flow from Session" button on SessionDetailPage for
completed sessions. Generates a procedural flow via AI then navigates
to the procedural editor.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: cast tree_type to TreeType in session-to-flow creation

Fixes build error where string was not assignable to TreeType.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: update Playwright test selectors to match actual UI

- Use Control+k instead of Meta+k (Linux/CI compatibility)
- Use 'AI Assistant' group label instead of 'FlowPilot AI'
- Match actual FlowPilot chat page elements (Start a Conversation, New Chat, textarea)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: update Playwright test selectors to match actual UI

- Use specific command palette placeholder to avoid ambiguous matches
- Fix 'Quick Actions' scoping (two elements with same text)
- Fix 'Resolved' exact match on session detail page
- Fix tree editor to use getByText instead of getByDisplayValue
- Fix 'Add Step' strict mode by using .first()
- Fix fallback description placeholder text
- Update playwright.config.ts to use port 5433 and resolutionflow DB
- Update FlowPilot chat selectors to match actual page layout

11/17 new tests now passing. Remaining 6 need procedural session
navigation investigation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve all Playwright test failures — 16/16 passing

- Fix procedural session tests: sessions auto-start, no Start button
- Fix strict mode violations: use getByRole('heading') for step titles
- Fix FlowPilot chat: use button role selector for New Chat
- Fix command palette page nav: scope Analytics click to palette modal
- Fix fallback runner: remove non-existent Start button click
- Update playwright.config to port 5433 for local Docker

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 13:39:17 -04:00

535 lines
21 KiB
Python

"""ConnectWise implementation of PSAProvider."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from app.services.psa.base import PSAProvider
from app.services.psa.cache import psa_cache
from app.services.psa.types import (
ConnectionTestResult,
PSATicket,
PSANote,
PSAStatus,
PSACompany,
PSAMember,
PSAConfiguration,
)
from .client import ConnectWiseClient
logger = logging.getLogger(__name__)
class ConnectWiseProvider(PSAProvider):
"""ConnectWise PSA provider implementation."""
def __init__(self, client: ConnectWiseClient):
self.client = client
async def test_connection(self) -> ConnectionTestResult:
"""Test the CW connection by fetching system info."""
try:
info = await self.client.get("/system/info")
return ConnectionTestResult(
success=True,
message="Connected successfully.",
server_version=info.get("version", None),
)
except Exception as e:
return ConnectionTestResult(
success=False,
message=str(e),
server_version=None,
)
# ── Tickets ───────────────────────────────────────────────────────
async def get_ticket(self, ticket_id: str) -> PSATicket:
"""Fetch a single ticket by ID from ConnectWise."""
data = await self.client.get(
f"/service/tickets/{ticket_id}",
params={"fields": "id,summary,company,board,status,priority,closedFlag"},
)
return self._map_ticket(data)
async def search_tickets(self, query: str, **filters) -> list[PSATicket]:
"""Search CW tickets by summary. Supports board_id and status_id filters."""
params: dict = {
"fields": "id,summary,company,board,status,priority,closedFlag",
"orderBy": "id desc",
"pageSize": 25,
}
# Build CW condition query
conditions: list[str] = []
if query:
conditions.append(f"summary contains '{query}'")
if filters.get("board_id"):
conditions.append(f"board/id = {filters['board_id']}")
if filters.get("status_id"):
conditions.append(f"status/id = {filters['status_id']}")
if not filters.get("include_closed", False):
conditions.append("closedFlag = false")
if conditions:
params["conditions"] = " and ".join(conditions)
data = await self.client.get("/service/tickets", params=params)
return [
self._map_ticket(t)
for t in (data if isinstance(data, list) else [])
]
async def get_ticket_configurations(
self, ticket_id: str
) -> list[PSAConfiguration]:
"""Get configurations (assets) attached to a ticket."""
data = await self.client.get(
f"/service/tickets/{ticket_id}/configurations",
params={"fields": "id,deviceIdentifier,type,company"},
)
return [
PSAConfiguration(
id=str(c["id"]),
name=c.get("deviceIdentifier", ""),
type=c.get("type", {}).get("name") if c.get("type") else None,
company_name=c.get("company", {}).get("name") if c.get("company") else None,
)
for c in (data if isinstance(data, list) else [])
]
# ── Board statuses (cached) ───────────────────────────────────────
async def get_ticket_statuses(self, board_id: int) -> list[PSAStatus]:
"""Get available statuses for a CW service board (cached 1 hour)."""
cache_key = f"board_statuses:{board_id}"
cached = psa_cache.get(cache_key)
if cached is not None:
return cached
data = await self.client.get(
f"/service/boards/{board_id}/statuses",
params={"fields": "id,name,closedStatus", "pageSize": 100},
)
result = [
PSAStatus(
id=s["id"],
name=s["name"],
is_closed=s.get("closedStatus", False),
)
for s in (data if isinstance(data, list) else [])
]
psa_cache.set(cache_key, result, ttl_seconds=3600)
return result
# ── Companies ─────────────────────────────────────────────────────
async def list_companies(self, **filters) -> list[PSACompany]:
"""List companies from CW, optionally filtered by status."""
params: dict = {
"fields": "id,name,status",
"pageSize": 100,
"orderBy": "name asc",
}
conditions: list[str] = []
if filters.get("status"):
conditions.append(f"status/name = '{filters['status']}'")
if conditions:
params["conditions"] = " and ".join(conditions)
data = await self.client.get("/company/companies", params=params)
return [
PSACompany(
id=str(c["id"]),
name=c.get("name", ""),
status=c.get("status", {}).get("name") if c.get("status") else None,
)
for c in (data if isinstance(data, list) else [])
]
async def get_company(self, company_id: str) -> PSACompany:
"""Fetch a single company by ID."""
data = await self.client.get(
f"/company/companies/{company_id}",
params={"fields": "id,name,status"},
)
return PSACompany(
id=str(data["id"]),
name=data.get("name", ""),
status=data.get("status", {}).get("name") if data.get("status") else None,
)
# ── Notes & status updates ───────────────────────────────────────
async def post_note(
self,
ticket_id: str,
text: str,
note_type: str,
member_id: str | None = None,
) -> PSANote:
"""Post a note to a CW ticket.
Maps ResolutionFlow note types to CW flag fields:
- internal_analysis → internalAnalysisFlag (internal only)
- resolution → resolutionFlag (internal, triggers notifications)
- description → detailDescriptionFlag (external, triggers notifications)
"""
from app.services.psa.types import NoteType
flags = {
NoteType.INTERNAL_ANALYSIS: {
"internalAnalysisFlag": True,
"resolutionFlag": False,
"detailDescriptionFlag": False,
"internalFlag": True,
"processNotifications": False,
},
NoteType.RESOLUTION: {
"internalAnalysisFlag": False,
"resolutionFlag": True,
"detailDescriptionFlag": False,
"internalFlag": True,
"processNotifications": True,
},
NoteType.DESCRIPTION: {
"internalAnalysisFlag": False,
"resolutionFlag": False,
"detailDescriptionFlag": True,
"internalFlag": False,
"processNotifications": True,
},
}
note_flags = flags.get(note_type, flags[NoteType.INTERNAL_ANALYSIS])
# NOTE: CW Developer Guide states \n is "Not Supported" in JSON bodies
# and may be collapsed to a single space. CW does support markdown in ticket
# notes (see PSA-Markdown.md). This needs sandbox testing — if newlines are
# lost, consider using double-space line breaks or HTML <br> tags instead.
body: dict = {
"text": text,
**note_flags,
}
if member_id:
body["member"] = {"id": int(member_id)}
data = await self.client.post(
f"/service/tickets/{ticket_id}/notes", json_body=body
)
return PSANote(
id=str(data.get("id", "")),
text=data.get("text", ""),
note_type=note_type,
created_at=data.get("dateCreated"),
)
async def update_ticket_status(
self, ticket_id: str, status_id: int
) -> PSATicket:
"""Update a CW ticket's status using JSON Patch format."""
patch_body = [
{"op": "replace", "path": "status", "value": {"id": status_id}}
]
data = await self.client.patch(
f"/service/tickets/{ticket_id}", json_body=patch_body
)
return self._map_ticket(data)
async def list_members(self) -> list[PSAMember]:
"""List CW system members (cached 15 minutes)."""
cache_key = "members:all"
cached = psa_cache.get(cache_key)
if cached is not None:
return cached
data = await self.client.get_paginated(
"/system/members",
params={
"fields": "id,identifier,firstName,lastName,officeEmail",
"conditions": "inactiveFlag = false",
"pageSize": 1000,
},
)
result = [
PSAMember(
id=str(m["id"]),
identifier=m.get("identifier", ""),
name=f"{m.get('firstName', '')} {m.get('lastName', '')}".strip(),
email=m.get("officeEmail"),
)
for m in data
]
psa_cache.set(cache_key, result, ttl_seconds=900)
return result
# ── Ticket Context ────────────────────────────────────────────────
async def get_ticket_context(
self, ticket_id: int, connection_id: str | None = None
):
"""Fetch rich ticket context for AI prompt injection.
Returns a TicketContext with ticket details, company, contact,
configurations, recent notes, and related open tickets.
Results are cached for 5 minutes per ticket.
"""
from app.schemas.psa_context import (
TicketContext,
TicketDetails,
CompanyInfo,
ContactInfo,
ConfigItem,
TicketNote,
RelatedTicket,
)
cache_key = f"{connection_id or 'default'}:ticket_context:{ticket_id}"
cached = psa_cache.get(cache_key)
if cached is not None:
return cached
# Fetch ticket first to get company_id and contact_id
ticket_data = await self.client.get(
f"/service/tickets/{ticket_id}",
params={
"fields": "id,summary,status,priority,board,sla,dateEntered,resources,company,contact"
},
)
company_id = ticket_data.get("company", {}).get("id") if ticket_data.get("company") else None
contact_id = ticket_data.get("contact", {}).get("id") if ticket_data.get("contact") else None
# Build parallel fetch tasks
configs_task = asyncio.create_task(
self.client.get(
f"/service/tickets/{ticket_id}/configurations",
params={
"fields": "id,deviceIdentifier,type,osType,serialNumber,ipAddress,modelNumber"
},
)
)
notes_task = asyncio.create_task(
self.client.get(
f"/service/tickets/{ticket_id}/notes",
params={
"pageSize": "20",
"orderBy": "dateCreated desc",
"fields": "id,text,member,dateCreated,internalAnalysisFlag",
},
)
)
company_task = asyncio.create_task(
self.client.get(
f"/company/companies/{company_id}",
params={
"fields": "id,name,site,addressLine1,city,state,zip,phoneNumber,type,territory"
},
)
) if company_id else None
related_task = asyncio.create_task(
self.client.get(
"/service/tickets",
params={
"conditions": f"company/id={company_id} AND closedFlag=false AND id != {ticket_id}",
"pageSize": "5",
"orderBy": "id desc",
"fields": "id,summary,status,priority,board",
},
)
) if company_id else None
contact_task = asyncio.create_task(
self.client.get(
f"/company/contacts/{contact_id}",
params={
"fields": "id,firstName,lastName,title,defaultPhoneNbr,communicationItems"
},
)
) if contact_id else None
# Gather all tasks with partial failure tolerance
tasks_to_await = [t for t in [configs_task, notes_task, company_task, related_task, contact_task] if t is not None]
task_results = await asyncio.gather(*tasks_to_await, return_exceptions=True)
# Unpack results in order (skipping None tasks)
result_iter = iter(task_results)
configs_raw = next(result_iter)
notes_raw = next(result_iter)
company_raw = next(result_iter) if company_task else None
related_raw = next(result_iter) if related_task else None
contact_raw = next(result_iter) if contact_task else None
# Map ticket details
def _parse_dt(val: str | None) -> datetime:
if not val:
return datetime.now(timezone.utc)
try:
# CW returns ISO 8601 strings — ensure timezone aware
dt = datetime.fromisoformat(val.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, AttributeError):
return datetime.now(timezone.utc)
ticket_details = TicketDetails(
id=ticket_data["id"],
summary=ticket_data.get("summary", ""),
status=ticket_data.get("status", {}).get("name", "") if isinstance(ticket_data.get("status"), dict) else str(ticket_data.get("status", "")),
priority=ticket_data.get("priority", {}).get("name", "") if isinstance(ticket_data.get("priority"), dict) else str(ticket_data.get("priority", "")),
board=ticket_data.get("board", {}).get("name", "") if isinstance(ticket_data.get("board"), dict) else str(ticket_data.get("board", "")),
sla=ticket_data.get("sla", {}).get("name") if isinstance(ticket_data.get("sla"), dict) else ticket_data.get("sla"),
date_entered=_parse_dt(ticket_data.get("dateEntered")),
resources=ticket_data.get("resources"),
)
# Map company
company_info: CompanyInfo
if isinstance(company_raw, dict):
addr_parts = [
company_raw.get("addressLine1"),
company_raw.get("city"),
company_raw.get("state"),
company_raw.get("zip"),
]
address = ", ".join(p for p in addr_parts if p) or None
company_info = CompanyInfo(
id=company_raw["id"],
name=company_raw.get("name", ""),
site=company_raw.get("site", {}).get("name") if isinstance(company_raw.get("site"), dict) else company_raw.get("site"),
address=address,
phone=company_raw.get("phoneNumber"),
type=company_raw.get("type", {}).get("name") if isinstance(company_raw.get("type"), dict) else company_raw.get("type"),
territory=company_raw.get("territory", {}).get("name") if isinstance(company_raw.get("territory"), dict) else company_raw.get("territory"),
)
else:
if isinstance(company_raw, Exception):
logger.warning("Failed to fetch company for ticket %s: %s", ticket_id, company_raw)
# Fallback: use data from ticket itself
company_info = CompanyInfo(
id=company_id or 0,
name=ticket_data.get("company", {}).get("name", "") if isinstance(ticket_data.get("company"), dict) else "",
)
# Map contact
contact_info: ContactInfo | None = None
if isinstance(contact_raw, dict):
first = contact_raw.get("firstName", "")
last = contact_raw.get("lastName", "")
full_name = f"{first} {last}".strip() or "Unknown"
# Extract email from communicationItems
email: str | None = None
comm_items = contact_raw.get("communicationItems", [])
if isinstance(comm_items, list):
for item in comm_items:
if isinstance(item, dict) and item.get("communicationType") == "Email":
email = item.get("value")
break
contact_info = ContactInfo(
name=full_name,
email=email,
phone=contact_raw.get("defaultPhoneNbr"),
title=contact_raw.get("title"),
)
elif isinstance(contact_raw, Exception):
logger.warning("Failed to fetch contact for ticket %s: %s", ticket_id, contact_raw)
# Map configurations
configurations: list[ConfigItem] = []
if isinstance(configs_raw, list):
for cfg in configs_raw:
if not isinstance(cfg, dict):
continue
configurations.append(ConfigItem(
device_identifier=cfg.get("deviceIdentifier", ""),
type=cfg.get("type", {}).get("name") if isinstance(cfg.get("type"), dict) else cfg.get("type"),
os_type=cfg.get("osType", {}).get("name") if isinstance(cfg.get("osType"), dict) else cfg.get("osType"),
serial_number=cfg.get("serialNumber"),
ip_address=cfg.get("ipAddress"),
model_number=cfg.get("modelNumber"),
))
elif isinstance(configs_raw, Exception):
logger.warning("Failed to fetch configs for ticket %s: %s", ticket_id, configs_raw)
# Map notes
notes: list[TicketNote] = []
if isinstance(notes_raw, list):
for note in notes_raw:
if not isinstance(note, dict):
continue
member_name: str | None = None
member_obj = note.get("member")
if isinstance(member_obj, dict):
first = member_obj.get("firstName", "")
last = member_obj.get("lastName", "")
member_name = f"{first} {last}".strip() or member_obj.get("identifier")
elif isinstance(member_obj, str):
member_name = member_obj
notes.append(TicketNote(
text=note.get("text", ""),
member=member_name,
date_created=_parse_dt(note.get("dateCreated")),
internal_analysis_flag=note.get("internalAnalysisFlag", False),
))
elif isinstance(notes_raw, Exception):
logger.warning("Failed to fetch notes for ticket %s: %s", ticket_id, notes_raw)
# Map related tickets
related_tickets: list[RelatedTicket] = []
if isinstance(related_raw, list):
for rt in related_raw:
if not isinstance(rt, dict):
continue
related_tickets.append(RelatedTicket(
id=rt["id"],
summary=rt.get("summary", ""),
status=rt.get("status", {}).get("name", "") if isinstance(rt.get("status"), dict) else str(rt.get("status", "")),
priority=rt.get("priority", {}).get("name", "") if isinstance(rt.get("priority"), dict) else str(rt.get("priority", "")),
board=rt.get("board", {}).get("name", "") if isinstance(rt.get("board"), dict) else str(rt.get("board", "")),
))
elif isinstance(related_raw, Exception):
logger.warning("Failed to fetch related tickets for ticket %s: %s", ticket_id, related_raw)
ctx = TicketContext(
ticket=ticket_details,
company=company_info,
contact=contact_info,
configurations=configurations,
notes=notes,
related_tickets=related_tickets,
fetched_at=datetime.now(timezone.utc),
)
psa_cache.set(cache_key, ctx, ttl_seconds=300)
return ctx
# ── Private helpers ───────────────────────────────────────────────
@staticmethod
def _map_ticket(data: dict) -> PSATicket:
"""Map a CW ticket JSON dict to a PSATicket."""
return PSATicket(
id=str(data["id"]),
summary=data.get("summary", ""),
company_name=data.get("company", {}).get("name"),
company_id=str(data["company"]["id"]) if data.get("company") else None,
board_name=data.get("board", {}).get("name"),
board_id=data.get("board", {}).get("id"),
status_name=data.get("status", {}).get("name"),
status_id=data.get("status", {}).get("id"),
priority_name=data.get("priority", {}).get("name"),
priority_id=data.get("priority", {}).get("id"),
closed=data.get("closedFlag", False),
)