Files
RocketLeagueBot-Renderer/internal/recording/recorder.go
T
fdestefano d5e65fbb03
release / goreleaser (push) Failing after 51s
Release v1.0.0 #major
2026-06-02 23:12:36 -05:00

239 lines
5.9 KiB
Go

package recording
import (
"database/sql"
"log/slog"
"time"
"github.com/klauspost/compress/zstd"
"git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub"
"git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/stats"
)
const (
flushBatchSize = 64
flushInterval = 100 * time.Millisecond
framesChanCap = 1024
idleTimeout = 5 * time.Second
)
type rawFrame struct {
t time.Time
data []byte
}
// Recorder receives raw UDP frames and persists them in batched, optionally
// zstd-compressed inserts to SQLite. A single writer goroutine owns the DB
// handles so there is no contention on the UDP hot path.
type Recorder struct {
db *sql.DB
hub *hub.Hub
frames chan rawFrame
compress bool
enc *zstd.Encoder
// session bookkeeping (writer-goroutine-owned)
sessionID int64
sessionName string
startTime time.Time
frameCount int
lastFrameAt time.Time
insertFrame *sql.Stmt
insertSession *sql.Stmt
updateSession *sql.Stmt
doneCh chan struct{}
stoppedCh chan struct{}
}
// NewRecorder constructs a Recorder. Call Start to launch the writer goroutine.
func NewRecorder(db *sql.DB, h *hub.Hub, compress bool) (*Recorder, error) {
r := &Recorder{
db: db,
hub: h,
frames: make(chan rawFrame, framesChanCap),
compress: compress,
doneCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
var err error
if r.insertFrame, err = db.Prepare(`INSERT INTO frames(recording_id, offset_ms, data, codec) VALUES(?,?,?,?)`); err != nil {
return nil, err
}
if r.insertSession, err = db.Prepare(`INSERT INTO recordings(name) VALUES(?)`); err != nil {
return nil, err
}
if r.updateSession, err = db.Prepare(`UPDATE recordings SET frames=?, duration_ms=? WHERE id=?`); err != nil {
return nil, err
}
if compress {
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
if err != nil {
return nil, err
}
r.enc = enc
}
return r, nil
}
// Submit hands a UDP packet to the recorder. Non-blocking: if the queue is
// full the frame is dropped and counted in stats.FramesDropped.
func (r *Recorder) Submit(data []byte) {
cp := make([]byte, len(data))
copy(cp, data)
select {
case r.frames <- rawFrame{t: time.Now(), data: cp}:
default:
stats.Global.FramesDropped.Add(1)
}
}
// Start launches the writer goroutine. Stop must be called to flush.
func (r *Recorder) Start() { go r.run() }
// Stop signals the writer to drain the queue and exit, then waits for it.
func (r *Recorder) Stop() {
close(r.doneCh)
<-r.stoppedCh
}
func (r *Recorder) run() {
defer close(r.stoppedCh)
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
idleCheck := time.NewTicker(idleTimeout / 2)
defer idleCheck.Stop()
batch := make([]rawFrame, 0, flushBatchSize)
flush := func() {
if len(batch) == 0 {
return
}
if err := r.writeBatch(batch); err != nil {
slog.Error("flush batch failed", "err", err, "n", len(batch))
}
batch = batch[:0]
}
for {
select {
case <-r.doneCh:
drain:
for {
select {
case f := <-r.frames:
batch = append(batch, f)
if len(batch) >= flushBatchSize {
flush()
}
default:
break drain
}
}
flush()
r.finalizeSession()
return
case f := <-r.frames:
batch = append(batch, f)
if len(batch) >= flushBatchSize {
flush()
}
case <-ticker.C:
flush()
case <-idleCheck.C:
if r.sessionID != 0 && time.Since(r.lastFrameAt) > idleTimeout {
flush()
r.finalizeSession()
}
}
}
}
// writeBatch persists frames within a single transaction. If no session is
// active, one is started using the first frame's timestamp.
func (r *Recorder) writeBatch(batch []rawFrame) error {
tx, err := r.db.Begin()
if err != nil {
return err
}
if r.sessionID == 0 {
first := batch[0].t
r.sessionName = first.Format("2006-01-02_15-04-05")
res, err := tx.Stmt(r.insertSession).Exec(r.sessionName)
if err != nil {
_ = tx.Rollback()
return err
}
r.sessionID, _ = res.LastInsertId()
r.startTime = first
r.frameCount = 0
slog.Info("recording started", "name", r.sessionName, "id", r.sessionID)
r.hub.PublishEvent(map[string]any{
"type": "event", "kind": "recording_started",
"recording": map[string]any{"id": r.sessionID, "name": r.sessionName},
})
}
stmt := tx.Stmt(r.insertFrame)
for _, f := range batch {
payload := f.data
codec := 0
stats.Global.BytesBeforeCompr.Add(int64(len(payload)))
if r.compress && r.enc != nil {
payload = r.enc.EncodeAll(f.data, make([]byte, 0, len(f.data)/2+64))
codec = 1
}
stats.Global.BytesAfterCompr.Add(int64(len(payload)))
offset := f.t.Sub(r.startTime).Milliseconds()
if _, err := stmt.Exec(r.sessionID, offset, payload, codec); err != nil {
_ = tx.Rollback()
return err
}
r.frameCount++
r.lastFrameAt = f.t
stats.Global.FramesWritten.Add(1)
}
return tx.Commit()
}
func (r *Recorder) finalizeSession() {
if r.sessionID == 0 {
return
}
duration := r.lastFrameAt.Sub(r.startTime).Milliseconds()
if _, err := r.updateSession.Exec(r.frameCount, duration, r.sessionID); err != nil {
slog.Error("finalize session failed", "err", err)
}
slog.Info("recording stopped", "name", r.sessionName, "frames", r.frameCount, "duration_ms", duration)
r.hub.PublishEvent(map[string]any{
"type": "event", "kind": "recording_stopped",
"recording": map[string]any{
"id": r.sessionID, "name": r.sessionName,
"frames": r.frameCount, "duration_ms": duration,
},
})
r.sessionID = 0
r.sessionName = ""
r.frameCount = 0
}
// CurrentSession returns a snapshot of the active recording, or zero values
// if none. Reads are not strictly synchronized; slight staleness is acceptable
// for the /api/stats endpoint.
func (r *Recorder) CurrentSession() (id int64, name string, frames int, elapsedMs int64) {
id = r.sessionID
name = r.sessionName
frames = r.frameCount
if id != 0 {
elapsedMs = time.Since(r.startTime).Milliseconds()
}
return
}