Merge main into feat/flowpilot-migration
Some checks failed
Mirror to GitHub / mirror (push) Successful in 11s
CI / backend (pull_request) Failing after 36s
CI / frontend (pull_request) Failing after 1m7s
CI / e2e (pull_request) Has been skipped

Brings in PR #141 (PSA ticket management) so FlowPilot can ship on top
of a unified main. Two manual conflict resolutions:

1. CLAUDE.md — kept the FlowPilot ai-handoff rewrite (`.ai/`-driven
   protocol). The pre-rewrite reference content (CW integration notes,
   lessons archive, env vars table) lives in `docs/connectwise/`,
   `docs/LESSONS-ARCHIVE.md`, and DEV-ENV.md by design.

2. frontend/src/pages/AssistantChatPage.tsx — both conflict regions
   were purely additive. Concatenated FlowPilot's Phase 2-9 state hooks
   (facts, activeFix, preview*, scriptPanelOpen, templatizeQueue) with
   PSA's spin-off ticket state (linkedTicket, showNewTicket, spinOffHint).
   Both modal mounts (TemplatizePrompt, ShortcutsHelpOverlay,
   NewTicketModal) kept. All setters wired by either branch are intact.

Verification:
- `tsc -b` clean across the merged tree.
- Browser smoke-test (Session B fixture): Phase 9 ProposalBanner
  ("Run AI-drafted PowerShell to recover SSL VPN") renders alongside
  PSA's new Tickets sidebar icon. Console clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-25 01:03:33 -04:00
45 changed files with 9951 additions and 106 deletions

View File

