# event_bus.py from typing import Callable, Dict, List from .models import JobEvent class EventBus: def __init__(self): self.listeners: List[Callable[[JobEvent], None]] = [] def subscribe(self, listener: Callable[[JobEvent], None]): self.listeners.append(listener) def publish(self, event: JobEvent): for l in self.listeners: l(event) event_bus = EventBus()
event_bus.publish(JobEvent(job_id=job.id, status=JobStatus.PENDING, message="Submitting Docker job"))
# sse_notifier.py import asyncio from ..core.event_bus import event_bus from ..core.models import JobEvent clients: List[asyncio.Queue] = [] def sse_listener(event: JobEvent): for q in clients: q.put_nowait(event) event_bus.subscribe(sse_listener) async def get_event_stream(queue: asyncio.Queue): try: while True: event = await queue.get() yield f"data: {event.json()}\n\n" except asyncio.CancelledError: clients.remove(queue)