Files
bifrost/plugins/logging/writer.go
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

425 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package logging
import (
"sync"
"time"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/framework/logstore"
)
const (
// maxBatchSize is the maximum number of entries to collect before flushing
maxBatchSize = 1000
// batchInterval is the maximum time to wait before flushing a partial batch
batchInterval = 2 * time.Second
// maxBatchBytes is the approximate byte-size ceiling for a batch (100 MB).
// When the cumulative estimated size of queued entries hits this limit the
// batch is flushed immediately, even if maxBatchSize hasn't been reached.
maxBatchBytes = 100 * 1024 * 1024
// writeQueueCapacity is the buffer size for the write queue channel
writeQueueCapacity = 10000
// maxDeferredUsageConcurrency limits concurrent deferred usage DB updates
maxDeferredUsageConcurrency = 5
// pendingLogTTL is how long a pending log entry can stay in memory before cleanup
pendingLogTTL = 5 * time.Minute
)
// PendingLogData holds PreLLMHook input data until PostLLMHook fires.
// Stored in pendingLogs sync.Map keyed by requestID.
type PendingLogData struct {
RequestID string
ParentRequestID string
Timestamp time.Time
FallbackIndex int
Status string
RoutingEnginesUsed []string
InitialData *InitialLogData
CreatedAt time.Time // For cleanup of stale entries
}
// pendingInjectEntries wraps a slice of log entries so it can be used with sync.Map.
// The mutex protects concurrent appends to the entries slice within the same traceID.
type pendingInjectEntries struct {
mu sync.Mutex
entries []*logstore.Log
createdAt time.Time
}
// writeQueueEntry is an entry pushed to the batch write queue.
type writeQueueEntry struct {
log *logstore.Log // Complete log entry ready for INSERT
callback func(entry *logstore.Log) // Post-commit callback receives the inserted entry (no DB re-read needed)
}
// batchWriter is the single writer goroutine that drains the write queue
// and processes entries in batched transactions.
func (p *LoggerPlugin) batchWriter() {
defer p.wg.Done()
batch := make([]*writeQueueEntry, 0, maxBatchSize)
batchBytes := 0
timer := time.NewTimer(batchInterval)
timer.Stop()
timerRunning := false
flush := func() {
if timerRunning {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timerRunning = false
}
p.safeProcessBatch(batch)
clear(batch)
batch = batch[:0]
batchBytes = 0
}
for {
select {
case entry, ok := <-p.writeQueue:
if !ok {
// Channel closed - flush remaining batch and exit
p.safeProcessBatch(batch)
return
}
batch = append(batch, entry)
batchBytes += estimateLogEntrySize(entry.log)
if len(batch) >= maxBatchSize || batchBytes >= maxBatchBytes {
flush()
} else if !timerRunning {
timer.Reset(batchInterval)
timerRunning = true
}
case <-timer.C:
timerRunning = false
if len(batch) > 0 {
flush()
}
}
}
}
// safeProcessBatch wraps processBatch with panic recovery so a single
// bad entry cannot kill the batchWriter goroutine.
func (p *LoggerPlugin) safeProcessBatch(batch []*writeQueueEntry) {
defer func() {
if r := recover(); r != nil {
p.logger.Error("panic in batch writer processBatch (recovered, %d entries dropped): %v", len(batch), r)
p.droppedRequests.Add(int64(len(batch)))
}
}()
p.processBatch(batch)
}
// processBatch executes a batch of log entries in a single database transaction.
func (p *LoggerPlugin) processBatch(batch []*writeQueueEntry) {
if len(batch) == 0 {
return
}
// Collect all log entries for batch insert
logs := make([]*logstore.Log, 0, len(batch))
for _, entry := range batch {
if entry.log != nil {
logs = append(logs, entry.log)
}
}
if len(logs) > 0 {
if err := p.store.BatchCreateIfNotExists(p.ctx, logs); err != nil {
p.logger.Warn("batch insert failed for %d entries, falling back to individual inserts: %v", len(logs), err)
// Individual fallback — isolate the bad entry instead of losing the whole batch
for _, log := range logs {
if err := p.store.BatchCreateIfNotExists(p.ctx, []*logstore.Log{log}); err != nil {
p.logger.Warn("individual insert failed for log %s: %v", log.ID, err)
p.droppedRequests.Add(1)
}
}
}
}
// Collect callbacks that need to fire, then run them in a single goroutine.
// This avoids blocking the batch writer (synchronous was causing 1+ second stalls
// during WebSocket broadcast) without creating a goroutine per entry (which caused
// goroutine explosion to 13K+).
type cbPair struct {
cb func(*logstore.Log)
log *logstore.Log
}
var callbacks []cbPair
for _, entry := range batch {
if entry.callback != nil {
callbacks = append(callbacks, cbPair{cb: entry.callback, log: entry.log})
}
}
if len(callbacks) > 0 {
go func(callbacks []cbPair) {
defer func() {
if r := recover(); r != nil {
p.logger.Warn("log callback panicked: %v", r)
}
}()
for _, pair := range callbacks {
pair.cb(pair.log)
}
}(callbacks)
}
}
// cleanupStalePendingLogs removes entries from pendingLogs that have been
// waiting longer than pendingLogTTL. This handles cases where PostLLMHook
// never fires for a request (e.g., request was cancelled before reaching the provider).
func (p *LoggerPlugin) cleanupStalePendingLogs() {
cutoff := time.Now().Add(-pendingLogTTL)
p.pendingLogsEntries.Range(func(key, value any) bool {
if pending, ok := value.(*PendingLogData); ok {
if pending.CreatedAt.Before(cutoff) {
p.pendingLogsEntries.Delete(key)
}
}
return true
})
p.pendingLogsToInject.Range(func(key, value any) bool {
if pending, ok := value.(*pendingInjectEntries); ok {
if pending.createdAt.Before(cutoff) {
p.pendingLogsToInject.Delete(key)
}
}
return true
})
}
// enqueueLogEntry pushes a complete log entry to the write queue.
// If the queue is full, the entry is dropped to prevent Postgres slowness
// from cascading into request handling goroutines.
func (p *LoggerPlugin) enqueueLogEntry(entry *logstore.Log, callback func(entry *logstore.Log)) {
if p.closed.Load() {
return
}
defer func() {
if r := recover(); r != nil {
// Channel was closed between the check and send; entry is dropped
p.droppedRequests.Add(1)
}
}()
select {
case p.writeQueue <- &writeQueueEntry{log: entry, callback: callback}:
// enqueued successfully
default:
p.droppedRequests.Add(1)
p.logger.Warn("log write queue full, dropping log entry %s", entry.ID)
}
}
// estimateLogEntrySize returns a rough byte-size estimate for a log entry
// based on its serialized text fields. This is intentionally cheap — no
// marshaling, just string lengths — and is used to cap batch memory.
//
// NOTE: At enqueue time the string fields may still be empty (data lives in the
// Parsed struct fields until GORM's BeforeCreate hook serializes them), so this
// can undercount significantly. That is acceptable — the byte limit is a
// coarse safety valve, not a precise memory cap. Overshooting by 2× is fine;
// maxBatchSize is the primary batching control.
func estimateLogEntrySize(log *logstore.Log) int {
if log == nil {
return 0
}
// Sum the dominant text/blob fields. Fixed-width columns (IDs, timestamps,
// ints, bools) are negligible compared to these and covered by the 512-byte
// baseline below.
n := len(log.InputHistory) +
len(log.ResponsesInputHistory) +
len(log.OutputMessage) +
len(log.ResponsesOutput) +
len(log.EmbeddingOutput) +
len(log.RerankOutput) +
len(log.OCROutput) +
len(log.Params) +
len(log.Tools) +
len(log.ToolCalls) +
len(log.SpeechInput) +
len(log.SpeechOutput) +
len(log.TranscriptionInput) +
len(log.TranscriptionOutput) +
len(log.ImageGenerationInput) +
len(log.ImageGenerationOutput) +
len(log.VideoGenerationInput) +
len(log.VideoGenerationOutput) +
len(log.VideoRetrieveOutput) +
len(log.VideoDownloadOutput) +
len(log.VideoListOutput) +
len(log.VideoDeleteOutput) +
len(log.ListModelsOutput) +
len(log.TokenUsage) +
len(log.ErrorDetails) +
len(log.RawRequest) +
len(log.RawResponse) +
len(log.PassthroughRequestBody) +
len(log.PassthroughResponseBody) +
len(log.ContentSummary) +
len(log.CacheDebug) +
len(log.RoutingEngineLogs)
// Baseline for fixed-width columns and struct overhead
return n + 512
}
// buildInitialLogEntry constructs a logstore.Log from PendingLogData (input)
// without writing to the database. Used for the UI callback in PreLLMHook.
func buildInitialLogEntry(pending *PendingLogData) *logstore.Log {
entry := &logstore.Log{
ID: pending.RequestID,
Timestamp: pending.Timestamp,
Object: pending.InitialData.Object,
Provider: pending.InitialData.Provider,
Model: pending.InitialData.Model,
FallbackIndex: pending.FallbackIndex,
Status: "processing",
Stream: false,
CreatedAt: pending.Timestamp,
InputHistoryParsed: pending.InitialData.InputHistory,
ResponsesInputHistoryParsed: pending.InitialData.ResponsesInputHistory,
ParamsParsed: pending.InitialData.Params,
ToolsParsed: pending.InitialData.Tools,
PassthroughRequestBody: pending.InitialData.PassthroughRequestBody,
}
if pending.ParentRequestID != "" {
entry.ParentRequestID = &pending.ParentRequestID
}
if len(pending.RoutingEnginesUsed) > 0 {
entry.RoutingEnginesUsed = pending.RoutingEnginesUsed
}
return entry
}
// buildCompleteLogEntryFromPending constructs a logstore.Log with both input (from PendingLogData)
// and output fields fully populated. The caller provides a function to apply output-specific fields.
func buildCompleteLogEntryFromPending(pending *PendingLogData) *logstore.Log {
entry := &logstore.Log{
ID: pending.RequestID,
Timestamp: pending.Timestamp,
Object: pending.InitialData.Object,
Provider: pending.InitialData.Provider,
Model: pending.InitialData.Model,
FallbackIndex: pending.FallbackIndex,
Status: "success",
CreatedAt: pending.Timestamp,
// Set parsed fields for serialization via GORM hooks
InputHistoryParsed: pending.InitialData.InputHistory,
ResponsesInputHistoryParsed: pending.InitialData.ResponsesInputHistory,
ParamsParsed: pending.InitialData.Params,
ToolsParsed: pending.InitialData.Tools,
SpeechInputParsed: pending.InitialData.SpeechInput,
TranscriptionInputParsed: pending.InitialData.TranscriptionInput,
OCRInputParsed: pending.InitialData.OCRInput,
ImageGenerationInputParsed: pending.InitialData.ImageGenerationInput,
ImageEditInputParsed: pending.InitialData.ImageEditInput,
ImageVariationInputParsed: pending.InitialData.ImageVariationInput,
VideoGenerationInputParsed: pending.InitialData.VideoGenerationInput,
PassthroughRequestBody: pending.InitialData.PassthroughRequestBody,
}
if pending.ParentRequestID != "" {
entry.ParentRequestID = &pending.ParentRequestID
}
if len(pending.RoutingEnginesUsed) > 0 {
entry.RoutingEnginesUsed = pending.RoutingEnginesUsed
}
return entry
}
// applyModelAlias sets entry.Model to resolvedModel (falling back to requestedModel if empty)
// and entry.Alias to requestedModel when the two differ (i.e. an alias mapping was applied).
func applyModelAlias(entry *logstore.Log, requestedModel, resolvedModel string) {
if resolvedModel != "" && resolvedModel != requestedModel {
entry.Model = resolvedModel
entry.Alias = &requestedModel
} else {
// No alias mapping; keep whichever value is non-empty as the model.
if resolvedModel != "" {
entry.Model = resolvedModel
} else if requestedModel != "" {
entry.Model = requestedModel
}
entry.Alias = nil
}
}
// applyOutputFieldsToEntry sets common output fields on a log entry.
func applyOutputFieldsToEntry(
entry *logstore.Log,
selectedKeyID, selectedKeyName string,
virtualKeyID, virtualKeyName string,
routingRuleID, routingRuleName string,
selectedPromptID, selectedPromptName, selectedPromptVersion string,
teamID, teamName string,
customerID, customerName string,
userID, userName string,
businessUnitID, businessUnitName string,
numberOfRetries int,
latency int64,
attemptTrail []schemas.KeyAttemptRecord,
) {
entry.SelectedKeyID = selectedKeyID
entry.SelectedKeyName = selectedKeyName
if virtualKeyID != "" {
entry.VirtualKeyID = &virtualKeyID
}
if virtualKeyName != "" {
entry.VirtualKeyName = &virtualKeyName
}
if routingRuleID != "" {
entry.RoutingRuleID = &routingRuleID
}
if routingRuleName != "" {
entry.RoutingRuleName = &routingRuleName
}
if selectedPromptID != "" {
entry.SelectedPromptID = &selectedPromptID
}
if selectedPromptName != "" {
entry.SelectedPromptName = &selectedPromptName
}
if selectedPromptVersion != "" {
entry.SelectedPromptVersion = &selectedPromptVersion
}
if teamID != "" {
entry.TeamID = &teamID
}
if teamName != "" {
entry.TeamName = &teamName
}
if customerID != "" {
entry.CustomerID = &customerID
}
if customerName != "" {
entry.CustomerName = &customerName
}
if userID != "" {
entry.UserID = &userID
}
if userName != "" {
entry.UserName = &userName
}
if businessUnitID != "" {
entry.BusinessUnitID = &businessUnitID
}
if businessUnitName != "" {
entry.BusinessUnitName = &businessUnitName
}
if numberOfRetries != 0 {
entry.NumberOfRetries = numberOfRetries
}
if latency != 0 {
latF := float64(latency)
entry.Latency = &latF
}
if len(attemptTrail) > 0 {
entry.AttemptTrailParsed = attemptTrail
}
}