实现异步解耦通信(如:任务状态推送、日志流、模块间事件传递)而不依赖第三方中间件(如 Redis、Kafka、MQTT),通常依赖的是语言自身提供的并发机制,如 Python 的 asyncio、队列、协程 + 多线程/多进程 + 内部事件总线机制。
asyncio
multiprocessing.Queue/Pipe
socket
asyncio + UNIX socket
SQLite 轮询 + 文件锁
FIFO 命名管道(mkfifo)
watchdog + 文件触发器
你可以说明你的具体需求,但这里是几个常见用例:
asyncio.Queue
multiprocessing.Queue
EventBus
# event_bus.py from collections import defaultdict import asyncio class EventBus: def __init__(self): self.listeners = defaultdict(list) # topic -> List[Queue] def register(self, topic: str, queue: asyncio.Queue): self.listeners[topic].append(queue) def unregister(self, topic: str, queue: asyncio.Queue): if queue in self.listeners[topic]: self.listeners[topic].remove(queue) async def publish(self, topic: str, message: str): for queue in self.listeners[topic]: await queue.put(message) event_bus = EventBus()
# main.py from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from event_bus import event_bus import asyncio app = FastAPI() @app.get("/sse/{topic}") async def sse_topic(request: Request, topic: str): queue = asyncio.Queue() event_bus.register(topic, queue) async def event_stream(): try: while True: if await request.is_disconnected(): break try: message = await asyncio.wait_for(queue.get(), timeout=10) yield f"data: {message}\n\n" except asyncio.TimeoutError: yield ": keep-alive\n\n" finally: event_bus.unregister(topic, queue) return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.post("/notify/{topic}") async def notify(topic: str, msg: str): await event_bus.publish(topic, msg) return {"status": "sent"}
n
EventSource
--workers 1
--reload=False
如需支持:
Manager().Queue()
如果你能补充更具体场景(例如:任务之间、模块之间、服务之间、前后端之间),我可以进一步提供适配模型(如:模块间事件发布、实时日志流式传输等)。是否需要我提供日志流 + SSE + 虚拟滚动分页的完整例子?