# event_bus.py
from typing import Callable, Dict, List
from .models import JobEvent
class EventBus:
def __init__(self):
self.listeners: List[Callable[[JobEvent], None]] = []
def subscribe(self, listener: Callable[[JobEvent], None]):
self.listeners.append(listener)
def publish(self, event: JobEvent):
for l in self.listeners:
l(event)
event_bus = EventBus()
event_bus.publish(JobEvent(job_id=job.id, status=JobStatus.PENDING, message="Submitting Docker job"))
# sse_notifier.py
import asyncio
from ..core.event_bus import event_bus
from ..core.models import JobEvent
clients: List[asyncio.Queue] = []
def sse_listener(event: JobEvent):
for q in clients:
q.put_nowait(event)
event_bus.subscribe(sse_listener)
async def get_event_stream(queue: asyncio.Queue):
try:
while True:
event = await queue.get()
yield f"data: {event.json()}\n\n"
except asyncio.CancelledError:
clients.remove(queue)