一个简洁的事件驱动状态机作为“状态中心”,结合订阅系统,解析消息并根据状态触发流程变化。
workflow_id
当前状态。from typing import Dict, Callable, Awaitable, Optional
import asyncio
class StateMachine:
def __init__(self):
self._states: Dict[str, str] = {} # workflow_id -> current_state
self._transitions: Dict[str, Dict[str, str]] = {} # state -> event -> next_state
self._callbacks: Dict[str, Callable[[str, str, dict], Awaitable]] = {} # state -> async callback
def add_transition(self, from_state: str, event: str, to_state: str):
self._transitions.setdefault(from_state, {})[event] = to_state
def on_enter(self, state: str, callback: Callable[[str, str, dict], Awaitable]):
self._callbacks[state] = callback
async def handle_event(self, workflow_id: str, event: str, payload: dict):
current_state = self._states.get(workflow_id, "initial")
next_state = self._transitions.get(current_state, {}).get(event)
if next_state:
self._states[workflow_id] = next_state
cb = self._callbacks.get(next_state)
if cb:
await cb(workflow_id, next_state, payload)
else:
print(f"No transition from {current_state} on event {event}")
# ---------------------------------------
# 事件驱动的订阅示例
class WorkflowEventCenter:
def __init__(self, pubsub):
self.pubsub = pubsub
self.state_machine = StateMachine()
# 定义状态机转换
self.state_machine.add_transition("initial", "flow_begin", "running")
self.state_machine.add_transition("running", "process_complete", "completed")
self.state_machine.add_transition("running", "process_error", "error")
# 订阅状态进入事件,做异步处理
self.state_machine.on_enter("running", self.on_running)
self.state_machine.on_enter("completed", self.on_completed)
self.state_machine.on_enter("error", self.on_error)
# 订阅 pubsub 事件,所有事件进入状态机处理
self.pubsub.subscribe("workflow_events", self.handle_pubsub_event)
async def handle_pubsub_event(self, msg: dict):
workflow_id = msg.get("workflow_id", "unknown")
event = msg.get("event")
if event:
await self.state_machine.handle_event(workflow_id, event, msg)
async def on_running(self, workflow_id: str, state: str, payload: dict):
print(f"[State] Workflow {workflow_id} started running")
# 这里触发一些业务逻辑
async def on_completed(self, workflow_id: str, state: str, payload: dict):
print(f"[State] Workflow {workflow_id} completed")
# 这里持久化或通知等
async def on_error(self, workflow_id: str, state: str, payload: dict):
print(f"[State] Workflow {workflow_id} error")
# 处理错误,触发报警等
pubsub = PubSubManager()
center = WorkflowEventCenter(pubsub)
# 发布事件模拟
asyncio.run(pubsub.publish("workflow_events", {"workflow_id": "wf1", "event": "flow_begin"}))
asyncio.run(pubsub.publish("workflow_events", {"workflow_id": "wf1", "event": "process_complete"}))
StateMachine
:管理状态与事件驱动的转移,支持回调异步处理。WorkflowEventCenter
:订阅 pubsub
事件,驱动状态机,做状态相关逻辑。