多进程异步通信系统 - 为每个客户端连接创建独立缓冲队列

最后发布时间:2025-07-13 23:10:24 浏览量:
 [Producer] --->
               \
 [Producer] ---> \                     ┌────────────┐
                   --> [UDS Server] -->│  asyncio   │
 [Producer] ---> /                     │   Queue    │--> 处理 / 落盘 / SSE
               /                       └────────────┘
 [Producer] --->

好的,下面是一套完整的 多进程异步通信系统,使用:

  • Unix Domain Socket(跨进程安全快速)
  • asyncio.Queue(异步缓冲,避免阻塞)
  • asyncio.start_unix_server 支持多个客户端并发
  • ✅ 子进程作为生产者,主控服务作为事件中心

📦 项目结构

uds_demo/
├── broker.py       # 通信中心(主服务)
├── producer.py     # 子进程客户端(多个)
├── run_all.py      # 启动所有进程(示例)

🔧 1. broker.py:通信中心 + 异步队列转发

# broker.py
import asyncio
import os
import uuid

SOCKET_PATH = "/tmp/uds_demo.sock"
clients = {}

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    client_id = str(uuid.uuid4())
    queue = asyncio.Queue()
    clients[client_id] = {"writer": writer, "queue": queue}

    print(f"[+] Client connected: {client_id}")
    asyncio.create_task(send_messages(client_id))

    try:
        while data := await reader.readline():
            message = data.decode().strip()
            print(f"[{client_id}] Received: {message}")
            # 例如:广播到其他客户端
            for cid, client in clients.items():
                if cid != client_id:
                    await client["queue"].put(f"[{client_id}] {message}")
    except Exception as e:
        print(f"[!] {client_id} error:", e)
    finally:
        print(f"[-] Client disconnected: {client_id}")
        clients.pop(client_id, None)
        writer.close()

async def send_messages(client_id):
    client = clients[client_id]
    writer = client["writer"]
    queue = client["queue"]
    try:
        while True:
            msg = await queue.get()
            writer.write(f"{msg}\n".encode())
            await writer.drain()
    except Exception as e:
        print(f"[!] Send failed to {client_id}:", e)

async def main():
    if os.path.exists(SOCKET_PATH):
        os.remove(SOCKET_PATH)
    server = await asyncio.start_unix_server(handle_client, path=SOCKET_PATH)
    print(f"🚀 Broker started at {SOCKET_PATH}")
    async with server:
        await server.serve_forever()

if __name__ == "__main__":
    asyncio.run(main())

🔧 2. producer.py:客户端(可多实例)

# producer.py
import asyncio
import sys
import random

SOCKET_PATH = "/tmp/uds_demo.sock"

async def producer(name: str):
    reader, writer = await asyncio.open_unix_connection(SOCKET_PATH)
    print(f"[{name}] Connected to broker")

    # 启动监听回显
    async def read_messages():
        try:
            while True:
                data = await reader.readline()
                if not data:
                    break
                print(f"[{name}] Got message: {data.decode().strip()}")
        except:
            pass

    asyncio.create_task(read_messages())

    # 定时发送消息
    for i in range(5):
        msg = f"msg-{i} from {name}"
        writer.write(f"{msg}\n".encode())
        await writer.drain()
        await asyncio.sleep(random.uniform(0.5, 2.0))

    print(f"[{name}] Done sending")
    writer.close()

if __name__ == "__main__":
    name = sys.argv[1] if len(sys.argv) > 1 else "client"
    asyncio.run(producer(name))

🧪 3. run_all.py:同时启动 broker + 多个生产者(跨进程)

# run_all.py
import subprocess
import time
import multiprocessing

def run_broker():
    subprocess.run(["python3", "broker.py"])

def run_producer(name):
    subprocess.run(["python3", "producer.py", name])

if __name__ == "__main__":
    broker_proc = multiprocessing.Process(target=run_broker)
    broker_proc.start()
    time.sleep(1)  # 确保 broker 启动完成

    producer_procs = []
    for i in range(3):
        p = multiprocessing.Process(target=run_producer, args=(f"client{i}",))
        p.start()
        producer_procs.append(p)

    for p in producer_procs:
        p.join()
    
    broker_proc.terminate()

✅ 运行方式

cd uds_demo
python3 run_all.py

🚀 效果

  • broker 会打印每个客户端的连接、收到的消息,并将其广播给其他客户端
  • 每个 producer 会发送多条消息并监听来自其他 producer 的广播
  • 使用 asyncio.Queue 缓冲,避免 socket 写阻塞

✅ 适配建议(给你项目)

你可以将:

  • Nextflow 子流程 → 改为生产者脚本,将流程状态、日志发送给 broker
  • FastAPI 主控服务 → 包含 broker 功能,或消费 broker 的消息,转发给前端(通过 SSE 或 WebSocket)
# brave/workflow_event_system.py

