eventbus

最后发布时间:2026-02-12 15:39:28 浏览量:
package eventbus

import (
	"context"
	"log"
	"sync"

	"golang.org/x/sync/errgroup"
)

type EventType string

// const (
// 	EventLLMChunk EventType = "llm.chunk"
// 	EventLLMDone  EventType = "llm.done"
// 	EventTTSChunk EventType = "tts.chunk"
// 	EventError    EventType = "error"
// )

type RingBuffer struct {
	mu     sync.RWMutex
	size   int
	buffer []Event
	index  int64
}

// type Bus struct {
// 	mu     sync.RWMutex
// 	ring   *RingBuffer
// 	subs   map[EventType]map[chan Event]struct{}
// 	closed bool
// }

type Subscriber struct {
	// eventType EventType
	Ch     chan Event
	Ctx    context.Context
	cancel context.CancelFunc
	G      *errgroup.Group
	// Closed    chan struct{}
}

type Bus struct {
	mu      sync.RWMutex
	ring    *RingBuffer
	subs    map[EventType]map[*Subscriber]struct{} // 多播 / 广播 一个 EventType 可以有多个 Subscriber
	SubSize chan int                               // 用于标记没有订阅者的事件类型
	closed  bool
}

// type Bus struct {
// 	mu     sync.RWMutex
// 	ring   *RingBuffer
// 	subs   map[EventType]*Subscriber
// 	closed bool
// }

// type Bus struct {
// 	mu     sync.RWMutex
// 	ring   *RingBuffer
// 	subs   map[EventType][]*Subscriber
// 	closed bool
// }

type Event struct {
	Type EventType
	Data any
}

func NewRingBuffer(size int) *RingBuffer {
	return &RingBuffer{
		size:   size,
		buffer: make([]Event, size),
	}
}

func NewBus(bufferSize int) *Bus {
	return &Bus{
		subs:    make(map[EventType]map[*Subscriber]struct{}),
		ring:    NewRingBuffer(bufferSize),
		SubSize: make(chan int),
	}
}
func (r *RingBuffer) Replay(fromID int64) []Event {
	r.mu.RLock()
	defer r.mu.RUnlock()

	var result []Event

	for i := fromID; i < r.index; i++ {
		if i < r.index-int64(r.size) {
			continue
		}
		result = append(result, r.buffer[i%int64(r.size)])
	}

	return result
}

func (b *Bus) Subscribe(ctx context.Context, types ...EventType) (*Subscriber, func()) {
	// ch := make(chan Event, 16)
	ctx, cancel := context.WithCancel(ctx)
	g, ctx := errgroup.WithContext(ctx)
	sub := &Subscriber{
		// eventType: eventType,
		Ctx:    ctx,
		cancel: cancel,
		Ch:     make(chan Event, 16),
		G:      g,
		// Closed:    make(chan struct{}),
	}

	// if b.closed {
	// 	// close(ch)
	// 	b.mu.Unlock()
	// 	return nil, nil
	// }

	// if _, ok := b.subs[t]; !ok {
	// 	b.subs[t] = make(map[chan Event]struct{})
	// }
	// b.subs[t][ch] = struct{}{}
	b.mu.Lock()

	for _, t := range types {
		if b.subs[t] == nil {
			b.subs[t] = make(map[*Subscriber]struct{})
		}
		b.subs[t][sub] = struct{}{}
	}
	b.mu.Unlock()

	// ctx 结束时自动取消订阅
	// go func() {
	// 	select {
	// 	case <-ctx.Done():
	// 		b.unsubscribe(t, ch)
	// 	}
	// }()
	unsub := func() {
		b.mu.Lock()
		// if subs, ok := b.subs[t]; ok {
		// 	if _, ok := subs[ch]; ok {
		// 		delete(subs, ch)
		// 		close(ch)
		// 	}
		// }
		for _, t := range types {
			if subs, ok := b.subs[t]; ok {
				if _, ok := subs[sub]; ok {
					sub.cancel() // 取消订阅者的 context,触发自动取消逻辑
					// 死锁风险:取消逻辑中需要 sub.G.Wait() 等待 goroutine 结束,cancel本身就在等待 goroutine 结束,导致死锁
					// if err := sub.G.Wait(); err != nil {
					// 	log.Printf("Error waiting for subscriber goroutines to finish: %v", err)
					// }
					delete(subs, sub)
					if len(subs) == 0 {
						delete(b.subs, t)
						// close(b.ZeroSubscribers) // 关闭 zeroSubscribers 通知没有订阅者的事件类型
						// close(sub.Ch)
					}
					// log.Println("len(subs):", len(subs))
				}
			}
		}
		log.Printf(" %s unsubscribe ch!", types)
		if len(b.subs) == 0 {
			// log.Printf("No subscribers left, all events will be ignored.")
			close(b.SubSize) // 关闭 subSize 通知没有订阅者的事件类型
		} else {
			// log.Printf("Remaining subscribers: %d", len(b.subs))
			b.SubSize <- len(b.subs) // 发送信号通知没有订阅者的事件类型

		}

		b.mu.Unlock()
	}

	history := b.ring.Replay(0)
	if len(history) != 0 {
		log.Printf("%s Replay history events: %v", types, history)
	}
	go func() {
		for _, e := range history {
			sub.Ch <- e
		}
	}()

	return sub, unsub
}

func (b *Bus) Unsubscribe(t EventType, sub *Subscriber) {
	b.mu.Lock()
	defer b.mu.Unlock()

	if subs, ok := b.subs[t]; ok {
		// if _, exists := subs[sub]; exists {
		// 	delete(subs, sub)
		// }
		sub.cancel() // 取消订阅者的 context,触发自动取消逻辑
		delete(subs, sub)
		if len(subs) == 0 {
			delete(b.subs, t)
			// close(sub.Ch)
		}
	}
}

func (r *RingBuffer) Add(event Event) {
	r.mu.Lock()
	defer r.mu.Unlock()

	// event := Event{
	// 	ID:      r.index,
	// 	Content: content,
	// 	Time:    time.Now(),
	// }
	//
	r.buffer[r.index%int64(r.size)] = event
	r.index++

	// return event
}

func (b *Bus) Publish(e Event) {
	if e.Type == EventLLMChunk {
		b.ring.Add(e)
	}

	b.mu.RLock()
	defer b.mu.RUnlock()

	if b.closed {
		return
	}

	subs := b.subs[e.Type]
	for sub := range subs {
		sub.Ch <- e
		// select {
		// case ch <- e:
		// default:
		// 	log.Printf(">>>>>>>>>> %s: slow consumer, save buffer", e.Type)
		// }
	}
}

func (b *Bus) Close() {
	b.mu.Lock()
	defer b.mu.Unlock()

	if b.closed {
		return
	}
	b.closed = true

	for _, subs := range b.subs {
		for sub := range subs {
			close(sub.Ch)
		}
	}
	b.subs = nil
}

// func (b *Bus) Subscribe(t EventType) <-chan Event {
// 	ch := make(chan Event, 16)

// 	b.mu.Lock()
// 	b.subs[t] = append(b.subs[t], ch)
// 	b.mu.Unlock()

// 	return ch
// }

// func (b *Bus) Publish(e Event) {
// 	b.mu.RLock()
// 	defer b.mu.RUnlock()

// 	for _, ch := range b.subs[e.Type] {
// 		select {
// 		case ch <- e:
// 		default:
// 			// 慢消费者直接丢(或打点)
// 		}
// 	}
// }

快捷入口
Go 思维导图 浏览PDF 下载PDF
分享到:
标签