[Producer] ---> \ [Producer] ---> \ ┌────────────┐ --> [UDS Server] -->│ asyncio │ [Producer] ---> / │ Queue │--> 处理 / 落盘 / SSE / └────────────┘ [Producer] --->
好的,下面是一套完整的 多进程异步通信系统,使用:
Unix Domain Socket
asyncio.Queue
asyncio.start_unix_server
uds_demo/ ├── broker.py # 通信中心(主服务) ├── producer.py # 子进程客户端(多个) ├── run_all.py # 启动所有进程(示例)
broker.py
# broker.py import asyncio import os import uuid SOCKET_PATH = "/tmp/uds_demo.sock" clients = {} async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): client_id = str(uuid.uuid4()) queue = asyncio.Queue() clients[client_id] = {"writer": writer, "queue": queue} print(f"[+] Client connected: {client_id}") asyncio.create_task(send_messages(client_id)) try: while data := await reader.readline(): message = data.decode().strip() print(f"[{client_id}] Received: {message}") # 例如:广播到其他客户端 for cid, client in clients.items(): if cid != client_id: await client["queue"].put(f"[{client_id}] {message}") except Exception as e: print(f"[!] {client_id} error:", e) finally: print(f"[-] Client disconnected: {client_id}") clients.pop(client_id, None) writer.close() async def send_messages(client_id): client = clients[client_id] writer = client["writer"] queue = client["queue"] try: while True: msg = await queue.get() writer.write(f"{msg}\n".encode()) await writer.drain() except Exception as e: print(f"[!] Send failed to {client_id}:", e) async def main(): if os.path.exists(SOCKET_PATH): os.remove(SOCKET_PATH) server = await asyncio.start_unix_server(handle_client, path=SOCKET_PATH) print(f"🚀 Broker started at {SOCKET_PATH}") async with server: await server.serve_forever() if __name__ == "__main__": asyncio.run(main())
producer.py
# producer.py import asyncio import sys import random SOCKET_PATH = "/tmp/uds_demo.sock" async def producer(name: str): reader, writer = await asyncio.open_unix_connection(SOCKET_PATH) print(f"[{name}] Connected to broker") # 启动监听回显 async def read_messages(): try: while True: data = await reader.readline() if not data: break print(f"[{name}] Got message: {data.decode().strip()}") except: pass asyncio.create_task(read_messages()) # 定时发送消息 for i in range(5): msg = f"msg-{i} from {name}" writer.write(f"{msg}\n".encode()) await writer.drain() await asyncio.sleep(random.uniform(0.5, 2.0)) print(f"[{name}] Done sending") writer.close() if __name__ == "__main__": name = sys.argv[1] if len(sys.argv) > 1 else "client" asyncio.run(producer(name))
run_all.py
# run_all.py import subprocess import time import multiprocessing def run_broker(): subprocess.run(["python3", "broker.py"]) def run_producer(name): subprocess.run(["python3", "producer.py", name]) if __name__ == "__main__": broker_proc = multiprocessing.Process(target=run_broker) broker_proc.start() time.sleep(1) # 确保 broker 启动完成 producer_procs = [] for i in range(3): p = multiprocessing.Process(target=run_producer, args=(f"client{i}",)) p.start() producer_procs.append(p) for p in producer_procs: p.join() broker_proc.terminate()
cd uds_demo python3 run_all.py
你可以将:
# brave/workflow_event_system.py import asyncio import time import json import socket import os from collections import defaultdict from dataclasses import dataclass, field from typing import Callable, Coroutine, Union, Literal from fastapi import Request from starlette.responses import StreamingResponse # Type alias Callback = Union[Callable[[dict], None], Callable[[dict], Coroutine]] # ================================ # PubSubManager # ================================ class PubSubManager: def __init__(self): self.subscribers: dict[str, set[Callback]] = defaultdict(set) def subscribe(self, topic: str, callback: Callback): self.subscribers[topic].add(callback) def unsubscribe(self, topic: str, callback: Callback): self.subscribers[topic].discard(callback) async def publish(self, topic: str, message: dict): for cb in list(self.subscribers.get(topic, [])): try: if asyncio.iscoroutinefunction(cb): await cb(message) else: cb(message) except Exception as e: print(f"[PubSub ERROR] {e}") # ================================ # WorkflowQueueManager # ================================ @dataclass class WorkflowQueue: queue: asyncio.Queue task: asyncio.Task last_active: float = field(default_factory=time.time) subscribers: int = 0 class WorkflowQueueManager: def __init__(self, pubsub: PubSubManager): self.workflow_map: dict[str, WorkflowQueue] = {} self.workflow_status_map: dict[str, Literal["running", "completed", "failed", "cancelled"]] = {} self.pubsub = pubsub def register(self, workflow_id: str): if workflow_id not in self.workflow_map: queue = asyncio.Queue() task = asyncio.create_task(self._consume_loop(workflow_id, queue)) self.workflow_map[workflow_id] = WorkflowQueue(queue=queue, task=task) def update_status(self, workflow_id: str, status: Literal["running", "completed", "failed", "cancelled"]): self.workflow_status_map[workflow_id] = status async def put(self, workflow_id: str, msg: dict): self.register(workflow_id) wfq = self.workflow_map[workflow_id] wfq.last_active = time.time() await wfq.queue.put(msg) def get(self, workflow_id: str) -> WorkflowQueue: return self.workflow_map[workflow_id] async def _consume_loop(self, workflow_id: str, queue: asyncio.Queue): while True: msg = await queue.get() try: self.workflow_map[workflow_id].last_active = time.time() await self.pubsub.publish(workflow_id, msg) except Exception as e: print(f"[Consumer ERROR] workflow {workflow_id}: {e}") async def cleanup(self, timeout: int = 300): now = time.time() to_delete = [] for wf_id, wfq in self.workflow_map.items(): status = self.workflow_status_map.get(wf_id) if ( wfq.subscribers == 0 and wfq.queue.empty() and (now - wfq.last_active > timeout) and status in ["completed", "failed", "cancelled"] ): wfq.task.cancel() to_delete.append(wf_id) for wf_id in to_delete: del self.workflow_map[wf_id] self.workflow_status_map.pop(wf_id, None) print(f"[Cleanup] Removed workflow {wf_id}") # ================================ # SSEManager # ================================ class SSEManager: def __init__(self, wq_manager: WorkflowQueueManager): self.wq_manager = wq_manager def create_endpoint(self): async def sse_endpoint(workflow_id: str, request: Request): self.wq_manager.register(workflow_id) wfq = self.wq_manager.get(workflow_id) wfq.subscribers += 1 wfq.last_active = time.time() async def event_stream(): try: while True: if await request.is_disconnected(): break msg = await wfq.queue.get() yield f"data: {json.dumps(msg)}\n\n" finally: wfq.subscribers -= 1 wfq.last_active = time.time() return StreamingResponse(event_stream(), media_type="text/event-stream") return sse_endpoint # ================================ # UDSListener using asyncio.start_unix_server # ================================ class UDSListener: def __init__(self, uds_path: str, wq_manager: WorkflowQueueManager): self.uds_path = uds_path self.wq_manager = wq_manager async def start(self): if os.path.exists(self.uds_path): os.remove(self.uds_path) server = await asyncio.start_unix_server(self.handle_client, path=self.uds_path) print(f"[UDS] Listening at {self.uds_path}") async with server: await server.serve_forever() async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): try: while line := await reader.readline(): try: msg = json.loads(line.decode().strip()) workflow_id = msg.get("workflow_id") if workflow_id: await self.wq_manager.put(workflow_id, msg) except Exception as e: print(f"[UDS] Message error: {e}") except Exception as e: print(f"[UDS] Client error: {e}") finally: writer.close() await writer.wait_closed()