[Producer] --->
              \
[Producer] ---> \                     ┌────────────┐
                  --> [UDS Server] -->│  asyncio   │
[Producer] ---> /                     │   Queue    │--> 处理 / 落盘 / SSE
              /                       └────────────┘
[Producer] --->

好的,下面是一套完整的 多进程异步通信系统,使用:

📦 项目结构

uds_demo/
├── broker.py       # 通信中心(主服务)
├── producer.py     # 子进程客户端(多个)
├── run_all.py      # 启动所有进程(示例)

🔧 1. broker.py:通信中心 + 异步队列转发

# broker.py
import asyncio
import os
import uuid

SOCKET_PATH = "/tmp/uds_demo.sock"
clients = {}

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    client_id = str(uuid.uuid4())
    queue = asyncio.Queue()
    clients[client_id] = {"writer": writer, "queue": queue}

    print(f"[+] Client connected: {client_id}")
    asyncio.create_task(send_messages(client_id))

    try:
        while data := await reader.readline():
            message = data.decode().strip()
            print(f"[{client_id}] Received: {message}")
            # 例如:广播到其他客户端
            for cid, client in clients.items():
                if cid != client_id:
                    await client["queue"].put(f"[{client_id}] {message}")
    except Exception as e:
        print(f"[!] {client_id} error:", e)
    finally:
        print(f"[-] Client disconnected: {client_id}")
        clients.pop(client_id, None)
        writer.close()

async def send_messages(client_id):
    client = clients[client_id]
    writer = client["writer"]
    queue = client["queue"]
    try:
        while True:
            msg = await queue.get()
            writer.write(f"{msg}\n".encode())
            await writer.drain()
    except Exception as e:
        print(f"[!] Send failed to {client_id}:", e)

async def main():
    if os.path.exists(SOCKET_PATH):
        os.remove(SOCKET_PATH)
    server = await asyncio.start_unix_server(handle_client, path=SOCKET_PATH)
    print(f"🚀 Broker started at {SOCKET_PATH}")
    async with server:
        await server.serve_forever()

if __name__ == "__main__":
    asyncio.run(main())

🔧 2. producer.py:客户端(可多实例)

# producer.py
import asyncio
import sys
import random

SOCKET_PATH = "/tmp/uds_demo.sock"

async def producer(name: str):
    reader, writer = await asyncio.open_unix_connection(SOCKET_PATH)
    print(f"[{name}] Connected to broker")

    # 启动监听回显
    async def read_messages():
        try:
            while True:
                data = await reader.readline()
                if not data:
                    break
                print(f"[{name}] Got message: {data.decode().strip()}")
        except:
            pass

    asyncio.create_task(read_messages())

    # 定时发送消息
    for i in range(5):
        msg = f"msg-{i} from {name}"
        writer.write(f"{msg}\n".encode())
        await writer.drain()
        await asyncio.sleep(random.uniform(0.5, 2.0))

    print(f"[{name}] Done sending")
    writer.close()

if __name__ == "__main__":
    name = sys.argv[1] if len(sys.argv) > 1 else "client"
    asyncio.run(producer(name))

🧪 3. run_all.py:同时启动 broker + 多个生产者(跨进程)

# run_all.py
import subprocess
import time
import multiprocessing

def run_broker():
    subprocess.run(["python3", "broker.py"])

def run_producer(name):
    subprocess.run(["python3", "producer.py", name])

if __name__ == "__main__":
    broker_proc = multiprocessing.Process(target=run_broker)
    broker_proc.start()
    time.sleep(1)  # 确保 broker 启动完成

    producer_procs = []
    for i in range(3):
        p = multiprocessing.Process(target=run_producer, args=(f"client{i}",))
        p.start()
        producer_procs.append(p)

    for p in producer_procs:
        p.join()
    
    broker_proc.terminate()

✅ 运行方式

cd uds_demo
python3 run_all.py

🚀 效果

✅ 适配建议(给你项目)

你可以将:

# brave/workflow_event_system.py

import asyncio
import time
import json
import socket
import os
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Callable, Coroutine, Union, Literal
from fastapi import Request
from starlette.responses import StreamingResponse


# Type alias
Callback = Union[Callable[[dict], None], Callable[[dict], Coroutine]]


# ================================
# PubSubManager
# ================================
class PubSubManager:
    def __init__(self):
        self.subscribers: dict[str, set[Callback]] = defaultdict(set)

    def subscribe(self, topic: str, callback: Callback):
        self.subscribers[topic].add(callback)

    def unsubscribe(self, topic: str, callback: Callback):
        self.subscribers[topic].discard(callback)

    async def publish(self, topic: str, message: dict):
        for cb in list(self.subscribers.get(topic, [])):
            try:
                if asyncio.iscoroutinefunction(cb):
                    await cb(message)
                else:
                    cb(message)
            except Exception as e:
                print(f"[PubSub ERROR] {e}")


