"""PSA integration endpoints — connection CRUD and test.""" from __future__ import annotations from datetime import datetime, timezone from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import delete from app.api.deps import get_current_active_user, require_account_owner, require_engineer_or_admin from app.core.database import get_db from app.models.psa_connection import PsaConnection from app.models.psa_member_mapping import PsaMemberMapping from app.models.user import User from app.schemas.psa_connection import ( PsaConnectionCreate, PsaConnectionResponse, PsaConnectionTestResponse, PsaConnectionUpdate, PSATicketSearchResult, PSATicketStatusItem, PsaMemberMappingResponse, PsaMemberMappingSaveRequest, PsaMemberResponse, AutoMatchResult, ) from app.core.config import settings from app.services.psa.encryption import ( decrypt_credentials, encrypt_credentials, mask_credential, ) router = APIRouter(prefix="/integrations/psa", tags=["integrations"]) # ── helpers ────────────────────────────────────────────────────────── def _to_response(conn: PsaConnection) -> PsaConnectionResponse: """Build a response DTO with masked credential hints.""" creds = decrypt_credentials(conn.credentials_encrypted) return PsaConnectionResponse( id=conn.id, account_id=conn.account_id, provider=conn.provider, display_name=conn.display_name, site_url=conn.site_url, company_id=conn.company_id, is_active=conn.is_active, last_validated_at=conn.last_validated_at, created_at=conn.created_at, updated_at=conn.updated_at, public_key_hint=mask_credential(creds.get("public_key")), private_key_hint=mask_credential(creds.get("private_key")), ) async def _get_connection( account_id: UUID, db: AsyncSession ) -> PsaConnection | None: result = await db.execute( select(PsaConnection).where(PsaConnection.account_id == account_id) ) return result.scalar_one_or_none() async def _test_credentials( provider: str, site_url: str, company_id: str, public_key: str, private_key: str, client_id: str, ) -> PsaConnectionTestResponse: """Instantiate a provider and run test_connection.""" if provider == "connectwise": from app.services.psa.connectwise.client import ConnectWiseClient from app.services.psa.connectwise.provider import ConnectWiseProvider client = ConnectWiseClient( site_url=site_url, company_id=company_id, public_key=public_key, private_key=private_key, client_id=client_id, ) result = await ConnectWiseProvider(client).test_connection() return PsaConnectionTestResponse( success=result.success, message=result.message, server_version=result.server_version, ) return PsaConnectionTestResponse( success=False, message=f"Unsupported provider: {provider}", ) # ── endpoints ──────────────────────────────────────────────────────── @router.get("/connections", response_model=PsaConnectionResponse | None) async def get_connection( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Return the account's PSA connection (redacted credentials) or null.""" if not current_user.account_id: return None conn = await _get_connection(current_user.account_id, db) if not conn: return None return _to_response(conn) @router.post( "/connections", response_model=PsaConnectionResponse, status_code=status.HTTP_201_CREATED, ) async def create_connection( data: PsaConnectionCreate, current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Create a new PSA connection. Tests credentials before saving.""" if not current_user.account_id: raise HTTPException(status.HTTP_400_BAD_REQUEST, "No account associated with user") if not settings.CW_CLIENT_ID: raise HTTPException(status.HTTP_503_SERVICE_UNAVAILABLE, "ConnectWise integration is not configured on this server") # Check for existing connection existing = await _get_connection(current_user.account_id, db) if existing: raise HTTPException( status.HTTP_409_CONFLICT, "A PSA connection already exists for this account. Update or delete the existing one.", ) # Test connection before saving test_result = await _test_credentials( provider=data.provider, site_url=data.site_url, company_id=data.company_id, public_key=data.public_key, private_key=data.private_key, client_id=settings.CW_CLIENT_ID, ) if not test_result.success: raise HTTPException( status.HTTP_422_UNPROCESSABLE_ENTITY, f"Connection test failed: {test_result.message}", ) credentials = { "public_key": data.public_key, "private_key": data.private_key, } conn = PsaConnection( account_id=current_user.account_id, provider=data.provider, display_name=data.display_name, site_url=data.site_url, company_id=data.company_id, credentials_encrypted=encrypt_credentials(credentials), is_active=True, last_validated_at=datetime.now(timezone.utc), ) db.add(conn) await db.commit() await db.refresh(conn) return _to_response(conn) @router.put("/connections/{connection_id}", response_model=PsaConnectionResponse) async def update_connection( connection_id: UUID, data: PsaConnectionUpdate, current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Update an existing PSA connection. Re-tests if credentials change.""" conn = await _get_connection_or_404(connection_id, current_user, db) # Decrypt existing credentials creds = decrypt_credentials(conn.credentials_encrypted) # Track whether credential fields changed cred_fields = {"public_key", "private_key"} cred_changed = False # Apply updates update_data = data.model_dump(exclude_unset=True) for field, value in update_data.items(): if field in cred_fields: if value is not None and value != creds.get(field): creds[field] = value cred_changed = True else: setattr(conn, field, value) # Re-test if credentials changed if cred_changed: site_url = update_data.get("site_url", conn.site_url) company_id_val = update_data.get("company_id", conn.company_id) test_result = await _test_credentials( provider=conn.provider, site_url=site_url, company_id=company_id_val, public_key=creds["public_key"], private_key=creds["private_key"], client_id=settings.CW_CLIENT_ID or "", ) if not test_result.success: raise HTTPException( status.HTTP_422_UNPROCESSABLE_ENTITY, f"Connection test failed: {test_result.message}", ) conn.credentials_encrypted = encrypt_credentials(creds) conn.last_validated_at = datetime.now(timezone.utc) conn.updated_at = datetime.now(timezone.utc) await db.commit() await db.refresh(conn) return _to_response(conn) @router.delete( "/connections/{connection_id}", status_code=status.HTTP_204_NO_CONTENT, ) async def delete_connection( connection_id: UUID, current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Delete a PSA connection.""" conn = await _get_connection_or_404(connection_id, current_user, db) await db.delete(conn) await db.commit() @router.post( "/connections/{connection_id}/test", response_model=PsaConnectionTestResponse, ) async def test_connection( connection_id: UUID, current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Test an existing PSA connection.""" conn = await _get_connection_or_404(connection_id, current_user, db) creds = decrypt_credentials(conn.credentials_encrypted) result = await _test_credentials( provider=conn.provider, site_url=conn.site_url, company_id=conn.company_id, public_key=creds["public_key"], private_key=creds["private_key"], client_id=settings.CW_CLIENT_ID or "", ) if result.success: conn.last_validated_at = datetime.now(timezone.utc) await db.commit() # Invalidate cached PSA data when connection is re-validated from app.services.psa.cache import psa_cache psa_cache.clear() return result # ── FlowPilot PSA Settings ────────────────────────────────────── @router.get("/connections/{connection_id}/flowpilot-settings") async def get_flowpilot_settings( connection_id: UUID, current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get FlowPilot-specific settings for a PSA connection.""" conn = await _get_connection_or_404(connection_id, current_user, db) # Return settings with defaults filled in defaults = { "auto_push": True, "auto_time_entry": True, "time_rounding": "15min", "note_visibility": "internal", "include_diagnostic_steps": True, "prompt_status_on_resolution": False, "prompt_status_on_escalation": False, } settings_data = {**defaults, **(conn.flowpilot_settings or {})} return settings_data @router.put("/connections/{connection_id}/flowpilot-settings") async def update_flowpilot_settings( connection_id: UUID, data: dict, current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Update FlowPilot-specific settings for a PSA connection.""" conn = await _get_connection_or_404(connection_id, current_user, db) # Validate allowed keys allowed_keys = { "auto_push", "auto_time_entry", "time_rounding", "note_visibility", "include_diagnostic_steps", "prompt_status_on_resolution", "prompt_status_on_escalation", } filtered = {k: v for k, v in data.items() if k in allowed_keys} # Merge with existing current = conn.flowpilot_settings or {} current.update(filtered) conn.flowpilot_settings = current await db.commit() await db.refresh(conn) defaults = { "auto_push": True, "auto_time_entry": True, "time_rounding": "15min", "note_visibility": "internal", "include_diagnostic_steps": True, "prompt_status_on_resolution": False, "prompt_status_on_escalation": False, } return {**defaults, **(conn.flowpilot_settings or {})} # ── ticket / status / company endpoints ────────────────────────── @router.get("/tickets/search", response_model=list[PSATicketSearchResult]) async def search_tickets( current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], query: str = "", board_id: int | None = None, status_id: int | None = None, include_closed: bool = False, ): """Search ConnectWise tickets.""" if not current_user.account_id: raise HTTPException(status_code=400, detail="User has no account") from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSAError try: provider = await get_provider_for_account(current_user.account_id, db) tickets = await provider.search_tickets( query, board_id=board_id, status_id=status_id, include_closed=include_closed ) return [ PSATicketSearchResult( id=t.id, summary=t.summary, company_name=t.company_name, board_name=t.board_name, status_name=t.status_name, priority_name=t.priority_name, closed=t.closed, ) for t in tickets ] except PSAError as e: raise HTTPException(status_code=502, detail=str(e)) @router.get("/tickets/{ticket_id}/context") async def get_ticket_context( ticket_id: int, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get rich ticket context (company, contact, configs, notes, related tickets) for AI prompt injection.""" from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import ( PSAError, PSAAuthError, PSAPermissionError, PSANotFoundError, PSAConnectionError, ) from app.schemas.psa_context import TicketContext if not current_user.account_id: raise HTTPException(status_code=400, detail="User has no account") # Look up the active connection for connection_id conn_result = await db.execute( select(PsaConnection).where( PsaConnection.account_id == current_user.account_id, PsaConnection.is_active.is_(True), ) ) connection = conn_result.scalar_one_or_none() if not connection: raise HTTPException(status_code=404, detail="No active PSA connection configured") try: provider = await get_provider_for_account(current_user.account_id, db) except PSAConnectionError: raise HTTPException(status_code=404, detail="No active PSA connection configured") except PSAError as e: raise HTTPException(status_code=502, detail=str(e)) try: ctx: TicketContext = await provider.get_ticket_context( ticket_id=ticket_id, connection_id=str(connection.id), ) return ctx except (PSAAuthError, PSAPermissionError): raise HTTPException( status_code=502, detail={"error": "psa_auth_failed", "message": "PSA credentials may have expired."}, ) except PSANotFoundError: raise HTTPException(status_code=404, detail="Ticket not found") except PSAError as e: raise HTTPException(status_code=502, detail=str(e)) @router.get("/tickets/{ticket_id}") async def get_ticket( ticket_id: str, current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get a single CW ticket by ID.""" if not current_user.account_id: raise HTTPException(status_code=400, detail="User has no account") from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSAError, PSANotFoundError try: provider = await get_provider_for_account(current_user.account_id, db) ticket = await provider.get_ticket(ticket_id) return ticket except PSANotFoundError: raise HTTPException(status_code=404, detail="Ticket not found") except PSAError as e: raise HTTPException(status_code=502, detail=str(e)) @router.get("/tickets/{ticket_id}/statuses", response_model=list[PSATicketStatusItem]) async def get_ticket_statuses( ticket_id: str, current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get available statuses for a ticket's board.""" if not current_user.account_id: raise HTTPException(status_code=400, detail="User has no account") from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSAError, PSANotFoundError try: provider = await get_provider_for_account(current_user.account_id, db) ticket = await provider.get_ticket(ticket_id) if not ticket.board_id: raise HTTPException(status_code=400, detail="Ticket has no board") statuses = await provider.get_ticket_statuses(ticket.board_id) return [PSATicketStatusItem(id=s.id, name=s.name, is_closed=s.is_closed) for s in statuses] except PSANotFoundError: raise HTTPException(status_code=404, detail="Ticket not found") except PSAError as e: raise HTTPException(status_code=502, detail=str(e)) # ── member mapping endpoints ───────────────────────────────────────── @router.get("/members", response_model=list[PsaMemberResponse]) async def list_members( current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """List CW members (from CW API).""" if not current_user.account_id: raise HTTPException(status_code=400, detail="User has no account") from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSAError try: provider = await get_provider_for_account(current_user.account_id, db) members = await provider.list_members() return [ PsaMemberResponse(id=m.id, identifier=m.identifier, name=m.name, email=m.email) for m in members ] except PSAError as e: raise HTTPException(status_code=502, detail=str(e)) @router.get("/member-mappings", response_model=list[PsaMemberMappingResponse]) async def get_member_mappings( current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get all member mappings for the account.""" conn = await _get_account_connection(current_user.account_id, db) if not conn: return [] result = await db.execute( select(PsaMemberMapping).where(PsaMemberMapping.psa_connection_id == conn.id) ) mappings = result.scalars().all() response = [] for m in mappings: user_result = await db.execute(select(User).where(User.id == m.user_id)) user = user_result.scalar_one_or_none() if user: response.append(PsaMemberMappingResponse( id=str(m.id), user_id=str(m.user_id), user_email=user.email, user_name=user.name, external_member_id=m.external_member_id, external_member_name=m.external_member_name, matched_by=m.matched_by, )) return response @router.post("/member-mappings", response_model=list[PsaMemberMappingResponse]) async def save_member_mappings( mappings: list[PsaMemberMappingSaveRequest], current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Save/update member mappings (batch). Replaces all existing mappings.""" conn = await _get_account_connection(current_user.account_id, db) if not conn: raise HTTPException(status_code=400, detail="No PSA connection configured") # Delete existing mappings await db.execute( delete(PsaMemberMapping).where(PsaMemberMapping.psa_connection_id == conn.id) ) # Insert new mappings for m in mappings: mapping = PsaMemberMapping( psa_connection_id=conn.id, user_id=UUID(m.user_id), external_member_id=m.external_member_id, external_member_name=m.external_member_name, matched_by="manual_admin", ) db.add(mapping) await db.commit() # Return the saved mappings return await get_member_mappings(current_user, db) @router.post("/member-mappings/auto-match", response_model=AutoMatchResult) async def auto_match_members( current_user: Annotated[User, Depends(require_account_owner)], db: Annotated[AsyncSession, Depends(get_db)], ): """Auto-match RF users to CW members by email.""" conn = await _get_account_connection(current_user.account_id, db) if not conn: raise HTTPException(status_code=400, detail="No PSA connection configured") from app.services.psa.registry import get_provider_for_account from app.services.psa.exceptions import PSAError try: provider = await get_provider_for_account(current_user.account_id, db) cw_members = await provider.list_members() except PSAError as e: raise HTTPException(status_code=502, detail=str(e)) # Build email → member lookup email_to_member: dict = {} for m in cw_members: if m.email: email_to_member[m.email.lower()] = m # Get account users users_result = await db.execute( select(User).where(User.account_id == current_user.account_id, User.is_active.is_(True)) ) users = users_result.scalars().all() matched = [] unmatched_count = 0 for user in users: cw_member = email_to_member.get(user.email.lower()) if cw_member: # Check if mapping already exists existing = await db.execute( select(PsaMemberMapping).where( PsaMemberMapping.psa_connection_id == conn.id, PsaMemberMapping.user_id == user.id, ) ) if not existing.scalar_one_or_none(): mapping = PsaMemberMapping( psa_connection_id=conn.id, user_id=user.id, external_member_id=cw_member.id, external_member_name=cw_member.name, matched_by="auto_email", ) db.add(mapping) matched.append((mapping, user)) else: unmatched_count += 1 await db.commit() # Build response matched_response = [ PsaMemberMappingResponse( id=str(m.id), user_id=str(m.user_id), user_email=u.email, user_name=u.name, external_member_id=m.external_member_id, external_member_name=m.external_member_name, matched_by=m.matched_by, ) for m, u in matched ] return AutoMatchResult(matched=matched_response, unmatched_users=unmatched_count) # ── internal helpers ───────────────────────────────────────────────── async def _get_account_connection( account_id: UUID | None, db: AsyncSession ) -> PsaConnection | None: """Get the PSA connection for an account.""" if not account_id: return None result = await db.execute( select(PsaConnection).where(PsaConnection.account_id == account_id) ) return result.scalar_one_or_none() async def _get_connection_or_404( connection_id: UUID, user: User, db: AsyncSession ) -> PsaConnection: """Fetch a connection by ID, ensuring it belongs to the user's account.""" result = await db.execute( select(PsaConnection).where(PsaConnection.id == connection_id) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status.HTTP_404_NOT_FOUND, "PSA connection not found") if conn.account_id != user.account_id: raise HTTPException(status.HTTP_404_NOT_FOUND, "PSA connection not found") return conn