122 lines
3.9 KiB
Python
122 lines
3.9 KiB
Python
"""Unit tests for the in-memory escalation pub/sub bus."""
|
|
import asyncio
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from app.core.escalation_bus import EscalationBus
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publish_with_no_subscribers_returns_zero():
|
|
bus = EscalationBus()
|
|
delivered = await bus.publish(uuid4(), {"type": "handoff_created"})
|
|
assert delivered == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscribe_then_publish_delivers_event():
|
|
bus = EscalationBus()
|
|
account = uuid4()
|
|
queue = await bus.subscribe(account)
|
|
try:
|
|
delivered = await bus.publish(account, {"type": "handoff_created", "id": "x"})
|
|
assert delivered == 1
|
|
event = await asyncio.wait_for(queue.get(), timeout=1.0)
|
|
assert event == {"type": "handoff_created", "id": "x"}
|
|
finally:
|
|
await bus.unsubscribe(account, queue)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_two_subscribers_same_account_both_receive():
|
|
bus = EscalationBus()
|
|
account = uuid4()
|
|
q1 = await bus.subscribe(account)
|
|
q2 = await bus.subscribe(account)
|
|
try:
|
|
delivered = await bus.publish(account, {"type": "x"})
|
|
assert delivered == 2
|
|
e1 = await asyncio.wait_for(q1.get(), timeout=1.0)
|
|
e2 = await asyncio.wait_for(q2.get(), timeout=1.0)
|
|
assert e1 == e2 == {"type": "x"}
|
|
finally:
|
|
await bus.unsubscribe(account, q1)
|
|
await bus.unsubscribe(account, q2)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscriber_in_other_account_does_not_receive():
|
|
"""Cross-tenant isolation is the whole point — sanity check it directly."""
|
|
bus = EscalationBus()
|
|
account_a = uuid4()
|
|
account_b = uuid4()
|
|
q_a = await bus.subscribe(account_a)
|
|
q_b = await bus.subscribe(account_b)
|
|
try:
|
|
delivered = await bus.publish(account_a, {"type": "x"})
|
|
assert delivered == 1
|
|
|
|
e_a = await asyncio.wait_for(q_a.get(), timeout=1.0)
|
|
assert e_a == {"type": "x"}
|
|
|
|
# B's queue must remain empty.
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
await asyncio.wait_for(q_b.get(), timeout=0.1)
|
|
finally:
|
|
await bus.unsubscribe(account_a, q_a)
|
|
await bus.unsubscribe(account_b, q_b)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publish_normalizes_string_uuid_account_id():
|
|
"""ORM-created objects can briefly carry string UUIDs in-memory."""
|
|
bus = EscalationBus()
|
|
account = uuid4()
|
|
queue = await bus.subscribe(account)
|
|
try:
|
|
delivered = await bus.publish(str(account), {"type": "x"})
|
|
assert delivered == 1
|
|
event = await asyncio.wait_for(queue.get(), timeout=1.0)
|
|
assert event == {"type": "x"}
|
|
finally:
|
|
await bus.unsubscribe(str(account), queue)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unsubscribe_drops_subscriber_count_to_zero():
|
|
bus = EscalationBus()
|
|
account = uuid4()
|
|
q = await bus.subscribe(account)
|
|
assert bus.subscriber_count(account) == 1
|
|
await bus.unsubscribe(account, q)
|
|
assert bus.subscriber_count(account) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publish_drops_events_when_subscriber_queue_is_full():
|
|
"""A stuck subscriber must not back-pressure publishers."""
|
|
bus = EscalationBus()
|
|
account = uuid4()
|
|
queue = await bus.subscribe(account)
|
|
try:
|
|
# Stuff the queue past capacity (maxsize is 64) without consuming.
|
|
for _ in range(65):
|
|
await bus.publish(account, {"type": "x"})
|
|
# Sanity: queue holds at most maxsize.
|
|
assert queue.qsize() <= 64
|
|
# Publishes after capacity didn't raise — they were dropped silently.
|
|
finally:
|
|
await bus.unsubscribe(account, queue)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unsubscribe_unknown_queue_is_noop():
|
|
"""Defensive: unsubscribe on an account/queue that isn't registered
|
|
should not raise — finally blocks rely on this."""
|
|
bus = EscalationBus()
|
|
account = uuid4()
|
|
fake_queue: asyncio.Queue = asyncio.Queue()
|
|
# Should not raise.
|
|
await bus.unsubscribe(account, fake_queue)
|