"""Tests for event system.""" import pytest from fastapi import APIRouter from fastapi_route_loader.events import ( EventDispatcher, RouterEvent, RouterEventType, RouterLoadedEvent, RouterUnloadedEvent, RouterUpdatedEvent, ) class TestRouterEvents: """Test router event classes.""" def test_router_loaded_event(self): router = APIRouter() event = RouterLoadedEvent("test_router", router, {"key": "value"}) assert event.event_type == RouterEventType.LOADED assert event.router_name == "test_router" assert event.router is router assert event.metadata == {"key": "value"} def test_router_loaded_event_no_metadata(self): router = APIRouter() event = RouterLoadedEvent("test_router", router) assert event.event_type == RouterEventType.LOADED assert event.metadata == {} def test_router_unloaded_event(self): router = APIRouter() event = RouterUnloadedEvent("test_router", router, {"reason": "cleanup"}) assert event.event_type == RouterEventType.UNLOADED assert event.router_name == "test_router" assert event.router is router assert event.metadata == {"reason": "cleanup"} def test_router_updated_event(self): old_router = APIRouter() new_router = APIRouter() event = RouterUpdatedEvent( "test_router", new_router, old_router, {"version": "2.0"} ) assert event.event_type == RouterEventType.UPDATED assert event.router_name == "test_router" assert event.router is new_router assert event.old_router is old_router assert event.metadata == {"version": "2.0"} def test_event_immutability(self): router = APIRouter() event = RouterLoadedEvent("test_router", router) with pytest.raises(AttributeError): event.router_name = "new_name" class TestEventDispatcher: """Test event dispatcher.""" def test_subscribe_and_dispatch(self): dispatcher = EventDispatcher() router = APIRouter() events_received = [] def handler(event: RouterEvent) -> None: events_received.append(event) dispatcher.subscribe(RouterEventType.LOADED, handler) event = RouterLoadedEvent("test_router", router) dispatcher.dispatch(event) assert len(events_received) == 1 assert events_received[0] is event def test_subscribe_to_all_events(self): dispatcher = EventDispatcher() router = APIRouter() events_received = [] def handler(event: RouterEvent) -> None: events_received.append(event) dispatcher.subscribe(None, handler) loaded_event = RouterLoadedEvent("test_router", router) dispatcher.dispatch(loaded_event) unloaded_event = RouterUnloadedEvent("test_router", router) dispatcher.dispatch(unloaded_event) assert len(events_received) == 2 assert events_received[0] is loaded_event assert events_received[1] is unloaded_event def test_multiple_handlers(self): dispatcher = EventDispatcher() router = APIRouter() handler1_calls = [] handler2_calls = [] def handler1(event: RouterEvent) -> None: handler1_calls.append(event) def handler2(event: RouterEvent) -> None: handler2_calls.append(event) dispatcher.subscribe(RouterEventType.LOADED, handler1) dispatcher.subscribe(RouterEventType.LOADED, handler2) event = RouterLoadedEvent("test_router", router) dispatcher.dispatch(event) assert len(handler1_calls) == 1 assert len(handler2_calls) == 1 def test_unsubscribe(self): dispatcher = EventDispatcher() router = APIRouter() events_received = [] def handler(event: RouterEvent) -> None: events_received.append(event) dispatcher.subscribe(RouterEventType.LOADED, handler) dispatcher.unsubscribe(RouterEventType.LOADED, handler) event = RouterLoadedEvent("test_router", router) dispatcher.dispatch(event) assert len(events_received) == 0 def test_unsubscribe_global_handler(self): dispatcher = EventDispatcher() router = APIRouter() events_received = [] def handler(event: RouterEvent) -> None: events_received.append(event) dispatcher.subscribe(None, handler) dispatcher.unsubscribe(None, handler) event = RouterLoadedEvent("test_router", router) dispatcher.dispatch(event) assert len(events_received) == 0 def test_clear_handlers(self): dispatcher = EventDispatcher() router = APIRouter() events_received = [] def handler(event: RouterEvent) -> None: events_received.append(event) dispatcher.subscribe(RouterEventType.LOADED, handler) dispatcher.subscribe(None, handler) dispatcher.clear() event = RouterLoadedEvent("test_router", router) dispatcher.dispatch(event) assert len(events_received) == 0 def test_handler_exception_does_not_stop_other_handlers(self): dispatcher = EventDispatcher() router = APIRouter() handler2_calls = [] def handler1(event: RouterEvent) -> None: raise ValueError("Handler error") def handler2(event: RouterEvent) -> None: handler2_calls.append(event) dispatcher.subscribe(RouterEventType.LOADED, handler1) dispatcher.subscribe(RouterEventType.LOADED, handler2) event = RouterLoadedEvent("test_router", router) with pytest.raises(ValueError, match="Handler error"): dispatcher.dispatch(event)