From fdaea49d3b31fc0b57da44773d9aea191211d826 Mon Sep 17 00:00:00 2001 From: Michael Chihlas Date: Sat, 14 Mar 2026 21:51:49 -0400 Subject: [PATCH] feat(psa): add ConnectWise HTTP client with auth, URL resolution, pagination, retry Implements ConnectWiseClient in services/psa/connectwise/client.py with: - API key auth (Base64 companyId+publicKey:privateKey) + clientId header - Accept header pinned to CW API version 2025.16 - Dynamic base URL resolution via /login/companyinfo (cloud vs on-premise) - SSRF prevention: validates against known CW domains, rejects private IPs - Retry with exponential backoff for timeouts and 5xx errors - 429 rate limit handling with Retry-After header respect - Error mapping: 401/403/404/429/5xx to typed PSA exceptions - Paginated GET with while-loop pattern (max 1000 per page) - JSON Patch array format for PATCH requests Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/services/psa/connectwise/__init__.py | 1 + .../app/services/psa/connectwise/client.py | 288 ++++++++++++++++++ 2 files changed, 289 insertions(+) create mode 100644 backend/app/services/psa/connectwise/__init__.py create mode 100644 backend/app/services/psa/connectwise/client.py diff --git a/backend/app/services/psa/connectwise/__init__.py b/backend/app/services/psa/connectwise/__init__.py new file mode 100644 index 00000000..fe4f48c4 --- /dev/null +++ b/backend/app/services/psa/connectwise/__init__.py @@ -0,0 +1 @@ +"""ConnectWise PSA provider implementation.""" diff --git a/backend/app/services/psa/connectwise/client.py b/backend/app/services/psa/connectwise/client.py new file mode 100644 index 00000000..4faaf176 --- /dev/null +++ b/backend/app/services/psa/connectwise/client.py @@ -0,0 +1,288 @@ +"""Low-level HTTP client for ConnectWise PSA REST API. + +Handles auth headers, base URL resolution (cloud vs on-premise), +pagination, retry with backoff, and error mapping. +""" +from __future__ import annotations + +import asyncio +import base64 +import ipaddress +import logging +import socket +from typing import Any +from urllib.parse import urlparse + +import httpx + +from app.services.psa.exceptions import ( + PSAAuthError, + PSAConnectionError, + PSANotFoundError, + PSAPermissionError, + PSARateLimitError, + PSAServerError, + PSATimeoutError, +) + +logger = logging.getLogger(__name__) + +# Pinned CW API version per best-practices/PSA-Versioning.md +CW_API_VERSION = "2025.16" +CW_ACCEPT_HEADER = f"application/vnd.connectwise.com+json; version={CW_API_VERSION}" + +# Known CW cloud domains (for SSRF prevention) +CW_ALLOWED_DOMAINS = { + "myconnectwise.net", + "connectwisedev.com", +} + +REQUEST_TIMEOUT = 30.0 +MAX_RETRIES = 2 +MAX_PAGE_SIZE = 1000 + + +def _validate_site_url(site_url: str) -> None: + """Validate site_url is a known CW domain (SSRF prevention). + + Rejects any hostname that is not a recognized ConnectWise domain + and any hostname that resolves to a private/loopback/link-local IP. + """ + # Ensure scheme for parsing + url = site_url if "://" in site_url else f"https://{site_url}" + parsed = urlparse(url) + hostname = parsed.hostname or "" + + # Check against allowed domains + if not any( + hostname.endswith(f".{domain}") or hostname == domain + for domain in CW_ALLOWED_DOMAINS + ): + raise PSAConnectionError( + f"Invalid ConnectWise site URL: {hostname}. " + "Must be a *.myconnectwise.net or *.connectwisedev.com domain.", + provider="connectwise", + ) + + # Resolve and check for private IPs + try: + addrs = socket.getaddrinfo(hostname, None) + for _, _, _, _, sockaddr in addrs: + ip = ipaddress.ip_address(sockaddr[0]) + if ip.is_private or ip.is_loopback or ip.is_link_local: + raise PSAConnectionError( + f"Site URL resolves to a private/internal address: {sockaddr[0]}", + provider="connectwise", + ) + except socket.gaierror: + raise PSAConnectionError( + f"Cannot resolve hostname: {hostname}", + provider="connectwise", + ) + + +class ConnectWiseClient: + """Async HTTP client for the ConnectWise PSA API. + + Auth: Authorization: Basic {base64(companyId+publicKey:privateKey)} + clientId header + Accept: application/vnd.connectwise.com+json; version=2025.16 + Base URL: resolved dynamically via /login/companyinfo/{companyId} + Pagination: page/pageSize params, max 1000 per page, while-loop pattern + Retry: respects 429 Retry-After, max 2 retries with exponential backoff for 5xx + Timeout: 30 seconds per request + """ + + def __init__( + self, + site_url: str, + company_id: str, + public_key: str, + private_key: str, + client_id: str, + ): + self.site_url = site_url.rstrip("/") + self.company_id = company_id + self.client_id = client_id + + # Auth: Base64(companyId+publicKey:privateKey) + auth_string = f"{company_id}+{public_key}:{private_key}" + self._auth_b64 = base64.b64encode(auth_string.encode()).decode() + + # Base URL resolved lazily on first request + self._base_url: str | None = None + + async def _resolve_base_url(self) -> str: + """Resolve the CW API base URL using /login/companyinfo/{companyId}. + + Cloud environments return a versioned codebase (e.g., v2025_3/) requiring + an 'api-' prefix on the hostname. On-premise returns v4_6_release/ with + no prefix needed. + """ + if self._base_url: + return self._base_url + + _validate_site_url(self.site_url) + + info_url = f"https://{self.site_url}/login/companyinfo/{self.company_id}" + + async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT) as client: + try: + resp = await client.get(info_url) + resp.raise_for_status() + except httpx.TimeoutException: + raise PSATimeoutError( + "Timed out resolving CW base URL", provider="connectwise" + ) + except httpx.HTTPError as e: + raise PSAConnectionError( + f"Failed to resolve CW base URL: {e}", provider="connectwise" + ) + + data = resp.json() + codebase = data.get("Codebase", "v4_6_release/") + site_url = data.get("SiteUrl", self.site_url) + + # Cloud codebase (e.g., v2025_3/) requires api- prefix + if codebase != "v4_6_release/": + if not site_url.startswith("api-"): + site_url = f"api-{site_url}" + + self._base_url = f"https://{site_url}/{codebase}apis/3.0" + logger.info("Resolved CW base URL: %s", self._base_url) + return self._base_url + + def _headers(self) -> dict[str, str]: + return { + "Authorization": f"Basic {self._auth_b64}", + "clientId": self.client_id, + "Accept": CW_ACCEPT_HEADER, + "Content-Type": "application/json", + } + + async def _request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json_body: Any = None, + retries: int = MAX_RETRIES, + ) -> Any: + """Make an authenticated request to the CW API with retry and error mapping.""" + base_url = await self._resolve_base_url() + url = f"{base_url}/{path.lstrip('/')}" + + async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT) as client: + for attempt in range(retries + 1): + try: + resp = await client.request( + method, + url, + headers=self._headers(), + params=params, + json=json_body, + ) + except httpx.TimeoutException: + if attempt < retries: + await asyncio.sleep(2 ** attempt) + continue + raise PSATimeoutError( + "ConnectWise request timed out", provider="connectwise" + ) + except httpx.ConnectError: + raise PSAConnectionError( + "Cannot reach ConnectWise server", provider="connectwise" + ) + + # Rate limit — retry with Retry-After backoff + if resp.status_code == 429: + if attempt < retries: + retry_after = int(resp.headers.get("Retry-After", "5")) + await asyncio.sleep(retry_after) + continue + raise PSARateLimitError( + "ConnectWise rate limit exceeded", + retry_after_seconds=int( + resp.headers.get("Retry-After", "60") + ), + provider="connectwise", + ) + + # Map error status codes to typed exceptions + if resp.status_code == 401: + raise PSAAuthError( + "Invalid credentials. Check your API keys.", + provider="connectwise", + ) + if resp.status_code == 403: + raise PSAPermissionError( + "Insufficient permissions. Check the API member's security role.", + provider="connectwise", + ) + if resp.status_code == 404: + raise PSANotFoundError( + "Resource not found.", provider="connectwise" + ) + if resp.status_code >= 500: + if attempt < retries: + await asyncio.sleep(2 ** attempt) + continue + raise PSAServerError( + "ConnectWise is experiencing issues. Try again.", + provider="connectwise", + ) + + resp.raise_for_status() + if resp.status_code == 204: + return None + return resp.json() + + # Should not reach here, but satisfy type checker + raise PSAConnectionError( + "Request failed after all retries", provider="connectwise" + ) + + async def get(self, path: str, params: dict[str, Any] | None = None) -> Any: + """GET request to CW API.""" + return await self._request("GET", path, params=params) + + async def post(self, path: str, json_body: Any = None) -> Any: + """POST request to CW API.""" + return await self._request("POST", path, json_body=json_body) + + async def patch(self, path: str, json_body: Any = None) -> Any: + """PATCH request to CW API (JSON Patch array format). + + CW uses JSON Patch syntax: [{"op": "replace", "path": "field", "value": ...}] + """ + return await self._request("PATCH", path, json_body=json_body) + + async def delete(self, path: str) -> Any: + """DELETE request to CW API.""" + return await self._request("DELETE", path) + + async def get_paginated( + self, + path: str, + params: dict[str, Any] | None = None, + max_pages: int = 10, + ) -> list[Any]: + """Fetch all pages of a paginated CW endpoint. + + Uses navigable pagination with page/pageSize params. + Stops when a page returns fewer results than pageSize or max_pages is reached. + """ + params = dict(params or {}) + params.setdefault("pageSize", MAX_PAGE_SIZE) + all_results: list[Any] = [] + + for page in range(1, max_pages + 1): + params["page"] = page + results = await self.get(path, params=params) + if not results: + break + all_results.extend(results) + if len(results) < params["pageSize"]: + break + + return all_results