Files
bifrost/plugins/logging/main.go
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

1294 lines
51 KiB
Go

// Package logging provides a GORM-based logging plugin for Bifrost.
// This plugin stores comprehensive logs of all requests and responses with search,
// filter, and pagination capabilities.
package logging
import (
"context"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/bytedance/sonic"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/core/mcp"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/framework/configstore/tables"
"github.com/maximhq/bifrost/framework/logstore"
"github.com/maximhq/bifrost/framework/mcpcatalog"
"github.com/maximhq/bifrost/framework/modelcatalog"
"github.com/maximhq/bifrost/framework/streaming"
)
const (
PluginName = "logging"
)
// LogOperation represents the type of logging operation
type LogOperation string
const (
LogOperationCreate LogOperation = "create"
LogOperationUpdate LogOperation = "update"
LogOperationStreamUpdate LogOperation = "stream_update"
)
// UpdateLogData contains data for log entry updates
type UpdateLogData struct {
Status string
TokenUsage *schemas.BifrostLLMUsage
Cost *float64 // Cost in dollars from pricing plugin
ListModelsOutput []schemas.Model // For list models requests
ChatOutput *schemas.ChatMessage
ResponsesOutput []schemas.ResponsesMessage
EmbeddingOutput []schemas.EmbeddingData
RerankOutput []schemas.RerankResult
OCROutput *schemas.BifrostOCRResponse // For OCR responses
ErrorDetails *schemas.BifrostError
SpeechOutput *schemas.BifrostSpeechResponse // For non-streaming speech responses
TranscriptionOutput *schemas.BifrostTranscriptionResponse // For non-streaming transcription responses
ImageGenerationOutput *schemas.BifrostImageGenerationResponse // For non-streaming image generation responses
VideoGenerationOutput *schemas.BifrostVideoGenerationResponse // For non-streaming video generation responses
VideoRetrieveOutput *schemas.BifrostVideoGenerationResponse // For non-streaming video retrieve responses
VideoDownloadOutput *schemas.BifrostVideoDownloadResponse // For non-streaming video download responses
VideoListOutput *schemas.BifrostVideoListResponse // For non-streaming video list responses
VideoDeleteOutput *schemas.BifrostVideoDeleteResponse // For non-streaming video delete responses
RawRequest interface{}
RawResponse interface{}
IsLargePayloadRequest bool // When true, RawRequest is a truncated preview string (skip sonic.Marshal)
IsLargePayloadResponse bool // When true, RawResponse is a truncated preview string (skip sonic.Marshal)
}
// applyLargePayloadPreviews reads large payload/response preview strings from context
// and overrides RawRequest/RawResponse on updateData for truncated logging.
func applyLargePayloadPreviews(ctx *schemas.BifrostContext, updateData *UpdateLogData) {
if isLargePayload, ok := ctx.Value(schemas.BifrostContextKeyLargePayloadMode).(bool); ok && isLargePayload {
if preview, ok := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestPreview).(string); ok && preview != "" {
updateData.RawRequest = preview
updateData.IsLargePayloadRequest = true
}
}
if isLargeResponse, ok := ctx.Value(schemas.BifrostContextKeyLargeResponseMode).(bool); ok && isLargeResponse {
if preview, ok := ctx.Value(schemas.BifrostContextKeyLargePayloadResponsePreview).(string); ok && preview != "" {
updateData.RawResponse = preview
updateData.IsLargePayloadResponse = true
}
}
}
func applyLargePayloadPreviewsToEntry(ctx *schemas.BifrostContext, entry *logstore.Log, contentLoggingEnabled bool) {
if ctx == nil || entry == nil {
return
}
updateData := &UpdateLogData{}
applyLargePayloadPreviews(ctx, updateData)
shouldStoreRaw, _ := ctx.Value(schemas.BifrostContextKeyShouldStoreRawInLogs).(bool)
if updateData.IsLargePayloadRequest {
entry.IsLargePayloadRequest = true
if shouldStoreRaw && contentLoggingEnabled {
if preview, ok := updateData.RawRequest.(string); ok {
entry.RawRequest = preview
}
}
}
if updateData.IsLargePayloadResponse {
entry.IsLargePayloadResponse = true
if shouldStoreRaw && contentLoggingEnabled {
if preview, ok := updateData.RawResponse.(string); ok {
entry.RawResponse = preview
}
}
}
}
// scheduleDeferredUsageUpdate schedules a deferred usage update for the request.
func (p *LoggerPlugin) scheduleDeferredUsageUpdate(ctx *schemas.BifrostContext, requestID string, usageAlreadyPresent bool) {
if usageAlreadyPresent || ctx == nil {
return
}
deferredChan, ok := ctx.Value(schemas.BifrostContextKeyDeferredUsage).(<-chan *schemas.BifrostLLMUsage)
if !ok || deferredChan == nil {
return
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
// Large-response phase B closes this channel after trailing usage extraction completes.
deferredUsage, chanOpen := <-deferredChan
if !chanOpen || deferredUsage == nil {
return
}
// Acquire semaphore — drop if all slots busy to prevent unbounded goroutines
// from exhausting DB connections when Postgres is slow
select {
case p.deferredUsageSem <- struct{}{}:
defer func() { <-p.deferredUsageSem }()
default:
p.logger.Warn("deferred usage update dropped for request %s: semaphore full", requestID)
return
}
usageUpdates := map[string]interface{}{
"prompt_tokens": deferredUsage.PromptTokens,
"completion_tokens": deferredUsage.CompletionTokens,
"total_tokens": deferredUsage.TotalTokens,
}
tempEntry := &logstore.Log{TokenUsageParsed: deferredUsage}
if serErr := tempEntry.SerializeFields(); serErr == nil {
usageUpdates["token_usage"] = tempEntry.TokenUsage
usageUpdates["cached_read_tokens"] = tempEntry.CachedReadTokens
}
// Check if log entry present in the store
// exponential backoff with jitter and 3 retries
// then fail
var found bool
var findErr error
for i := 0; i < 3; i++ {
found, findErr = p.store.IsLogEntryPresent(p.ctx, requestID)
if findErr != nil {
p.logger.Warn("failed to check if log entry is present for request %s: %v", requestID, findErr)
continue
}
if found {
break
}
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second * 2)
}
if !found {
p.logger.Warn("log entry not found for request %s after 3 retries. failed to update deferred usage for large payload request", requestID)
return
}
if updErr := p.store.Update(p.ctx, requestID, usageUpdates); updErr != nil {
p.logger.Warn("failed to update deferred usage for request %s: %v", requestID, updErr)
}
}()
}
// RecalculateCostResult represents summary stats from a cost backfill operation
type RecalculateCostResult struct {
TotalMatched int64 `json:"total_matched"`
Updated int `json:"updated"`
Skipped int `json:"skipped"`
Remaining int64 `json:"remaining"`
}
// LogMessage represents a message in the logging queue
type LogMessage struct {
Operation LogOperation
RequestID string // Unique ID for the request
ParentRequestID string // Unique ID for the parent request (used for fallback requests)
NumberOfRetries int // Number of retries
FallbackIndex int // Fallback index
SelectedKeyID string // Selected key ID
SelectedKeyName string // Selected key name
AttemptTrail []schemas.KeyAttemptRecord // Per-attempt key selection history
VirtualKeyID string // Virtual key ID
VirtualKeyName string // Virtual key name
RoutingEnginesUsed []string // List of routing engines used
RoutingRuleID string // Routing rule ID
RoutingRuleName string // Routing rule name
Timestamp time.Time // Of the preHook/postHook call
Latency int64 // For latency updates
InitialData *InitialLogData // For create operations
SemanticCacheDebug *schemas.BifrostCacheDebug // For semantic cache operations
UpdateData *UpdateLogData // For update operations
StreamResponse *streaming.ProcessedStreamResponse // For streaming delta updates
RoutingEngineLogs string // Formatted routing engine decision logs
}
// InitialLogData contains data for initial log entry creation
type InitialLogData struct {
Status string
Provider string
Model string
Object string
InputHistory []schemas.ChatMessage
ResponsesInputHistory []schemas.ResponsesMessage
Params any
SpeechInput *schemas.SpeechInput
TranscriptionInput *schemas.TranscriptionInput
OCRInput *schemas.OCRDocument
ImageGenerationInput *schemas.ImageGenerationInput
ImageEditInput *schemas.ImageEditInput
ImageVariationInput *schemas.ImageVariationInput
VideoGenerationInput *schemas.VideoGenerationInput
Tools []schemas.ChatTool
RoutingEngineUsed []string
Metadata map[string]any
PassthroughRequestBody string // Raw body for passthrough requests (UTF-8)
}
// LogCallback is a function that gets called when a new log entry is created
type LogCallback func(ctx context.Context, logEntry *logstore.Log)
// MCPToolLogCallback is a function that gets called when a new MCP tool log entry is created or updated
type MCPToolLogCallback func(*logstore.MCPToolLog)
type Config struct {
DisableContentLogging *bool `json:"disable_content_logging"`
LoggingHeaders *[]string `json:"logging_headers"` // Pointer to live config slice; changes are reflected immediately without restart
}
// LoggerPlugin implements the schemas.LLMPlugin and schemas.MCPPlugin interfaces
type LoggerPlugin struct {
ctx context.Context
store logstore.LogStore
disableContentLogging *bool
loggingHeaders *[]string // Pointer to live config slice for headers to capture in metadata
pricingManager *modelcatalog.ModelCatalog
mcpCatalog *mcpcatalog.MCPCatalog // MCP catalog for tool cost calculation
mu sync.Mutex
done chan struct{}
cleanupOnce sync.Once // Ensures cleanup only runs once
wg sync.WaitGroup
logger schemas.Logger
logCallback LogCallback
mcpToolLogCallback MCPToolLogCallback // Callback for MCP tool log entries
droppedRequests atomic.Int64
cleanupTicker *time.Ticker // Ticker for cleaning up old processing logs
logMsgPool sync.Pool // Pool for reusing LogMessage structs
updateDataPool sync.Pool // Pool for reusing UpdateLogData structs
pendingLogsEntries sync.Map // Maps requestID -> *PendingLogData (PreLLMHook input data awaiting PostLLMHook)
pendingLogsToInject sync.Map // Maps traceID -> *pendingInjectEntries (log entries to inject, supports multiple per trace)
writeQueue chan *writeQueueEntry // Buffered channel for batch write queue
closed atomic.Bool // Set during cleanup to prevent sends on closed writeQueue
deferredUsageSem chan struct{} // Limits concurrent deferred usage DB updates
}
// Init creates new logger plugin with given log store
func Init(ctx context.Context, config *Config, logger schemas.Logger, logsStore logstore.LogStore, pricingManager *modelcatalog.ModelCatalog, mcpCatalog *mcpcatalog.MCPCatalog) (*LoggerPlugin, error) {
if config == nil {
return nil, fmt.Errorf("config is required")
}
if logsStore == nil {
return nil, fmt.Errorf("logs store cannot be nil")
}
if pricingManager == nil {
logger.Warn("logging plugin requires model catalog to calculate cost, all LLM cost calculations will be skipped.")
}
if mcpCatalog == nil {
logger.Warn("logging plugin requires MCP catalog to calculate cost, all MCP cost calculations will be skipped.")
}
plugin := &LoggerPlugin{
ctx: ctx,
store: logsStore,
pricingManager: pricingManager,
mcpCatalog: mcpCatalog,
disableContentLogging: config.DisableContentLogging,
loggingHeaders: config.LoggingHeaders,
done: make(chan struct{}),
logger: logger,
writeQueue: make(chan *writeQueueEntry, writeQueueCapacity),
deferredUsageSem: make(chan struct{}, maxDeferredUsageConcurrency),
logMsgPool: sync.Pool{
New: func() any {
return &LogMessage{}
},
},
updateDataPool: sync.Pool{
New: func() any {
return &UpdateLogData{}
},
},
}
// Prewarm the pools for better performance at startup
for range 1000 {
plugin.logMsgPool.Put(&LogMessage{})
plugin.updateDataPool.Put(&UpdateLogData{})
}
// Start cleanup ticker (runs every 1 minute)
plugin.cleanupTicker = time.NewTicker(1 * time.Minute)
plugin.wg.Add(1)
go plugin.cleanupWorker()
// Start the batch writer goroutine (single writer for all DB writes)
plugin.wg.Add(1)
go plugin.batchWriter()
return plugin, nil
}
// cleanupWorker periodically removes old processing logs
func (p *LoggerPlugin) cleanupWorker() {
defer p.wg.Done()
for {
select {
case <-p.cleanupTicker.C:
p.cleanupOldProcessingLogs()
case <-p.done:
return
}
}
}
// cleanupOldProcessingLogs removes processing logs older than 30 minutes
// and stale pending log entries from the in-memory map
func (p *LoggerPlugin) cleanupOldProcessingLogs() {
// Calculate timestamp for 30 minutes ago in UTC to match log entry timestamps
thirtyMinutesAgo := time.Now().UTC().Add(-1 * 30 * time.Minute)
// Delete LLM processing logs older than 30 minutes
if err := p.store.Flush(p.ctx, thirtyMinutesAgo); err != nil {
p.logger.Warn("failed to cleanup old processing LLM logs: %v", err)
}
// Delete MCP tool processing logs older than 30 minutes
if err := p.store.FlushMCPToolLogs(p.ctx, thirtyMinutesAgo); err != nil {
p.logger.Warn("failed to cleanup old processing MCP tool logs: %v", err)
}
// Clean up stale pending log entries (requests where PostLLMHook never fired)
p.cleanupStalePendingLogs()
}
// SetLogCallback sets a callback function that will be called for each log entry
func (p *LoggerPlugin) SetLogCallback(callback LogCallback) {
p.mu.Lock()
defer p.mu.Unlock()
p.logCallback = callback
}
// GetName returns the name of the plugin
func (p *LoggerPlugin) GetName() string {
return PluginName
}
// HTTPTransportPreHook is not used for this plugin
func (p *LoggerPlugin) HTTPTransportPreHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest) (*schemas.HTTPResponse, error) {
return nil, nil
}
// HTTPTransportPostHook is not used for this plugin
func (p *LoggerPlugin) HTTPTransportPostHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest, resp *schemas.HTTPResponse) error {
return nil
}
// HTTPTransportStreamChunkHook passes through streaming chunks unchanged
func (p *LoggerPlugin) HTTPTransportStreamChunkHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest, chunk *schemas.BifrostStreamChunk) (*schemas.BifrostStreamChunk, error) {
return chunk, nil
}
// captureLoggingHeaders extracts configured logging headers and x-bf-lh-* prefixed headers
// from the request context. Returns a new metadata map, or nil if no headers were captured.
// System entries (e.g. isAsyncRequest) should be set AFTER calling this so they take precedence.
func (p *LoggerPlugin) captureLoggingHeaders(ctx *schemas.BifrostContext) map[string]interface{} {
allHeaders, _ := ctx.Value(schemas.BifrostContextKeyRequestHeaders).(map[string]string)
if allHeaders == nil {
return nil
}
var metadata map[string]interface{}
// Check configured logging headers
if p.loggingHeaders != nil {
for _, h := range *p.loggingHeaders {
key := strings.ToLower(h)
if val, ok := allHeaders[key]; ok {
if metadata == nil {
metadata = make(map[string]interface{})
}
metadata[key] = val
}
}
}
// Check x-bf-lh-* prefixed headers
for key, val := range allHeaders {
if labelName, ok := strings.CutPrefix(key, "x-bf-lh-"); ok && labelName != "" {
if metadata == nil {
metadata = make(map[string]interface{})
}
metadata[labelName] = val
}
}
return metadata
}
// PreLLMHook is called before a request is processed - FULLY ASYNC, NO DATABASE I/O
// Parameters:
// - ctx: The Bifrost context
// - req: The Bifrost request
//
// Returns:
// - *schemas.BifrostRequest: The processed request
// - *schemas.LLMPluginShortCircuit: The plugin short circuit if the request is not allowed
// - error: Any error that occurred during processing
func (p *LoggerPlugin) PreLLMHook(ctx *schemas.BifrostContext, req *schemas.BifrostRequest) (*schemas.BifrostRequest, *schemas.LLMPluginShortCircuit, error) {
if ctx == nil {
// Log error but don't fail the request
p.logger.Error("context is nil in PreLLMHook")
return req, nil, nil
}
// Extract request ID from context
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
if !ok || requestID == "" {
// Log error but don't fail the request
p.logger.Error("request-id not found in context or is empty")
return req, nil, nil
}
createdTimestamp := time.Now().UTC()
// If request type is streaming we create a stream accumulator via the tracer
// Skip for passthrough streams — they carry raw bytes, not LLM response chunks
if bifrost.IsStreamRequestType(req.RequestType) && req.RequestType != schemas.PassthroughStreamRequest {
tracer, traceID, err := bifrost.GetTracerFromContext(ctx)
if err == nil && tracer != nil && traceID != "" {
tracer.CreateStreamAccumulator(traceID, createdTimestamp)
}
}
provider, model, _ := req.GetRequestFields()
initialData := &InitialLogData{
Provider: string(provider),
Model: model,
Object: string(req.RequestType),
}
if req.RequestType == schemas.RealtimeRequest {
initialData.Object = "realtime.turn"
}
if p.disableContentLogging == nil || !*p.disableContentLogging {
inputHistory, responsesInputHistory := p.extractInputHistory(req)
initialData.InputHistory = inputHistory
initialData.ResponsesInputHistory = responsesInputHistory
switch req.RequestType {
case schemas.TextCompletionRequest, schemas.TextCompletionStreamRequest:
initialData.Params = req.TextCompletionRequest.Params
case schemas.ChatCompletionRequest, schemas.ChatCompletionStreamRequest:
initialData.Params = req.ChatRequest.Params
initialData.Tools = req.ChatRequest.Params.Tools
case schemas.ResponsesRequest, schemas.ResponsesStreamRequest, schemas.WebSocketResponsesRequest:
initialData.Params = req.ResponsesRequest.Params
var tools []schemas.ChatTool
for _, tool := range req.ResponsesRequest.Params.Tools {
tools = append(tools, *tool.ToChatTool())
}
initialData.Tools = tools
case schemas.RealtimeRequest:
if req.ResponsesRequest != nil {
initialData.Params = req.ResponsesRequest.Params
}
case schemas.EmbeddingRequest:
initialData.Params = req.EmbeddingRequest.Params
case schemas.RerankRequest:
initialData.Params = req.RerankRequest.Params
case schemas.OCRRequest:
initialData.Params = req.OCRRequest.Params
initialData.OCRInput = &req.OCRRequest.Document
case schemas.SpeechRequest, schemas.SpeechStreamRequest:
initialData.Params = req.SpeechRequest.Params
initialData.SpeechInput = req.SpeechRequest.Input
case schemas.TranscriptionRequest, schemas.TranscriptionStreamRequest:
initialData.Params = req.TranscriptionRequest.Params
input := req.TranscriptionRequest.Input
if input != nil {
reqThreshold, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestThreshold).(int64)
if reqThreshold > 0 && int64(len(input.File)) > reqThreshold {
// Strip binary file content when it exceeds the large payload threshold
// to avoid serializing multi-MB audio into the log database.
logInput := *input
logInput.File = nil
initialData.TranscriptionInput = &logInput
} else {
initialData.TranscriptionInput = input
}
}
case schemas.ImageGenerationRequest, schemas.ImageGenerationStreamRequest:
initialData.Params = req.ImageGenerationRequest.Params
initialData.ImageGenerationInput = req.ImageGenerationRequest.Input
case schemas.ImageEditRequest, schemas.ImageEditStreamRequest:
params := req.ImageEditRequest.Params
input := req.ImageEditRequest.Input
if input != nil {
reqThreshold, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestThreshold).(int64)
if reqThreshold > 0 {
var totalSize int64
for _, img := range input.Images {
totalSize += int64(len(img.Image))
}
if totalSize > reqThreshold {
logInput := *input
logInput.Images = nil
initialData.ImageEditInput = &logInput
} else {
initialData.ImageEditInput = input
}
if params != nil && int64(len(params.Mask)) > reqThreshold {
logParams := *params
logParams.Mask = nil
initialData.Params = &logParams
} else {
initialData.Params = params
}
} else {
initialData.ImageEditInput = input
initialData.Params = params
}
} else {
initialData.Params = params
}
case schemas.ImageVariationRequest:
initialData.Params = req.ImageVariationRequest.Params
input := req.ImageVariationRequest.Input
if input != nil {
reqThreshold, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadRequestThreshold).(int64)
if reqThreshold > 0 && int64(len(input.Image.Image)) > reqThreshold {
logInput := *input
logInput.Image = schemas.ImageInput{}
initialData.ImageVariationInput = &logInput
} else {
initialData.ImageVariationInput = input
}
}
case schemas.VideoGenerationRequest:
initialData.Params = req.VideoGenerationRequest.Params
initialData.VideoGenerationInput = req.VideoGenerationRequest.Input
case schemas.VideoRemixRequest:
initialData.Params = &schemas.VideoLogParams{
VideoID: req.VideoRemixRequest.ID,
}
initialData.VideoGenerationInput = req.VideoRemixRequest.Input
case schemas.VideoRetrieveRequest:
initialData.Params = &schemas.VideoLogParams{
VideoID: req.VideoRetrieveRequest.ID,
}
case schemas.VideoDownloadRequest:
initialData.Params = &schemas.VideoLogParams{
VideoID: req.VideoDownloadRequest.ID,
}
case schemas.VideoDeleteRequest:
initialData.Params = &schemas.VideoLogParams{
VideoID: req.VideoDeleteRequest.ID,
}
case schemas.PassthroughRequest, schemas.PassthroughStreamRequest:
initialData.Params = &schemas.PassthroughLogParams{
Method: req.PassthroughRequest.Method,
Path: req.PassthroughRequest.Path,
RawQuery: req.PassthroughRequest.RawQuery,
}
if len(req.PassthroughRequest.Body) > 0 {
ct := strings.ToLower(req.PassthroughRequest.SafeHeaders["content-type"])
if strings.Contains(ct, "application/json") {
initialData.PassthroughRequestBody = string(req.PassthroughRequest.Body)
}
}
}
}
// Capture configured logging headers and x-bf-lh-* headers into metadata first
initialData.Metadata = mergeRealtimeMetadata(p.captureLoggingHeaders(ctx), ctx)
// System entries are set after so they take precedence over dynamic header values
if isAsync, ok := ctx.Value(schemas.BifrostIsAsyncRequest).(bool); ok && isAsync {
if initialData.Metadata == nil {
initialData.Metadata = make(map[string]interface{})
}
initialData.Metadata["isAsyncRequest"] = true
}
// If fallback request ID is present, use it instead of the primary request ID
// Determine effective request ID (fallback override)
effectiveRequestID := requestID
var parentRequestID string
if directParentRequestID, ok := ctx.Value(schemas.BifrostContextKeyParentRequestID).(string); ok && directParentRequestID != "" {
parentRequestID = directParentRequestID
}
fallbackRequestID, ok := ctx.Value(schemas.BifrostContextKeyFallbackRequestID).(string)
if ok && fallbackRequestID != "" {
effectiveRequestID = fallbackRequestID
if parentRequestID == "" {
parentRequestID = requestID
}
}
fallbackIndex := bifrost.GetIntFromContext(ctx, schemas.BifrostContextKeyFallbackIndex)
// Get routing engines array
routingEngines := []string{}
if engines, ok := ctx.Value(schemas.BifrostContextKeyRoutingEnginesUsed).([]string); ok {
routingEngines = engines
}
initialData.RoutingEngineUsed = routingEngines
initialData.Status = "processing"
// Store input data in pendingLogs for later combination with PostLLMHook output.
// No DB write here - the write is deferred to PostLLMHook to halve total writes.
pending := &PendingLogData{
RequestID: effectiveRequestID,
ParentRequestID: parentRequestID,
Timestamp: createdTimestamp,
FallbackIndex: fallbackIndex,
RoutingEnginesUsed: routingEngines,
InitialData: initialData,
CreatedAt: time.Now(),
Status: "processing",
}
p.pendingLogsEntries.Store(effectiveRequestID, pending)
// Call callback synchronously for immediate UI feedback (WebSocket "processing" notification).
// The entry does not exist in the DB yet - it will be written when PostLLMHook fires.
p.mu.Lock()
callback := p.logCallback
p.mu.Unlock()
if callback != nil {
callback(p.ctx, buildInitialLogEntry(pending))
}
return req, nil, nil
}
// PostLLMHook is called after a response is received - FULLY ASYNC, NO DATABASE I/O
// Parameters:
// - ctx: The Bifrost context
// - result: The Bifrost response to be processed
// - bifrostErr: The Bifrost error to be processed
//
// Returns:
// - *schemas.BifrostResponse: The processed response
// - *schemas.BifrostError: The processed error
// - error: Any error that occurred during processing
func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) {
if ctx == nil {
// Log error but don't fail the request
p.logger.Error("context is nil in PostLLMHook")
return result, bifrostErr, nil
}
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
if !ok || requestID == "" {
p.logger.Error("request-id not found in context or is empty")
return result, bifrostErr, nil
}
// If fallback request ID is present, use it instead of the primary request ID
fallbackRequestID, ok := ctx.Value(schemas.BifrostContextKeyFallbackRequestID).(string)
if ok && fallbackRequestID != "" {
requestID = fallbackRequestID
}
selectedKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedKeyID)
selectedKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedKeyName)
virtualKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyID)
virtualKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyName)
routingRuleID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceRoutingRuleID)
routingRuleName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceRoutingRuleName)
selectedPromptName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedPromptName)
selectedPromptVersion := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedPromptVersion)
selectedPromptID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeySelectedPromptID)
teamID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceTeamID)
teamName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceTeamName)
customerID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceCustomerID)
customerName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceCustomerName)
userID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyUserID)
userName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyUserName)
businessUnitID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceBusinessUnitID)
businessUnitName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceBusinessUnitName)
numberOfRetries := bifrost.GetIntFromContext(ctx, schemas.BifrostContextKeyNumberOfRetries)
attemptTrail, _ := ctx.Value(schemas.BifrostContextKeyAttemptTrail).([]schemas.KeyAttemptRecord)
requestType, _, originalModelRequested, resolvedModelUsed := bifrost.GetResponseFields(result, bifrostErr)
shouldStoreRaw, _ := ctx.Value(schemas.BifrostContextKeyShouldStoreRawInLogs).(bool)
contentLoggingEnabled := p.disableContentLogging == nil || !*p.disableContentLogging
isFinalChunk := bifrost.IsFinalChunk(ctx)
var tracer schemas.Tracer
var traceID string
if bifrost.IsStreamRequestType(requestType) && requestType != schemas.PassthroughStreamRequest && requestType != schemas.RealtimeRequest {
var err error
tracer, traceID, err = bifrost.GetTracerFromContext(ctx)
if err != nil {
p.logger.Debug("tracer not available in logging plugin posthook: %v", err)
// Continue with nil tracer — the rest of the code handles this gracefully
// via `if tracer != nil && traceID != ""` guards
}
}
// For non-final streaming chunks, process the accumulator synchronously
// and skip the write queue entirely. The accumulator work (ProcessStreamingChunk)
// is fast (mutex + append). Only final chunks, errors, and non-streaming
// responses need a DB write.
if bifrost.IsStreamRequestType(requestType) && requestType != schemas.PassthroughStreamRequest && requestType != schemas.RealtimeRequest && !isFinalChunk && result != nil && bifrostErr == nil {
if tracer != nil && traceID != "" {
tracer.ProcessStreamingChunk(traceID, false, result, bifrostErr)
}
return result, bifrostErr, nil
}
// Extract routing engine logs from context before entering goroutine
routingEngineLogs := formatRoutingEngineLogs(ctx.GetRoutingEngineLogs())
// Retrieve pending input data from PreLLMHook
pendingVal, hasPending := p.pendingLogsEntries.LoadAndDelete(requestID)
if !hasPending {
// If we have an error (e.g., cancellation/timeout), still write a minimal error entry
// so the error is visible in logs. Without PreLLMHook's DB insert, silently returning
// here means the error is completely lost.
if bifrostErr != nil {
p.logger.Warn("no pending log data found for request %s, writing minimal error entry", requestID)
entry := &logstore.Log{
ID: requestID,
Provider: string(bifrostErr.ExtraFields.Provider),
Status: "error",
Stream: bifrost.IsStreamRequestType(requestType),
Timestamp: time.Now().UTC(),
CreatedAt: time.Now().UTC(),
}
applyModelAlias(entry, bifrostErr.ExtraFields.OriginalModelRequested, bifrostErr.ExtraFields.ResolvedModelUsed)
if data, err := sonic.Marshal(bifrostErr); err == nil {
entry.ErrorDetails = string(data)
}
entry.ErrorDetailsParsed = bifrostErr
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
} else {
p.logger.Warn("no pending log data found for request %s, skipping log write", requestID)
}
return result, bifrostErr, nil
}
pending := pendingVal.(*PendingLogData)
if requestType == schemas.RealtimeRequest {
if resolvedRealtimeSessionID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyRealtimeSessionID); resolvedRealtimeSessionID != "" {
pending.ParentRequestID = resolvedRealtimeSessionID
}
}
// Build the complete log entry with input (from PreLLMHook) + output (from PostLLMHook)
entry := buildCompleteLogEntryFromPending(pending)
// Apply common output fields
var latency int64
if result != nil {
latency = result.GetExtraFields().Latency
}
applyOutputFieldsToEntry(entry, selectedKeyID, selectedKeyName, virtualKeyID, virtualKeyName, routingRuleID, routingRuleName, selectedPromptID, selectedPromptName, selectedPromptVersion, teamID, teamName, customerID, customerName, userID, userName, businessUnitID, businessUnitName, numberOfRetries, latency, attemptTrail)
entry.MetadataParsed = pending.InitialData.Metadata
entry.MetadataParsed = mergeRealtimeMetadata(entry.MetadataParsed, ctx)
entry.RoutingEngineLogs = routingEngineLogs
// Branch based on response type to populate output-specific fields
// Path A: Error with nil result
if result == nil && bifrostErr != nil {
entry.Status = "error"
applyModelAlias(entry, bifrostErr.ExtraFields.OriginalModelRequested, bifrostErr.ExtraFields.ResolvedModelUsed)
if bifrost.IsStreamRequestType(requestType) {
entry.Stream = true
}
// For streaming errors, finalize and read accumulated chunks so logs retain pre-error stream metadata
if bifrost.IsStreamRequestType(requestType) &&
requestType != schemas.PassthroughStreamRequest &&
requestType != schemas.RealtimeRequest &&
tracer != nil &&
traceID != "" {
if accResult := tracer.ProcessStreamingChunk(traceID, true, result, bifrostErr); accResult != nil {
if streamResponse := convertToProcessedStreamResponse(accResult, requestType); streamResponse != nil {
p.applyStreamingOutputToEntry(entry, streamResponse, shouldStoreRaw)
}
}
tracer.CleanupStreamAccumulator(traceID)
}
// Serialize error details immediately since bifrostErr may be released
// back to the pool before the async batch writer processes this entry.
// Also set ErrorDetailsParsed for UI callback (JSON serialization uses this field).
if data, err := sonic.Marshal(bifrostErr); err == nil {
entry.ErrorDetails = string(data)
}
entry.ErrorDetailsParsed = bifrostErr
if shouldStoreRaw && contentLoggingEnabled {
if bifrostErr.ExtraFields.RawRequest != nil {
rawReqBytes, err := sonic.Marshal(bifrostErr.ExtraFields.RawRequest)
if err == nil {
entry.RawRequest = string(rawReqBytes)
}
}
if entry.RawResponse == "" && bifrostErr.ExtraFields.RawResponse != nil {
rawRespBytes, err := sonic.Marshal(bifrostErr.ExtraFields.RawResponse)
if err == nil {
entry.RawResponse = string(rawRespBytes)
}
}
}
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil)
return result, bifrostErr, nil
}
// Path B: Streaming final chunk
if bifrost.IsStreamRequestType(requestType) && requestType != schemas.RealtimeRequest {
var streamResponse *streaming.ProcessedStreamResponse
if requestType != schemas.PassthroughStreamRequest && tracer != nil && traceID != "" {
accResult := tracer.ProcessStreamingChunk(traceID, isFinalChunk, result, bifrostErr)
if accResult != nil {
streamResponse = convertToProcessedStreamResponse(accResult, requestType)
}
}
if bifrostErr != nil {
entry.Status = "error"
entry.Stream = true
applyModelAlias(entry, originalModelRequested, resolvedModelUsed)
if data, err := sonic.Marshal(bifrostErr); err == nil {
entry.ErrorDetails = string(data)
}
entry.ErrorDetailsParsed = bifrostErr
} else if streamResponse == nil {
// tracer or traceID not available, or accumulator returned nil - still write what we have
entry.Status = "success"
entry.Stream = true
applyModelAlias(entry, originalModelRequested, resolvedModelUsed)
} else if isFinalChunk {
// Apply streaming output fields to the entry
entry.Stream = true
p.applyStreamingOutputToEntry(entry, streamResponse, shouldStoreRaw)
}
// Backfill passthrough status_code from response (streaming path)
if result != nil && result.PassthroughResponse != nil {
if params, ok := entry.ParamsParsed.(*schemas.PassthroughLogParams); ok {
params.StatusCode = result.PassthroughResponse.StatusCode
}
if contentLoggingEnabled && len(result.PassthroughResponse.Body) > 0 {
entry.PassthroughResponseBody = string(result.PassthroughResponse.Body)
if shouldStoreRaw {
entry.RawResponse = string(result.PassthroughResponse.Body)
}
}
// Flip status for passthrough error responses (4xx/5xx from provider)
if isPassthroughErrorResponse(result) {
entry.Status = "error"
}
}
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
if requestType != schemas.PassthroughStreamRequest && tracer != nil && traceID != "" {
tracer.CleanupStreamAccumulator(traceID)
}
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil)
return result, bifrostErr, nil
}
// Path C: Non-streaming response
if bifrostErr != nil {
entry.Status = "error"
applyModelAlias(entry, bifrostErr.ExtraFields.OriginalModelRequested, bifrostErr.ExtraFields.ResolvedModelUsed)
// Serialize error details immediately since bifrostErr may be released
// back to the pool before the async batch writer processes this entry.
// Also set ErrorDetailsParsed for UI callback (JSON serialization uses this field).
if data, err := sonic.Marshal(bifrostErr); err == nil {
entry.ErrorDetails = string(data)
}
entry.ErrorDetailsParsed = bifrostErr
// Realtime turns that fail mid-stream still need their input transcript
// surfaced — backfill from bifrostErr.ExtraFields.RawRequest if present.
if requestType == schemas.RealtimeRequest {
applyRealtimeRawRequestBackfill(entry, bifrostErr.ExtraFields.RawRequest, contentLoggingEnabled, shouldStoreRaw)
}
} else if result != nil {
entry.Status = "success"
extraFields := result.GetExtraFields()
applyModelAlias(entry, extraFields.OriginalModelRequested, extraFields.ResolvedModelUsed)
if requestType == schemas.RealtimeRequest {
p.applyRealtimeOutputToEntry(entry, result, shouldStoreRaw)
} else {
p.applyNonStreamingOutputToEntry(entry, result, shouldStoreRaw)
}
// Flip status for passthrough error responses (4xx/5xx from provider)
if isPassthroughErrorResponse(result) {
entry.Status = "error"
}
}
applyLargePayloadPreviewsToEntry(ctx, entry, contentLoggingEnabled)
// Calculate cost
var cacheDebug *schemas.BifrostCacheDebug
if result != nil {
cacheDebug = result.GetExtraFields().CacheDebug
}
entry.CacheDebugParsed = cacheDebug
if p.pricingManager != nil {
pricingScopes := modelcatalog.PricingLookupScopesFromContext(ctx, string(entry.Provider))
if cost := p.pricingManager.CalculateCost(result, pricingScopes); cost > 0 {
entry.Cost = &cost
}
}
// Pre-apply denormalized fields for WebSocket callback enrichment
if entry.SelectedKeyID != "" && entry.SelectedKeyName != "" {
entry.SelectedKey = &schemas.Key{
ID: entry.SelectedKeyID,
Name: entry.SelectedKeyName,
}
}
if entry.VirtualKeyID != nil && entry.VirtualKeyName != nil && *entry.VirtualKeyID != "" && *entry.VirtualKeyName != "" {
entry.VirtualKey = &tables.TableVirtualKey{
ID: *entry.VirtualKeyID,
Name: *entry.VirtualKeyName,
}
}
if entry.RoutingRuleID != nil && entry.RoutingRuleName != nil && *entry.RoutingRuleID != "" && *entry.RoutingRuleName != "" {
entry.RoutingRule = &tables.TableRoutingRule{
ID: *entry.RoutingRuleID,
Name: *entry.RoutingRuleName,
}
}
p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil))
p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil)
return result, bifrostErr, nil
}
// Cleanup is called when the plugin is being shut down
func (p *LoggerPlugin) Cleanup() error {
p.cleanupOnce.Do(func() {
// Stop the cleanup ticker
if p.cleanupTicker != nil {
p.cleanupTicker.Stop()
}
// Signal the cleanup worker to stop
close(p.done)
// Close write queue FIRST — batchWriter drains remaining entries and exits.
// THEN set closed flag — this prevents panics from sends-on-closed-channel
// in enqueueLogEntry (the defer/recover there catches the race window).
close(p.writeQueue)
p.closed.Store(true)
// Wait for the cleanup worker and batch writer to finish
p.wg.Wait()
// Note: Accumulator cleanup is handled by the tracer, not the logging plugin
// GORM handles connection cleanup automatically
})
return nil
}
// storeOrEnqueueEntry stores a log entry in pendingLogs keyed by traceID for later
// retrieval by Inject(), or enqueues directly if no traceID is available (Go SDK path).
// Multiple entries per traceID are supported (e.g. fallback/retry attempts within the same trace).
func (p *LoggerPlugin) storeOrEnqueueEntry(ctx *schemas.BifrostContext, entry *logstore.Log, callback func(entry *logstore.Log)) {
traceID, _ := ctx.Value(schemas.BifrostContextKeyTraceID).(string)
if traceID != "" {
// Append to slice for Inject() to pick up — supports multiple attempts per trace
existing, loaded := p.pendingLogsToInject.LoadOrStore(traceID, &pendingInjectEntries{entries: []*logstore.Log{entry}, createdAt: time.Now()})
if !loaded {
return
}
pending := existing.(*pendingInjectEntries)
pending.mu.Lock()
pending.entries = append(pending.entries, entry)
pending.mu.Unlock()
} else {
// Fallback: no tracing (Go SDK path), enqueue directly
p.enqueueLogEntry(entry, callback)
}
}
// Inject receives a completed trace and writes the log entries with plugin logs to DB.
// This implements the ObservabilityPlugin interface.
func (p *LoggerPlugin) Inject(_ context.Context, trace *schemas.Trace) error {
if trace == nil {
return nil
}
// Retrieve pending log entries built by PostLLMHook (stored by traceID)
entryVal, ok := p.pendingLogsToInject.LoadAndDelete(trace.TraceID)
if !ok {
return nil
}
pending, ok := entryVal.(*pendingInjectEntries)
if !ok {
return nil
}
// Serialize plugin logs once for all entries
var pluginLogsJSON string
if len(trace.PluginLogs) > 0 {
grouped := schemas.GroupPluginLogsByName(trace.PluginLogs)
if data, err := sonic.Marshal(grouped); err == nil {
pluginLogsJSON = string(data)
}
}
// Enqueue each log entry (supports multiple attempts per trace)
for _, entry := range pending.entries {
entry.PluginLogs = pluginLogsJSON
p.enqueueLogEntry(entry, p.makePostWriteCallback(nil))
}
return nil
}
// MCP Plugin Interface Implementation
// SetMCPToolLogCallback sets a callback function that will be called for each MCP tool log entry
func (p *LoggerPlugin) SetMCPToolLogCallback(callback MCPToolLogCallback) {
p.mu.Lock()
defer p.mu.Unlock()
p.mcpToolLogCallback = callback
}
// PreMCPHook is called before an MCP tool execution - creates initial log entry
// Parameters:
// - ctx: The Bifrost context
// - req: The MCP request containing tool call information
//
// Returns:
// - *schemas.BifrostMCPRequest: The unmodified request
// - *schemas.MCPPluginShortCircuit: nil (no short-circuiting)
// - error: nil (errors are logged but don't fail the request)
func (p *LoggerPlugin) PreMCPHook(ctx *schemas.BifrostContext, req *schemas.BifrostMCPRequest) (*schemas.BifrostMCPRequest, *schemas.MCPPluginShortCircuit, error) {
if ctx == nil {
p.logger.Error("context is nil in PreMCPHook")
return req, nil, nil
}
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
if !ok || requestID == "" {
p.logger.Error("request-id not found in context or is empty in PreMCPHook")
return req, nil, nil
}
// Get parent request ID if this MCP call is part of a larger LLM request (using the MCP agent original request ID)
parentRequestID, _ := ctx.Value(schemas.BifrostMCPAgentOriginalRequestID).(string)
createdTimestamp := time.Now().UTC()
// Extract tool name and arguments from the request
var toolName string
var serverLabel string
fullToolName := req.GetToolName()
arguments := req.GetToolArguments()
// Skip execution for codemode tools
if bifrost.IsCodemodeTool(fullToolName) {
return req, nil, nil
}
// Extract server label from tool name (format: {client}-{tool_name})
// The first part before hyphen is the client/server label
if fullToolName != "" {
if idx := strings.Index(fullToolName, "-"); idx > 0 {
serverLabel = fullToolName[:idx]
toolName = fullToolName[idx+1:]
} else {
toolName = fullToolName
}
switch toolName {
case mcp.ToolTypeListToolFiles, mcp.ToolTypeReadToolFile, mcp.ToolTypeExecuteToolCode:
if serverLabel == "" {
serverLabel = "codemode"
}
}
}
// Get virtual key information from context - using same method as normal LLM logging
virtualKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyID)
virtualKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyName)
// Use the per-tool-call unique MCP log ID (set by agent executor per goroutine) as the
// primary key. Fall back to requestID if not set (e.g. direct single tool call).
mcpLogID, ok := ctx.Value(schemas.BifrostContextKeyMCPLogID).(string)
if !ok || mcpLogID == "" {
mcpLogID = requestID
}
go func() {
entry := &logstore.MCPToolLog{
ID: mcpLogID,
RequestID: requestID,
Timestamp: createdTimestamp,
ToolName: toolName,
ServerLabel: serverLabel,
Status: "processing",
CreatedAt: createdTimestamp,
}
if parentRequestID != "" {
entry.LLMRequestID = &parentRequestID
}
if virtualKeyID != "" {
entry.VirtualKeyID = &virtualKeyID
}
if virtualKeyName != "" {
entry.VirtualKeyName = &virtualKeyName
}
// Set arguments if content logging is enabled
if p.disableContentLogging == nil || !*p.disableContentLogging {
entry.ArgumentsParsed = arguments
}
// Capture configured logging headers and x-bf-lh-* headers into metadata
entry.MetadataParsed = p.captureLoggingHeaders(ctx)
if err := p.store.CreateMCPToolLog(p.ctx, entry); err != nil {
p.logger.Warn("Failed to insert initial MCP tool log entry for request %s: %v", requestID, err)
} else {
// Capture callback under lock, then call it outside the critical section
p.mu.Lock()
callback := p.mcpToolLogCallback
p.mu.Unlock()
if callback != nil {
callback(entry)
}
}
}()
return req, nil, nil
}
// PostMCPHook is called after an MCP tool execution - updates the log entry with results
// Parameters:
// - ctx: The Bifrost context
// - resp: The MCP response containing tool execution result
// - bifrostErr: Any error that occurred during execution
//
// Returns:
// - *schemas.BifrostMCPResponse: The unmodified response
// - *schemas.BifrostError: The unmodified error
// - error: nil (errors are logged but don't fail the request)
func (p *LoggerPlugin) PostMCPHook(ctx *schemas.BifrostContext, resp *schemas.BifrostMCPResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostMCPResponse, *schemas.BifrostError, error) {
if ctx == nil {
p.logger.Error("context is nil in PostMCPHook")
return resp, bifrostErr, nil
}
// Skip logging for codemode tools (executeToolCode, listToolFiles, readToolFile)
// We check the tool name from the response instead of context flags
if resp != nil && bifrost.IsCodemodeTool(resp.ExtraFields.ToolName) {
return resp, bifrostErr, nil
}
requestID, ok := ctx.Value(schemas.BifrostContextKeyRequestID).(string)
if !ok || requestID == "" {
p.logger.Error("request-id not found in context or is empty in PostMCPHook")
return resp, bifrostErr, nil
}
// Use the per-tool-call unique MCP log ID to find the correct log entry.
mcpLogID, ok := ctx.Value(schemas.BifrostContextKeyMCPLogID).(string)
if !ok || mcpLogID == "" {
mcpLogID = requestID
}
// Extract virtual key ID and name from context (set by governance plugin)
virtualKeyID := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyID)
virtualKeyName := bifrost.GetStringFromContext(ctx, schemas.BifrostContextKeyGovernanceVirtualKeyName)
go func() {
updates := make(map[string]interface{})
// Update virtual key ID and name if they are set (from governance plugin)
if virtualKeyID != "" {
updates["virtual_key_id"] = virtualKeyID
}
if virtualKeyName != "" {
updates["virtual_key_name"] = virtualKeyName
}
// Get latency from response ExtraFields
if resp != nil {
updates["latency"] = float64(resp.ExtraFields.Latency)
}
// Calculate MCP tool cost from catalog if available
var toolCost float64
success := (resp != nil && bifrostErr == nil)
if success && resp != nil && p.mcpCatalog != nil && resp.ExtraFields.ClientName != "" && resp.ExtraFields.ToolName != "" {
// Use separate client name and tool name fields
if pricingEntry, ok := p.mcpCatalog.GetPricingData(resp.ExtraFields.ClientName, resp.ExtraFields.ToolName); ok {
toolCost = pricingEntry.CostPerExecution
updates["cost"] = toolCost
p.logger.Debug("MCP tool cost for %s.%s: $%.6f", resp.ExtraFields.ClientName, resp.ExtraFields.ToolName, toolCost)
}
}
if bifrostErr != nil {
updates["status"] = "error"
// Serialize error details
tempEntry := &logstore.MCPToolLog{}
tempEntry.ErrorDetailsParsed = bifrostErr
if err := tempEntry.SerializeFields(); err == nil {
updates["error_details"] = tempEntry.ErrorDetails
}
} else if resp != nil {
updates["status"] = "success"
// Store result if content logging is enabled
if p.disableContentLogging == nil || !*p.disableContentLogging {
var result interface{}
if resp.ChatMessage != nil {
// For ChatMessage, try to parse the content as JSON if it's a string
if resp.ChatMessage.Content != nil && resp.ChatMessage.Content.ContentStr != nil {
contentStr := *resp.ChatMessage.Content.ContentStr
var parsedContent interface{}
if err := sonic.Unmarshal([]byte(contentStr), &parsedContent); err == nil {
// Content is valid JSON, use parsed version
result = parsedContent
} else {
// Content is not valid JSON or failed to parse, store the whole message
result = resp.ChatMessage
}
} else {
result = resp.ChatMessage
}
} else if resp.ResponsesMessage != nil {
result = resp.ResponsesMessage
}
if result != nil {
tempEntry := &logstore.MCPToolLog{}
tempEntry.ResultParsed = result
if err := tempEntry.SerializeFields(); err == nil {
updates["result"] = tempEntry.Result
}
}
}
} else {
updates["status"] = "error"
tempEntry := &logstore.MCPToolLog{}
tempEntry.ErrorDetailsParsed = &schemas.BifrostError{
IsBifrostError: true,
Error: &schemas.ErrorField{
Message: "MCP tool execution returned nil response",
},
}
if err := tempEntry.SerializeFields(); err == nil {
updates["error_details"] = tempEntry.ErrorDetails
}
}
processingErr := retryOnNotFound(p.ctx, func() error {
return p.store.UpdateMCPToolLog(p.ctx, mcpLogID, updates)
})
if processingErr != nil {
p.logger.Warn("failed to process MCP tool log update for request %s: %v", requestID, processingErr)
} else {
// Capture callback under lock, then perform DB I/O and invoke callback outside critical section
p.mu.Lock()
callback := p.mcpToolLogCallback
p.mu.Unlock()
if callback != nil {
if updatedEntry, getErr := p.store.FindMCPToolLog(p.ctx, mcpLogID); getErr == nil {
callback(updatedEntry)
} else {
p.logger.Warn("failed to find updated entry for callback: %v", getErr)
}
}
}
}()
return resp, bifrostErr, nil
}