# ================================
# WorkflowQueueManager
# ================================
@dataclass
class WorkflowQueue:
    queue: asyncio.Queue
    task: asyncio.Task
    last_active: float = field(default_factory=time.time)
    subscribers: int = 0

class WorkflowQueueManager:
    def __init__(self, pubsub: PubSubManager):
        self.workflow_map: dict[str, WorkflowQueue] = {}
        self.workflow_status_map: dict[str, Literal["running", "completed", "failed", "cancelled"]] = {}
        self.pubsub = pubsub

    def register(self, workflow_id: str):
        if workflow_id not in self.workflow_map:
            queue = asyncio.Queue()
            task = asyncio.create_task(self._consume_loop(workflow_id, queue))
            self.workflow_map[workflow_id] = WorkflowQueue(queue=queue, task=task)

    def update_status(self, workflow_id: str, status: Literal["running", "completed", "failed", "cancelled"]):
        self.workflow_status_map[workflow_id] = status

    async def put(self, workflow_id: str, msg: dict):
        self.register(workflow_id)
        wfq = self.workflow_map[workflow_id]
        wfq.last_active = time.time()
        await wfq.queue.put(msg)

    def get(self, workflow_id: str) -> WorkflowQueue:
        return self.workflow_map[workflow_id]

    async def _consume_loop(self, workflow_id: str, queue: asyncio.Queue):
        while True:
            msg = await queue.get()
            try:
                self.workflow_map[workflow_id].last_active = time.time()
                await self.pubsub.publish(workflow_id, msg)
            except Exception as e:
                print(f"[Consumer ERROR] workflow {workflow_id}: {e}")

    async def cleanup(self, timeout: int = 300):
        now = time.time()
        to_delete = []
        for wf_id, wfq in self.workflow_map.items():
            status = self.workflow_status_map.get(wf_id)
            if (
                wfq.subscribers == 0 and
                wfq.queue.empty() and
                (now - wfq.last_active > timeout) and
                status in ["completed", "failed", "cancelled"]
            ):
                wfq.task.cancel()
                to_delete.append(wf_id)

        for wf_id in to_delete:
            del self.workflow_map[wf_id]
            self.workflow_status_map.pop(wf_id, None)
            print(f"[Cleanup] Removed workflow {wf_id}")


# ================================
# SSEManager
# ================================
class SSEManager:
    def __init__(self, wq_manager: WorkflowQueueManager):
        self.wq_manager = wq_manager

    def create_endpoint(self):
        async def sse_endpoint(workflow_id: str, request: Request):
            self.wq_manager.register(workflow_id)
            wfq = self.wq_manager.get(workflow_id)
            wfq.subscribers += 1
            wfq.last_active = time.time()

            async def event_stream():
                try:
                    while True:
                        if await request.is_disconnected():
                            break
                        msg = await wfq.queue.get()
                        yield f"data: {json.dumps(msg)}\n\n"
                finally:
                    wfq.subscribers -= 1
                    wfq.last_active = time.time()

            return StreamingResponse(event_stream(), media_type="text/event-stream")

        return sse_endpoint


# ================================
# UDSListener using asyncio.start_unix_server
# ================================
class UDSListener:
    def __init__(self, uds_path: str, wq_manager: WorkflowQueueManager):
        self.uds_path = uds_path
        self.wq_manager = wq_manager

    async def start(self):
        if os.path.exists(self.uds_path):
            os.remove(self.uds_path)

        server = await asyncio.start_unix_server(self.handle_client, path=self.uds_path)
        print(f"[UDS] Listening at {self.uds_path}")
        async with server:
            await server.serve_forever()

    async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        try:
            while line := await reader.readline():
                try:
                    msg = json.loads(line.decode().strip())
                    workflow_id = msg.get("workflow_id")
                    if workflow_id:
                        await self.wq_manager.put(workflow_id, msg)
                except Exception as e:
                    print(f"[UDS] Message error: {e}")
        except Exception as e:
            print(f"[UDS] Client error: {e}")
        finally:
            writer.close()
            await writer.wait_closed()

pubsub = PubSubManager()
wq_manager = WorkflowQueueManager(pubsub)
uds_listener = UDSListener("/tmp/brave.sock", wq_manager)

@app.get("/events/{workflow_id}")
async def sse_events(workflow_id: str, request: Request):
    return await sse_manager.create_endpoint()(workflow_id, request)

async def cleanup_worker():
    while True:
        await asyncio.sleep(60)
        await wq_manager.cleanup(timeout=300)

@app.on_event("startup")
async def startup_event():
    asyncio.create_task(uds_listener.start())
    asyncio.create_task(cleanup_worker())