239 lines
5.9 KiB
Go
239 lines
5.9 KiB
Go
package recording
|
|
|
|
import (
|
|
"database/sql"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/zstd"
|
|
|
|
"git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub"
|
|
"git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/stats"
|
|
)
|
|
|
|
const (
|
|
flushBatchSize = 64
|
|
flushInterval = 100 * time.Millisecond
|
|
framesChanCap = 1024
|
|
idleTimeout = 5 * time.Second
|
|
)
|
|
|
|
type rawFrame struct {
|
|
t time.Time
|
|
data []byte
|
|
}
|
|
|
|
// Recorder receives raw UDP frames and persists them in batched, optionally
|
|
// zstd-compressed inserts to SQLite. A single writer goroutine owns the DB
|
|
// handles so there is no contention on the UDP hot path.
|
|
type Recorder struct {
|
|
db *sql.DB
|
|
hub *hub.Hub
|
|
frames chan rawFrame
|
|
compress bool
|
|
enc *zstd.Encoder
|
|
|
|
// session bookkeeping (writer-goroutine-owned)
|
|
sessionID int64
|
|
sessionName string
|
|
startTime time.Time
|
|
frameCount int
|
|
lastFrameAt time.Time
|
|
|
|
insertFrame *sql.Stmt
|
|
insertSession *sql.Stmt
|
|
updateSession *sql.Stmt
|
|
|
|
doneCh chan struct{}
|
|
stoppedCh chan struct{}
|
|
}
|
|
|
|
// NewRecorder constructs a Recorder. Call Start to launch the writer goroutine.
|
|
func NewRecorder(db *sql.DB, h *hub.Hub, compress bool) (*Recorder, error) {
|
|
r := &Recorder{
|
|
db: db,
|
|
hub: h,
|
|
frames: make(chan rawFrame, framesChanCap),
|
|
compress: compress,
|
|
doneCh: make(chan struct{}),
|
|
stoppedCh: make(chan struct{}),
|
|
}
|
|
var err error
|
|
if r.insertFrame, err = db.Prepare(`INSERT INTO frames(recording_id, offset_ms, data, codec) VALUES(?,?,?,?)`); err != nil {
|
|
return nil, err
|
|
}
|
|
if r.insertSession, err = db.Prepare(`INSERT INTO recordings(name) VALUES(?)`); err != nil {
|
|
return nil, err
|
|
}
|
|
if r.updateSession, err = db.Prepare(`UPDATE recordings SET frames=?, duration_ms=? WHERE id=?`); err != nil {
|
|
return nil, err
|
|
}
|
|
if compress {
|
|
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.enc = enc
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// Submit hands a UDP packet to the recorder. Non-blocking: if the queue is
|
|
// full the frame is dropped and counted in stats.FramesDropped.
|
|
func (r *Recorder) Submit(data []byte) {
|
|
cp := make([]byte, len(data))
|
|
copy(cp, data)
|
|
select {
|
|
case r.frames <- rawFrame{t: time.Now(), data: cp}:
|
|
default:
|
|
stats.Global.FramesDropped.Add(1)
|
|
}
|
|
}
|
|
|
|
// Start launches the writer goroutine. Stop must be called to flush.
|
|
func (r *Recorder) Start() { go r.run() }
|
|
|
|
// Stop signals the writer to drain the queue and exit, then waits for it.
|
|
func (r *Recorder) Stop() {
|
|
close(r.doneCh)
|
|
<-r.stoppedCh
|
|
}
|
|
|
|
func (r *Recorder) run() {
|
|
defer close(r.stoppedCh)
|
|
|
|
ticker := time.NewTicker(flushInterval)
|
|
defer ticker.Stop()
|
|
idleCheck := time.NewTicker(idleTimeout / 2)
|
|
defer idleCheck.Stop()
|
|
|
|
batch := make([]rawFrame, 0, flushBatchSize)
|
|
flush := func() {
|
|
if len(batch) == 0 {
|
|
return
|
|
}
|
|
if err := r.writeBatch(batch); err != nil {
|
|
slog.Error("flush batch failed", "err", err, "n", len(batch))
|
|
}
|
|
batch = batch[:0]
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-r.doneCh:
|
|
drain:
|
|
for {
|
|
select {
|
|
case f := <-r.frames:
|
|
batch = append(batch, f)
|
|
if len(batch) >= flushBatchSize {
|
|
flush()
|
|
}
|
|
default:
|
|
break drain
|
|
}
|
|
}
|
|
flush()
|
|
r.finalizeSession()
|
|
return
|
|
|
|
case f := <-r.frames:
|
|
batch = append(batch, f)
|
|
if len(batch) >= flushBatchSize {
|
|
flush()
|
|
}
|
|
|
|
case <-ticker.C:
|
|
flush()
|
|
|
|
case <-idleCheck.C:
|
|
if r.sessionID != 0 && time.Since(r.lastFrameAt) > idleTimeout {
|
|
flush()
|
|
r.finalizeSession()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// writeBatch persists frames within a single transaction. If no session is
|
|
// active, one is started using the first frame's timestamp.
|
|
func (r *Recorder) writeBatch(batch []rawFrame) error {
|
|
tx, err := r.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if r.sessionID == 0 {
|
|
first := batch[0].t
|
|
r.sessionName = first.Format("2006-01-02_15-04-05")
|
|
res, err := tx.Stmt(r.insertSession).Exec(r.sessionName)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
r.sessionID, _ = res.LastInsertId()
|
|
r.startTime = first
|
|
r.frameCount = 0
|
|
slog.Info("recording started", "name", r.sessionName, "id", r.sessionID)
|
|
r.hub.PublishEvent(map[string]any{
|
|
"type": "event", "kind": "recording_started",
|
|
"recording": map[string]any{"id": r.sessionID, "name": r.sessionName},
|
|
})
|
|
}
|
|
|
|
stmt := tx.Stmt(r.insertFrame)
|
|
for _, f := range batch {
|
|
payload := f.data
|
|
codec := 0
|
|
stats.Global.BytesBeforeCompr.Add(int64(len(payload)))
|
|
if r.compress && r.enc != nil {
|
|
payload = r.enc.EncodeAll(f.data, make([]byte, 0, len(f.data)/2+64))
|
|
codec = 1
|
|
}
|
|
stats.Global.BytesAfterCompr.Add(int64(len(payload)))
|
|
offset := f.t.Sub(r.startTime).Milliseconds()
|
|
if _, err := stmt.Exec(r.sessionID, offset, payload, codec); err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
r.frameCount++
|
|
r.lastFrameAt = f.t
|
|
stats.Global.FramesWritten.Add(1)
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (r *Recorder) finalizeSession() {
|
|
if r.sessionID == 0 {
|
|
return
|
|
}
|
|
duration := r.lastFrameAt.Sub(r.startTime).Milliseconds()
|
|
if _, err := r.updateSession.Exec(r.frameCount, duration, r.sessionID); err != nil {
|
|
slog.Error("finalize session failed", "err", err)
|
|
}
|
|
slog.Info("recording stopped", "name", r.sessionName, "frames", r.frameCount, "duration_ms", duration)
|
|
r.hub.PublishEvent(map[string]any{
|
|
"type": "event", "kind": "recording_stopped",
|
|
"recording": map[string]any{
|
|
"id": r.sessionID, "name": r.sessionName,
|
|
"frames": r.frameCount, "duration_ms": duration,
|
|
},
|
|
})
|
|
r.sessionID = 0
|
|
r.sessionName = ""
|
|
r.frameCount = 0
|
|
}
|
|
|
|
// CurrentSession returns a snapshot of the active recording, or zero values
|
|
// if none. Reads are not strictly synchronized; slight staleness is acceptable
|
|
// for the /api/stats endpoint.
|
|
func (r *Recorder) CurrentSession() (id int64, name string, frames int, elapsedMs int64) {
|
|
id = r.sessionID
|
|
name = r.sessionName
|
|
frames = r.frameCount
|
|
if id != 0 {
|
|
elapsedMs = time.Since(r.startTime).Milliseconds()
|
|
}
|
|
return
|
|
}
|