package tts import ( "context" "encoding/json" "errors" "log" "math/rand" "net/http" "strings" "sync" "time" "github.com/google/uuid" "github.com/gorilla/websocket" ) // --- Protocol constants (mirroring dashscope.protocol.websocket) --- const ( headerKey = "header" actionKey = "action" eventKey = "event" taskIDKey = "task_id" streamingKey = "streaming" streamingDuplex = "duplex" ) type actionType string const ( actionStart actionType = "start" actionContinue actionType = "continue" actionFinished actionType = "finished" ) type eventType string const ( eventStarted eventType = "task-started" eventFinished eventType = "task-finished" eventFailed eventType = "task-failed" eventGenerated eventType = "result-generated" ) // ResultCallback defines hooks for receiving synthesis results. type ResultCallback interface { OnOpen() OnComplete() OnError(message string) OnClose() OnEvent(message string) OnData(data []byte) } // AudioFormat represents desired output format. type AudioFormat struct { Format string // mp3, wav, pcm, opus SampleRate int Channels string // mono BitRate int // for opus/mp3 } var ( // Common presets (subset of Python enum) AudioFormatDefault = AudioFormat{Format: "mp3", SampleRate: 22050, Channels: "mono", BitRate: 256} AudioFormatWAV16000Mono16bit = AudioFormat{Format: "wav", SampleRate: 16000, Channels: "mono", BitRate: 16} AudioFormatMP316000Mono128kbp = AudioFormat{Format: "mp3", SampleRate: 16000, Channels: "mono", BitRate: 128} AudioFormatOpus24000Mono32kbp = AudioFormat{Format: "opus", SampleRate: 24000, Channels: "mono", BitRate: 32} PCM_22050HZ_MONO_16BIT = AudioFormat{Format: "pcm", SampleRate: 22050, Channels: "mono", BitRate: 16} ) // Request builds websocket messages for the speech synthesizer. // type Request struct { // TaskID string // APIKey string // Voice string // Model string // Format string // SampleRate int // BitRate int // Volume int // SpeechRate float64 // PitchRate float64 // Seed int // SynthesisType int // Instruction *string // LanguageHints []string // } // func NewRequest(apiKey, model, voice string, fmt AudioFormat, volume int, speechRate, pitchRate float64, seed, synthType int, instruction *string, languageHints []string) *Request { // return &Request{ // TaskID: newUID(), // APIKey: apiKey, // Voice: voice, // Model: model, // Format: fmt.Format, // SampleRate: fmt.SampleRate, // BitRate: fmt.BitRate, // Volume: volume, // SpeechRate: speechRate, // PitchRate: pitchRate, // Seed: seed, // SynthesisType: synthType, // Instruction: instruction, // LanguageHints: languageHints, // } // } // func (r *Request) startPayload() map[string]any { // runTaskCmd := map[string]interface{}{ // "header": map[string]interface{}{ // "action": "run-task", // "task_id": r.TaskID, // "streaming": "duplex", // }, // "payload": map[string]interface{}{ // "task_group": "audio", // "task": "tts", // "function": "SpeechSynthesizer", // "model": r.Model, // "parameters": map[string]interface{}{ // "text_type": "PlainText", // "voice": r.Voice, // "format": r.Format, // "sample_rate": r.SampleRate, // "volume": r.Volume, // "rate": r.BitRate, // "pitch": r.PitchRate, // // 如果enable_ssml设为true,只允许发送一次continue-task指令,否则会报错“Text request limit violated, expected 1.” // "enable_ssml": r.SynthesisType, // }, // "input": map[string]interface{}{}, // }, // } // return runTaskCmd // // params := map[string]any{ // // "voice": r.Voice, // // "volume": r.Volume, // // "text_type": "PlainText", // // "sample_rate": r.SampleRate, // // "rate": r.SpeechRate, // // "format": r.Format, // // "pitch": r.PitchRate, // // "seed": r.Seed, // // "type": r.SynthesisType, // // } // // if r.Format == "opus" { // // params["bit_rate"] = r.BitRate // // } // // if r.Instruction != nil { // // params["instruction"] = *r.Instruction // // } // // if len(r.LanguageHints) > 0 { // // params["language_hints"] = r.LanguageHints // // } // // for k, v := range additional { // // params[k] = v // // } // // return map[string]any{ // // headerKey: map[string]any{ // // actionKey: actionStart, // // taskIDKey: r.TaskID, // // streamingKey: streamingDuplex, // // }, // // "payload": map[string]any{ // // "model": r.Model, // // "task_group": "audio", // // "task": "tts", // // "function": "SpeechSynthesizer", // // "input": map[string]any{}, // // "parameters": params, // // }, // // } // } // func (r *Request) continuePayload(text string) map[string]any { // return map[string]any{ // headerKey: map[string]any{ // actionKey: actionContinue, // taskIDKey: r.TaskID, // streamingKey: streamingDuplex, // }, // "payload": map[string]any{ // "model": r.Model, // "task_group": "audio", // "task": "tts", // "function": "SpeechSynthesizer", // "input": map[string]any{ // "text": text, // }, // }, // } // } // func (r *Request) finishPayload() map[string]any { // return map[string]any{ // headerKey: map[string]any{ // actionKey: actionFinished, // taskIDKey: r.TaskID, // streamingKey: streamingDuplex, // }, // "payload": map[string]any{ // "input": map[string]any{}, // }, // } // } // func newUID() string { // return strings.ReplaceAll(time.Now().Format("20060102150405.000000000"), ".", "") // } // // SpeechSynthesizer encapsulates the websocket session. type SpeechSynthesizer struct { // immutable setup url string apiKey string headers http.Header workspace string // request params model string voice string // aformat string // sampleRate int aformat AudioFormat volume int speechRate float64 pitchRate float64 seed int synthType int instruction *string langHints []string additional map[string]any // runtime conn *websocket.Conn dialer *websocket.Dialer startCh chan struct{} completeCh chan struct{} closed chan struct{} isStarted bool audioBuf []byte asyncCall bool callback ResultCallback lastResponse map[string]any lastRequestID string closeWSAfterUse bool // timing startStreamTS int64 firstPkgTS int64 recvAudioMillis float64 taskID string } // NewSpeechSynthesizer creates a synthesizer. func NewSpeechSynthesizer(apiKey, model, voice string, fmt AudioFormat, // volume int, // speechRate, // pitchRate float64, // seed, // synthType int, // instruction *string, // languageHints []string, // headers http.Header, callback ResultCallback, // workspace string, // additional map[string]any ) (*SpeechSynthesizer, error) { if model == "" { return nil, errors.New("model is required") } if fmt.Format == "" { return nil, errors.New("format is required") } // if url == "" { // return nil, errors.New("url is required") // } if apiKey == "" { return nil, errors.New("apikey is required") } // af := fmt.Format // if strings.ToLower(af) == "default" { // af = "mp3" // } // sr := fmt.SampleRate // if sr == 0 { // sr = 22050 // } s := &SpeechSynthesizer{ url: "wss://dashscope.aliyuncs.com/api-ws/v1/inference/", apiKey: apiKey, headers: http.Header{}, //headers.Clone(), headers := http.Header{} workspace: "", model: model, voice: voice, aformat: fmt, // sampleRate: sr, volume: 50, speechRate: 1.0, pitchRate: 1.0, seed: 0, synthType: 0, instruction: nil, langHints: nil, additional: nil, dialer: &websocket.Dialer{HandshakeTimeout: 5 * time.Second}, startCh: make(chan struct{}, 1), completeCh: make(chan struct{}, 1), closed: make(chan struct{}, 1), asyncCall: callback != nil, callback: callback, closeWSAfterUse: true, taskID: uuid.NewString(), } return s, nil } func (r *SpeechSynthesizer) startPayload() map[string]any { runTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "run-task", "task_id": r.taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "task_group": "audio", "task": "tts", "function": "SpeechSynthesizer", "model": r.model, "parameters": map[string]interface{}{ "text_type": "PlainText", "voice": r.voice, "format": r.aformat.Format, "sample_rate": r.aformat.SampleRate, "volume": r.volume, "rate": r.aformat.SampleRate, "pitch": r.pitchRate, // 如果enable_ssml设为true,只允许发送一次continue-task指令,否则会报错“Text request limit violated, expected 1.” "enable_ssml": r.synthType, }, "input": map[string]interface{}{}, }, } return runTaskCmd } func (r *SpeechSynthesizer) continuePayload(text string) map[string]any { continueTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "continue-task", "task_id": r.taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "input": map[string]interface{}{ "text": text, }, }, } return continueTaskCmd // return map[string]any{ // headerKey: map[string]any{ // actionKey: actionContinue, // taskIDKey: r.TaskID, // streamingKey: streamingDuplex, // }, // "payload": map[string]any{ // "model": r.Model, // "task_group": "audio", // "task": "tts", // "function": "SpeechSynthesizer", // "input": map[string]any{ // "text": text, // }, // }, // } } func (r *SpeechSynthesizer) finishPayload() map[string]any { finishTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "finish-task", "task_id": r.taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "input": map[string]interface{}{}, }, } return finishTaskCmd // return map[string]any{ // headerKey: map[string]any{ // actionKey: actionFinished, // taskIDKey: r.TaskID, // streamingKey: streamingDuplex, // }, // "payload": map[string]any{ // "input": map[string]any{}, // }, // } } func (s *SpeechSynthesizer) String() string { return "[SpeechSynthesizer] model:" + s.model + ", voice:" + s.voice + ", format:" + s.aformat.Format } // Connect establishes the websocket connection. func (s *SpeechSynthesizer) Connect(ctx context.Context) error { if s.conn != nil { return nil } hdr := s.headers.Clone() hdr.Set("Authorization", "bearer "+s.apiKey) hdr.Set("X-DashScope-DataInspection", "enable") if s.workspace != "" { hdr.Set("X-DashScope-WorkSpace", s.workspace) } c, _, err := s.dialer.DialContext(ctx, s.url, hdr) if err != nil { return err } s.conn = c go s.readLoop() return nil } func (s *SpeechSynthesizer) isConnected() bool { return s.conn != nil } func (s *SpeechSynthesizer) reset() { s.startStreamTS = -1 s.firstPkgTS = -1 s.recvAudioMillis = 0 s.isStarted = false s.audioBuf = nil s.lastResponse = nil } // readLoop reads messages and dispatches events. func (s *SpeechSynthesizer) readLoop() { defer func() { close(s.closed) log.Println("SpeechSynthesizer readLoop exited") }() for { if s.conn == nil { return } mt, payload, err := s.conn.ReadMessage() if err != nil { // connection closed return } switch mt { case websocket.TextMessage: var obj map[string]any if json.Unmarshal(payload, &obj) != nil { continue } s.lastResponse = obj hdr, _ := obj[headerKey].(map[string]any) if hdr == nil { continue } evs, _ := hdr[eventKey].(string) switch eventType(strings.ToLower(evs)) { case eventStarted: select { case s.startCh <- struct{}{}: default: } case eventFinished: select { case s.completeCh <- struct{}{}: default: } if s.callback != nil { s.callback.OnComplete() s.callback.OnClose() } case eventFailed: select { case s.startCh <- struct{}{}: default: } select { case s.completeCh <- struct{}{}: default: } if s.callback != nil { s.callback.OnError(string(payload)) s.callback.OnClose() } case eventGenerated: if s.callback != nil { s.callback.OnEvent(string(payload)) } default: } case websocket.BinaryMessage: if s.recvAudioMillis == 0 { s.firstPkgTS = time.Now().UnixMilli() } // approximate received audio ms for 16-bit mono s.recvAudioMillis += float64(len(payload)) / (2 * float64(s.aformat.SampleRate) / 1000.0) if s.callback == nil { // non-async, collect audio s.audioBuf = append(s.audioBuf, payload...) } else { s.callback.OnData(payload) } } } } func (s *SpeechSynthesizer) sendJSON(obj map[string]any) error { b, err := json.Marshal(obj) if err != nil { return err } return s.conn.WriteMessage(websocket.TextMessage, b) } func (s *SpeechSynthesizer) startStream(ctx context.Context) error { s.startStreamTS = time.Now().UnixMilli() s.firstPkgTS = -1 s.recvAudioMillis = 0 if s.callback == nil { s.asyncCall = false } else { s.asyncCall = true } if s.conn == nil { if err := s.Connect(ctx); err != nil { return err } } // AudioFormat{Format: s.aformat, SampleRate: s.sampleRate, Channels: "mono", BitRate: 0} // req := NewRequest(s.apiKey, s.model, s.voice, s.aformat, s.volume, s.speechRate, s.pitchRate, s.seed, s.synthType, s.instruction, s.langHints) // s.lastRequestID = req.TaskID payload := s.startPayload() if err := s.sendJSON(payload); err != nil { return err } select { case <-s.startCh: s.isStarted = true if s.callback != nil { s.callback.OnOpen() } log.Println("SpeechSynthesizer started!") return nil case <-time.After(10 * time.Second): return errors.New("start speech synthesizer timeout") } } func (s *SpeechSynthesizer) submitText(text string) error { if !s.isStarted { return errors.New("speech synthesizer not started") } // req := &Request{TaskID: s.lastRequestID, Model: s.model} payload := s.continuePayload(text) return s.sendJSON(payload) } // StreamingCall starts the session (if needed) and sends one text chunk. func (s *SpeechSynthesizer) StreamingCall(ctx context.Context, text string) error { if !s.isStarted { if err := s.startStream(ctx); err != nil { return err } } return s.submitText(text) } // StreamingComplete stops the session and waits for remaining audio. func (s *SpeechSynthesizer) StreamingComplete(timeout time.Duration) error { if !s.isStarted { return errors.New("speech synthesizer not started") } // req := &Request{TaskID: s.lastRequestID} payload := s.finishPayload() if err := s.sendJSON(payload); err != nil { return err } if timeout > 0 { select { case <-s.completeCh: case <-time.After(timeout): return errors.New("speech synthesizer wait complete timeout") } } else { <-s.completeCh } s.isStarted = false if s.closeWSAfterUse { s.Close() } return nil } // AsyncStreamingComplete returns immediately; completion will trigger callbacks. func (s *SpeechSynthesizer) AsyncStreamingComplete(timeout time.Duration) error { if !s.isStarted { return errors.New("speech synthesizer not started") } // req := &Request{TaskID: s.lastRequestID} paylod := s.finishPayload() if err := s.sendJSON(paylod); err != nil { return err } go func() { if timeout > 0 { select { case <-s.completeCh: case <-time.After(timeout): } } else { <-s.completeCh } s.isStarted = false if s.closeWSAfterUse { s.Close() } }() return nil } // Call performs a simple one-shot synth. If no callback, returns audio bytes. func (s *SpeechSynthesizer) Call(ctx context.Context, text string, timeout time.Duration) ([]byte, error) { if s.additional == nil { s.additional = map[string]any{"enable_ssml": true} } else { s.additional["enable_ssml"] = true } if s.callback == nil { s.asyncCall = false } if err := s.startStream(ctx); err != nil { return nil, err } if err := s.submitText(text); err != nil { return nil, err } if s.asyncCall { if err := s.AsyncStreamingComplete(timeout); err != nil { return nil, err } return nil, nil } if err := s.StreamingComplete(timeout); err != nil { return nil, err } return s.audioBuf, nil } func (s *SpeechSynthesizer) Close() { if s.conn != nil { _ = s.conn.Close() s.conn = nil } } func (s *SpeechSynthesizer) LastRequestID() string { return s.lastRequestID } func (s *SpeechSynthesizer) FirstPackageDelay() int64 { if s.firstPkgTS <= 0 || s.startStreamTS <= 0 { return -1 } return s.firstPkgTS - s.startStreamTS } func (s *SpeechSynthesizer) Response() map[string]any { return s.lastResponse } // // --- Object pool implementation --- // type SpeechSynthesizerObjectPool struct { mu sync.Mutex pool []*poolObj available []bool borrowed int maxSize int stop bool url string apiKey string headers http.Header workspace string model string voice string reconnectBase int } type poolObj struct { syn *SpeechSynthesizer connectedAt time.Time } func NewSpeechSynthesizerObjectPool(maxSize int, url, apiKey string, headers http.Header, workspace, model, voice string) (*SpeechSynthesizerObjectPool, error) { if maxSize <= 0 || maxSize > 100 { return nil, errors.New("max_size must be 1..100") } p := &SpeechSynthesizerObjectPool{ maxSize: maxSize, url: url, apiKey: apiKey, headers: headers.Clone(), workspace: workspace, model: model, voice: voice, reconnectBase: 30, } for i := 0; i < maxSize; i++ { // syn, err := NewSpeechSynthesizer(url, apiKey, model, voice, AudioFormatDefault, 50, 1.0, 1.0, 0, 0, nil, nil, headers, nil, workspace, nil) syn, err := NewSpeechSynthesizer(apiKey, model, voice, AudioFormatDefault, nil) if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _ = syn.Connect(ctx) cancel() p.pool = append(p.pool, &poolObj{syn: syn, connectedAt: time.Now()}) p.available = append(p.available, true) } go p.autoReconnect() return p, nil } func (p *SpeechSynthesizerObjectPool) autoReconnect() { for { time.Sleep(1 * time.Second) p.mu.Lock() if p.stop { p.mu.Unlock() return } now := time.Now() toRenew := []*poolObj{} for i, po := range p.pool { if !p.available[i] { continue } if po.syn == nil || !po.syn.isConnected() || now.Sub(po.connectedAt) > time.Duration(p.reconnectBase+rand.Intn(10)-5)*time.Second { p.available[i] = false toRenew = append(toRenew, po) } } p.mu.Unlock() for _, po := range toRenew { syn, err := NewSpeechSynthesizer(p.apiKey, p.model, p.voice, AudioFormatDefault, nil) if err != nil { continue } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _ = syn.Connect(ctx) cancel() po.syn = syn po.connectedAt = time.Now() p.mu.Lock() // mark available again for i := range p.pool { if p.pool[i] == po { p.available[i] = true break } } p.mu.Unlock() } } } func (p *SpeechSynthesizerObjectPool) Shutdown() { p.mu.Lock() p.stop = true for _, po := range p.pool { if po.syn != nil { po.syn.Close() } } p.pool = nil p.available = nil p.mu.Unlock() } // Borrow returns a synthesizer; caller should Return it when done. func (p *SpeechSynthesizerObjectPool) Borrow() *SpeechSynthesizer { p.mu.Lock() defer p.mu.Unlock() for i, po := range p.pool { if p.available[i] && po.syn != nil && po.syn.isConnected() { p.available[i] = false p.borrowed++ return po.syn } } // exhausted: create a new unconnected object syn, _ := NewSpeechSynthesizer(p.apiKey, p.model, p.voice, AudioFormatDefault, nil) return syn } // Return returns a synthesizer to the pool. func (p *SpeechSynthesizerObjectPool) Return(s *SpeechSynthesizer) bool { if s == nil { return false } p.mu.Lock() defer p.mu.Unlock() for i, po := range p.pool { if po.syn == s { if p.available[i] { return false } p.available[i] = true if !s.isConnected() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _ = s.Connect(ctx) cancel() } po.connectedAt = time.Now() p.borrowed-- return true } } // if not from pool, drop return false }
Github开源生信云平台 DEMO