@@ -1,6 +1,7 @@
"""PSA integration endpoints — connection CRUD and test."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Annotated
from uuid import UUID
@@ -11,6 +12,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import delete
logger = logging.getLogger(__name__)
from app.api.deps import get_current_active_user, require_account_owner, require_engineer_or_admin
from app.core.database import get_db
from app.models.psa_connection import PsaConnection
@@ -30,6 +33,17 @@ from app.schemas.psa_connection import (
PSABoardResponse,
)
from app.core.config import settings
from app.schemas.psa_tickets import (
PSAResourceSchema,
PSATicketCreatedSchema,
PSATicketStatusUpdateSchema,
TicketCreatePayloadSchema,
PSAPrioritySchema,
TicketListResponseSchema,
AiParseRequestSchema,
AiParseResponseSchema,
)
import app.services.ticket_service as ticket_svc
from app.services.psa.encryption import (
decrypt_credentials,
encrypt_credentials,
@@ -362,33 +376,36 @@ async def list_boards(
provider = await get_provider_for_account(current_user.account_id, db)
boards = await provider.list_boards()
return [PSABoardResponse(id=b.id, name=b.name) for b in boards]
except PSAError:
except PSAError as e:
# Boards are optional UI chrome — degrade gracefully rather than surfacing a toast
logger.warning("list_boards failed: %s", e)
return []
@router.get("/tickets/search", response_model=list[PSATicketSearchResult])
@router.get("/tickets/search", response_model=TicketListResponseSchema)
async def search_tickets(
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
query: str = "",
board_id: int | None = None,
status_id: int | None = None,
status_name: str | None = None,
include_closed: bool = False,
assigned_to_me: bool = False,
unassigned: bool = False,
board_ids: str = "",
priority: str | None = None,
company_id: int | None = None,
page: int = 1,
page_size: int = 10,
page_size: int = 25,
):
"""Search ConnectWise tickets."""
"""Search ConnectWise tickets — returns paginated TicketListResponse."""
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.registry import get_provider_for_account
from app.services.psa.exceptions import PSAError
# Resolve assigned_to_me → member_identifier (CW login name for resources contains filter)
member_identifier: str | None = None
if assigned_to_me:
conn_result = await db.execute(
@@ -407,23 +424,18 @@ async def search_tickets(
)
mapping = mapping_result.scalar_one_or_none()
if not mapping:
# No mapping for this user — return empty list
return []
from app.services.psa.registry import get_provider_for_account as _get_provider
from app.services.psa.exceptions import PSAError as _PSAError
return {"items": [], "total": 0, "page": page, "page_size": page_size}
try:
_provider = await _get_provider(current_user.account_id, db)
_provider = await get_provider_for_account(current_user.account_id, db)
cw_members = await _provider.list_members()
matched = next((m for m in cw_members if m.id == mapping.external_member_id), None)
if matched:
member_identifier = matched.identifier
else:
return []
except _PSAError:
return []
return {"items": [], "total": 0, "page": page, "page_size": page_size}
except PSAError:
return {"items": [], "total": 0, "page": page, "page_size": page_size}
# Parse comma-separated board_ids
parsed_board_ids: list[int] = []
if board_ids:
try:
@@ -433,33 +445,250 @@ async def search_tickets(
try:
provider = await get_provider_for_account(current_user.account_id, db)
tickets = await provider.search_tickets(
result = await provider.search_tickets(
query,
board_id=board_id,
status_id=status_id,
status_name=status_name,
include_closed=include_closed,
member_identifier=member_identifier,
unassigned=unassigned,
board_ids=parsed_board_ids,
company_id=company_id,
page=page,
page_size=page_size,
)
return [
items = [
PSATicketSearchResult(
id=t.id,
summary=t.summary,
company_name=t.company_name,
company_id=t.company_id,
board_name=t.board_name,
board_id=t.board_id,
status_name=t.status_name,
status_id=t.status_id,
priority_name=t.priority_name,
priority_id=t.priority_id,
closed=t.closed,
)
for t in tickets
for t in result.items
]
return {"items": items, "total": result.total, "page": result.page, "page_size": result.page_size}
except PSAError as e:
raise HTTPException(status_code=502, detail=str(e))
@router.post("/tickets", response_model=PSATicketCreatedSchema, status_code=201)
async def create_ticket(
data: TicketCreatePayloadSchema,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Create a new PSA ticket."""
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.exceptions import PSAError
from app.services.psa.types import TicketCreatePayload
try:
return await ticket_svc.create_ticket(
current_user.account_id,
TicketCreatePayload(**data.model_dump()),
db,
)
except PSAError as e:
raise HTTPException(status_code=502, detail=str(e))
@router.post("/tickets/ai-parse", response_model=AiParseResponseSchema)
async def ai_parse_ticket(
data: AiParseRequestSchema,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Parse natural language into a ticket pre-fill payload using Claude."""
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.registry import get_provider_for_account
from app.services.psa.exceptions import PSAError
import anthropic
import json
# Fetch boards + members for context (both cached)
boards = []
members = []
try:
provider = await get_provider_for_account(current_user.account_id, db)
boards = await provider.list_boards()
members = await provider.list_members()
except PSAError:
pass
boards_list = [{"id": b.id, "name": b.name} for b in boards]
members_list = [{"id": m.id, "name": m.name, "identifier": m.identifier} for m in members]
system_prompt = """You are a ticket triage assistant for an MSP help desk.
Extract structured ticket information from the engineer's natural language description.
Return ONLY valid JSON matching this exact schema — no other text:
{
"summary": "short one-line ticket title or null",
"board_id": "integer matching one of the provided boards or null",
"priority_name": "one of: Critical, High, Medium, Low, or null",
"description": "expanded description or null",
"assignee_identifier": "member identifier string from the provided members list or null",
"warnings": ["list of strings explaining what could not be resolved"]
}"""
user_msg = f"""Available boards: {json.dumps(boards_list)}
Available members: {json.dumps(members_list[:50])}
Engineer's description: {data.prompt}"""
missing_fields: list[str] = []
warnings: list[str] = []
response_data = AiParseResponseSchema()
try:
client = anthropic.AsyncAnthropic(
api_key=settings.ANTHROPIC_API_KEY,
max_retries=1,
)
msg = await client.messages.create(
model=settings.get_model_for_action("default"),
max_tokens=512,
system=system_prompt,
messages=[{"role": "user", "content": user_msg}],
)
raw = msg.content[0].text.strip()
# Strip markdown fences if present
if raw.startswith("```"):
import re
raw = re.sub(r'^```(?:json)?\s*', '', raw)
raw = re.sub(r'\s*```$', '', raw.strip())
parsed = json.loads(raw)
response_data.summary = parsed.get("summary")
response_data.description = parsed.get("description")
warnings = parsed.get("warnings", [])
# Resolve board_id
if parsed.get("board_id"):
board_match = next((b for b in boards if b.id == int(parsed["board_id"])), None)
if board_match:
response_data.board_id = board_match.id
else:
missing_fields.append("board_id")
warnings.append(f"Board ID {parsed['board_id']} not found")
else:
missing_fields.append("board_id")
# Resolve assignee
if parsed.get("assignee_identifier"):
member = next((m for m in members if m.identifier == parsed["assignee_identifier"]), None)
if member:
response_data.assigned_member_id = int(member.id)
else:
warnings.append(f"Member '{parsed['assignee_identifier']}' not found")
# Priority/status/company always need manual selection
missing_fields.extend(["status_id", "priority_id", "company_id"])
except Exception as e:
logger.warning("AI parse failed: %s", e)
missing_fields = ["summary", "board_id", "status_id", "priority_id", "company_id"]
warnings = ["AI parsing failed — please fill in manually"]
response_data.missing_fields = missing_fields
response_data.warnings = warnings
return response_data
@router.patch("/tickets/{ticket_id}/status", response_model=PSATicketStatusUpdateSchema)
async def update_ticket_status_endpoint(
ticket_id: int,
status_id: int,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Update a ticket's status."""
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.exceptions import PSAError
try:
return await ticket_svc.update_status(current_user.account_id, ticket_id, status_id, db)
except PSAError as e:
raise HTTPException(status_code=502, detail=str(e))
@router.get("/tickets/{ticket_id}/resources", response_model=list[PSAResourceSchema])
async def list_ticket_resources(
ticket_id: int,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.exceptions import PSAError
try:
return await ticket_svc.list_resources(current_user.account_id, ticket_id, db)
except PSAError as e:
# Resources are optional display data — degrade gracefully rather than surfacing a toast
logger.warning("list_resources(%s) failed: %s", ticket_id, e)
return []
@router.post("/tickets/{ticket_id}/resources", response_model=PSAResourceSchema, status_code=201)
async def add_ticket_resource(
ticket_id: int,
member_id: int,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.exceptions import PSAError
try:
return await ticket_svc.add_resource(current_user.account_id, ticket_id, member_id, db)
except PSAError as e:
raise HTTPException(status_code=502, detail=str(e))
@router.delete("/tickets/{ticket_id}/resources/{member_id}", status_code=204)
async def remove_ticket_resource(
ticket_id: int,
member_id: int,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.exceptions import PSAError
try:
await ticket_svc.remove_resource(current_user.account_id, ticket_id, member_id, db)
except PSAError as e:
raise HTTPException(status_code=502, detail=str(e))
@router.get("/priorities", response_model=list[PSAPrioritySchema])
async def list_priorities(
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""List PSA priority levels for ticket creation form."""
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.registry import get_provider_for_account
from app.services.psa.exceptions import PSAError
try:
provider = await get_provider_for_account(current_user.account_id, db)
raw = await provider.list_priorities()
return [PSAPrioritySchema(id=p["id"], name=p["name"]) for p in raw if p.get("id")]
except PSAError as e:
logger.warning("list_priorities failed: %s", e)
return []
@router.get("/tickets/{ticket_id}/context")
async def get_ticket_context(
ticket_id: int,
@@ -561,7 +790,30 @@ async def get_ticket_statuses(
except PSANotFoundError:
raise HTTPException(status_code=404, detail="Ticket not found")
except PSAError as e:
raise HTTPException(status_code=502, detail=str(e))
logger.warning("get_ticket_statuses(%s) failed: %s", ticket_id, e)
return []
@router.get("/boards/{board_id}/statuses", response_model=list[PSATicketStatusItem])
async def get_board_statuses(
board_id: int,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get available statuses for a service board directly (no ticket lookup required)."""
if not current_user.account_id:
raise HTTPException(status_code=400, detail="User has no account")
from app.services.psa.registry import get_provider_for_account
from app.services.psa.exceptions import PSAError
try:
provider = await get_provider_for_account(current_user.account_id, db)
statuses = await provider.get_ticket_statuses(board_id)
return [PSATicketStatusItem(id=s.id, name=s.name, is_closed=s.is_closed) for s in statuses]
except PSAError as e:
logger.warning("get_board_statuses(%s) failed: %s", board_id, e)
return []
# ── member mapping endpoints ─────────────────────────────────────────
@@ -569,7 +821,7 @@ async def get_ticket_statuses(
@router.get("/members", response_model=list[PsaMemberResponse])
async def list_members(
current_user: Annotated[User, Depends(require_account_owner)],
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""List CW members (from CW API)."""
@@ -587,7 +839,9 @@ async def list_members(
for m in members
]
except PSAError as e:
raise HTTPException(status_code=502, detail=str(e))
# Members are optional display data — degrade gracefully
logger.warning("list_members failed: %s", e)
return []
@router.get("/member-mappings", response_model=list[PsaMemberMappingResponse])

View File

@@ -53,9 +53,13 @@ class PSATicketSearchResult(BaseModel):
id: str
summary: str
company_name: str | None = None
company_id: str | None = None
board_name: str | None = None
board_id: int | None = None
status_name: str | None = None
status_id: int | None = None
priority_name: str | None = None
priority_id: int | None = None
closed: bool = False

View File

@@ -0,0 +1,65 @@
"""Normalized DTOs for ticket management endpoints."""
from __future__ import annotations
from pydantic import BaseModel
class PSAResourceSchema(BaseModel):
member_id: int
member_name: str
member_identifier: str
is_rf_user: bool = False
class PSATicketCreatedSchema(BaseModel):
id: int
summary: str
board_name: str
status_name: str
priority_name: str
company_name: str
resources: list[PSAResourceSchema] = []
class PSATicketStatusUpdateSchema(BaseModel):
ticket_id: int
previous_status: str
new_status: str
new_status_id: int
class TicketCreatePayloadSchema(BaseModel):
summary: str
company_id: int
board_id: int
status_id: int
priority_id: int
description: str | None = None
assigned_member_id: int | None = None
class TicketListResponseSchema(BaseModel):
items: list = []
total: int = 0
page: int = 1
page_size: int = 25
class AiParseRequestSchema(BaseModel):
prompt: str
class AiParseResponseSchema(BaseModel):
summary: str | None = None
company_id: int | None = None
board_id: int | None = None
priority_id: int | None = None
status_id: int | None = None
assigned_member_id: int | None = None
description: str | None = None
missing_fields: list[str] = []
warnings: list[str] = []
class PSAPrioritySchema(BaseModel):
id: int
name: str

View File

@@ -295,6 +295,23 @@ To create a fork, append this marker AFTER your [QUESTIONS]/[ACTIONS] markers:
- If a question is clearly outside your domain, say so briefly and redirect.
- Never fabricate error codes, KB article numbers, or CLI flags. If unsure, say so.
## SPIN-OFF TICKET CREATION
When you identify a second distinct issue that is clearly separate from the primary topic \
of this session, suggest creating a spin-off ticket using the [ACTIONS] marker below. \
Use this sparingly — only when the issue is genuinely independent, not for every tangential mention.
Format:
[ACTIONS]
[
{
"label": "Create ticket: <brief issue title>",
"command": "create_spin_off_ticket",
"description": "<one sentence description of the separate issue>"
}
]
[/ACTIONS]
## FINAL REMINDER — THIS OVERRIDES EVERYTHING ABOVE
Every single response MUST contain [QUESTIONS] and/or [ACTIONS] markers with valid JSON. \
No exceptions. Not even when forking. A response without at least one of these markers \

View File

@@ -12,6 +12,10 @@ from app.services.psa.types import (
PSAConfiguration,
PSATimeEntry,
PSABoard,
PaginatedTicketResult,
PSAResource,
PSACreatedTicket,
TicketCreatePayload,
)
@@ -28,7 +32,7 @@ class AutotaskProvider(PSAProvider):
async def get_ticket(self, ticket_id: str) -> PSATicket:
raise NotImplementedError("Autotask integration coming soon")
async def search_tickets(self, query: str, **filters) -> list[PSATicket]:
async def search_tickets(self, query: str, **filters) -> PaginatedTicketResult:
raise NotImplementedError("Autotask integration coming soon")
async def post_note(
@@ -74,3 +78,18 @@ class AutotaskProvider(PSAProvider):
work_type: str | None = None,
) -> PSATimeEntry:
raise NotImplementedError("Autotask integration coming soon")
async def list_resources(self, ticket_id: int) -> list[PSAResource]:
raise NotImplementedError("Autotask integration coming soon")
async def add_resource(self, ticket_id: int, member_id: int) -> PSAResource:
raise NotImplementedError("Autotask integration coming soon")
async def remove_resource(self, ticket_id: int, member_id: int) -> None:
raise NotImplementedError("Autotask integration coming soon")
async def create_ticket(self, payload: TicketCreatePayload) -> PSACreatedTicket:
raise NotImplementedError("Autotask integration coming soon")
async def list_priorities(self) -> list[dict]:
raise NotImplementedError("Autotask integration coming soon")

View File

@@ -13,6 +13,10 @@ from .types import (
PSAConfiguration,
PSATimeEntry,
PSABoard,
PaginatedTicketResult,
PSAResource,
PSACreatedTicket,
TicketCreatePayload,
)
@@ -28,7 +32,7 @@ class PSAProvider(ABC):
...
@abstractmethod
async def search_tickets(self, query: str, **filters) -> list[PSATicket]:
async def search_tickets(self, query: str, **filters) -> PaginatedTicketResult:
...
@abstractmethod
@@ -83,3 +87,23 @@ class PSAProvider(ABC):
work_type: str | None = None,
) -> PSATimeEntry:
...
@abstractmethod
async def list_resources(self, ticket_id: int) -> list[PSAResource]:
...
@abstractmethod
async def add_resource(self, ticket_id: int, member_id: int) -> PSAResource:
...
@abstractmethod
async def remove_resource(self, ticket_id: int, member_id: int) -> None:
...
@abstractmethod
async def create_ticket(self, payload: TicketCreatePayload) -> PSACreatedTicket:
...
@abstractmethod
async def list_priorities(self) -> list[dict]:
...

View File

@@ -7,6 +7,7 @@ from datetime import datetime, timezone
from app.services.psa.base import PSAProvider
from app.services.psa.cache import psa_cache
from app.services.psa.exceptions import PSAError
from app.services.psa.types import (
ConnectionTestResult,
PSATicket,
@@ -17,6 +18,10 @@ from app.services.psa.types import (
PSAConfiguration,
PSATimeEntry,
PSABoard,
PaginatedTicketResult,
PSAResource,
PSACreatedTicket,
TicketCreatePayload,
)
from .client import ConnectWiseClient
@@ -55,27 +60,31 @@ class ConnectWiseProvider(PSAProvider):
)
return self._map_ticket(data)
async def search_tickets(self, query: str, **filters) -> list[PSATicket]:
"""Search CW tickets by summary. Supports board_id, status_id, member_id,
unassigned, board_ids, page, and page_size filters."""
async def search_tickets(self, query: str, **filters) -> PaginatedTicketResult:
"""Search CW tickets by summary. Supports board_id, status_id, member_identifier,
unassigned, board_ids, page, and page_size filters. Returns paginated result."""
page_size = filters.get("page_size", 10)
page = filters.get("page", 1)
params: dict = {
"fields": "id,summary,company,board,status,priority,closedFlag",
"orderBy": "id desc",
"orderBy": "priority/sort asc,dateEntered desc",
"pageSize": page_size,
"page": page,
}
# Build CW condition query
conditions: list[str] = []
if query:
conditions.append(f"summary contains '{query}'")
# Sanitize: strip single quotes to prevent CW condition injection
safe_query = query.replace("'", "")
conditions.append(f"summary contains '{safe_query}'")
if filters.get("board_id"):
conditions.append(f"board/id = {filters['board_id']}")
if filters.get("status_id"):
conditions.append(f"status/id = {filters['status_id']}")
elif filters.get("status_name"):
safe_status = str(filters["status_name"]).replace("'", "")
conditions.append(f"status/name = '{safe_status}'")
if not filters.get("include_closed", False):
conditions.append("closedFlag = false")
if filters.get("member_identifier") is not None:
@@ -86,16 +95,27 @@ class ConnectWiseProvider(PSAProvider):
if board_ids:
board_list = ", ".join(str(bid) for bid in board_ids)
conditions.append(f"board/id in ({board_list})")
if filters.get("company_id"):
conditions.append(f"company/id = {int(filters['company_id'])}")
if conditions:
params["conditions"] = " and ".join(conditions)
condition_str = " and ".join(conditions) if conditions else ""
if condition_str:
params["conditions"] = condition_str
data = await self.client.get("/service/tickets", params=params)
count_params: dict = {}
if condition_str:
count_params["conditions"] = condition_str
return [
self._map_ticket(t)
for t in (data if isinstance(data, list) else [])
]
# Fire page fetch + count in parallel
data, count_data = await asyncio.gather(
self.client.get("/service/tickets", params=params),
self.client.get("/service/tickets/count", params=count_params),
)
items = [self._map_ticket(t) for t in (data if isinstance(data, list) else [])]
total = count_data.get("count", len(items)) if isinstance(count_data, dict) else len(items)
return PaginatedTicketResult(items=items, total=total, page=page, page_size=page_size)
async def get_ticket_configurations(
self, ticket_id: str
@@ -246,13 +266,30 @@ class ConnectWiseProvider(PSAProvider):
async def update_ticket_status(
self, ticket_id: str, status_id: int
) -> PSATicket:
"""Update a CW ticket's status using JSON Patch format."""
"""Update a CW ticket's status using JSON Patch format.
Verifies CW actually applied the change — CW silently returns 200 when
a status id is invalid for the ticket's board. We check the response
body's status.id matches what we sent, and raise PSAError if not.
"""
patch_body = [
{"op": "replace", "path": "status", "value": {"id": status_id}}
]
data = await self.client.patch(
f"/service/tickets/{ticket_id}", json_body=patch_body
)
applied = (data.get("status") or {}) if isinstance(data, dict) else {}
applied_id = applied.get("id")
if applied_id != status_id:
logger.warning(
"CW status PATCH for ticket %s returned status id=%s instead of %s",
ticket_id, applied_id, status_id,
)
raise PSAError(
f"ConnectWise did not apply status {status_id} "
f"(still {applied.get('name') or applied_id}). "
"The status may not be valid for this ticket's board."
)
return self._map_ticket(data)
async def list_members(self) -> list[PSAMember]:
@@ -591,16 +628,247 @@ class ConnectWiseProvider(PSAProvider):
@staticmethod
def _map_ticket(data: dict) -> PSATicket:
"""Map a CW ticket JSON dict to a PSATicket."""
company = data.get("company") or {}
board = data.get("board") or {}
status = data.get("status") or {}
priority = data.get("priority") or {}
return PSATicket(
id=str(data["id"]),
id=str(data.get("id", "")),
summary=data.get("summary", ""),
company_name=data.get("company", {}).get("name"),
company_id=str(data["company"]["id"]) if data.get("company") else None,
board_name=data.get("board", {}).get("name"),
board_id=data.get("board", {}).get("id"),
status_name=data.get("status", {}).get("name"),
status_id=data.get("status", {}).get("id"),
priority_name=data.get("priority", {}).get("name"),
priority_id=data.get("priority", {}).get("id"),
company_name=company.get("name"),
company_id=str(company.get("id")) if company.get("id") else None,
board_name=board.get("name"),
board_id=board.get("id"),
status_name=status.get("name"),
status_id=status.get("id"),
priority_name=priority.get("name"),
priority_id=priority.get("id"),
closed=data.get("closedFlag", False),
)
# ── Resource management ───────────────────────────────────────────
# Schedule type id for "Service Ticket" resources — CW's canonical type for ticket co-assignees
_SCHEDULE_TYPE_SERVICE_TICKET = 4
async def _get_ticket_owner(self, ticket_id: int) -> dict | None:
"""Fetch the ticket's current owner (MemberReference) or None if unassigned."""
data = await self.client.get(
f"/service/tickets/{ticket_id}",
params={"fields": "id,owner"},
)
if not isinstance(data, dict):
return None
owner_raw = data.get("owner")
return owner_raw if isinstance(owner_raw, dict) and owner_raw.get("id") else None
async def _list_ticket_schedule_entries(self, ticket_id: int) -> list[dict]:
"""List schedule entries for a ticket's co-assignees.
Returns raw CW schedule entry dicts with at least id and member info.
"""
data = await self.client.get(
"/schedule/entries",
params={
"conditions": (
f"type/id={self._SCHEDULE_TYPE_SERVICE_TICKET} AND objectId={ticket_id}"
),
"fields": "id,member,name",
"pageSize": 100,
},
)
return data if isinstance(data, list) else []
async def list_resources(self, ticket_id: int) -> list[PSAResource]:
"""List members assigned to a CW ticket.
Merges the `owner` MemberReference (primary assignee) with schedule entries
of type 4 (Service Ticket resources — co-assignees). Deduped by member id.
"""
owner = await self._get_ticket_owner(ticket_id)
entries = await self._list_ticket_schedule_entries(ticket_id)
members = await self.list_members()
by_id = {str(m.id): m for m in members}
seen_ids: set[str] = set()
results: list[PSAResource] = []
if owner is not None:
owner_id = str(owner.get("id"))
m = by_id.get(owner_id)
if m:
results.append(PSAResource(
member_id=int(m.id),
member_name=m.name,
member_identifier=m.identifier,
))
else:
results.append(PSAResource(
member_id=int(owner.get("id") or 0),
member_name=str(owner.get("name") or ""),
member_identifier=str(owner.get("identifier") or ""),
))
seen_ids.add(owner_id)
for entry in entries:
entry_member = entry.get("member") if isinstance(entry, dict) else None
if not isinstance(entry_member, dict):
continue
mid = str(entry_member.get("id") or "")
if not mid or mid in seen_ids:
continue
m = by_id.get(mid)
if m:
results.append(PSAResource(
member_id=int(m.id),
member_name=m.name,
member_identifier=m.identifier,
))
else:
results.append(PSAResource(
member_id=int(entry_member.get("id") or 0),
member_name=str(entry_member.get("name") or ""),
member_identifier=str(entry_member.get("identifier") or ""),
))
seen_ids.add(mid)
return results
async def add_resource(self, ticket_id: int, member_id: int) -> PSAResource:
"""Assign a member to a CW ticket.
- If the ticket has no owner, set the target as `owner` (CW's canonical
primary assignee field). CW typically mirrors this into the derived
`resources` string automatically.
- If the ticket is already owned by someone else, add the target as a
co-assignee via a schedule entry of type 4 (Service Ticket). The
existing owner is not changed.
- Idempotent when target is already owner or already has a schedule entry.
"""
members = await self.list_members()
target = next((m for m in members if str(m.id) == str(member_id)), None)
if target is None:
raise PSAError(f"Member {member_id} not found")
current_owner = await self._get_ticket_owner(ticket_id)
if current_owner is None:
# Primary assign — set owner
await self.client.patch(
f"/service/tickets/{ticket_id}",
json_body=[{"op": "replace", "path": "owner", "value": {"id": int(target.id)}}],
)
elif str(current_owner.get("id")) != str(target.id):
# Ticket owned by someone else — add as co-assignee via schedule entry.
# Idempotent: skip if a schedule entry already exists for this member.
existing = await self._list_ticket_schedule_entries(ticket_id)
already_assigned = any(
str((e.get("member") or {}).get("id") or "") == str(target.id)
for e in existing
)
if not already_assigned:
await self.client.post(
"/schedule/entries",
json_body={
"member": {"id": int(target.id)},
"objectId": int(ticket_id),
"type": {"id": self._SCHEDULE_TYPE_SERVICE_TICKET},
"name": target.name or target.identifier or f"Member {target.id}",
},
)
# else: already the owner — idempotent no-op
return PSAResource(
member_id=int(target.id),
member_name=target.name,
member_identifier=target.identifier,
)
async def remove_resource(self, ticket_id: int, member_id: int) -> None:
"""Remove a member from a CW ticket (idempotent).
- If the target is the current owner, clear the owner field.
- Otherwise, delete their schedule entry (Service Ticket type).
"""
members = await self.list_members()
target = next((m for m in members if str(m.id) == str(member_id)), None)
if target is None:
return
current_owner = await self._get_ticket_owner(ticket_id)
if current_owner is not None and str(current_owner.get("id")) == str(target.id):
# Unassign the owner. Try RFC 6902 "remove" first; fall back to
# "replace" with null if CW rejects it.
try:
await self.client.patch(
f"/service/tickets/{ticket_id}",
json_body=[{"op": "remove", "path": "owner"}],
)
except PSAError:
await self.client.patch(
f"/service/tickets/{ticket_id}",
json_body=[{"op": "replace", "path": "owner", "value": None}],
)
return
# Not the owner — find and delete the schedule entry for this member.
entries = await self._list_ticket_schedule_entries(ticket_id)
for entry in entries:
entry_member = entry.get("member") if isinstance(entry, dict) else None
if isinstance(entry_member, dict) and str(entry_member.get("id") or "") == str(target.id):
entry_id = entry.get("id")
if entry_id:
await self.client.delete(f"/schedule/entries/{entry_id}")
break
# ── Ticket creation ───────────────────────────────────────────────
async def create_ticket(self, payload: TicketCreatePayload) -> PSACreatedTicket:
"""Create a new CW service ticket."""
body: dict = {
"summary": payload.summary,
"board": {"id": payload.board_id},
"company": {"id": payload.company_id},
"status": {"id": payload.status_id},
"priority": {"id": payload.priority_id},
}
if payload.description:
body["initialDescription"] = payload.description
if payload.assigned_member_id:
body["owner"] = {"id": payload.assigned_member_id}
data = await self.client.post("/service/tickets", json_body=body)
ticket_id = data.get("id") if isinstance(data, dict) else None
resources: list[PSAResource] = []
if ticket_id and payload.assigned_member_id:
try:
resources = await self.list_resources(ticket_id)
except Exception:
pass
company = (data.get("company") or {}) if isinstance(data, dict) else {}
board = (data.get("board") or {}) if isinstance(data, dict) else {}
status = (data.get("status") or {}) if isinstance(data, dict) else {}
priority = (data.get("priority") or {}) if isinstance(data, dict) else {}
return PSACreatedTicket(
id=ticket_id or 0,
summary=data.get("summary", payload.summary) if isinstance(data, dict) else payload.summary,
board_name=board.get("name", ""),
status_name=status.get("name", ""),
priority_name=priority.get("name", ""),
company_name=company.get("name", ""),
resources=resources,
)
# ── Priorities ────────────────────────────────────────────────────
async def list_priorities(self) -> list[dict]:
"""List CW service priorities."""
data = await self.client.get("/service/priorities", params={"pageSize": 50})
return [
{"id": p.get("id"), "name": p.get("name")}
for p in (data if isinstance(data, list) else [])
]

View File

@@ -12,6 +12,10 @@ from app.services.psa.types import (
PSAConfiguration,
PSATimeEntry,
PSABoard,
PaginatedTicketResult,
PSAResource,
PSACreatedTicket,
TicketCreatePayload,
)
@@ -28,7 +32,7 @@ class HaloPSAProvider(PSAProvider):
async def get_ticket(self, ticket_id: str) -> PSATicket:
raise NotImplementedError("Halo PSA integration coming soon")
async def search_tickets(self, query: str, **filters) -> list[PSATicket]:
async def search_tickets(self, query: str, **filters) -> PaginatedTicketResult:
raise NotImplementedError("Halo PSA integration coming soon")
async def post_note(
@@ -74,3 +78,18 @@ class HaloPSAProvider(PSAProvider):
work_type: str | None = None,
) -> PSATimeEntry:
raise NotImplementedError("Halo PSA integration coming soon")
async def list_resources(self, ticket_id: int) -> list[PSAResource]:
raise NotImplementedError("Halo PSA integration coming soon")
async def add_resource(self, ticket_id: int, member_id: int) -> PSAResource:
raise NotImplementedError("Halo PSA integration coming soon")
async def remove_resource(self, ticket_id: int, member_id: int) -> None:
raise NotImplementedError("Halo PSA integration coming soon")
async def create_ticket(self, payload: TicketCreatePayload) -> PSACreatedTicket:
raise NotImplementedError("Halo PSA integration coming soon")
async def list_priorities(self) -> list[dict]:
raise NotImplementedError("Halo PSA integration coming soon")

View File

@@ -73,6 +73,40 @@ class PSABoard(BaseModel):
inactive: bool = False
class PaginatedTicketResult(BaseModel):
items: list[PSATicket]
total: int
page: int
page_size: int
class PSAResource(BaseModel):
member_id: int
member_name: str
member_identifier: str
is_rf_user: bool = False
class PSACreatedTicket(BaseModel):
id: int
summary: str
board_name: str
status_name: str
priority_name: str
company_name: str
resources: list[PSAResource] = []
class TicketCreatePayload(BaseModel):
summary: str
company_id: int
board_id: int
status_id: int
priority_id: int
description: str | None = None
assigned_member_id: int | None = None
class NoteType:
INTERNAL_ANALYSIS = "internal_analysis"
RESOLUTION = "resolution"

View File

@@ -0,0 +1,116 @@
"""Ticket mutation service — wraps PSA provider, resolves is_rf_user flag."""
from __future__ import annotations
import logging
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.psa_connection import PsaConnection
from app.models.psa_member_mapping import PsaMemberMapping
from app.schemas.psa_tickets import (
PSAResourceSchema,
PSATicketCreatedSchema,
PSATicketStatusUpdateSchema,
)
from app.services.psa.registry import get_provider_for_account
from app.services.psa.types import TicketCreatePayload
logger = logging.getLogger(__name__)
async def _get_mapped_member_ids(account_id: UUID, db: AsyncSession) -> set[int]:
"""Return set of external_member_id ints that are mapped to RF users."""
conn_result = await db.execute(
select(PsaConnection).where(PsaConnection.account_id == account_id)
)
conn = conn_result.scalar_one_or_none()
if not conn:
return set()
mappings = await db.execute(
select(PsaMemberMapping).where(PsaMemberMapping.psa_connection_id == conn.id)
)
return {int(m.external_member_id) for m in mappings.scalars().all() if m.external_member_id}
async def list_resources(
account_id: UUID, ticket_id: int, db: AsyncSession
) -> list[PSAResourceSchema]:
provider = await get_provider_for_account(account_id, db)
mapped_ids = await _get_mapped_member_ids(account_id, db)
resources = await provider.list_resources(ticket_id)
return [
PSAResourceSchema(
member_id=r.member_id,
member_name=r.member_name,
member_identifier=r.member_identifier,
is_rf_user=r.member_id in mapped_ids,
)
for r in resources
]
async def add_resource(
account_id: UUID, ticket_id: int, member_id: int, db: AsyncSession
) -> PSAResourceSchema:
provider = await get_provider_for_account(account_id, db)
mapped_ids = await _get_mapped_member_ids(account_id, db)
resource = await provider.add_resource(ticket_id, member_id)
return PSAResourceSchema(
member_id=resource.member_id,
member_name=resource.member_name,
member_identifier=resource.member_identifier,
is_rf_user=resource.member_id in mapped_ids,
)
async def remove_resource(
account_id: UUID, ticket_id: int, member_id: int, db: AsyncSession
) -> None:
provider = await get_provider_for_account(account_id, db)
await provider.remove_resource(ticket_id, member_id)
async def update_status(
account_id: UUID, ticket_id: int, status_id: int, db: AsyncSession
) -> PSATicketStatusUpdateSchema:
provider = await get_provider_for_account(account_id, db)
# get current status before updating
ticket = await provider.get_ticket(str(ticket_id))
previous_status = ticket.status_name or ""
await provider.update_ticket_status(str(ticket_id), status_id)
# get new status name from statuses list
statuses = await provider.get_ticket_statuses(ticket.board_id or 0)
new_status = next((s.name for s in statuses if s.id == status_id), str(status_id))
return PSATicketStatusUpdateSchema(
ticket_id=ticket_id,
previous_status=previous_status,
new_status=new_status,
new_status_id=status_id,
)
async def create_ticket(
account_id: UUID, payload: TicketCreatePayload, db: AsyncSession
) -> PSATicketCreatedSchema:
provider = await get_provider_for_account(account_id, db)
mapped_ids = await _get_mapped_member_ids(account_id, db)
result = await provider.create_ticket(payload)
return PSATicketCreatedSchema(
id=result.id,
summary=result.summary,
board_name=result.board_name,
status_name=result.status_name,
priority_name=result.priority_name,
company_name=result.company_name,
resources=[
PSAResourceSchema(
member_id=r.member_id,
member_name=r.member_name,
member_identifier=r.member_identifier,
is_rf_user=r.member_id in mapped_ids,
)
for r in result.resources
],
)

View File

@@ -0,0 +1,55 @@
# backend/tests/test_psa_tickets.py
"""Routing and auth tests for new ticket management endpoints."""
import pytest
@pytest.mark.asyncio
async def test_create_ticket_requires_auth(client):
"""POST /tickets returns 401 without auth."""
response = await client.post(
"/api/v1/integrations/psa/tickets",
json={
"summary": "Test", "company_id": 1, "board_id": 1,
"status_id": 1, "priority_id": 1
},
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_list_resources_requires_auth(client):
response = await client.get("/api/v1/integrations/psa/tickets/1/resources")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_search_tickets_returns_paginated_shape(client, auth_headers):
"""search endpoint returns TicketListResponse shape when no PSA connected."""
response = await client.get(
"/api/v1/integrations/psa/tickets/search",
headers=auth_headers,
)
# No PSA connection → 400 or 502; with PSA → 200
assert response.status_code in (200, 400, 502)
if response.status_code == 200:
data = response.json()
assert "items" in data
assert "total" in data
assert "page" in data
@pytest.mark.asyncio
async def test_update_status_requires_auth(client):
response = await client.patch(
"/api/v1/integrations/psa/tickets/1/status?status_id=5"
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_ai_parse_requires_auth(client):
response = await client.post(
"/api/v1/integrations/psa/tickets/ai-parse",
json={"prompt": "New ticket for Acme"},
)
assert response.status_code == 401