From ed3b195b26fea2a736a1ec38a4e01db6c9a5edba Mon Sep 17 00:00:00 2001 From: sergystepanov Date: Thu, 12 Dec 2024 21:13:43 +0300 Subject: [PATCH] Dynamic audio buf * Ugly audio buf * Use dynamic Opus frames with config --- pkg/config/config.yaml | 5 ++ pkg/config/loader_test.go | 13 +++- pkg/config/worker.go | 2 +- pkg/worker/coordinatorhandlers.go | 2 +- pkg/worker/media/buffer.go | 119 ++++++++++++++++++++++++++++++ pkg/worker/media/buffer_test.go | 77 +++++++++++++++++++ pkg/worker/media/media.go | 105 ++++++-------------------- pkg/worker/media/media_test.go | 64 ---------------- pkg/worker/room/room_test.go | 2 +- 9 files changed, 235 insertions(+), 154 deletions(-) create mode 100644 pkg/worker/media/buffer.go create mode 100644 pkg/worker/media/buffer_test.go diff --git a/pkg/config/config.yaml b/pkg/config/config.yaml index 48d78ed38..9c1ee1aae 100644 --- a/pkg/config/config.yaml +++ b/pkg/config/config.yaml @@ -300,7 +300,12 @@ encoder: # audio frame duration needed for WebRTC (Opus) # most of the emulators have ~1400 samples per a video frame, # so we keep the frame buffer roughly half of that size or 2 RTC packets per frame + # (deprecated) due to frames frame: 10 + # dynamic frames for Opus encoder + frames: + - 10 + - 5 video: # h264, vpx (vp8) or vp9 codec: h264 diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 355f19a48..08e17dd34 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -9,8 +9,10 @@ import ( func TestConfigEnv(t *testing.T) { var out WorkerConfig - _ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAME", "33") - defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAME") }() + _ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]", "10") + _ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]", "5") + defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]") }() + defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]") }() _ = os.Setenv("CLOUD_GAME_EMULATOR_LIBRETRO_CORES_LIST_PCSX_OPTIONS__PCSX_REARMED_DRC", "x") defer func() { @@ -22,8 +24,11 @@ func TestConfigEnv(t *testing.T) { t.Fatal(err) } - if out.Encoder.Audio.Frame != 33 { - t.Errorf("%v is not 33", out.Encoder.Audio.Frame) + for i, x := range []float32{10, 5} { + if out.Encoder.Audio.Frames[i] != x { + t.Errorf("%v is not [10, 5]", out.Encoder.Audio.Frames) + t.Failed() + } } v := out.Emulator.Libretro.Cores.List["pcsx"].Options["pcsx_rearmed_drc"] diff --git a/pkg/config/worker.go b/pkg/config/worker.go index 1a476b05e..5a509b0c1 100644 --- a/pkg/config/worker.go +++ b/pkg/config/worker.go @@ -51,7 +51,7 @@ type Encoder struct { } type Audio struct { - Frame float32 + Frames []float32 } type Video struct { diff --git a/pkg/worker/coordinatorhandlers.go b/pkg/worker/coordinatorhandlers.go index 97cb27849..536c6ed69 100644 --- a/pkg/worker/coordinatorhandlers.go +++ b/pkg/worker/coordinatorhandlers.go @@ -168,7 +168,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke } m.AudioSrcHz = app.AudioSampleRate() - m.AudioFrame = w.conf.Encoder.Audio.Frame + m.AudioFrames = w.conf.Encoder.Audio.Frames m.VideoW, m.VideoH = app.ViewportSize() m.VideoScale = app.Scale() diff --git a/pkg/worker/media/buffer.go b/pkg/worker/media/buffer.go new file mode 100644 index 000000000..bba289592 --- /dev/null +++ b/pkg/worker/media/buffer.go @@ -0,0 +1,119 @@ +package media + +import ( + "errors" + "math" + "unsafe" +) + +// buffer is a simple non-concurrent safe buffer for audio samples. +type buffer struct { + stretch bool + frameHz []int + + raw samples + buckets []Bucket + cur *Bucket +} + +type Bucket struct { + mem samples + ms float32 + lv int + dst int +} + +func newBuffer(frames []float32, hz int) (*buffer, error) { + if hz < 2000 { + return nil, errors.New("hz should be > than 2000") + } + + buf := buffer{} + + // preallocate continuous array + s := 0 + for _, f := range frames { + s += frame(hz, f) + } + buf.raw = make(samples, s) + + next := 0 + for _, f := range frames { + s := frame(hz, f) + buf.buckets = append(buf.buckets, Bucket{ + mem: buf.raw[next : next+s], + ms: f, + }) + next += s + } + buf.cur = &buf.buckets[len(buf.buckets)-1] + return &buf, nil +} + +func (b *buffer) choose(l int) { + for _, bb := range b.buckets { + if l >= len(bb.mem) { + b.cur = &bb + break + } + } +} + +func (b *buffer) resample(hz int) { + b.stretch = true + for i := range b.buckets { + b.buckets[i].dst = frame(hz, float32(b.buckets[i].ms)) + } +} + +// write fills the buffer until it's full and then passes the gathered data into a callback. +// +// There are two cases to consider: +// 1. Underflow, when the length of the written data is less than the buffer's available space. +// 2. Overflow, when the length exceeds the current available buffer space. +// +// We overwrite any previous values in the buffer and move the internal write pointer +// by the length of the written data. +// In the first case, we won't call the callback, but it will be called every time +// when the internal buffer overflows until all samples are read. +func (b *buffer) write(s samples, onFull func(samples, float32)) (r int) { + for r < len(s) { + buf := b.cur + w := copy(buf.mem[buf.lv:], s[r:]) + r += w + buf.lv += w + if buf.lv == len(buf.mem) { + if b.stretch { + onFull(buf.mem.stretch(buf.dst), buf.ms) + } else { + onFull(buf.mem, buf.ms) + } + b.choose(len(s) - r) + b.cur.lv = 0 + } + } + return +} + +// frame calculates an audio stereo frame size, i.e. 48k*frame/1000*2 +// with round(x / 2) * 2 for the closest even number +func frame(hz int, frame float32) int { + return int(math.Round(float64(hz)*float64(frame)/1000/2) * 2 * 2) +} + +// stretch does a simple stretching of audio samples. +// something like: [1,2,3,4,5,6] -> [1,2,x,x,3,4,x,x,5,6,x,x] -> [1,2,1,2,3,4,3,4,5,6,5,6] +func (s samples) stretch(size int) []int16 { + out := buf[:size] + n := len(s) + ratio := float32(size) / float32(n) + sPtr := unsafe.Pointer(&s[0]) + for i, l, r := 0, 0, 0; i < n; i += 2 { + l, r = r, int(float32((i+2)>>1)*ratio)<<1 // index in src * ratio -> approximated index in dst *2 due to int16 + for j := l; j < r; j += 2 { + *(*int32)(unsafe.Pointer(&out[j])) = *(*int32)(sPtr) // out[j] = s[i]; out[j+1] = s[i+1] + } + sPtr = unsafe.Add(sPtr, uintptr(4)) + } + return out +} diff --git a/pkg/worker/media/buffer_test.go b/pkg/worker/media/buffer_test.go new file mode 100644 index 000000000..29f2fc6a9 --- /dev/null +++ b/pkg/worker/media/buffer_test.go @@ -0,0 +1,77 @@ +package media + +import ( + "reflect" + "testing" +) + +type bufWrite struct { + sample int16 + len int +} + +func TestBufferWrite(t *testing.T) { + tests := []struct { + bufLen int + writes []bufWrite + expect samples + }{ + { + bufLen: 2000, + writes: []bufWrite{ + {sample: 1, len: 10}, + {sample: 2, len: 20}, + {sample: 3, len: 30}, + }, + expect: samples{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}, + }, + { + bufLen: 2000, + writes: []bufWrite{ + {sample: 1, len: 3}, + {sample: 2, len: 18}, + {sample: 3, len: 2}, + }, + expect: samples{2, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}, + }, + } + + for _, test := range tests { + var lastResult samples + buf, err := newBuffer([]float32{10, 5}, test.bufLen) + if err != nil { + t.Fatalf("oof, %v", err) + } + for _, w := range test.writes { + buf.write(samplesOf(w.sample, w.len), + func(s samples, ms float32) { lastResult = s }, + ) + } + if !reflect.DeepEqual(test.expect, lastResult) { + t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, len(buf.cur.mem)) + } + } +} + +func BenchmarkBufferWrite(b *testing.B) { + fn := func(_ samples, _ float32) {} + l := 2000 + buf, err := newBuffer([]float32{10}, l) + if err != nil { + b.Fatalf("oof: %v", err) + } + samples1 := samplesOf(1, l/2) + samples2 := samplesOf(2, l*2) + for i := 0; i < b.N; i++ { + buf.write(samples1, fn) + buf.write(samples2, fn) + } +} + +func samplesOf(v int16, len int) (s samples) { + s = make(samples, len) + for i := range s { + s[i] = v + } + return +} diff --git a/pkg/worker/media/media.go b/pkg/worker/media/media.go index ed356308a..bece8a097 100644 --- a/pkg/worker/media/media.go +++ b/pkg/worker/media/media.go @@ -2,16 +2,13 @@ package media import ( "fmt" - "math" - "sync" - "time" - "unsafe" - "github.com/giongto35/cloud-game/v3/pkg/config" "github.com/giongto35/cloud-game/v3/pkg/encoder" "github.com/giongto35/cloud-game/v3/pkg/encoder/opus" "github.com/giongto35/cloud-game/v3/pkg/logger" "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app" + "sync" + "time" ) const ( @@ -19,16 +16,7 @@ const ( sampleBufLen = 1024 * 4 ) -// buffer is a simple non-concurrent safe ring buffer for audio samples. -type ( - buffer struct { - s samples - wi int - dst int - stretch bool - } - samples []int16 -) +type samples []int16 var ( encoderOnce = sync.Once{} @@ -36,39 +24,6 @@ var ( buf = make([]int16, sampleBufLen) ) -func newBuffer(srcLen int) buffer { return buffer{s: make(samples, srcLen)} } - -// enableStretch adds a simple stretching of buffer to a desired size before -// the onFull callback call. -func (b *buffer) enableStretch(l int) { b.stretch = true; b.dst = l } - -// write fills the buffer until it's full and then passes the gathered data into a callback. -// -// There are two cases to consider: -// 1. Underflow, when the length of the written data is less than the buffer's available space. -// 2. Overflow, when the length exceeds the current available buffer space. -// -// We overwrite any previous values in the buffer and move the internal write pointer -// by the length of the written data. -// In the first case, we won't call the callback, but it will be called every time -// when the internal buffer overflows until all samples are read. -func (b *buffer) write(s samples, onFull func(samples)) (r int) { - for r < len(s) { - w := copy(b.s[b.wi:], s[r:]) - r += w - b.wi += w - if b.wi == len(b.s) { - b.wi = 0 - if b.stretch { - onFull(b.s.stretch(b.dst)) - } else { - onFull(b.s) - } - } - } - return -} - func DefaultOpus() (*opus.Encoder, error) { var err error encoderOnce.Do(func() { opusCoder, err = opus.NewEncoder(audioHz) }) @@ -81,34 +36,11 @@ func DefaultOpus() (*opus.Encoder, error) { return opusCoder, nil } -// frame calculates an audio stereo frame size, i.e. 48k*frame/1000*2 -// with round(x / 2) * 2 for the closest even number -func frame(hz int, frame float32) int { - return int(math.Round(float64(hz)*float64(frame)/1000/2) * 2 * 2) -} - -// stretch does a simple stretching of audio samples. -// something like: [1,2,3,4,5,6] -> [1,2,x,x,3,4,x,x,5,6,x,x] -> [1,2,1,2,3,4,3,4,5,6,5,6] -func (s samples) stretch(size int) []int16 { - out := buf[:size] - n := len(s) - ratio := float32(size) / float32(n) - sPtr := unsafe.Pointer(&s[0]) - for i, l, r := 0, 0, 0; i < n; i += 2 { - l, r = r, int(float32((i+2)>>1)*ratio)<<1 // index in src * ratio -> approximated index in dst *2 due to int16 - for j := l; j < r; j += 2 { - *(*int32)(unsafe.Pointer(&out[j])) = *(*int32)(sPtr) // out[j] = s[i]; out[j+1] = s[i+1] - } - sPtr = unsafe.Add(sPtr, uintptr(4)) - } - return out -} - type WebrtcMediaPipe struct { a *opus.Encoder v *encoder.Video - onAudio func([]byte) - audioBuf buffer + onAudio func([]byte, float32) + audioBuf *buffer log *logger.Logger mua sync.RWMutex @@ -118,7 +50,7 @@ type WebrtcMediaPipe struct { vConf config.Video AudioSrcHz int - AudioFrame float32 + AudioFrames []float32 VideoW, VideoH int VideoScale float64 @@ -135,8 +67,9 @@ func NewWebRtcMediaPipe(ac config.Audio, vc config.Video, log *logger.Logger) *W } func (wmp *WebrtcMediaPipe) SetAudioCb(cb func([]byte, int32)) { - fr := int32(time.Duration(wmp.AudioFrame) * time.Millisecond) - wmp.onAudio = func(bytes []byte) { cb(bytes, fr) } + wmp.onAudio = func(bytes []byte, ms float32) { + cb(bytes, int32(time.Duration(ms)*time.Millisecond)) + } } func (wmp *WebrtcMediaPipe) Destroy() { v := wmp.Video() @@ -144,10 +77,12 @@ func (wmp *WebrtcMediaPipe) Destroy() { v.Stop() } } -func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { wmp.audioBuf.write(audio, wmp.encodeAudio) } +func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { + wmp.audioBuf.write(audio, wmp.encodeAudio) +} func (wmp *WebrtcMediaPipe) Init() error { - if err := wmp.initAudio(wmp.AudioSrcHz, wmp.AudioFrame); err != nil { + if err := wmp.initAudio(wmp.AudioSrcHz, wmp.AudioFrames); err != nil { return err } if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil { @@ -166,30 +101,34 @@ func (wmp *WebrtcMediaPipe) Init() error { return nil } -func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSize float32) error { +func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSizes []float32) error { au, err := DefaultOpus() if err != nil { return fmt.Errorf("opus fail: %w", err) } wmp.log.Debug().Msgf("Opus: %v", au.GetInfo()) wmp.SetAudio(au) - buf := newBuffer(frame(srcHz, frameSize)) + buf, err := newBuffer(frameSizes, srcHz) + if err != nil { + return err + } + wmp.log.Debug().Msgf("Opus frames (ms): %v", frameSizes) dstHz, _ := au.SampleRate() if srcHz != dstHz { - buf.enableStretch(frame(dstHz, frameSize)) + buf.resample(dstHz) wmp.log.Debug().Msgf("Resample %vHz -> %vHz", srcHz, dstHz) } wmp.audioBuf = buf return nil } -func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples) { +func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples, ms float32) { data, err := wmp.Audio().Encode(pcm) if err != nil { wmp.log.Error().Err(err).Msgf("opus encode fail") return } - wmp.onAudio(data) + wmp.onAudio(data, ms) } func (wmp *WebrtcMediaPipe) initVideo(w, h int, scale float64, conf config.Video) (err error) { diff --git a/pkg/worker/media/media_test.go b/pkg/worker/media/media_test.go index ab27f7fa8..4b9a431b4 100644 --- a/pkg/worker/media/media_test.go +++ b/pkg/worker/media/media_test.go @@ -3,7 +3,6 @@ package media import ( "image" "math/rand/v2" - "reflect" "testing" "github.com/giongto35/cloud-game/v3/pkg/config" @@ -154,69 +153,6 @@ func gen(l int) []int16 { return nums } -type bufWrite struct { - sample int16 - len int -} - -func TestBufferWrite(t *testing.T) { - tests := []struct { - bufLen int - writes []bufWrite - expect samples - }{ - { - bufLen: 20, - writes: []bufWrite{ - {sample: 1, len: 10}, - {sample: 2, len: 20}, - {sample: 3, len: 30}, - }, - expect: samples{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}, - }, - { - bufLen: 11, - writes: []bufWrite{ - {sample: 1, len: 3}, - {sample: 2, len: 18}, - {sample: 3, len: 2}, - }, - expect: samples{3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3}, - }, - } - - for _, test := range tests { - var lastResult samples - buf := newBuffer(test.bufLen) - for _, w := range test.writes { - buf.write(samplesOf(w.sample, w.len), func(s samples) { lastResult = s }) - } - if !reflect.DeepEqual(test.expect, lastResult) { - t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, buf.s) - } - } -} - -func BenchmarkBufferWrite(b *testing.B) { - fn := func(_ samples) {} - l := 1920 - buf := newBuffer(l) - samples1 := samplesOf(1, l/2) - samples2 := samplesOf(2, l*2) - for i := 0; i < b.N; i++ { - buf.write(samples1, fn) - buf.write(samples2, fn) - } -} - -func samplesOf(v int16, len int) (s samples) { - s = make(samples, len) - for i := range s { - s[i] = v - } - return -} - func TestFrame(t *testing.T) { type args struct { hz int diff --git a/pkg/worker/room/room_test.go b/pkg/worker/room/room_test.go index a33e4d826..9a4bdd738 100644 --- a/pkg/worker/room/room_test.go +++ b/pkg/worker/room/room_test.go @@ -229,7 +229,7 @@ func room(cfg conf) testRoom { m := media.NewWebRtcMediaPipe(conf.Encoder.Audio, conf.Encoder.Video, l) m.AudioSrcHz = emu.AudioSampleRate() - m.AudioFrame = conf.Encoder.Audio.Frame + m.AudioFrames = conf.Encoder.Audio.Frames m.VideoW, m.VideoH = emu.ViewportSize() m.VideoScale = emu.Scale() if err := m.Init(); err != nil {