package recording import ( "database/sql" "log/slog" "time" "github.com/klauspost/compress/zstd" "git.destefano.cloud/texasmade/RocketLeagueBot-Renderer/internal/hub" "git.destefano.cloud/texasmade/RocketLeagueBot-Renderer/internal/stats" ) const ( flushBatchSize = 64 flushInterval = 100 * time.Millisecond framesChanCap = 1024 idleTimeout = 5 * time.Second ) type rawFrame struct { t time.Time data []byte } // Recorder receives raw UDP frames and persists them in batched, optionally // zstd-compressed inserts to SQLite. A single writer goroutine owns the DB // handles so there is no contention on the UDP hot path. type Recorder struct { db *sql.DB hub *hub.Hub frames chan rawFrame compress bool enc *zstd.Encoder // session bookkeeping (writer-goroutine-owned) sessionID int64 sessionName string startTime time.Time frameCount int lastFrameAt time.Time insertFrame *sql.Stmt insertSession *sql.Stmt updateSession *sql.Stmt doneCh chan struct{} stoppedCh chan struct{} } // NewRecorder constructs a Recorder. Call Start to launch the writer goroutine. func NewRecorder(db *sql.DB, h *hub.Hub, compress bool) (*Recorder, error) { r := &Recorder{ db: db, hub: h, frames: make(chan rawFrame, framesChanCap), compress: compress, doneCh: make(chan struct{}), stoppedCh: make(chan struct{}), } var err error if r.insertFrame, err = db.Prepare(`INSERT INTO frames(recording_id, offset_ms, data, codec) VALUES(?,?,?,?)`); err != nil { return nil, err } if r.insertSession, err = db.Prepare(`INSERT INTO recordings(name) VALUES(?)`); err != nil { return nil, err } if r.updateSession, err = db.Prepare(`UPDATE recordings SET frames=?, duration_ms=? WHERE id=?`); err != nil { return nil, err } if compress { enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) if err != nil { return nil, err } r.enc = enc } return r, nil } // Submit hands a UDP packet to the recorder. Non-blocking: if the queue is // full the frame is dropped and counted in stats.FramesDropped. func (r *Recorder) Submit(data []byte) { cp := make([]byte, len(data)) copy(cp, data) select { case r.frames <- rawFrame{t: time.Now(), data: cp}: default: stats.Global.FramesDropped.Add(1) } } // Start launches the writer goroutine. Stop must be called to flush. func (r *Recorder) Start() { go r.run() } // Stop signals the writer to drain the queue and exit, then waits for it. func (r *Recorder) Stop() { close(r.doneCh) <-r.stoppedCh } func (r *Recorder) run() { defer close(r.stoppedCh) ticker := time.NewTicker(flushInterval) defer ticker.Stop() idleCheck := time.NewTicker(idleTimeout / 2) defer idleCheck.Stop() batch := make([]rawFrame, 0, flushBatchSize) flush := func() { if len(batch) == 0 { return } if err := r.writeBatch(batch); err != nil { slog.Error("flush batch failed", "err", err, "n", len(batch)) } batch = batch[:0] } for { select { case <-r.doneCh: drain: for { select { case f := <-r.frames: batch = append(batch, f) if len(batch) >= flushBatchSize { flush() } default: break drain } } flush() r.finalizeSession() return case f := <-r.frames: batch = append(batch, f) if len(batch) >= flushBatchSize { flush() } case <-ticker.C: flush() case <-idleCheck.C: if r.sessionID != 0 && time.Since(r.lastFrameAt) > idleTimeout { flush() r.finalizeSession() } } } } // writeBatch persists frames within a single transaction. If no session is // active, one is started using the first frame's timestamp. func (r *Recorder) writeBatch(batch []rawFrame) error { tx, err := r.db.Begin() if err != nil { return err } if r.sessionID == 0 { first := batch[0].t r.sessionName = first.Format("2006-01-02_15-04-05") res, err := tx.Stmt(r.insertSession).Exec(r.sessionName) if err != nil { _ = tx.Rollback() return err } r.sessionID, _ = res.LastInsertId() r.startTime = first r.frameCount = 0 slog.Info("recording started", "name", r.sessionName, "id", r.sessionID) r.hub.PublishEvent(map[string]any{ "type": "event", "kind": "recording_started", "recording": map[string]any{"id": r.sessionID, "name": r.sessionName}, }) } stmt := tx.Stmt(r.insertFrame) for _, f := range batch { payload := f.data codec := 0 stats.Global.BytesBeforeCompr.Add(int64(len(payload))) if r.compress && r.enc != nil { payload = r.enc.EncodeAll(f.data, make([]byte, 0, len(f.data)/2+64)) codec = 1 } stats.Global.BytesAfterCompr.Add(int64(len(payload))) offset := f.t.Sub(r.startTime).Milliseconds() if _, err := stmt.Exec(r.sessionID, offset, payload, codec); err != nil { _ = tx.Rollback() return err } r.frameCount++ r.lastFrameAt = f.t stats.Global.FramesWritten.Add(1) } return tx.Commit() } func (r *Recorder) finalizeSession() { if r.sessionID == 0 { return } duration := r.lastFrameAt.Sub(r.startTime).Milliseconds() if _, err := r.updateSession.Exec(r.frameCount, duration, r.sessionID); err != nil { slog.Error("finalize session failed", "err", err) } slog.Info("recording stopped", "name", r.sessionName, "frames", r.frameCount, "duration_ms", duration) r.hub.PublishEvent(map[string]any{ "type": "event", "kind": "recording_stopped", "recording": map[string]any{ "id": r.sessionID, "name": r.sessionName, "frames": r.frameCount, "duration_ms": duration, }, }) r.sessionID = 0 r.sessionName = "" r.frameCount = 0 } // CurrentSession returns a snapshot of the active recording, or zero values // if none. Reads are not strictly synchronized; slight staleness is acceptable // for the /api/stats endpoint. func (r *Recorder) CurrentSession() (id int64, name string, frames int, elapsedMs int64) { id = r.sessionID name = r.sessionName frames = r.frameCount if id != 0 { elapsedMs = time.Since(r.startTime).Milliseconds() } return }