@@ -0,0 +1,238 @@
|
||||
package recording
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
|
||||
"git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/hub"
|
||||
"git.destefano.cloud/fdestefano/RocketLeagueBot-Renderer/internal/stats"
|
||||
)
|
||||
|
||||
const (
|
||||
flushBatchSize = 64
|
||||
flushInterval = 100 * time.Millisecond
|
||||
framesChanCap = 1024
|
||||
idleTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
type rawFrame struct {
|
||||
t time.Time
|
||||
data []byte
|
||||
}
|
||||
|
||||
// Recorder receives raw UDP frames and persists them in batched, optionally
|
||||
// zstd-compressed inserts to SQLite. A single writer goroutine owns the DB
|
||||
// handles so there is no contention on the UDP hot path.
|
||||
type Recorder struct {
|
||||
db *sql.DB
|
||||
hub *hub.Hub
|
||||
frames chan rawFrame
|
||||
compress bool
|
||||
enc *zstd.Encoder
|
||||
|
||||
// session bookkeeping (writer-goroutine-owned)
|
||||
sessionID int64
|
||||
sessionName string
|
||||
startTime time.Time
|
||||
frameCount int
|
||||
lastFrameAt time.Time
|
||||
|
||||
insertFrame *sql.Stmt
|
||||
insertSession *sql.Stmt
|
||||
updateSession *sql.Stmt
|
||||
|
||||
doneCh chan struct{}
|
||||
stoppedCh chan struct{}
|
||||
}
|
||||
|
||||
// NewRecorder constructs a Recorder. Call Start to launch the writer goroutine.
|
||||
func NewRecorder(db *sql.DB, h *hub.Hub, compress bool) (*Recorder, error) {
|
||||
r := &Recorder{
|
||||
db: db,
|
||||
hub: h,
|
||||
frames: make(chan rawFrame, framesChanCap),
|
||||
compress: compress,
|
||||
doneCh: make(chan struct{}),
|
||||
stoppedCh: make(chan struct{}),
|
||||
}
|
||||
var err error
|
||||
if r.insertFrame, err = db.Prepare(`INSERT INTO frames(recording_id, offset_ms, data, codec) VALUES(?,?,?,?)`); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.insertSession, err = db.Prepare(`INSERT INTO recordings(name) VALUES(?)`); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.updateSession, err = db.Prepare(`UPDATE recordings SET frames=?, duration_ms=? WHERE id=?`); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if compress {
|
||||
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.enc = enc
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Submit hands a UDP packet to the recorder. Non-blocking: if the queue is
|
||||
// full the frame is dropped and counted in stats.FramesDropped.
|
||||
func (r *Recorder) Submit(data []byte) {
|
||||
cp := make([]byte, len(data))
|
||||
copy(cp, data)
|
||||
select {
|
||||
case r.frames <- rawFrame{t: time.Now(), data: cp}:
|
||||
default:
|
||||
stats.Global.FramesDropped.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the writer goroutine. Stop must be called to flush.
|
||||
func (r *Recorder) Start() { go r.run() }
|
||||
|
||||
// Stop signals the writer to drain the queue and exit, then waits for it.
|
||||
func (r *Recorder) Stop() {
|
||||
close(r.doneCh)
|
||||
<-r.stoppedCh
|
||||
}
|
||||
|
||||
func (r *Recorder) run() {
|
||||
defer close(r.stoppedCh)
|
||||
|
||||
ticker := time.NewTicker(flushInterval)
|
||||
defer ticker.Stop()
|
||||
idleCheck := time.NewTicker(idleTimeout / 2)
|
||||
defer idleCheck.Stop()
|
||||
|
||||
batch := make([]rawFrame, 0, flushBatchSize)
|
||||
flush := func() {
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
if err := r.writeBatch(batch); err != nil {
|
||||
slog.Error("flush batch failed", "err", err, "n", len(batch))
|
||||
}
|
||||
batch = batch[:0]
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.doneCh:
|
||||
drain:
|
||||
for {
|
||||
select {
|
||||
case f := <-r.frames:
|
||||
batch = append(batch, f)
|
||||
if len(batch) >= flushBatchSize {
|
||||
flush()
|
||||
}
|
||||
default:
|
||||
break drain
|
||||
}
|
||||
}
|
||||
flush()
|
||||
r.finalizeSession()
|
||||
return
|
||||
|
||||
case f := <-r.frames:
|
||||
batch = append(batch, f)
|
||||
if len(batch) >= flushBatchSize {
|
||||
flush()
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
flush()
|
||||
|
||||
case <-idleCheck.C:
|
||||
if r.sessionID != 0 && time.Since(r.lastFrameAt) > idleTimeout {
|
||||
flush()
|
||||
r.finalizeSession()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writeBatch persists frames within a single transaction. If no session is
|
||||
// active, one is started using the first frame's timestamp.
|
||||
func (r *Recorder) writeBatch(batch []rawFrame) error {
|
||||
tx, err := r.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if r.sessionID == 0 {
|
||||
first := batch[0].t
|
||||
r.sessionName = first.Format("2006-01-02_15-04-05")
|
||||
res, err := tx.Stmt(r.insertSession).Exec(r.sessionName)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
}
|
||||
r.sessionID, _ = res.LastInsertId()
|
||||
r.startTime = first
|
||||
r.frameCount = 0
|
||||
slog.Info("recording started", "name", r.sessionName, "id", r.sessionID)
|
||||
r.hub.PublishEvent(map[string]any{
|
||||
"type": "event", "kind": "recording_started",
|
||||
"recording": map[string]any{"id": r.sessionID, "name": r.sessionName},
|
||||
})
|
||||
}
|
||||
|
||||
stmt := tx.Stmt(r.insertFrame)
|
||||
for _, f := range batch {
|
||||
payload := f.data
|
||||
codec := 0
|
||||
stats.Global.BytesBeforeCompr.Add(int64(len(payload)))
|
||||
if r.compress && r.enc != nil {
|
||||
payload = r.enc.EncodeAll(f.data, make([]byte, 0, len(f.data)/2+64))
|
||||
codec = 1
|
||||
}
|
||||
stats.Global.BytesAfterCompr.Add(int64(len(payload)))
|
||||
offset := f.t.Sub(r.startTime).Milliseconds()
|
||||
if _, err := stmt.Exec(r.sessionID, offset, payload, codec); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
}
|
||||
r.frameCount++
|
||||
r.lastFrameAt = f.t
|
||||
stats.Global.FramesWritten.Add(1)
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (r *Recorder) finalizeSession() {
|
||||
if r.sessionID == 0 {
|
||||
return
|
||||
}
|
||||
duration := r.lastFrameAt.Sub(r.startTime).Milliseconds()
|
||||
if _, err := r.updateSession.Exec(r.frameCount, duration, r.sessionID); err != nil {
|
||||
slog.Error("finalize session failed", "err", err)
|
||||
}
|
||||
slog.Info("recording stopped", "name", r.sessionName, "frames", r.frameCount, "duration_ms", duration)
|
||||
r.hub.PublishEvent(map[string]any{
|
||||
"type": "event", "kind": "recording_stopped",
|
||||
"recording": map[string]any{
|
||||
"id": r.sessionID, "name": r.sessionName,
|
||||
"frames": r.frameCount, "duration_ms": duration,
|
||||
},
|
||||
})
|
||||
r.sessionID = 0
|
||||
r.sessionName = ""
|
||||
r.frameCount = 0
|
||||
}
|
||||
|
||||
// CurrentSession returns a snapshot of the active recording, or zero values
|
||||
// if none. Reads are not strictly synchronized; slight staleness is acceptable
|
||||
// for the /api/stats endpoint.
|
||||
func (r *Recorder) CurrentSession() (id int64, name string, frames int, elapsedMs int64) {
|
||||
id = r.sessionID
|
||||
name = r.sessionName
|
||||
frames = r.frameCount
|
||||
if id != 0 {
|
||||
elapsedMs = time.Since(r.startTime).Milliseconds()
|
||||
}
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user