EventBus(发布/订阅)

最后发布时间:2025-07-15 16:27:39 浏览量:
# event_bus.py
from typing import Callable, Dict, List
from .models import JobEvent

class EventBus:
    def __init__(self):
        self.listeners: List[Callable[[JobEvent], None]] = []

    def subscribe(self, listener: Callable[[JobEvent], None]):
        self.listeners.append(listener)

    def publish(self, event: JobEvent):
        for l in self.listeners:
            l(event)

event_bus = EventBus()
event_bus.publish(JobEvent(job_id=job.id, status=JobStatus.PENDING, message="Submitting Docker job"))
# sse_notifier.py
import asyncio
from ..core.event_bus import event_bus
from ..core.models import JobEvent

clients: List[asyncio.Queue] = []

def sse_listener(event: JobEvent):
    for q in clients:
        q.put_nowait(event)

event_bus.subscribe(sse_listener)

async def get_event_stream(queue: asyncio.Queue):
    try:
        while True:
            event = await queue.get()
            yield f"data: {event.json()}\n\n"
    except asyncio.CancelledError:
        clients.remove(queue)

快捷入口
Python 思维导图 浏览PDF 下载PDF
分享到:
标签