你这个问题已经进入 **“事件系统语义设计”**层面了,而不是简单并发问题了 👍
你问的是:
为了确保订阅者数据完整性,Publish 时是否不应该丢数据?
答案是:
❗ 取决于你想实现的语义
不是“应不应该”,而是“你要什么保证”
我们拆开讲清楚。
for _, sub := range b.subs {
select {
case sub.Ch <- event:
default:
// 丢弃
}
}
这代表:
at-most-once delivery
best-effort
允许丢数据
慢消费者自动丢弃
这是一种合法的设计。
那你要的是:
at-least-once
或 exactly-once
那 Publish 不能丢数据。
但代价是什么?
如果你改成:
sub.Ch <- event
那语义变成:
强一致广播
所有订阅者必须接收
但问题来了:
Publish 阻塞
→ 整个系统背压
→ 所有生产者停住
这在高并发系统里是灾难。
生产系统通常是:
不要往 subscriber channel 推。
改成:
Publish 只写 RingBuffer
Subscriber 自己拉
也就是:
类似 Kafka 模型
因为:
1️⃣ 所有数据只写入 Log(RingBuffer)
2️⃣ Consumer 自己维护 offset
3️⃣ Publish 不关心 consumer
Publish 永远不阻塞。
现在你的模型是:
Push 模型
问题是:
Channel 满 = 数据丢失
Replay 只能补订阅前的历史
但订阅之后:
实时数据丢失
Replay 无法补救。
推荐架构:
func (b *EventBus) Publish(content string) {
b.ring.Add(content)
}
type Subscriber struct {
offset int64
...
}
订阅后:
go func() {
for {
events := b.ring.Replay(sub.offset)
for _, e := range events {
sub.Ch <- e
sub.offset = e.ID + 1
}
time.Sleep(10 * time.Millisecond)
}
}()
这才是:
真正的生产级流式架构
你的模型适合:
LLM streaming
日志广播
UI 实时刷新
弱一致通知
不适合:
金融
交易
任务状态可靠投递
审计日志
| 模型 | 是否丢数据 | 是否阻塞 | 适合场景 |
|---|---|---|---|
| 你当前(select default) | 会丢 | 不阻塞 | 实时 UI |
| 阻塞写 channel | 不丢 | 会阻塞 | 小规模 |
| RingBuffer + offset 拉 | 不丢 | 不阻塞 | 生产级 |
| Kafka 模型 | 不丢 | 不阻塞 | 高可靠 |
答案:
❌ Publish 不能负责完整性
✅ RingBuffer 负责完整性
✅ Subscriber 负责消费进度
Push 模型保证实时性
Pull + Offset 模型保证可靠性
如果你愿意,我可以帮你设计一个:
这会是一个真正工业级的 Event Streaming 内核 🚀