1294 lines
51 KiB
Go
1294 lines
51 KiB
Go
// Package logging provides a GORM-based logging plugin for Bifrost.
|
|
// This plugin stores comprehensive logs of all requests and responses with search,
|
|
// filter, and pagination capabilities.
|
|
package logging
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/bytedance/sonic"
|
|
bifrost "github.com/maximhq/bifrost/core"
|
|
"github.com/maximhq/bifrost/core/mcp"
|
|
"github.com/maximhq/bifrost/core/schemas"
|
|
"github.com/maximhq/bifrost/framework/configstore/tables"
|
|
"github.com/maximhq/bifrost/framework/logstore"
|
|
"github.com/maximhq/bifrost/framework/mcpcatalog"
|
|
"github.com/maximhq/bifrost/framework/modelcatalog"
|
|
"github.com/maximhq/bifrost/framework/streaming"
|
|
)
|
|
|
|
const (
|
|
PluginName = "logging"
|
|
)
|
|
|
|
// LogOperation represents the type of logging operation
|
|
type LogOperation string
|
|
|
|
const (
|
|
LogOperationCreate LogOperation = "create"
|
|
LogOperationUpdate LogOperation = "update"
|
|
LogOperationStreamUpdate LogOperation = "stream_update"
|
|
)
|
|
|
|
// UpdateLogData contains data for log entry updates
|
|
type UpdateLogData struct {
|
|
Status string
|
|
TokenUsage *schemas.BifrostLLMUsage
|
|
Cost *float64 // Cost in dollars from pricing plugin
|
|
ListModelsOutput []schemas.Model // For list models requests
|
|
ChatOutput *schemas.ChatMessage
|
|
ResponsesOutput []schemas.ResponsesMessage
|
|
EmbeddingOutput []schemas.EmbeddingData
|
|
RerankOutput []schemas.RerankResult
|
|
OCROutput *schemas.BifrostOCRResponse // For OCR responses
|
|
ErrorDetails *schemas.BifrostError
|
|
SpeechOutput *schemas.BifrostSpeechResponse // For non-streaming speech responses
|
|
TranscriptionOutput *schemas.BifrostTranscriptionResponse // For non-streaming transcription responses
|
|
ImageGenerationOutput *schemas.BifrostImageGenerationResponse // For non-streaming image generation responses
|
|
VideoGenerationOutput *schemas.BifrostVideoGenerationResponse // For non-streaming video generation responses
|
|
VideoRetrieveOutput *schemas.BifrostVideoGenerationResponse // For non-streaming video retrieve responses
|
|
VideoDownloadOutput *schemas.BifrostVideoDownloadResponse // For non-streaming video download responses
|
|
VideoListOutput *schemas.BifrostVideoListResponse // For non-streaming video list responses
|
|
VideoDeleteOutput *schemas.BifrostVideoDeleteResponse // For non-streaming video delete responses
|
|
RawRequest interface{}
|
|
RawResponse interface{}
|
|
IsLargePayloadRequest bool // When true, RawRequest is a truncated preview string (skip sonic.Marshal)
|
|
IsLargePayloadResponse bool // When true, RawResponse is a truncated preview string (skip sonic.Marshal)
|
|
}
|
|
|
|
// applyLargePayloadPreviews reads large payload/response preview strings from context
|
|
// and overrides RawRequest/RawResponse on updateData for truncated logging.
|
|
func applyLargePayloadPreviews(ctx *schemas.BifrostContext, updateData *UpdateLogData) {
|
|
if isLargePayload, ok := ctx.Value(schemas.BifrostContextKeyLargePayloadMode).(bool); ok && isLargePayload {
|
|
if preview, ok := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestPreview).(string); ok && preview != "" {
|
|
updateData.RawRequest = preview
|
|
updateData.IsLargePayloadRequest = true
|
|
}
|
|
}
|
|
if isLargeResponse, ok := ctx.Value(schemas.BifrostContextKeyLargeResponseMode).(bool); ok && isLargeResponse {
|
|
if preview, ok := ctx.Value(schemas.BifrostContextKeyLargePayloadResponsePreview).(string); ok && preview != "" {
|
|
updateData.RawResponse = preview
|
|
updateData.IsLargePayloadResponse = true
|
|
}
|
|
}
|
|
}
|
|
|
|
func applyLargePayloadPreviewsToEntry(ctx *schemas.BifrostContext, entry *logstore.Log, contentLoggingEnabled bool) {
|
|
if ctx == nil || entry == nil {
|
|
return
|
|
}
|
|
|
|
updateData := &UpdateLogData{}
|
|
applyLargePayloadPreviews(ctx, updateData)
|
|
shouldStoreRaw, _ := ctx.Value(schemas.BifrostContextKeyShouldStoreRawInLogs).(bool)
|
|
|
|
if updateData.IsLargePayloadRequest {
|
|
entry.IsLargePayloadRequest = true
|
|
if shouldStoreRaw && contentLoggingEnabled {
|
|
if preview, ok := updateData.RawRequest.(string); ok {
|
|
entry.RawRequest = preview
|
|
}
|
|
}
|
|
}
|
|
if updateData.IsLargePayloadResponse {
|
|
entry.IsLargePayloadResponse = true
|
|
if shouldStoreRaw && contentLoggingEnabled {
|
|
if preview, ok := updateData.RawResponse.(string); ok {
|
|
entry.RawResponse = preview
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// scheduleDeferredUsageUpdate schedules a deferred usage update for the request.
|
|
func (p *LoggerPlugin) scheduleDeferredUsageUpdate(ctx *schemas.BifrostContext, requestID string, usageAlreadyPresent bool) {
|
|
if usageAlreadyPresent || ctx == nil {
|
|
return
|
|
}
|
|
|
|
deferredChan, ok := ctx.Value(schemas.BifrostContextKeyDeferredUsage).(<-chan *schemas.BifrostLLMUsage)
|
|
if !ok || deferredChan == nil {
|
|
return
|
|
}
|
|
p.wg.Add(1)
|
|
go func() {
|
|
defer p.wg.Done()
|
|
// Large-response phase B closes this channel after trailing usage extraction completes.
|
|
deferredUsage, chanOpen := <-deferredChan
|
|
if !chanOpen || deferredUsage == nil {
|
|
return
|
|
}
|
|
|
|
// Acquire semaphore — drop if all slots busy to prevent unbounded goroutines
|
|
// from exhausting DB connections when Postgres is slow
|
|
select {
|
|
case p.deferredUsageSem <- struct{}{}:
|
|
defer func() { <-p.deferredUsageSem }()
|
|
default:
|
|
p.logger.Warn("deferred usage update dropped for request %s: semaphore full", requestID)
|
|
return
|
|
}
|
|
usageUpdates := map[string]interface{}{
|
|
"prompt_tokens": deferredUsage.PromptTokens,
|
|
"completion_tokens": deferredUsage.CompletionTokens,
|
|
"total_tokens": deferredUsage.TotalTokens,
|
|
}
|
|
tempEntry := &logstore.Log{TokenUsageParsed: deferredUsage}
|
|
if serErr := tempEntry.SerializeFields(); serErr == nil {
|
|
usageUpdates["token_usage"] = tempEntry.TokenUsage
|
|
usageUpdates["cached_read_tokens"] = tempEntry.CachedReadTokens
|
|
}
|
|
|
|
// Check if log entry present in the store
|
|
// exponential backoff with jitter and 3 retries
|
|
// then fail
|
|
var found bool
|
|
var findErr error
|
|
for i := 0; i < 3; i++ {
|
|
found, findErr = p.store.IsLogEntryPresent(p.ctx, requestID)
|
|
if findErr != nil {
|
|
p.logger.Warn("failed to check if log entry is present for request %s: %v", requestID, findErr)
|
|
continue
|
|
}
|
|
if found {
|
|
break
|
|
}
|
|
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second * 2)
|
|
}
|
|
if !found {
|
|
p.logger.Warn("log entry not found for request %s after 3 retries. failed to update deferred usage for large payload request", requestID)
|
|
return
|
|
}
|
|
if updErr := p.store.Update(p.ctx, requestID, usageUpdates); updErr != nil {
|
|
p.logger.Warn("failed to update deferred usage for request %s: %v", requestID, updErr)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// RecalculateCostResult represents summary stats from a cost backfill operation
|
|
type RecalculateCostResult struct {
|
|
TotalMatched int64 `json:"total_matched"`
|
|
Updated int `json:"updated"`
|
|
Skipped int `json:"skipped"`
|
|
Remaining int64 `json:"remaining"`
|
|
}
|
|
|
|
// LogMessage represents a message in the logging queue
|
|
type LogMessage struct {
|
|
Operation LogOperation
|
|
RequestID string // Unique ID for the request
|
|
ParentRequestID string // Unique ID for the parent request (used for fallback requests)
|
|
NumberOfRetries int // Number of retries
|
|
FallbackIndex int // Fallback index
|
|
SelectedKeyID string // Selected key ID
|
|
SelectedKeyName string // Selected key name
|
|
AttemptTrail []schemas.KeyAttemptRecord // Per-attempt key selection history
|
|
VirtualKeyID string // Virtual key ID
|
|
VirtualKeyName string // Virtual key name
|
|
RoutingEnginesUsed []string // List of routing engines used
|
|
RoutingRuleID string // Routing rule ID
|
|
RoutingRuleName string // Routing rule name
|
|
Timestamp time.Time // Of the preHook/postHook call
|
|
Latency int64 // For latency updates
|
|
InitialData *InitialLogData // For create operations
|
|
SemanticCacheDebug *schemas.BifrostCacheDebug // For semantic cache operations
|
|
UpdateData *UpdateLogData // For update operations
|
|
StreamResponse *streaming.ProcessedStreamResponse // For streaming delta updates
|
|
RoutingEngineLogs string // Formatted routing engine decision logs
|
|
}
|
|
|
|
// InitialLogData contains data for initial log entry creation
|
|
type InitialLogData struct {
|
|
Status string
|
|
Provider string
|
|
Model string
|
|
Object string
|
|
InputHistory []schemas.ChatMessage
|
|
ResponsesInputHistory []schemas.ResponsesMessage
|
|
Params any
|
|
SpeechInput *schemas.SpeechInput
|
|
TranscriptionInput *schemas.TranscriptionInput
|
|
OCRInput *schemas.OCRDocument
|
|
ImageGenerationInput *schemas.ImageGenerationInput
|
|
ImageEditInput *schemas.ImageEditInput
|
|
ImageVariationInput *schemas.ImageVariationInput
|
|
VideoGenerationInput *schemas.VideoGenerationInput
|
|
Tools []schemas.ChatTool
|
|
RoutingEngineUsed []string
|
|
Metadata map[string]any
|
|
PassthroughRequestBody string // Raw body for passthrough requests (UTF-8)
|
|
}
|
|
|
|
// LogCallback is a function that gets called when a new log entry is created
|
|
type LogCallback func(ctx context.Context, logEntry *logstore.Log)
|
|
|
|
// MCPToolLogCallback is a function that gets called when a new MCP tool log entry is created or updated
|
|
type MCPToolLogCallback func(*logstore.MCPToolLog)
|
|
|
|
type Config struct {
|
|
DisableContentLogging *bool `json:"disable_content_logging"`
|
|
LoggingHeaders *[]string `json:"logging_headers"` // Pointer to live config slice; changes are reflected immediately without restart
|
|
}
|
|
|
|
// LoggerPlugin implements the schemas.LLMPlugin and schemas.MCPPlugin interfaces
|
|
type LoggerPlugin struct {
|
|
ctx context.Context
|
|
store logstore.LogStore
|
|
disableContentLogging *bool
|
|
loggingHeaders *[]string // Pointer to live config slice for headers to capture in metadata
|
|
pricingManager *modelcatalog.ModelCatalog
|
|
mcpCatalog *mcpcatalog.MCPCatalog // MCP catalog for tool cost calculation
|
|
mu sync.Mutex
|
|
done chan struct{}
|
|
cleanupOnce sync.Once // Ensures cleanup only runs once
|
|
wg sync.WaitGroup
|
|
logger schemas.Logger
|
|
logCallback LogCallback
|
|
mcpToolLogCallback MCPToolLogCallback // Callback for MCP tool log entries
|
|
droppedRequests atomic.Int64
|
|
cleanupTicker *time.Ticker // Ticker for cleaning up old processing logs
|
|
logMsgPool sync.Pool // Pool for reusing LogMessage structs
|
|
updateDataPool sync.Pool // Pool for reusing UpdateLogData structs
|
|
pendingLogsEntries sync.Map // Maps requestID -> *PendingLogData (PreLLMHook input data awaiting PostLLMHook)
|
|
pendingLogsToInject sync.Map // Maps traceID -> *pendingInjectEntries (log entries to inject, supports multiple per trace)
|
|
writeQueue chan *writeQueueEntry // Buffered channel for batch write queue
|
|
closed atomic.Bool // Set during cleanup to prevent sends on closed writeQueue
|
|
deferredUsageSem chan struct{} // Limits concurrent deferred usage DB updates
|
|
}
|
|
|
|
// Init creates new logger plugin with given log store
|
|
func Init(ctx context.Context, config *Config, logger schemas.Logger, logsStore logstore.LogStore, pricingManager *modelcatalog.ModelCatalog, mcpCatalog *mcpcatalog.MCPCatalog) (*LoggerPlugin, error) {
|
|
if config == nil {
|
|
return nil, fmt.Errorf("config is required")
|
|
}
|
|
if logsStore == nil {
|
|
return nil, fmt.Errorf("logs store cannot be nil")
|
|
}
|
|
if pricingManager == nil {
|
|
logger.Warn("logging plugin requires model catalog to calculate cost, all LLM cost calculations will be skipped.")
|
|
}
|
|
if mcpCatalog == nil {
|
|
logger.Warn("logging plugin requires MCP catalog to calculate cost, all MCP cost calculations will be skipped.")
|
|
}
|
|
|
|
plugin := &LoggerPlugin{
|
|
ctx: ctx,
|
|
store: logsStore,
|
|
pricingManager: pricingManager,
|
|
mcpCatalog: mcpCatalog,
|
|
disableContentLogging: config.DisableContentLogging,
|
|
loggingHeaders: config.LoggingHeaders,
|
|
done: make(chan struct{}),
|
|
logger: logger,
|
|
writeQueue: make(chan *writeQueueEntry, writeQueueCapacity),
|
|
deferredUsageSem: make(chan struct{}, maxDeferredUsageConcurrency),
|
|
logMsgPool: sync.Pool{
|
|
New: func() any {
|
|
return &LogMessage{}
|
|
},
|
|
},
|
|
updateDataPool: sync.Pool{
|
|
New: func() any {
|
|
return &UpdateLogData{}
|
|
},
|
|
},
|
|
}
|
|
|
|
// Prewarm the pools for better performance at startup
|
|
for range 1000 {
|
|
plugin.logMsgPool.Put(&LogMessage{})
|
|
plugin.updateDataPool.Put(&UpdateLogData{})
|
|
}
|
|
|
|
// Start cleanup ticker (runs every 1 minute)
|
|
plugin.cleanupTicker = time.NewTicker(1 * time.Minute)
|
|
plugin.wg.Add(1)
|
|
go plugin.cleanupWorker()
|
|
|
|
// Start the batch writer goroutine (single writer for all DB writes)
|
|
plugin.wg.Add(1)
|
|
go plugin.batchWriter()
|
|
|
|
return plugin, nil
|
|
}
|
|
|
|
// cleanupWorker periodically removes old processing logs
|
|
func (p *LoggerPlugin) cleanupWorker() {
|
|
defer p.wg.Done()
|
|
for {
|
|
select {
|
|
case <-p.cleanupTicker.C:
|
|
p.cleanupOldProcessingLogs()
|
|
case <-p.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupOldProcessingLogs removes processing logs older than 30 minutes
|
|
// and stale pending log entries from the in-memory map
|
|
func (p *LoggerPlugin) cleanupOldProcessingLogs() {
|
|
// Calculate timestamp for 30 minutes ago in UTC to match log entry timestamps
|
|
thirtyMinutesAgo := time.Now().UTC().Add(-1 * 30 * time.Minute)
|
|
|
|
// Delete LLM processing logs older than 30 minutes
|
|
if err := p.store.Flush(p.ctx, thirtyMinutesAgo); err != nil {
|
|
p.logger.Warn("failed to cleanup old processing LLM logs: %v", err)
|
|
}
|
|
|
|
// Delete MCP tool processing logs older than 30 minutes
|
|
if err := p.store.FlushMCPToolLogs(p.ctx, thirtyMinutesAgo); err != nil {
|
|
p.logger.Warn("failed to cleanup old processing MCP tool logs: %v", err)
|
|
}
|
|
|
|
// Clean up stale pending log entries (requests where PostLLMHook never fired)
|
|
p.cleanupStalePendingLogs()
|
|
}
|
|
|
|
// SetLogCallback sets a callback function that will be called for each log entry
|
|
func (p *LoggerPlugin) SetLogCallback(callback LogCallback) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.logCallback = callback
|
|
}
|
|
|
|
// GetName returns the name of the plugin
|
|
func (p *LoggerPlugin) GetName() string {
|
|
return PluginName
|
|
}
|
|
|
|
// HTTPTransportPreHook is not used for this plugin
|
|
func (p *LoggerPlugin) HTTPTransportPreHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest) (*schemas.HTTPResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// HTTPTransportPostHook is not used for this plugin
|
|
func (p *LoggerPlugin) HTTPTransportPostHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest, resp *schemas.HTTPResponse) error {
|
|
return nil
|
|
}
|
|
|
|
// HTTPTransportStreamChunkHook passes through streaming chunks unchanged
|
|
func (p *LoggerPlugin) HTTPTransportStreamChunkHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest, chunk *schemas.BifrostStreamChunk) (*schemas.BifrostStreamChunk, error) {
|
|
return chunk, nil
|
|
}
|
|
|
|
// captureLoggingHeaders extracts configured logging headers and x-bf-lh-* prefixed headers
|
|
// from the request context. Returns a new metadata map, or nil if no headers were captured.
|
|
// System entries (e.g. isAsyncRequest) should be set AFTER calling this so they take precedence.
|
|
func (p *LoggerPlugin) captureLoggingHeaders(ctx *schemas.BifrostContext) map[string]interface{} {
|
|
allHeaders, _ := ctx.Value(schemas.BifrostContextKeyRequestHeaders).(map[string]string)
|
|
if allHeaders == nil {
|
|
return nil
|
|
}
|
|
|
|
var metadata map[string]interface{}
|
|
|
|
// Check configured logging headers
|
|
if p.loggingHeaders != nil {
|
|
for _, h := range *p.loggingHeaders {
|
|
key := strings.ToLower(h)
|
|
if val, ok := allHeaders[key]; ok {
|
|
if metadata == nil {
|
|
metadata = make(map[string]interface{})
|
|
}
|
|
metadata[key] = val
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check x-bf-lh-* prefixed headers
|
|
for key, val := range allHeaders {
|
|
if labelName, ok := strings.CutPrefix(key, "x-bf-lh-"); ok && labelName != "" {
|
|
if metadata == nil {
|
|
metadata = make(map[string]interface{})
|
|
}
|
|
metadata[labelName] = val
|
|
}
|
|
}
|
|
|
|
return metadata
|
|
}
|
|
|
|
// PreLLMHook is called before a request is processed - FULLY ASYNC, NO DATABASE I/O
|
|
// Parameters:
|
|
// - ctx: The Bifrost context
|
|
// - req: The Bifrost request
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostRequest: The processed request
|
|
// - *schemas.LLMPluginShortCircuit: The plugin short circuit if the request is not allowed
|
|
// - error: Any error that occurred during processing
|
|
func (p *LoggerPlugin) PreLLMHook(ctx *schemas.BifrostContext, req *schemas.BifrostRequest) (*schemas.BifrostRequest, *schemas.LLMPluginShortCircuit, error) {
|
|
if ctx == nil {
|
|
// Log error but don't fail the request
|
|
p.logger.Error("context is nil in PreLLMHook")
|
|
return req, nil, nil
|
|
}
|
|
|
|
// Extract request ID from context
|
|
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
|
|
if !ok || requestID == "" {
|
|
// Log error but don't fail the request
|
|
p.logger.Error("request-id not found in context or is empty")
|
|
return req, nil, nil
|
|
}
|
|
|
|
createdTimestamp := time.Now().UTC()
|
|
|
|
// If request type is streaming we create a stream accumulator via the tracer
|
|
// Skip for passthrough streams — they carry raw bytes, not LLM response chunks
|
|
if bifrost.IsStreamRequestType(req.RequestType) && req.RequestType != schemas.PassthroughStreamRequest {
|
|
tracer, traceID, err := bifrost.GetTracerFromContext(ctx)
|
|
if err == nil && tracer != nil && traceID != "" {
|
|
tracer.CreateStreamAccumulator(traceID, createdTimestamp)
|
|
}
|
|
}
|
|
|
|
provider, model, _ := req.GetRequestFields()
|
|
|
|
initialData := &InitialLogData{
|
|
Provider: string(provider),
|
|
Model: model,
|
|
Object: string(req.RequestType),
|
|
}
|
|
if req.RequestType == schemas.RealtimeRequest {
|
|
initialData.Object = "realtime.turn"
|
|
}
|
|
|
|
if p.disableContentLogging == nil || !*p.disableContentLogging {
|
|
inputHistory, responsesInputHistory := p.extractInputHistory(req)
|
|
initialData.InputHistory = inputHistory
|
|
initialData.ResponsesInputHistory = responsesInputHistory
|
|
|
|
switch req.RequestType {
|
|
case schemas.TextCompletionRequest, schemas.TextCompletionStreamRequest:
|
|
initialData.Params = req.TextCompletionRequest.Params
|
|
case schemas.ChatCompletionRequest, schemas.ChatCompletionStreamRequest:
|
|
initialData.Params = req.ChatRequest.Params
|
|
initialData.Tools = req.ChatRequest.Params.Tools
|
|
case schemas.ResponsesRequest, schemas.ResponsesStreamRequest, schemas.WebSocketResponsesRequest:
|
|
initialData.Params = req.ResponsesRequest.Params
|
|
|
|
var tools []schemas.ChatTool
|
|
for _, tool := range req.ResponsesRequest.Params.Tools {
|
|
tools = append(tools, *tool.ToChatTool())
|
|
}
|
|
initialData.Tools = tools
|
|
case schemas.RealtimeRequest:
|
|
if req.ResponsesRequest != nil {
|
|
initialData.Params = req.ResponsesRequest.Params
|
|
}
|
|
case schemas.EmbeddingRequest:
|
|
initialData.Params = req.EmbeddingRequest.Params
|
|
case schemas.RerankRequest:
|
|
initialData.Params = req.RerankRequest.Params
|
|
case schemas.OCRRequest:
|
|
initialData.Params = req.OCRRequest.Params
|
|
initialData.OCRInput = &req.OCRRequest.Document
|
|
case schemas.SpeechRequest, schemas.SpeechStreamRequest:
|
|
initialData.Params = req.SpeechRequest.Params
|
|
initialData.SpeechInput = req.SpeechRequest.Input
|
|
case schemas.TranscriptionRequest, schemas.TranscriptionStreamRequest:
|
|
initialData.Params = req.TranscriptionRequest.Params
|
|
input := req.TranscriptionRequest.Input
|
|
if input != nil {
|
|
reqThreshold, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestThreshold).(int64)
|
|
if reqThreshold > 0 && int64(len(input.File)) > reqThreshold {
|
|
// Strip binary file content when it exceeds the large payload threshold
|
|
// to avoid serializing multi-MB audio into the log database.
|
|
logInput := *input
|
|
logInput.File = nil
|
|
initialData.TranscriptionInput = &logInput
|
|
} else {
|
|
initialData.TranscriptionInput = input
|
|
}
|
|
}
|
|
case schemas.ImageGenerationRequest, schemas.ImageGenerationStreamRequest:
|
|
initialData.Params = req.ImageGenerationRequest.Params
|
|
initialData.ImageGenerationInput = req.ImageGenerationRequest.Input
|
|
case schemas.ImageEditRequest, schemas.ImageEditStreamRequest:
|
|
params := req.ImageEditRequest.Params
|
|
input := req.ImageEditRequest.Input
|
|
if input != nil {
|
|
reqThreshold, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestThreshold).(int64)
|
|
if reqThreshold > 0 {
|
|
var totalSize int64
|
|
for _, img := range input.Images {
|
|
totalSize += int64(len(img.Image))
|
|
}
|
|
if totalSize > reqThreshold {
|
|
logInput := *input
|
|
logInput.Images = nil
|
|
initialData.ImageEditInput = &logInput
|
|
} else {
|
|
initialData.ImageEditInput = input
|
|
}
|
|
if params != nil && int64(len(params.Mask)) > reqThreshold {
|
|
logParams := *params
|
|
logParams.Mask = nil
|
|
initialData.Params = &logParams
|
|
} else {
|
|
initialData.Params = params
|
|
}
|
|
} else {
|
|
initialData.ImageEditInput = input
|
|
initialData.Params = params
|
|
}
|
|
} else {
|
|
initialData.Params = params
|
|
}
|
|
case schemas.ImageVariationRequest:
|
|
initialData.Params = req.ImageVariationRequest.Params
|
|
input := req.ImageVariationRequest.Input
|
|
if input != nil {
|
|
reqThreshold, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestThreshold).(int64)
|
|
if reqThreshold > 0 && int64(len(input.Image.Image)) > reqThreshold {
|
|
logInput := *input
|
|
logInput.Image = schemas.ImageInput{}
|
|
initialData.ImageVariationInput = &logInput
|
|
} else {
|
|
initialData.ImageVariationInput = input
|
|
}
|
|
}
|
|
case schemas.VideoGenerationRequest:
|
|
initialData.Params = req.VideoGenerationRequest.Params
|
|
initialData.VideoGenerationInput = req.VideoGenerationRequest.Input
|
|
case schemas.VideoRemixRequest:
|
|
initialData.Params = &schemas.VideoLogParams{
|
|
VideoID: req.VideoRemixRequest.ID,
|
|
}
|
|
initialData.VideoGenerationInput = req.VideoRemixRequest.Input
|
|
case schemas.VideoRetrieveRequest:
|
|
initialData.Params = &schemas.VideoLogParams{
|
|
VideoID: req.VideoRetrieveRequest.ID,
|
|
}
|
|
case schemas.VideoDownloadRequest:
|
|
initialData.Params = &schemas.VideoLogParams{
|
|
VideoID: req.VideoDownloadRequest.ID,
|
|
}
|
|
case schemas.VideoDeleteRequest:
|
|
initialData.Params = &schemas.VideoLogParams{
|
|
VideoID: req.VideoDeleteRequest.ID,
|
|
}
|
|
case schemas.PassthroughRequest, schemas.PassthroughStreamRequest:
|
|
initialData.Params = &schemas.PassthroughLogParams{
|
|
Method: req.PassthroughRequest.Method,
|
|
Path: req.PassthroughRequest.Path,
|
|
RawQuery: req.PassthroughRequest.RawQuery,
|
|
}
|
|
if len(req.PassthroughRequest.Body) > 0 {
|
|
ct := strings.ToLower(req.PassthroughRequest.SafeHeaders["content-type"])
|
|
if strings.Contains(ct, "application/json") {
|
|
initialData.PassthroughRequestBody = string(req.PassthroughRequest.Body)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Capture configured logging headers and x-bf-lh-* headers into metadata first
|
|
initialData.Metadata = mergeRealtimeMetadata(p.captureLoggingHeaders(ctx), ctx)
|
|
|
|
// System entries are set after so they take precedence over dynamic header values
|
|
if isAsync, ok := ctx.Value(schemas.BifrostIsAsyncRequest).(bool); ok && isAsync {
|
|
if initialData.Metadata == nil {
|
|
initialData.Metadata = make(map[string]interface{})
|
|
}
|
|
initialData.Metadata["isAsyncRequest"] = true
|
|
}
|
|
|
|
// If fallback request ID is present, use it instead of the primary request ID
|
|
// Determine effective request ID (fallback override)
|
|
effectiveRequestID := requestID
|
|
var parentRequestID string
|
|
if directParentRequestID, ok := ctx.Value(schemas.BifrostContextKeyParentRequestID).(string); ok && directParentRequestID != "" {
|
|
parentRequestID = directParentRequestID
|
|
}
|
|
fallbackRequestID, ok := ctx.Value(schemas.BifrostContextKeyFallbackRequestID).(string)
|
|
if ok && fallbackRequestID != "" {
|
|
effectiveRequestID = fallbackRequestID
|
|
if parentRequestID == "" {
|
|
parentRequestID = requestID
|
|
}
|
|
}
|
|
|
|
fallbackIndex := bifrost.GetIntFromContext(ctx, schemas.BifrostContextKeyFallbackIndex)
|
|
// Get routing engines array
|
|
routingEngines := []string{}
|
|
if engines, ok := ctx.Value(schemas.BifrostContextKeyRoutingEnginesUsed).([]string); ok {
|
|
routingEngines = engines
|
|
}
|
|
|
|
initialData.RoutingEngineUsed = routingEngines
|
|
initialData.Status = "processing"
|
|
|
|
// Store input data in pendingLogs for later combination with PostLLMHook output.
|
|
// No DB write here - the write is deferred to PostLLMHook to halve total writes.
|
|
pending := &PendingLogData{
|
|
RequestID: effectiveRequestID,
|
|
ParentRequestID: parentRequestID,
|
|
Timestamp: createdTimestamp,
|
|
FallbackIndex: fallbackIndex,
|
|
RoutingEnginesUsed: routingEngines,
|
|
InitialData: initialData,
|
|
CreatedAt: time.Now(),
|
|
Status: "processing",
|
|
}
|
|
p.pendingLogsEntries.Store(effectiveRequestID, pending)
|
|
// Call callback synchronously for immediate UI feedback (WebSocket "processing" notification).
|
|
// The entry does not exist in the DB yet - it will be written when PostLLMHook fires.
|
|
p.mu.Lock()
|
|
callback := p.logCallback
|
|
p.mu.Unlock()
|
|
if callback != nil {
|
|
callback(p.ctx, buildInitialLogEntry(pending))
|
|
}
|
|
return req, nil, nil
|
|
}
|
|
|
|
// PostLLMHook is called after a response is received - FULLY ASYNC, NO DATABASE I/O
|
|
// Parameters:
|
|
// - ctx: The Bifrost context
|
|
// - result: The Bifrost response to be processed
|
|
// - bifrostErr: The Bifrost error to be processed
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostResponse: The processed response
|
|
// - *schemas.BifrostError: The processed error
|
|
// - error: Any error that occurred during processing
|
|
func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) {
|
|
if ctx == nil {
|
|
// Log error but don't fail the request
|
|
p.logger.Error("context is nil in PostLLMHook")
|
|
return result, bifrostErr, nil
|
|
}
|
|
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
|
|
if !ok || requestID == "" {
|
|
p.logger.Error("request-id not found in context or is empty")
|
|
return result, bifrostErr, nil
|
|
}
|
|
// If fallback request ID is present, use it instead of the primary request ID
|
|
fallbackRequestID, ok := ctx.Value(schemas.BifrostContextKeyFallbackRequestID).(string)
|
|
if ok && fallbackRequestID != "" {
|
|
requestID = fallbackRequestID
|
|
}
|
|
selectedKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedKeyID)
|
|
selectedKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedKeyName)
|
|
virtualKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyID)
|
|
virtualKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyName)
|
|
routingRuleID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceRoutingRuleID)
|
|
routingRuleName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceRoutingRuleName)
|
|
selectedPromptName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedPromptName)
|
|
selectedPromptVersion := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedPromptVersion)
|
|
selectedPromptID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedPromptID)
|
|
teamID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceTeamID)
|
|
teamName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceTeamName)
|
|
customerID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceCustomerID)
|
|
customerName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceCustomerName)
|
|
userID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyUserID)
|
|
userName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyUserName)
|
|
businessUnitID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceBusinessUnitID)
|
|
businessUnitName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceBusinessUnitName)
|
|
numberOfRetries := bifrost.GetIntFromContext(ctx, schemas.BifrostContextKeyNumberOfRetries)
|
|
attemptTrail, _ := ctx.Value(schemas.BifrostContextKeyAttemptTrail).([]schemas.KeyAttemptRecord)
|
|
|
|
requestType, _, originalModelRequested, resolvedModelUsed := bifrost.GetResponseFields(result, bifrostErr)
|
|
shouldStoreRaw, _ := ctx.Value(schemas.BifrostContextKeyShouldStoreRawInLogs).(bool)
|
|
contentLoggingEnabled := p.disableContentLogging == nil || !*p.disableContentLogging
|
|
|
|
isFinalChunk := bifrost.IsFinalChunk(ctx)
|
|
|
|
var tracer schemas.Tracer
|
|
var traceID string
|
|
if bifrost.IsStreamRequestType(requestType) && requestType != schemas.PassthroughStreamRequest && requestType != schemas.RealtimeRequest {
|
|
var err error
|
|
tracer, traceID, err = bifrost.GetTracerFromContext(ctx)
|
|
if err != nil {
|
|
p.logger.Debug("tracer not available in logging plugin posthook: %v", err)
|
|
// Continue with nil tracer — the rest of the code handles this gracefully
|
|
// via `if tracer != nil && traceID != ""` guards
|
|
}
|
|
}
|
|
|
|
// For non-final streaming chunks, process the accumulator synchronously
|
|
// and skip the write queue entirely. The accumulator work (ProcessStreamingChunk)
|
|
// is fast (mutex + append). Only final chunks, errors, and non-streaming
|
|
// responses need a DB write.
|
|
if bifrost.IsStreamRequestType(requestType) && requestType != schemas.PassthroughStreamRequest && requestType != schemas.RealtimeRequest && !isFinalChunk && result != nil && bifrostErr == nil {
|
|
if tracer != nil && traceID != "" {
|
|
tracer.ProcessStreamingChunk(traceID, false, result, bifrostErr)
|
|
}
|
|
return result, bifrostErr, nil
|
|
}
|
|
// Extract routing engine logs from context before entering goroutine
|
|
routingEngineLogs := formatRoutingEngineLogs(ctx.GetRoutingEngineLogs())
|
|
|
|
// Retrieve pending input data from PreLLMHook
|
|
pendingVal, hasPending := p.pendingLogsEntries.LoadAndDelete(requestID)
|
|
if !hasPending {
|
|
// If we have an error (e.g., cancellation/timeout), still write a minimal error entry
|
|
// so the error is visible in logs. Without PreLLMHook's DB insert, silently returning
|
|
// here means the error is completely lost.
|
|
if bifrostErr != nil {
|
|
p.logger.Warn("no pending log data found for request %s, writing minimal error entry", requestID)
|
|
entry := &logstore.Log{
|
|
ID: requestID,
|
|
Provider: string(bifrostErr.ExtraFields.Provider),
|
|
Status: "error",
|
|
Stream: bifrost.IsStreamRequestType(requestType),
|
|
Timestamp: time.Now().UTC(),
|
|
CreatedAt: time.Now().UTC(),
|
|
}
|
|
applyModelAlias(entry, bifrostErr.ExtraFields.OriginalModelRequested, bifrostErr.ExtraFields.ResolvedModelUsed)
|
|
if data, err := sonic.Marshal(bifrostErr); err == nil {
|
|
entry.ErrorDetails = string(data)
|
|
}
|
|
entry.ErrorDetailsParsed = bifrostErr
|
|
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
|
|
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
|
|
} else {
|
|
p.logger.Warn("no pending log data found for request %s, skipping log write", requestID)
|
|
}
|
|
return result, bifrostErr, nil
|
|
}
|
|
|
|
pending := pendingVal.(*PendingLogData)
|
|
if requestType == schemas.RealtimeRequest {
|
|
if resolvedRealtimeSessionID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyRealtimeSessionID); resolvedRealtimeSessionID != "" {
|
|
pending.ParentRequestID = resolvedRealtimeSessionID
|
|
}
|
|
}
|
|
|
|
// Build the complete log entry with input (from PreLLMHook) + output (from PostLLMHook)
|
|
entry := buildCompleteLogEntryFromPending(pending)
|
|
// Apply common output fields
|
|
var latency int64
|
|
if result != nil {
|
|
latency = result.GetExtraFields().Latency
|
|
}
|
|
applyOutputFieldsToEntry(entry, selectedKeyID, selectedKeyName, virtualKeyID, virtualKeyName, routingRuleID, routingRuleName, selectedPromptID, selectedPromptName, selectedPromptVersion, teamID, teamName, customerID, customerName, userID, userName, businessUnitID, businessUnitName, numberOfRetries, latency, attemptTrail)
|
|
entry.MetadataParsed = pending.InitialData.Metadata
|
|
entry.MetadataParsed = mergeRealtimeMetadata(entry.MetadataParsed, ctx)
|
|
entry.RoutingEngineLogs = routingEngineLogs
|
|
|
|
// Branch based on response type to populate output-specific fields
|
|
|
|
// Path A: Error with nil result
|
|
if result == nil && bifrostErr != nil {
|
|
entry.Status = "error"
|
|
applyModelAlias(entry, bifrostErr.ExtraFields.OriginalModelRequested, bifrostErr.ExtraFields.ResolvedModelUsed)
|
|
if bifrost.IsStreamRequestType(requestType) {
|
|
entry.Stream = true
|
|
}
|
|
|
|
// For streaming errors, finalize and read accumulated chunks so logs retain pre-error stream metadata
|
|
if bifrost.IsStreamRequestType(requestType) &&
|
|
requestType != schemas.PassthroughStreamRequest &&
|
|
requestType != schemas.RealtimeRequest &&
|
|
tracer != nil &&
|
|
traceID != "" {
|
|
if accResult := tracer.ProcessStreamingChunk(traceID, true, result, bifrostErr); accResult != nil {
|
|
if streamResponse := convertToProcessedStreamResponse(accResult, requestType); streamResponse != nil {
|
|
p.applyStreamingOutputToEntry(entry, streamResponse, shouldStoreRaw)
|
|
}
|
|
}
|
|
tracer.CleanupStreamAccumulator(traceID)
|
|
}
|
|
|
|
// Serialize error details immediately since bifrostErr may be released
|
|
// back to the pool before the async batch writer processes this entry.
|
|
// Also set ErrorDetailsParsed for UI callback (JSON serialization uses this field).
|
|
if data, err := sonic.Marshal(bifrostErr); err == nil {
|
|
entry.ErrorDetails = string(data)
|
|
}
|
|
entry.ErrorDetailsParsed = bifrostErr
|
|
if shouldStoreRaw && contentLoggingEnabled {
|
|
if bifrostErr.ExtraFields.RawRequest != nil {
|
|
rawReqBytes, err := sonic.Marshal(bifrostErr.ExtraFields.RawRequest)
|
|
if err == nil {
|
|
entry.RawRequest = string(rawReqBytes)
|
|
}
|
|
}
|
|
|
|
if entry.RawResponse == "" && bifrostErr.ExtraFields.RawResponse != nil {
|
|
rawRespBytes, err := sonic.Marshal(bifrostErr.ExtraFields.RawResponse)
|
|
if err == nil {
|
|
entry.RawResponse = string(rawRespBytes)
|
|
}
|
|
}
|
|
}
|
|
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
|
|
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
|
|
p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil)
|
|
return result, bifrostErr, nil
|
|
}
|
|
|
|
// Path B: Streaming final chunk
|
|
if bifrost.IsStreamRequestType(requestType) && requestType != schemas.RealtimeRequest {
|
|
var streamResponse *streaming.ProcessedStreamResponse
|
|
if requestType != schemas.PassthroughStreamRequest && tracer != nil && traceID != "" {
|
|
accResult := tracer.ProcessStreamingChunk(traceID, isFinalChunk, result, bifrostErr)
|
|
if accResult != nil {
|
|
streamResponse = convertToProcessedStreamResponse(accResult, requestType)
|
|
}
|
|
}
|
|
|
|
if bifrostErr != nil {
|
|
entry.Status = "error"
|
|
entry.Stream = true
|
|
applyModelAlias(entry, originalModelRequested, resolvedModelUsed)
|
|
if data, err := sonic.Marshal(bifrostErr); err == nil {
|
|
entry.ErrorDetails = string(data)
|
|
}
|
|
entry.ErrorDetailsParsed = bifrostErr
|
|
} else if streamResponse == nil {
|
|
// tracer or traceID not available, or accumulator returned nil - still write what we have
|
|
entry.Status = "success"
|
|
entry.Stream = true
|
|
applyModelAlias(entry, originalModelRequested, resolvedModelUsed)
|
|
} else if isFinalChunk {
|
|
// Apply streaming output fields to the entry
|
|
entry.Stream = true
|
|
p.applyStreamingOutputToEntry(entry, streamResponse, shouldStoreRaw)
|
|
}
|
|
// Backfill passthrough status_code from response (streaming path)
|
|
if result != nil && result.PassthroughResponse != nil {
|
|
if params, ok := entry.ParamsParsed.(*schemas.PassthroughLogParams); ok {
|
|
params.StatusCode = result.PassthroughResponse.StatusCode
|
|
}
|
|
if contentLoggingEnabled && len(result.PassthroughResponse.Body) > 0 {
|
|
entry.PassthroughResponseBody = string(result.PassthroughResponse.Body)
|
|
if shouldStoreRaw {
|
|
entry.RawResponse = string(result.PassthroughResponse.Body)
|
|
}
|
|
}
|
|
// Flip status for passthrough error responses (4xx/5xx from provider)
|
|
if isPassthroughErrorResponse(result) {
|
|
entry.Status = "error"
|
|
}
|
|
}
|
|
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
|
|
|
|
if requestType != schemas.PassthroughStreamRequest && tracer != nil && traceID != "" {
|
|
tracer.CleanupStreamAccumulator(traceID)
|
|
}
|
|
|
|
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
|
|
p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil)
|
|
return result, bifrostErr, nil
|
|
}
|
|
|
|
// Path C: Non-streaming response
|
|
if bifrostErr != nil {
|
|
entry.Status = "error"
|
|
applyModelAlias(entry, bifrostErr.ExtraFields.OriginalModelRequested, bifrostErr.ExtraFields.ResolvedModelUsed)
|
|
// Serialize error details immediately since bifrostErr may be released
|
|
// back to the pool before the async batch writer processes this entry.
|
|
// Also set ErrorDetailsParsed for UI callback (JSON serialization uses this field).
|
|
if data, err := sonic.Marshal(bifrostErr); err == nil {
|
|
entry.ErrorDetails = string(data)
|
|
}
|
|
entry.ErrorDetailsParsed = bifrostErr
|
|
// Realtime turns that fail mid-stream still need their input transcript
|
|
// surfaced — backfill from bifrostErr.ExtraFields.RawRequest if present.
|
|
if requestType == schemas.RealtimeRequest {
|
|
applyRealtimeRawRequestBackfill(entry, bifrostErr.ExtraFields.RawRequest, contentLoggingEnabled, shouldStoreRaw)
|
|
}
|
|
} else if result != nil {
|
|
entry.Status = "success"
|
|
extraFields := result.GetExtraFields()
|
|
applyModelAlias(entry, extraFields.OriginalModelRequested, extraFields.ResolvedModelUsed)
|
|
if requestType == schemas.RealtimeRequest {
|
|
p.applyRealtimeOutputToEntry(entry, result, shouldStoreRaw)
|
|
} else {
|
|
p.applyNonStreamingOutputToEntry(entry, result, shouldStoreRaw)
|
|
}
|
|
// Flip status for passthrough error responses (4xx/5xx from provider)
|
|
if isPassthroughErrorResponse(result) {
|
|
entry.Status = "error"
|
|
}
|
|
}
|
|
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
|
|
|
|
// Calculate cost
|
|
var cacheDebug *schemas.BifrostCacheDebug
|
|
if result != nil {
|
|
cacheDebug = result.GetExtraFields().CacheDebug
|
|
}
|
|
entry.CacheDebugParsed = cacheDebug
|
|
if p.pricingManager != nil {
|
|
pricingScopes := modelcatalog.PricingLookupScopesFromContext(ctx, string(entry.Provider))
|
|
if cost := p.pricingManager.CalculateCost(result, pricingScopes); cost > 0 {
|
|
entry.Cost = &cost
|
|
}
|
|
}
|
|
|
|
// Pre-apply denormalized fields for WebSocket callback enrichment
|
|
if entry.SelectedKeyID != "" && entry.SelectedKeyName != "" {
|
|
entry.SelectedKey = &schemas.Key{
|
|
ID: entry.SelectedKeyID,
|
|
Name: entry.SelectedKeyName,
|
|
}
|
|
}
|
|
if entry.VirtualKeyID != nil && entry.VirtualKeyName != nil && *entry.VirtualKeyID != "" && *entry.VirtualKeyName != "" {
|
|
entry.VirtualKey = &tables.TableVirtualKey{
|
|
ID: *entry.VirtualKeyID,
|
|
Name: *entry.VirtualKeyName,
|
|
}
|
|
}
|
|
if entry.RoutingRuleID != nil && entry.RoutingRuleName != nil && *entry.RoutingRuleID != "" && *entry.RoutingRuleName != "" {
|
|
entry.RoutingRule = &tables.TableRoutingRule{
|
|
ID: *entry.RoutingRuleID,
|
|
Name: *entry.RoutingRuleName,
|
|
}
|
|
}
|
|
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
|
|
p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil)
|
|
return result, bifrostErr, nil
|
|
}
|
|
|
|
// Cleanup is called when the plugin is being shut down
|
|
func (p *LoggerPlugin) Cleanup() error {
|
|
p.cleanupOnce.Do(func() {
|
|
// Stop the cleanup ticker
|
|
if p.cleanupTicker != nil {
|
|
p.cleanupTicker.Stop()
|
|
}
|
|
// Signal the cleanup worker to stop
|
|
close(p.done)
|
|
// Close write queue FIRST — batchWriter drains remaining entries and exits.
|
|
// THEN set closed flag — this prevents panics from sends-on-closed-channel
|
|
// in enqueueLogEntry (the defer/recover there catches the race window).
|
|
close(p.writeQueue)
|
|
p.closed.Store(true)
|
|
// Wait for the cleanup worker and batch writer to finish
|
|
p.wg.Wait()
|
|
// Note: Accumulator cleanup is handled by the tracer, not the logging plugin
|
|
// GORM handles connection cleanup automatically
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// storeOrEnqueueEntry stores a log entry in pendingLogs keyed by traceID for later
|
|
// retrieval by Inject(), or enqueues directly if no traceID is available (Go SDK path).
|
|
// Multiple entries per traceID are supported (e.g. fallback/retry attempts within the same trace).
|
|
func (p *LoggerPlugin) storeOrEnqueueEntry(ctx *schemas.BifrostContext, entry *logstore.Log, callback func(entry *logstore.Log)) {
|
|
traceID, _ := ctx.Value(schemas.BifrostContextKeyTraceID).(string)
|
|
if traceID != "" {
|
|
// Append to slice for Inject() to pick up — supports multiple attempts per trace
|
|
existing, loaded := p.pendingLogsToInject.LoadOrStore(traceID, &pendingInjectEntries{entries: []*logstore.Log{entry}, createdAt: time.Now()})
|
|
if !loaded {
|
|
return
|
|
}
|
|
pending := existing.(*pendingInjectEntries)
|
|
pending.mu.Lock()
|
|
pending.entries = append(pending.entries, entry)
|
|
pending.mu.Unlock()
|
|
} else {
|
|
// Fallback: no tracing (Go SDK path), enqueue directly
|
|
p.enqueueLogEntry(entry, callback)
|
|
}
|
|
}
|
|
|
|
// Inject receives a completed trace and writes the log entries with plugin logs to DB.
|
|
// This implements the ObservabilityPlugin interface.
|
|
func (p *LoggerPlugin) Inject(_ context.Context, trace *schemas.Trace) error {
|
|
if trace == nil {
|
|
return nil
|
|
}
|
|
// Retrieve pending log entries built by PostLLMHook (stored by traceID)
|
|
entryVal, ok := p.pendingLogsToInject.LoadAndDelete(trace.TraceID)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
pending, ok := entryVal.(*pendingInjectEntries)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// Serialize plugin logs once for all entries
|
|
var pluginLogsJSON string
|
|
if len(trace.PluginLogs) > 0 {
|
|
grouped := schemas.GroupPluginLogsByName(trace.PluginLogs)
|
|
if data, err := sonic.Marshal(grouped); err == nil {
|
|
pluginLogsJSON = string(data)
|
|
}
|
|
}
|
|
|
|
// Enqueue each log entry (supports multiple attempts per trace)
|
|
for _, entry := range pending.entries {
|
|
entry.PluginLogs = pluginLogsJSON
|
|
p.enqueueLogEntry(entry, p.makePostWriteCallback(nil))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MCP Plugin Interface Implementation
|
|
|
|
// SetMCPToolLogCallback sets a callback function that will be called for each MCP tool log entry
|
|
func (p *LoggerPlugin) SetMCPToolLogCallback(callback MCPToolLogCallback) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.mcpToolLogCallback = callback
|
|
}
|
|
|
|
// PreMCPHook is called before an MCP tool execution - creates initial log entry
|
|
// Parameters:
|
|
// - ctx: The Bifrost context
|
|
// - req: The MCP request containing tool call information
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostMCPRequest: The unmodified request
|
|
// - *schemas.MCPPluginShortCircuit: nil (no short-circuiting)
|
|
// - error: nil (errors are logged but don't fail the request)
|
|
func (p *LoggerPlugin) PreMCPHook(ctx *schemas.BifrostContext, req *schemas.BifrostMCPRequest) (*schemas.BifrostMCPRequest, *schemas.MCPPluginShortCircuit, error) {
|
|
if ctx == nil {
|
|
p.logger.Error("context is nil in PreMCPHook")
|
|
return req, nil, nil
|
|
}
|
|
|
|
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
|
|
if !ok || requestID == "" {
|
|
p.logger.Error("request-id not found in context or is empty in PreMCPHook")
|
|
return req, nil, nil
|
|
}
|
|
|
|
// Get parent request ID if this MCP call is part of a larger LLM request (using the MCP agent original request ID)
|
|
parentRequestID, _ := ctx.Value(schemas.BifrostMCPAgentOriginalRequestID).(string)
|
|
|
|
createdTimestamp := time.Now().UTC()
|
|
|
|
// Extract tool name and arguments from the request
|
|
var toolName string
|
|
var serverLabel string
|
|
|
|
fullToolName := req.GetToolName()
|
|
arguments := req.GetToolArguments()
|
|
// Skip execution for codemode tools
|
|
if bifrost.IsCodemodeTool(fullToolName) {
|
|
return req, nil, nil
|
|
}
|
|
|
|
// Extract server label from tool name (format: {client}-{tool_name})
|
|
// The first part before hyphen is the client/server label
|
|
if fullToolName != "" {
|
|
if idx := strings.Index(fullToolName, "-"); idx > 0 {
|
|
serverLabel = fullToolName[:idx]
|
|
toolName = fullToolName[idx+1:]
|
|
} else {
|
|
toolName = fullToolName
|
|
}
|
|
switch toolName {
|
|
case mcp.ToolTypeListToolFiles, mcp.ToolTypeReadToolFile, mcp.ToolTypeExecuteToolCode:
|
|
if serverLabel == "" {
|
|
serverLabel = "codemode"
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get virtual key information from context - using same method as normal LLM logging
|
|
virtualKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyID)
|
|
virtualKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyName)
|
|
|
|
// Use the per-tool-call unique MCP log ID (set by agent executor per goroutine) as the
|
|
// primary key. Fall back to requestID if not set (e.g. direct single tool call).
|
|
mcpLogID, ok := ctx.Value(schemas.BifrostContextKeyMCPLogID).(string)
|
|
if !ok || mcpLogID == "" {
|
|
mcpLogID = requestID
|
|
}
|
|
|
|
go func() {
|
|
entry := &logstore.MCPToolLog{
|
|
ID: mcpLogID,
|
|
RequestID: requestID,
|
|
Timestamp: createdTimestamp,
|
|
ToolName: toolName,
|
|
ServerLabel: serverLabel,
|
|
Status: "processing",
|
|
CreatedAt: createdTimestamp,
|
|
}
|
|
|
|
if parentRequestID != "" {
|
|
entry.LLMRequestID = &parentRequestID
|
|
}
|
|
|
|
if virtualKeyID != "" {
|
|
entry.VirtualKeyID = &virtualKeyID
|
|
}
|
|
if virtualKeyName != "" {
|
|
entry.VirtualKeyName = &virtualKeyName
|
|
}
|
|
|
|
// Set arguments if content logging is enabled
|
|
if p.disableContentLogging == nil || !*p.disableContentLogging {
|
|
entry.ArgumentsParsed = arguments
|
|
}
|
|
|
|
// Capture configured logging headers and x-bf-lh-* headers into metadata
|
|
entry.MetadataParsed = p.captureLoggingHeaders(ctx)
|
|
|
|
if err := p.store.CreateMCPToolLog(p.ctx, entry); err != nil {
|
|
p.logger.Warn("Failed to insert initial MCP tool log entry for request %s: %v", requestID, err)
|
|
} else {
|
|
// Capture callback under lock, then call it outside the critical section
|
|
p.mu.Lock()
|
|
callback := p.mcpToolLogCallback
|
|
p.mu.Unlock()
|
|
|
|
if callback != nil {
|
|
callback(entry)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return req, nil, nil
|
|
}
|
|
|
|
// PostMCPHook is called after an MCP tool execution - updates the log entry with results
|
|
// Parameters:
|
|
// - ctx: The Bifrost context
|
|
// - resp: The MCP response containing tool execution result
|
|
// - bifrostErr: Any error that occurred during execution
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostMCPResponse: The unmodified response
|
|
// - *schemas.BifrostError: The unmodified error
|
|
// - error: nil (errors are logged but don't fail the request)
|
|
func (p *LoggerPlugin) PostMCPHook(ctx *schemas.BifrostContext, resp *schemas.BifrostMCPResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostMCPResponse, *schemas.BifrostError, error) {
|
|
if ctx == nil {
|
|
p.logger.Error("context is nil in PostMCPHook")
|
|
return resp, bifrostErr, nil
|
|
}
|
|
|
|
// Skip logging for codemode tools (executeToolCode, listToolFiles, readToolFile)
|
|
// We check the tool name from the response instead of context flags
|
|
if resp != nil && bifrost.IsCodemodeTool(resp.ExtraFields.ToolName) {
|
|
return resp, bifrostErr, nil
|
|
}
|
|
|
|
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
|
|
if !ok || requestID == "" {
|
|
p.logger.Error("request-id not found in context or is empty in PostMCPHook")
|
|
return resp, bifrostErr, nil
|
|
}
|
|
|
|
// Use the per-tool-call unique MCP log ID to find the correct log entry.
|
|
mcpLogID, ok := ctx.Value(schemas.BifrostContextKeyMCPLogID).(string)
|
|
if !ok || mcpLogID == "" {
|
|
mcpLogID = requestID
|
|
}
|
|
|
|
// Extract virtual key ID and name from context (set by governance plugin)
|
|
virtualKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyID)
|
|
virtualKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyName)
|
|
|
|
go func() {
|
|
updates := make(map[string]interface{})
|
|
|
|
// Update virtual key ID and name if they are set (from governance plugin)
|
|
if virtualKeyID != "" {
|
|
updates["virtual_key_id"] = virtualKeyID
|
|
}
|
|
if virtualKeyName != "" {
|
|
updates["virtual_key_name"] = virtualKeyName
|
|
}
|
|
|
|
// Get latency from response ExtraFields
|
|
if resp != nil {
|
|
updates["latency"] = float64(resp.ExtraFields.Latency)
|
|
}
|
|
|
|
// Calculate MCP tool cost from catalog if available
|
|
var toolCost float64
|
|
success := (resp != nil && bifrostErr == nil)
|
|
if success && resp != nil && p.mcpCatalog != nil && resp.ExtraFields.ClientName != "" && resp.ExtraFields.ToolName != "" {
|
|
// Use separate client name and tool name fields
|
|
if pricingEntry, ok := p.mcpCatalog.GetPricingData(resp.ExtraFields.ClientName, resp.ExtraFields.ToolName); ok {
|
|
toolCost = pricingEntry.CostPerExecution
|
|
updates["cost"] = toolCost
|
|
p.logger.Debug("MCP tool cost for %s.%s: $%.6f", resp.ExtraFields.ClientName, resp.ExtraFields.ToolName, toolCost)
|
|
}
|
|
}
|
|
|
|
if bifrostErr != nil {
|
|
updates["status"] = "error"
|
|
// Serialize error details
|
|
tempEntry := &logstore.MCPToolLog{}
|
|
tempEntry.ErrorDetailsParsed = bifrostErr
|
|
if err := tempEntry.SerializeFields(); err == nil {
|
|
updates["error_details"] = tempEntry.ErrorDetails
|
|
}
|
|
} else if resp != nil {
|
|
updates["status"] = "success"
|
|
// Store result if content logging is enabled
|
|
if p.disableContentLogging == nil || !*p.disableContentLogging {
|
|
var result interface{}
|
|
if resp.ChatMessage != nil {
|
|
// For ChatMessage, try to parse the content as JSON if it's a string
|
|
if resp.ChatMessage.Content != nil && resp.ChatMessage.Content.ContentStr != nil {
|
|
contentStr := *resp.ChatMessage.Content.ContentStr
|
|
var parsedContent interface{}
|
|
if err := sonic.Unmarshal([]byte(contentStr), &parsedContent); err == nil {
|
|
// Content is valid JSON, use parsed version
|
|
result = parsedContent
|
|
} else {
|
|
// Content is not valid JSON or failed to parse, store the whole message
|
|
result = resp.ChatMessage
|
|
}
|
|
} else {
|
|
result = resp.ChatMessage
|
|
}
|
|
} else if resp.ResponsesMessage != nil {
|
|
result = resp.ResponsesMessage
|
|
}
|
|
if result != nil {
|
|
tempEntry := &logstore.MCPToolLog{}
|
|
tempEntry.ResultParsed = result
|
|
if err := tempEntry.SerializeFields(); err == nil {
|
|
updates["result"] = tempEntry.Result
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
updates["status"] = "error"
|
|
tempEntry := &logstore.MCPToolLog{}
|
|
tempEntry.ErrorDetailsParsed = &schemas.BifrostError{
|
|
IsBifrostError: true,
|
|
Error: &schemas.ErrorField{
|
|
Message: "MCP tool execution returned nil response",
|
|
},
|
|
}
|
|
if err := tempEntry.SerializeFields(); err == nil {
|
|
updates["error_details"] = tempEntry.ErrorDetails
|
|
}
|
|
}
|
|
|
|
processingErr := retryOnNotFound(p.ctx, func() error {
|
|
return p.store.UpdateMCPToolLog(p.ctx, mcpLogID, updates)
|
|
})
|
|
if processingErr != nil {
|
|
p.logger.Warn("failed to process MCP tool log update for request %s: %v", requestID, processingErr)
|
|
} else {
|
|
// Capture callback under lock, then perform DB I/O and invoke callback outside critical section
|
|
p.mu.Lock()
|
|
callback := p.mcpToolLogCallback
|
|
p.mu.Unlock()
|
|
|
|
if callback != nil {
|
|
if updatedEntry, getErr := p.store.FindMCPToolLog(p.ctx, mcpLogID); getErr == nil {
|
|
callback(updatedEntry)
|
|
} else {
|
|
p.logger.Warn("failed to find updated entry for callback: %v", getErr)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return resp, bifrostErr, nil
|
|
}
|