Add ability to close active sessions directly from the Session History page via an inline popover with outcome selection and optional notes. Adds two new outcomes: cancelled and resolved_externally. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1442 lines
49 KiB
Python
1442 lines
49 KiB
Python
"""Integration tests for session endpoints."""
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
|
|
class TestSessions:
|
|
"""Test suite for troubleshooting session endpoints."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_session(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test starting a new troubleshooting session."""
|
|
session_data = {
|
|
"tree_id": test_tree["id"],
|
|
"ticket_number": "TICKET-123",
|
|
"client_name": "Test Client"
|
|
}
|
|
|
|
response = await client.post(
|
|
"/api/v1/sessions",
|
|
json=session_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["tree_id"] == test_tree["id"]
|
|
assert data["ticket_number"] == session_data["ticket_number"]
|
|
assert data["client_name"] == session_data["client_name"]
|
|
assert data["path_taken"] == []
|
|
assert data["decisions"] == []
|
|
assert data["completed_at"] is None
|
|
assert "id" in data
|
|
assert "started_at" in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_session(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test retrieving a specific session."""
|
|
# Create a session first
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Get the session
|
|
response = await client.get(
|
|
f"/api/v1/sessions/{session_id}",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["id"] == session_id
|
|
assert data["tree_id"] == test_tree["id"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_sessions(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test listing user's sessions."""
|
|
# Create a session
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# List sessions
|
|
response = await client.get("/api/v1/sessions", headers=auth_headers)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_session_path(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test updating session with path taken."""
|
|
# Create session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Update path
|
|
update_data = {
|
|
"path_taken": ["root", "solution1"]
|
|
}
|
|
|
|
response = await client.put(
|
|
f"/api/v1/sessions/{session_id}",
|
|
json=update_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["path_taken"] == update_data["path_taken"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_session_ticket(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test updating session metadata."""
|
|
# Create session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Update metadata
|
|
update_data = {
|
|
"ticket_number": "UPDATED-456",
|
|
"client_name": "Updated Client"
|
|
}
|
|
|
|
response = await client.put(
|
|
f"/api/v1/sessions/{session_id}",
|
|
json=update_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["ticket_number"] == update_data["ticket_number"]
|
|
assert data["client_name"] == update_data["client_name"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_session(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test marking a session as complete."""
|
|
# Create session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Complete session
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved", "outcome_notes": "Issue fixed after restarting service"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["completed_at"] is not None
|
|
assert data["outcome"] == "resolved"
|
|
assert data["outcome_notes"] == "Issue fixed after restarting service"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_session_with_cancelled_outcome(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test completing a session with 'cancelled' outcome."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "cancelled", "outcome_notes": "Ticket withdrawn by client"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["outcome"] == "cancelled"
|
|
assert data["outcome_notes"] == "Ticket withdrawn by client"
|
|
assert data["completed_at"] is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_session_with_resolved_externally_outcome(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test completing a session with 'resolved_externally' outcome."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved_externally"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["outcome"] == "resolved_externally"
|
|
assert data["completed_at"] is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_session_requires_outcome(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that completion requires an outcome payload."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_already_completed_session(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that completing an already completed session fails."""
|
|
# Create and complete session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Try to complete again
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_session_markdown(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test exporting session to markdown format."""
|
|
# Create session with ticket number
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "EXP-001"},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Export as markdown
|
|
export_data = {
|
|
"format": "markdown",
|
|
"include_timestamps": True,
|
|
"include_tree_info": True
|
|
}
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json=export_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "EXP-001" in content # Should contain ticket number
|
|
assert "#" in content # Markdown headers
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_markdown_includes_outcome_and_step_duration(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test markdown export includes session outcome and per-step duration."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "EXP-OUTCOME-001"},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
step_timestamp = "2026-02-11T10:10:00Z"
|
|
update_response = await client.put(
|
|
f"/api/v1/sessions/{session_id}",
|
|
json={
|
|
"decisions": [{
|
|
"node_id": "root",
|
|
"question": "Is this a test?",
|
|
"answer": "Yes",
|
|
"action_performed": None,
|
|
"notes": "Validated quickly",
|
|
"automation_used": False,
|
|
"timestamp": step_timestamp,
|
|
"entered_at": "2026-02-11T10:08:30Z",
|
|
"exited_at": step_timestamp,
|
|
"duration_seconds": 90,
|
|
"attachments": []
|
|
}]
|
|
},
|
|
headers=auth_headers
|
|
)
|
|
assert update_response.status_code == 200
|
|
|
|
complete_response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "workaround", "outcome_notes": "Temporary mitigation applied"},
|
|
headers=auth_headers
|
|
)
|
|
assert complete_response.status_code == 200
|
|
|
|
export_response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown", "include_timestamps": True, "include_tree_info": True},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert export_response.status_code == 200
|
|
content = export_response.text
|
|
assert "**Outcome:** Workaround Applied" in content
|
|
assert "**Duration:**" in content
|
|
assert "**Duration:** 1m 30s" in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_session_text(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test exporting session to text format."""
|
|
# Create session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Export as text
|
|
export_data = {"format": "text"}
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json=export_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.headers["content-type"] == "text/plain; charset=utf-8"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_session_html(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test exporting session to HTML format."""
|
|
# Create session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Export as HTML
|
|
export_data = {"format": "html"}
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json=export_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "<!DOCTYPE html>" in content
|
|
assert "<html>" in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_by_completion(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test filtering sessions by completion status."""
|
|
# Create two sessions, complete one
|
|
create1 = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session1_id = create1.json()["id"]
|
|
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Complete first session
|
|
await client.post(
|
|
f"/api/v1/sessions/{session1_id}/complete",
|
|
json={"outcome": "resolved"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Get completed sessions
|
|
response = await client.get(
|
|
"/api/v1/sessions?completed=true",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) >= 1
|
|
assert all(s["completed_at"] is not None for s in data)
|
|
|
|
# Get incomplete sessions
|
|
response = await client.get(
|
|
"/api/v1/sessions?completed=false",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) >= 1
|
|
assert all(s["completed_at"] is None for s in data)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_session_has_scratchpad(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that new sessions include scratchpad field."""
|
|
session_data = {
|
|
"tree_id": test_tree["id"],
|
|
}
|
|
|
|
response = await client.post(
|
|
"/api/v1/sessions",
|
|
json=session_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert "scratchpad" in data
|
|
assert data["scratchpad"] == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_scratchpad_via_put(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test updating scratchpad through the existing PUT endpoint."""
|
|
# Create session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Update scratchpad via PUT
|
|
update_data = {
|
|
"scratchpad": "- Server IP: 192.168.1.50\n- Error: 0x80070005"
|
|
}
|
|
|
|
response = await client.put(
|
|
f"/api/v1/sessions/{session_id}",
|
|
json=update_data,
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["scratchpad"] == update_data["scratchpad"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_scratchpad(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test the dedicated PATCH scratchpad endpoint."""
|
|
# Create session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Patch scratchpad
|
|
response = await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": "- IP: 10.0.0.1\n- User: jsmith"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["scratchpad"] == "- IP: 10.0.0.1\n- User: jsmith"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_scratchpad_not_found(
|
|
self, client: AsyncClient, auth_headers: dict
|
|
):
|
|
"""Test PATCH scratchpad with invalid session ID."""
|
|
import uuid
|
|
fake_id = str(uuid.uuid4())
|
|
|
|
response = await client.patch(
|
|
f"/api/v1/sessions/{fake_id}/scratchpad",
|
|
json={"scratchpad": "test"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_scratchpad_empty_string(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test PATCH scratchpad with empty string (clear scratchpad)."""
|
|
# Create session and set scratchpad
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Set scratchpad
|
|
await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": "some notes"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Clear scratchpad
|
|
response = await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": ""},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["scratchpad"] == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_scratchpad_completed_session(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that scratchpad can still be updated on completed sessions."""
|
|
# Create and complete session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Should still be able to update scratchpad on completed sessions
|
|
response = await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": "post-resolution notes"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["scratchpad"] == "post-resolution notes"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_includes_scratchpad_markdown(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that markdown export includes scratchpad content."""
|
|
# Create session with scratchpad
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "SP-001"},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Add scratchpad content
|
|
await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": "- Server IP: 192.168.1.50\n- Error: 0x80070005"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Export as markdown
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown", "include_tree_info": True},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "## Evidence / Reference" in content
|
|
assert "192.168.1.50" in content
|
|
assert "0x80070005" in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_includes_scratchpad_text(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that text export includes scratchpad content."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": "Error code: 12345"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "text"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "EVIDENCE / REFERENCE" in content
|
|
assert "Error code: 12345" in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_includes_scratchpad_html(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that HTML export includes scratchpad content."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": "DNS server: 10.0.0.5"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "html"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "Evidence / Reference" in content
|
|
assert "DNS server: 10.0.0.5" in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_excludes_empty_scratchpad(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that export omits scratchpad section when empty."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
# Export without setting scratchpad
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "Evidence / Reference" not in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_excludes_whitespace_only_scratchpad(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that export omits scratchpad section when only whitespace."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": " \n \n "},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "Evidence / Reference" not in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_html_export_escapes_script_tags(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that HTML export escapes script tags in user content (XSS prevention)."""
|
|
session_data = {
|
|
"tree_id": test_tree["id"],
|
|
"ticket_number": '<script>alert("xss")</script>',
|
|
"client_name": '<img onerror="alert(1)" src=x>'
|
|
}
|
|
create_response = await client.post(
|
|
"/api/v1/sessions", json=session_data, headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "html", "include_tree_info": True},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert '<script>' not in content
|
|
assert '<script>' in content
|
|
assert '<img' in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_html_export_escapes_special_chars(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that HTML export properly escapes special characters."""
|
|
session_data = {
|
|
"tree_id": test_tree["id"],
|
|
"ticket_number": 'TICK-001 <b>"bold"</b> & \'quoted\''
|
|
}
|
|
create_response = await client.post(
|
|
"/api/v1/sessions", json=session_data, headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "html", "include_tree_info": True},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert '&' in content
|
|
assert '<b>' in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_html_export_escapes_scratchpad(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that HTML export escapes scratchpad content."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.patch(
|
|
f"/api/v1/sessions/{session_id}/scratchpad",
|
|
json={"scratchpad": '<script>document.cookie</script>'},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "html"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert '<script>' not in content
|
|
assert '<script>' in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_session_on_others_private_tree_forbidden(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that a user cannot start a session on another user's private tree."""
|
|
# Register a second user
|
|
await client.post("/api/v1/auth/register", json={
|
|
"email": "other@example.com",
|
|
"password": "OtherPassword123!",
|
|
"name": "Other User"
|
|
})
|
|
login_resp = await client.post("/api/v1/auth/login/json", json={
|
|
"email": "other@example.com",
|
|
"password": "OtherPassword123!"
|
|
})
|
|
other_headers = {"Authorization": f"Bearer {login_resp.json()['access_token']}"}
|
|
|
|
# test_tree is owned by test_user (not public, not default)
|
|
response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=other_headers
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_session_super_admin_any_tree(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that a super admin can start a session on any tree."""
|
|
response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_by_ticket_number(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test filtering sessions by ticket number (partial match)."""
|
|
# Create sessions with different ticket numbers
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "INC-12345"},
|
|
headers=auth_headers
|
|
)
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "REQ-67890"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Filter by ticket number
|
|
response = await client.get(
|
|
"/api/v1/sessions?ticket_number=INC",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) == 1
|
|
assert data[0]["ticket_number"] == "INC-12345"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_by_client_name(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test filtering sessions by client name (partial match, case-insensitive)."""
|
|
# Create sessions with different clients
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "client_name": "Acme Corporation"},
|
|
headers=auth_headers
|
|
)
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "client_name": "TechStart Inc"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Filter by client name (case-insensitive partial match)
|
|
response = await client.get(
|
|
"/api/v1/sessions?client_name=acme",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) == 1
|
|
assert data[0]["client_name"] == "Acme Corporation"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_by_tree_name(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test filtering sessions by tree name from snapshot."""
|
|
# Create session (tree_snapshot includes tree name)
|
|
response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
# Filter by tree name (partial match from snapshot)
|
|
tree_name_part = test_tree["name"][:5] # First 5 chars
|
|
response = await client.get(
|
|
f"/api/v1/sessions?tree_name={tree_name_part}",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) >= 1
|
|
assert test_tree["name"] in data[0]["tree_snapshot"]["name"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_by_tree_id(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test filtering sessions by tree_id."""
|
|
other_tree_response = await client.post(
|
|
"/api/v1/trees",
|
|
json={
|
|
"name": "Other Tree",
|
|
"description": "Second tree for filter test",
|
|
"category": test_tree["category"],
|
|
"tree_structure": test_tree["tree_structure"],
|
|
},
|
|
headers=auth_headers,
|
|
)
|
|
assert other_tree_response.status_code == 201
|
|
other_tree_id = other_tree_response.json()["id"]
|
|
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "TREE-A"},
|
|
headers=auth_headers,
|
|
)
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": other_tree_id, "ticket_number": "TREE-B"},
|
|
headers=auth_headers,
|
|
)
|
|
|
|
response = await client.get(
|
|
f"/api/v1/sessions?tree_id={test_tree['id']}",
|
|
headers=auth_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) >= 1
|
|
assert all(item["tree_id"] == test_tree["id"] for item in data)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_sessions_supports_size_and_page_params(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test frontend-compatible page/size query params."""
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "P1"},
|
|
headers=auth_headers,
|
|
)
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "P2"},
|
|
headers=auth_headers,
|
|
)
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "P3"},
|
|
headers=auth_headers,
|
|
)
|
|
|
|
first_page = await client.get("/api/v1/sessions?size=2&page=1", headers=auth_headers)
|
|
assert first_page.status_code == 200
|
|
first_data = first_page.json()
|
|
assert len(first_data) == 2
|
|
|
|
second_page = await client.get("/api/v1/sessions?size=2&page=2", headers=auth_headers)
|
|
assert second_page.status_code == 200
|
|
second_data = second_page.json()
|
|
assert len(second_data) >= 1
|
|
|
|
first_ids = {item["id"] for item in first_data}
|
|
second_ids = {item["id"] for item in second_data}
|
|
assert first_ids.isdisjoint(second_ids)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_by_started_date_range(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test filtering sessions by started date range."""
|
|
from datetime import datetime, timezone, timedelta
|
|
from urllib.parse import quote
|
|
|
|
# Create a session
|
|
response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "TEST-001"},
|
|
headers=auth_headers
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
# Get current time and create date range
|
|
now = datetime.now(timezone.utc)
|
|
yesterday = now - timedelta(days=1)
|
|
tomorrow = now + timedelta(days=1)
|
|
|
|
# Filter by started date range (should include the session)
|
|
# URL encode the datetime strings
|
|
started_after = quote(yesterday.isoformat())
|
|
started_before = quote(tomorrow.isoformat())
|
|
|
|
response = await client.get(
|
|
f"/api/v1/sessions?started_after={started_after}&started_before={started_before}",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) >= 1
|
|
assert data[0]["ticket_number"] == "TEST-001"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_by_completed_date_range(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test filtering sessions by completed date range."""
|
|
from datetime import datetime, timezone, timedelta
|
|
from urllib.parse import quote
|
|
|
|
# Create and complete a session
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"], "ticket_number": "TEST-002"},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Get current time and create date range
|
|
now = datetime.now(timezone.utc)
|
|
yesterday = now - timedelta(days=1)
|
|
tomorrow = now + timedelta(days=1)
|
|
|
|
# Filter by completed date range
|
|
completed_after = quote(yesterday.isoformat())
|
|
completed_before = quote(tomorrow.isoformat())
|
|
|
|
response = await client.get(
|
|
f"/api/v1/sessions?completed_after={completed_after}&completed_before={completed_before}",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) >= 1
|
|
assert any(s["ticket_number"] == "TEST-002" for s in data)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_combined(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test combining multiple filters (AND logic)."""
|
|
# Create sessions with various attributes
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={
|
|
"tree_id": test_tree["id"],
|
|
"ticket_number": "INC-111",
|
|
"client_name": "Client A"
|
|
},
|
|
headers=auth_headers
|
|
)
|
|
await client.post(
|
|
"/api/v1/sessions",
|
|
json={
|
|
"tree_id": test_tree["id"],
|
|
"ticket_number": "INC-222",
|
|
"client_name": "Client B"
|
|
},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Filter by both ticket and client (should match only one)
|
|
response = await client.get(
|
|
"/api/v1/sessions?ticket_number=INC-111&client_name=Client A",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) == 1
|
|
assert data[0]["ticket_number"] == "INC-111"
|
|
assert data[0]["client_name"] == "Client A"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_sessions_no_results(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that filtering returns empty list when no matches."""
|
|
response = await client.get(
|
|
"/api/v1/sessions?ticket_number=NONEXISTENT-999",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complete_session_with_next_steps(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test completing session saves next_steps."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={
|
|
"outcome": "resolved",
|
|
"outcome_notes": "Fixed the issue",
|
|
"next_steps": "Monitor for 48 hours"
|
|
},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["next_steps"] == "Monitor for 48 hours"
|
|
assert data["outcome_notes"] == "Fixed the issue"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_session_next_steps(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test updating next_steps via session update."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.put(
|
|
f"/api/v1/sessions/{session_id}",
|
|
json={"next_steps": "Schedule follow-up call"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["next_steps"] == "Schedule follow-up call"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_includes_outcome_notes_in_resolution(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that outcome_notes appear as Resolution section in exports."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={
|
|
"outcome": "resolved",
|
|
"outcome_notes": "Replaced failed DIMM in slot A2",
|
|
"next_steps": "Monitor for 24 hours"
|
|
},
|
|
headers=auth_headers
|
|
)
|
|
|
|
# Test markdown
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown"},
|
|
headers=auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
content = response.text
|
|
assert "## Resolution" in content
|
|
assert "Replaced failed DIMM in slot A2" in content
|
|
assert "## Next Steps" in content
|
|
assert "Monitor for 24 hours" in content
|
|
|
|
# Test text
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "text"},
|
|
headers=auth_headers
|
|
)
|
|
content = response.text
|
|
assert "RESOLUTION" in content
|
|
assert "Replaced failed DIMM in slot A2" in content
|
|
assert "NEXT STEPS" in content
|
|
assert "Monitor for 24 hours" in content
|
|
|
|
# Test HTML
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "html"},
|
|
headers=auth_headers
|
|
)
|
|
content = response.text
|
|
assert "Resolution" in content
|
|
assert "Replaced failed DIMM in slot A2" in content
|
|
assert "Next Steps" in content
|
|
assert "Monitor for 24 hours" in content
|
|
|
|
# Test PSA
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "psa"},
|
|
headers=auth_headers
|
|
)
|
|
content = response.text
|
|
assert "Replaced failed DIMM in slot A2" in content
|
|
assert "Monitor for 24 hours" in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_omits_empty_resolution_and_next_steps(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that empty outcome_notes/next_steps don't create empty sections."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown"},
|
|
headers=auth_headers
|
|
)
|
|
content = response.text
|
|
assert "## Resolution" not in content
|
|
assert "## Next Steps" not in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_exclude_outcome_notes_flag(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test include_outcome_notes=False suppresses resolution section."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={
|
|
"outcome": "resolved",
|
|
"outcome_notes": "Should not appear"
|
|
},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown", "include_outcome_notes": False},
|
|
headers=auth_headers
|
|
)
|
|
content = response.text
|
|
assert "## Resolution" not in content
|
|
assert "Should not appear" not in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_max_step_index(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test max_step_index limits exported steps."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
decisions = [
|
|
{"node_id": "n1", "question": "Step one?", "answer": "Yes", "timestamp": "2026-02-13T10:00:00Z", "attachments": []},
|
|
{"node_id": "n2", "question": "Step two?", "answer": "No", "timestamp": "2026-02-13T10:01:00Z", "attachments": []},
|
|
{"node_id": "n3", "question": "Step three?", "answer": "Maybe", "timestamp": "2026-02-13T10:02:00Z", "attachments": []},
|
|
]
|
|
await client.put(
|
|
f"/api/v1/sessions/{session_id}",
|
|
json={"decisions": decisions},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown", "max_step_index": 2},
|
|
headers=auth_headers
|
|
)
|
|
content = response.text
|
|
assert "Step one?" in content
|
|
assert "Step two?" in content
|
|
assert "Step three?" not in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_max_step_index_exceeds_count(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test max_step_index larger than decision count returns all steps."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
decisions = [
|
|
{"node_id": "n1", "question": "Only step", "answer": "Done", "timestamp": "2026-02-13T10:00:00Z", "attachments": []},
|
|
]
|
|
await client.put(
|
|
f"/api/v1/sessions/{session_id}",
|
|
json={"decisions": decisions},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown", "max_step_index": 100},
|
|
headers=auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Only step" in response.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_max_step_index_zero_returns_422(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test max_step_index=0 returns validation error."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown", "max_step_index": 0},
|
|
headers=auth_headers
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_in_progress_session_does_not_mark_exported(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that exporting an in-progress session does NOT set exported=True."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.get(
|
|
f"/api/v1/sessions/{session_id}",
|
|
headers=auth_headers
|
|
)
|
|
assert response.json()["exported"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_completed_session_marks_exported(
|
|
self, client: AsyncClient, auth_headers: dict, test_tree: dict
|
|
):
|
|
"""Test that exporting a completed session sets exported=True."""
|
|
create_response = await client.post(
|
|
"/api/v1/sessions",
|
|
json={"tree_id": test_tree["id"]},
|
|
headers=auth_headers
|
|
)
|
|
session_id = create_response.json()["id"]
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/complete",
|
|
json={"outcome": "resolved"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
await client.post(
|
|
f"/api/v1/sessions/{session_id}/export",
|
|
json={"format": "markdown"},
|
|
headers=auth_headers
|
|
)
|
|
|
|
response = await client.get(
|
|
f"/api/v1/sessions/{session_id}",
|
|
headers=auth_headers
|
|
)
|
|
assert response.json()["exported"] is True
|