feat: ConnectWise PSA integration (#106)
PSA abstraction layer with provider pattern, ConnectWise integration (connection management, ticket linking, note posting, status updates, member mapping), Integrations page UI, Fernet credential encryption, in-memory TTL cache, 6 DB migrations, ConnectWise API reference docs.
This commit was merged in pull request #106.
This commit is contained in:
59
backend/tests/test_psa_connections.py
Normal file
59
backend/tests/test_psa_connections.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Tests for PSA connection endpoints — routing and RBAC only.
|
||||
|
||||
We cannot fully test create/update/test endpoints in CI because they
|
||||
call the ConnectWise API. These tests verify routing and authorization.
|
||||
"""
|
||||
import pytest
|
||||
from sqlalchemy import select, update
|
||||
from app.models.user import User
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_connection_empty(client, admin_auth_headers):
|
||||
"""GET returns null when no connection exists."""
|
||||
response = await client.get(
|
||||
"/api/v1/integrations/psa/connections",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json() is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_connection_requires_owner(client, test_user, auth_headers, test_db):
|
||||
"""Engineer (non-owner) should get 403 on create."""
|
||||
# Downgrade the test user from owner to engineer so require_account_owner rejects
|
||||
user_id = test_user["user_data"]["id"]
|
||||
await test_db.execute(
|
||||
update(User).where(User.id == user_id).values(account_role="engineer")
|
||||
)
|
||||
await test_db.commit()
|
||||
|
||||
payload = {
|
||||
"provider": "connectwise",
|
||||
"display_name": "Test CW",
|
||||
"site_url": "https://na.myconnectwise.net",
|
||||
"company_id": "testmsp",
|
||||
"public_key": "pub123",
|
||||
"private_key": "priv456",
|
||||
"client_id": "client789",
|
||||
}
|
||||
response = await client.post(
|
||||
"/api/v1/integrations/psa/connections",
|
||||
json=payload,
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_nonexistent_returns_404(client, admin_auth_headers):
|
||||
"""DELETE with a nonexistent ID returns 404."""
|
||||
import uuid
|
||||
|
||||
fake_id = uuid.uuid4()
|
||||
response = await client.delete(
|
||||
f"/api/v1/integrations/psa/connections/{fake_id}",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
assert response.status_code == 404
|
||||
44
backend/tests/test_psa_encryption.py
Normal file
44
backend/tests/test_psa_encryption.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""Tests for PSA credential encryption/decryption."""
|
||||
import pytest
|
||||
from app.services.psa.encryption import encrypt_credentials, decrypt_credentials
|
||||
|
||||
|
||||
class TestCredentialEncryption:
|
||||
def test_round_trip(self):
|
||||
"""Encrypt then decrypt returns original credentials."""
|
||||
creds = {
|
||||
"public_key": "abc123",
|
||||
"private_key": "secret456",
|
||||
"client_id": "my-client-id",
|
||||
}
|
||||
encrypted = encrypt_credentials(creds)
|
||||
|
||||
# Encrypted should be a non-empty string, different from input
|
||||
assert isinstance(encrypted, str)
|
||||
assert len(encrypted) > 0
|
||||
assert "secret456" not in encrypted
|
||||
|
||||
decrypted = decrypt_credentials(encrypted)
|
||||
assert decrypted == creds
|
||||
|
||||
def test_different_inputs_produce_different_outputs(self):
|
||||
creds1 = {"public_key": "key1", "private_key": "priv1", "client_id": "cid1"}
|
||||
creds2 = {"public_key": "key2", "private_key": "priv2", "client_id": "cid2"}
|
||||
|
||||
enc1 = encrypt_credentials(creds1)
|
||||
enc2 = encrypt_credentials(creds2)
|
||||
assert enc1 != enc2
|
||||
|
||||
def test_tampered_ciphertext_raises(self):
|
||||
creds = {"public_key": "k", "private_key": "p", "client_id": "c"}
|
||||
encrypted = encrypt_credentials(creds)
|
||||
tampered = encrypted[:-5] + "XXXXX"
|
||||
with pytest.raises(Exception):
|
||||
decrypt_credentials(tampered)
|
||||
|
||||
def test_mask_private_key(self):
|
||||
from app.services.psa.encryption import mask_credential
|
||||
assert mask_credential("abcdefghij") == "\u2022\u2022\u2022\u2022\u2022\u2022ghij"
|
||||
assert mask_credential("abc") == "\u2022\u2022\u2022\u2022\u2022\u2022abc"
|
||||
assert mask_credential("") == "\u2022\u2022\u2022\u2022\u2022\u2022"
|
||||
assert mask_credential(None) == "\u2022\u2022\u2022\u2022\u2022\u2022"
|
||||
Reference in New Issue
Block a user