import asyncio
import time
import json
import socket
import os
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Callable, Coroutine, Union, Literal
from fastapi import Request
from starlette.responses import StreamingResponse


# Type alias
Callback = Union[Callable[[dict], None], Callable[[dict], Coroutine]]


# ================================
# PubSubManager
# ================================
class PubSubManager:
    def __init__(self):
        self.subscribers: dict[str, set[Callback]] = defaultdict(set)

    def subscribe(self, topic: str, callback: Callback):
        self.subscribers[topic].add(callback)

    def unsubscribe(self, topic: str, callback: Callback):
        self.subscribers[topic].discard(callback)

    async def publish(self, topic: str, message: dict):
        for cb in list(self.subscribers.get(topic, [])):
            try:
                if asyncio.iscoroutinefunction(cb):
                    await cb(message)
                else:
                    cb(message)
            except Exception as e:
                print(f"[PubSub ERROR] {e}")


# ================================
# WorkflowQueueManager
# ================================
@dataclass
class WorkflowQueue:
    queue: asyncio.Queue
    task: asyncio.Task
    last_active: float = field(default_factory=time.time)
    subscribers: int = 0

class WorkflowQueueManager:
    def __init__(self, pubsub: PubSubManager):
        self.workflow_map: dict[str, WorkflowQueue] = {}
        self.workflow_status_map: dict[str, Literal["running", "completed", "failed", "cancelled"]] = {}
        self.pubsub = pubsub

    def register(self, workflow_id: str):
        if workflow_id not in self.workflow_map:
            queue = asyncio.Queue()
            task = asyncio.create_task(self._consume_loop(workflow_id, queue))
            self.workflow_map[workflow_id] = WorkflowQueue(queue=queue, task=task)

    def update_status(self, workflow_id: str, status: Literal["running", "completed", "failed", "cancelled"]):
        self.workflow_status_map[workflow_id] = status

    async def put(self, workflow_id: str, msg: dict):
        self.register(workflow_id)
        wfq = self.workflow_map[workflow_id]
        wfq.last_active = time.time()
        await wfq.queue.put(msg)

    def get(self, workflow_id: str) -> WorkflowQueue:
        return self.workflow_map[workflow_id]

    async def _consume_loop(self, workflow_id: str, queue: asyncio.Queue):
        while True:
            msg = await queue.get()
            try:
                self.workflow_map[workflow_id].last_active = time.time()
                await self.pubsub.publish(workflow_id, msg)
            except Exception as e:
                print(f"[Consumer ERROR] workflow {workflow_id}: {e}")

    async def cleanup(self, timeout: int = 300):
        now = time.time()
        to_delete = []
        for wf_id, wfq in self.workflow_map.items():
            status = self.workflow_status_map.get(wf_id)
            if (
                wfq.subscribers == 0 and
                wfq.queue.empty() and
                (now - wfq.last_active > timeout) and
                status in ["completed", "failed", "cancelled"]
            ):
                wfq.task.cancel()
                to_delete.append(wf_id)

        for wf_id in to_delete:
            del self.workflow_map[wf_id]
            self.workflow_status_map.pop(wf_id, None)
            print(f"[Cleanup] Removed workflow {wf_id}")


# ================================
# SSEManager
# ================================
class SSEManager:
    def __init__(self, wq_manager: WorkflowQueueManager):
        self.wq_manager = wq_manager

    def create_endpoint(self):
        async def sse_endpoint(workflow_id: str, request: Request):
            self.wq_manager.register(workflow_id)
            wfq = self.wq_manager.get(workflow_id)
            wfq.subscribers += 1
            wfq.last_active = time.time()

            async def event_stream():
                try:
                    while True:
                        if await request.is_disconnected():
                            break
                        msg = await wfq.queue.get()
                        yield f"data: {json.dumps(msg)}\n\n"
                finally:
                    wfq.subscribers -= 1
                    wfq.last_active = time.time()

            return StreamingResponse(event_stream(), media_type="text/event-stream")

        return sse_endpoint


# ================================
# UDSListener using asyncio.start_unix_server
# ================================
class UDSListener:
    def __init__(self, uds_path: str, wq_manager: WorkflowQueueManager):
        self.uds_path = uds_path
        self.wq_manager = wq_manager

    async def start(self):
        if os.path.exists(self.uds_path):
            os.remove(self.uds_path)

        server = await asyncio.start_unix_server(self.handle_client, path=self.uds_path)
        print(f"[UDS] Listening at {self.uds_path}")
        async with server:
            await server.serve_forever()

    async def handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        try:
            while line := await reader.readline():
                try:
                    msg = json.loads(line.decode().strip())
                    workflow_id = msg.get("workflow_id")
                    if workflow_id:
                        await self.wq_manager.put(workflow_id, msg)
                except Exception as e:
                    print(f"[UDS] Message error: {e}")
        except Exception as e:
            print(f"[UDS] Client error: {e}")
        finally:
            writer.close()
            await writer.wait_closed()