实现异步解耦通信(如:任务状态推送、日志流、模块间事件传递)而不依赖第三方中间件(如 Redis、Kafka、MQTT),通常依赖的是语言自身提供的并发机制,如 Python 的 asyncio
、队列、协程 + 多线程/多进程 + 内部事件总线机制。
通信方式 | 是否跨进程 | 是否异步 | 是否解耦 | 特点 |
---|---|---|---|---|
multiprocessing.Queue/Pipe | ✅ | ❌(同步) | ✅ | 简单易用,仅适用于父子进程 |
socket (TCP/UNIX) | ✅ | ✅ | ✅ | 可扩展、可网络通信 |
asyncio + UNIX socket | ✅ | ✅ | ✅ | 内部事件驱动,轻量无依赖 |
SQLite 轮询 + 文件锁 | ✅ | ✅(模拟) | ✅ | 低频事件、状态同步适合 |
FIFO 命名管道(mkfifo) | ✅ | ✅ | ✅ | Shell 友好,支持管道通信 |
watchdog + 文件触发器 | ✅ | ❌(可补偿) | ✅ | 文件变更感知适合低频通知 |
ZeroMQ(pyzmq) | ✅ | ✅ | ✅ | 很强大,但轻度第三方依赖 |
你可以说明你的具体需求,但这里是几个常见用例:
场景 | 举例 | 推荐机制 |
---|---|---|
前端实时获取任务进度或日志 | 运行 Nextflow / 后台脚本 | SSE + asyncio.Queue |
后台任务间解耦传递信息 | 模块 A 通知模块 B | 内部事件总线 + 队列 |
多个子进程通信 | FastAPI 控制多个子任务 | multiprocessing.Queue / asyncio 协调 |
同主机多服务通信 | 模块独立部署运行 | Unix socket / HTTP 本地请求 |
单进程内广播机制 | 发布订阅 | 自定义事件中心 EventBus |
asyncio.Queue
的事件驱动通信(FastAPI)# event_bus.py
from collections import defaultdict
import asyncio
class EventBus:
def __init__(self):
self.listeners = defaultdict(list) # topic -> List[Queue]
def register(self, topic: str, queue: asyncio.Queue):
self.listeners[topic].append(queue)
def unregister(self, topic: str, queue: asyncio.Queue):
if queue in self.listeners[topic]:
self.listeners[topic].remove(queue)
async def publish(self, topic: str, message: str):
for queue in self.listeners[topic]:
await queue.put(message)
event_bus = EventBus()
# main.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from event_bus import event_bus
import asyncio
app = FastAPI()
@app.get("/sse/{topic}")
async def sse_topic(request: Request, topic: str):
queue = asyncio.Queue()
event_bus.register(topic, queue)
async def event_stream():
try:
while True:
if await request.is_disconnected():
break
try:
message = await asyncio.wait_for(queue.get(), timeout=10)
yield f"data: {message}\n\n"
except asyncio.TimeoutError:
yield ": keep-alive\n\n"
finally:
event_bus.unregister(topic, queue)
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.post("/notify/{topic}")
async def notify(topic: str, msg: str):
await event_bus.publish(topic, msg)
return {"status": "sent"}
n
个客户端监听同一个事件)EventSource
获取日志流或状态更新--workers 1
或用 --reload=False
否则队列在多进程间不能共享如需支持:
目标 | 技术 |
---|---|
跨进程共享 | multiprocessing.Queue 、Manager().Queue() |
跨服务通信 | HTTP + SSE / Unix Socket / ZeroMQ |
长时间可靠传递 | 使用 SQLite 持久化队列 |
前端虚拟滚动 + 日志分页 | 持久化日志 + 按 offset 拉取历史日志 |
限流/广播队列 | 加入 ring-buffer + max size 队列 |
如果你能补充更具体场景(例如:任务之间、模块之间、服务之间、前后端之间),我可以进一步提供适配模型(如:模块间事件发布、实时日志流式传输等)。是否需要我提供日志流 + SSE + 虚拟滚动分页的完整例子?