事件驱动状态机作为“状态中心”,结合订阅系统,解析消息并根据状态触发流程变化

最后发布时间:2025-07-14 11:30:11 浏览量:

一个简洁的事件驱动状态机作为“状态中心”,结合订阅系统,解析消息并根据状态触发流程变化。

设计思路

  • 状态中心 StateMachine 管理每个 workflow_id 当前状态。
  • 状态根据事件变更,驱动业务逻辑(如下一步动作、通知等)。
  • 事件通过订阅系统传入,StateMachine 订阅相关事件。
  • 其他模块订阅状态变化,进行持久化、通知等操作。

核心代码示例

from typing import Dict, Callable, Awaitable, Optional
import asyncio

class StateMachine:
    def __init__(self):
        self._states: Dict[str, str] = {}  # workflow_id -> current_state
        self._transitions: Dict[str, Dict[str, str]] = {}  # state -> event -> next_state
        self._callbacks: Dict[str, Callable[[str, str, dict], Awaitable]] = {}  # state -> async callback

    def add_transition(self, from_state: str, event: str, to_state: str):
        self._transitions.setdefault(from_state, {})[event] = to_state

    def on_enter(self, state: str, callback: Callable[[str, str, dict], Awaitable]):
        self._callbacks[state] = callback

    async def handle_event(self, workflow_id: str, event: str, payload: dict):
        current_state = self._states.get(workflow_id, "initial")
        next_state = self._transitions.get(current_state, {}).get(event)

        if next_state:
            self._states[workflow_id] = next_state
            cb = self._callbacks.get(next_state)
            if cb:
                await cb(workflow_id, next_state, payload)
        else:
            print(f"No transition from {current_state} on event {event}")

# ---------------------------------------
# 事件驱动的订阅示例

class WorkflowEventCenter:
    def __init__(self, pubsub):
        self.pubsub = pubsub
        self.state_machine = StateMachine()

        # 定义状态机转换
        self.state_machine.add_transition("initial", "flow_begin", "running")
        self.state_machine.add_transition("running", "process_complete", "completed")
        self.state_machine.add_transition("running", "process_error", "error")

        # 订阅状态进入事件,做异步处理
        self.state_machine.on_enter("running", self.on_running)
        self.state_machine.on_enter("completed", self.on_completed)
        self.state_machine.on_enter("error", self.on_error)

        # 订阅 pubsub 事件,所有事件进入状态机处理
        self.pubsub.subscribe("workflow_events", self.handle_pubsub_event)

    async def handle_pubsub_event(self, msg: dict):
        workflow_id = msg.get("workflow_id", "unknown")
        event = msg.get("event")
        if event:
            await self.state_machine.handle_event(workflow_id, event, msg)

    async def on_running(self, workflow_id: str, state: str, payload: dict):
        print(f"[State] Workflow {workflow_id} started running")
        # 这里触发一些业务逻辑

    async def on_completed(self, workflow_id: str, state: str, payload: dict):
        print(f"[State] Workflow {workflow_id} completed")
        # 这里持久化或通知等

    async def on_error(self, workflow_id: str, state: str, payload: dict):
        print(f"[State] Workflow {workflow_id} error")
        # 处理错误,触发报警等

用法示例

pubsub = PubSubManager()
center = WorkflowEventCenter(pubsub)

# 发布事件模拟
asyncio.run(pubsub.publish("workflow_events", {"workflow_id": "wf1", "event": "flow_begin"}))
asyncio.run(pubsub.publish("workflow_events", {"workflow_id": "wf1", "event": "process_complete"}))

说明

  • StateMachine:管理状态与事件驱动的转移,支持回调异步处理。
  • WorkflowEventCenter:订阅 pubsub 事件,驱动状态机,做状态相关逻辑。
  • 通过事件驱动方式解析消息,推动流程状态变化。
  • 订阅系统与状态机解耦,方便扩展和测试。
快捷入口
Python 思维导图 浏览PDF 下载PDF
分享到:
标签