一个简洁的事件驱动状态机作为“状态中心”,结合订阅系统,解析消息并根据状态触发流程变化。
workflow_id
from typing import Dict, Callable, Awaitable, Optional import asyncio class StateMachine: def __init__(self): self._states: Dict[str, str] = {} # workflow_id -> current_state self._transitions: Dict[str, Dict[str, str]] = {} # state -> event -> next_state self._callbacks: Dict[str, Callable[[str, str, dict], Awaitable]] = {} # state -> async callback def add_transition(self, from_state: str, event: str, to_state: str): self._transitions.setdefault(from_state, {})[event] = to_state def on_enter(self, state: str, callback: Callable[[str, str, dict], Awaitable]): self._callbacks[state] = callback async def handle_event(self, workflow_id: str, event: str, payload: dict): current_state = self._states.get(workflow_id, "initial") next_state = self._transitions.get(current_state, {}).get(event) if next_state: self._states[workflow_id] = next_state cb = self._callbacks.get(next_state) if cb: await cb(workflow_id, next_state, payload) else: print(f"No transition from {current_state} on event {event}") # --------------------------------------- # 事件驱动的订阅示例 class WorkflowEventCenter: def __init__(self, pubsub): self.pubsub = pubsub self.state_machine = StateMachine() # 定义状态机转换 self.state_machine.add_transition("initial", "flow_begin", "running") self.state_machine.add_transition("running", "process_complete", "completed") self.state_machine.add_transition("running", "process_error", "error") # 订阅状态进入事件,做异步处理 self.state_machine.on_enter("running", self.on_running) self.state_machine.on_enter("completed", self.on_completed) self.state_machine.on_enter("error", self.on_error) # 订阅 pubsub 事件,所有事件进入状态机处理 self.pubsub.subscribe("workflow_events", self.handle_pubsub_event) async def handle_pubsub_event(self, msg: dict): workflow_id = msg.get("workflow_id", "unknown") event = msg.get("event") if event: await self.state_machine.handle_event(workflow_id, event, msg) async def on_running(self, workflow_id: str, state: str, payload: dict): print(f"[State] Workflow {workflow_id} started running") # 这里触发一些业务逻辑 async def on_completed(self, workflow_id: str, state: str, payload: dict): print(f"[State] Workflow {workflow_id} completed") # 这里持久化或通知等 async def on_error(self, workflow_id: str, state: str, payload: dict): print(f"[State] Workflow {workflow_id} error") # 处理错误,触发报警等
pubsub = PubSubManager() center = WorkflowEventCenter(pubsub) # 发布事件模拟 asyncio.run(pubsub.publish("workflow_events", {"workflow_id": "wf1", "event": "flow_begin"})) asyncio.run(pubsub.publish("workflow_events", {"workflow_id": "wf1", "event": "process_complete"}))
StateMachine
WorkflowEventCenter
pubsub