Files
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

1654 lines
58 KiB
Go

// Package handlers provides HTTP request handlers for the Bifrost HTTP transport.
// This file contains logging-related handlers for log search, stats, and management.
package handlers
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/bytedance/sonic"
"github.com/fasthttp/router"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/framework/configstore/tables"
"github.com/maximhq/bifrost/framework/logstore"
"github.com/maximhq/bifrost/plugins/logging"
"github.com/maximhq/bifrost/transports/bifrost-http/lib"
"github.com/valyala/fasthttp"
"golang.org/x/sync/errgroup"
)
// LoggingHandler manages HTTP requests for logging operations
type LoggingHandler struct {
logManager logging.LogManager
redactedKeysManager RedactedKeysManager
config *lib.Config
}
// Keep session log page size in one place so the session sheet limit is easy to tune later.
const sessionLogPageLimit = 500
func parseParentRequestIDFilter(ctx *fasthttp.RequestCtx) string {
if parentRequestID := string(ctx.QueryArgs().Peek("parent_request_id")); strings.TrimSpace(parentRequestID) != "" {
return parentRequestID
}
if sessionID := string(ctx.QueryArgs().Peek("session_id")); strings.TrimSpace(sessionID) != "" {
return sessionID
}
return ""
}
type RedactedKeysManager interface {
GetAllRedactedKeys(ctx context.Context, ids []string) []schemas.Key
GetAllRedactedVirtualKeys(ctx context.Context, ids []string) []tables.TableVirtualKey
GetAllRedactedRoutingRules(ctx context.Context, ids []string) []tables.TableRoutingRule
}
// NewLoggingHandler creates a new logging handler instance
func NewLoggingHandler(logManager logging.LogManager, redactedKeysManager RedactedKeysManager, config *lib.Config) *LoggingHandler {
return &LoggingHandler{
logManager: logManager,
redactedKeysManager: redactedKeysManager,
config: config,
}
}
func (h *LoggingHandler) shouldHideDeletedVirtualKeysInFilters() bool {
if h == nil || h.config == nil {
return false
}
return h.config.ClientConfig.HideDeletedVirtualKeysInFilters
}
// RegisterRoutes registers all logging-related routes
func (h *LoggingHandler) RegisterRoutes(r *router.Router, middlewares ...schemas.BifrostHTTPMiddleware) {
// LLM Log retrieval with filtering, search, and pagination
r.GET("/api/logs", lib.ChainMiddlewares(h.getLogs, middlewares...))
r.GET("/api/logs/sessions/{session_id}/summary", lib.ChainMiddlewares(h.getLogSessionSummaryByID, middlewares...))
r.GET("/api/logs/sessions/{session_id}", lib.ChainMiddlewares(h.getLogSessionByID, middlewares...))
r.GET("/api/logs/{id}", lib.ChainMiddlewares(h.getLogByID, middlewares...))
r.GET("/api/logs/stats", lib.ChainMiddlewares(h.getLogsStats, middlewares...))
r.GET("/api/logs/histogram", lib.ChainMiddlewares(h.getLogsHistogram, middlewares...))
r.GET("/api/logs/histogram/tokens", lib.ChainMiddlewares(h.getLogsTokenHistogram, middlewares...))
r.GET("/api/logs/histogram/cost", lib.ChainMiddlewares(h.getLogsCostHistogram, middlewares...))
r.GET("/api/logs/histogram/models", lib.ChainMiddlewares(h.getLogsModelHistogram, middlewares...))
r.GET("/api/logs/histogram/latency", lib.ChainMiddlewares(h.getLogsLatencyHistogram, middlewares...))
r.GET("/api/logs/histogram/cost/by-provider", lib.ChainMiddlewares(h.getLogsProviderCostHistogram, middlewares...))
r.GET("/api/logs/histogram/tokens/by-provider", lib.ChainMiddlewares(h.getLogsProviderTokenHistogram, middlewares...))
r.GET("/api/logs/histogram/latency/by-provider", lib.ChainMiddlewares(h.getLogsProviderLatencyHistogram, middlewares...))
r.GET("/api/logs/histogram/cost/by-dimension", lib.ChainMiddlewares(h.getLogsDimensionCostHistogram, middlewares...))
r.GET("/api/logs/histogram/tokens/by-dimension", lib.ChainMiddlewares(h.getLogsDimensionTokenHistogram, middlewares...))
r.GET("/api/logs/histogram/latency/by-dimension", lib.ChainMiddlewares(h.getLogsDimensionLatencyHistogram, middlewares...))
r.GET("/api/logs/dropped", lib.ChainMiddlewares(h.getDroppedRequests, middlewares...))
r.GET("/api/logs/filterdata", lib.ChainMiddlewares(h.getAvailableFilterData, middlewares...))
r.GET("/api/logs/rankings", lib.ChainMiddlewares(h.getModelRankings, middlewares...))
r.DELETE("/api/logs", lib.ChainMiddlewares(h.deleteLogs, middlewares...))
r.POST("/api/logs/recalculate-cost", lib.ChainMiddlewares(h.recalculateLogCosts, middlewares...))
// MCP Tool Log retrieval with filtering, search, and pagination
r.GET("/api/mcp-logs", lib.ChainMiddlewares(h.getMCPLogs, middlewares...))
r.GET("/api/mcp-logs/stats", lib.ChainMiddlewares(h.getMCPLogsStats, middlewares...))
r.GET("/api/mcp-logs/filterdata", lib.ChainMiddlewares(h.getMCPLogsFilterData, middlewares...))
r.GET("/api/mcp-logs/histogram", lib.ChainMiddlewares(h.getMCPHistogram, middlewares...))
r.GET("/api/mcp-logs/histogram/cost", lib.ChainMiddlewares(h.getMCPCostHistogram, middlewares...))
r.GET("/api/mcp-logs/histogram/top-tools", lib.ChainMiddlewares(h.getMCPTopTools, middlewares...))
r.DELETE("/api/mcp-logs", lib.ChainMiddlewares(h.deleteMCPLogs, middlewares...))
}
// getLogSessionByID handles GET /api/logs/sessions/{session_id} - Get logs in a single session.
func (h *LoggingHandler) getLogSessionByID(ctx *fasthttp.RequestCtx) {
rawSessionID, ok := ctx.UserValue("session_id").(string)
if !ok || strings.TrimSpace(rawSessionID) == "" {
SendError(ctx, fasthttp.StatusBadRequest, "session_id is required")
return
}
pagination := &logstore.PaginationOptions{
Limit: sessionLogPageLimit,
Offset: 0,
SortBy: "timestamp",
Order: "asc",
}
if limit := string(ctx.QueryArgs().Peek("limit")); limit != "" {
i, err := strconv.Atoi(limit)
if err != nil {
SendError(ctx, fasthttp.StatusBadRequest, "invalid limit")
return
}
if i <= 0 {
SendError(ctx, fasthttp.StatusBadRequest, "limit must be greater than 0")
return
}
if i > sessionLogPageLimit {
SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("limit cannot exceed %d", sessionLogPageLimit))
return
}
pagination.Limit = i
}
if offset := string(ctx.QueryArgs().Peek("offset")); offset != "" {
i, err := strconv.Atoi(offset)
if err != nil {
SendError(ctx, fasthttp.StatusBadRequest, "invalid offset")
return
}
if i < 0 {
SendError(ctx, fasthttp.StatusBadRequest, "offset cannot be negative")
return
}
pagination.Offset = i
}
if order := string(ctx.QueryArgs().Peek("order")); order != "" {
if order != "asc" && order != "desc" {
SendError(ctx, fasthttp.StatusBadRequest, "order must be 'asc' or 'desc'")
return
}
pagination.Order = order
}
result, err := h.logManager.GetSessionLogs(ctx, rawSessionID, pagination)
if err != nil {
logger.Error("failed to fetch session logs: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Session fetch failed: %v", err))
return
}
selectedKeyIDs := make(map[string]struct{})
virtualKeyIDs := make(map[string]struct{})
routingRuleIDs := make(map[string]struct{})
for _, log := range result.Logs {
if log.SelectedKeyID != "" {
selectedKeyIDs[log.SelectedKeyID] = struct{}{}
}
if log.VirtualKeyID != nil && *log.VirtualKeyID != "" {
virtualKeyIDs[*log.VirtualKeyID] = struct{}{}
}
if log.RoutingRuleID != nil && *log.RoutingRuleID != "" {
routingRuleIDs[*log.RoutingRuleID] = struct{}{}
}
}
toSlice := func(m map[string]struct{}) []string {
if len(m) == 0 {
return nil
}
out := make([]string, 0, len(m))
for id := range m {
out = append(out, id)
}
return out
}
redactedKeys := h.redactedKeysManager.GetAllRedactedKeys(ctx, toSlice(selectedKeyIDs))
redactedVirtualKeys := h.redactedKeysManager.GetAllRedactedVirtualKeys(ctx, toSlice(virtualKeyIDs))
redactedRoutingRules := h.redactedKeysManager.GetAllRedactedRoutingRules(ctx, toSlice(routingRuleIDs))
for i, log := range result.Logs {
if log.SelectedKeyID != "" && log.SelectedKeyName != "" {
result.Logs[i].SelectedKey = findRedactedKey(redactedKeys, log.SelectedKeyID, log.SelectedKeyName)
}
if log.VirtualKeyID != nil && log.VirtualKeyName != nil && *log.VirtualKeyID != "" && *log.VirtualKeyName != "" {
result.Logs[i].VirtualKey = findRedactedVirtualKey(redactedVirtualKeys, *log.VirtualKeyID, *log.VirtualKeyName)
}
if log.RoutingRuleID != nil && log.RoutingRuleName != nil && *log.RoutingRuleID != "" && *log.RoutingRuleName != "" {
result.Logs[i].RoutingRule = findRedactedRoutingRule(redactedRoutingRules, *log.RoutingRuleID, *log.RoutingRuleName)
}
}
SendJSON(ctx, result)
}
// getLogSessionSummaryByID handles GET /api/logs/sessions/{session_id}/summary - Get aggregate totals for a single session.
func (h *LoggingHandler) getLogSessionSummaryByID(ctx *fasthttp.RequestCtx) {
rawSessionID, ok := ctx.UserValue("session_id").(string)
if !ok || strings.TrimSpace(rawSessionID) == "" {
SendError(ctx, fasthttp.StatusBadRequest, "session_id is required")
return
}
result, err := h.logManager.GetSessionSummary(ctx, rawSessionID)
if err != nil {
logger.Error("failed to fetch session summary: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Session summary fetch failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogs handles GET /api/logs - Get logs with filtering, search, and pagination via query parameters
func (h *LoggingHandler) getLogs(ctx *fasthttp.RequestCtx) {
// Parse query parameters into filters
filters := &logstore.SearchFilters{}
pagination := &logstore.PaginationOptions{}
// Extract filters from query parameters
if providers := string(ctx.QueryArgs().Peek("providers")); providers != "" {
filters.Providers = parseCommaSeparated(providers)
}
if models := string(ctx.QueryArgs().Peek("models")); models != "" {
filters.Models = parseCommaSeparated(models)
}
if aliases := string(ctx.QueryArgs().Peek("aliases")); aliases != "" {
filters.Aliases = parseCommaSeparated(aliases)
}
if statuses := string(ctx.QueryArgs().Peek("status")); statuses != "" {
filters.Status = parseCommaSeparated(statuses)
}
if objects := string(ctx.QueryArgs().Peek("objects")); objects != "" {
filters.Objects = parseCommaSeparated(objects)
}
if parentRequestID := parseParentRequestIDFilter(ctx); parentRequestID != "" {
filters.ParentRequestID = parentRequestID
}
if selectedKeyIDs := string(ctx.QueryArgs().Peek("selected_key_ids")); selectedKeyIDs != "" {
filters.SelectedKeyIDs = parseCommaSeparated(selectedKeyIDs)
}
if virtualKeyIDs := string(ctx.QueryArgs().Peek("virtual_key_ids")); virtualKeyIDs != "" {
filters.VirtualKeyIDs = parseCommaSeparated(virtualKeyIDs)
}
if routingRuleIDs := string(ctx.QueryArgs().Peek("routing_rule_ids")); routingRuleIDs != "" {
filters.RoutingRuleIDs = parseCommaSeparated(routingRuleIDs)
}
if teamIDs := string(ctx.QueryArgs().Peek("team_ids")); teamIDs != "" {
filters.TeamIDs = parseCommaSeparated(teamIDs)
}
if customerIDs := string(ctx.QueryArgs().Peek("customer_ids")); customerIDs != "" {
filters.CustomerIDs = parseCommaSeparated(customerIDs)
}
if userIDs := string(ctx.QueryArgs().Peek("user_ids")); userIDs != "" {
filters.UserIDs = parseCommaSeparated(userIDs)
}
if businessUnitIDs := string(ctx.QueryArgs().Peek("business_unit_ids")); businessUnitIDs != "" {
filters.BusinessUnitIDs = parseCommaSeparated(businessUnitIDs)
}
if routingEngines := string(ctx.QueryArgs().Peek("routing_engine_used")); routingEngines != "" {
filters.RoutingEngineUsed = parseCommaSeparated(routingEngines)
}
if startTime := string(ctx.QueryArgs().Peek("start_time")); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
filters.StartTime = &t
}
}
if endTime := string(ctx.QueryArgs().Peek("end_time")); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
filters.EndTime = &t
}
}
if minLatency := string(ctx.QueryArgs().Peek("min_latency")); minLatency != "" {
if f, err := strconv.ParseFloat(minLatency, 64); err == nil {
filters.MinLatency = &f
}
}
if maxLatency := string(ctx.QueryArgs().Peek("max_latency")); maxLatency != "" {
if val, err := strconv.ParseFloat(maxLatency, 64); err == nil {
filters.MaxLatency = &val
}
}
if minTokens := string(ctx.QueryArgs().Peek("min_tokens")); minTokens != "" {
if val, err := strconv.Atoi(minTokens); err == nil {
filters.MinTokens = &val
}
}
if maxTokens := string(ctx.QueryArgs().Peek("max_tokens")); maxTokens != "" {
if val, err := strconv.Atoi(maxTokens); err == nil {
filters.MaxTokens = &val
}
}
if cost := string(ctx.QueryArgs().Peek("min_cost")); cost != "" {
if val, err := strconv.ParseFloat(cost, 64); err == nil {
filters.MinCost = &val
}
}
if maxCost := string(ctx.QueryArgs().Peek("max_cost")); maxCost != "" {
if val, err := strconv.ParseFloat(maxCost, 64); err == nil {
filters.MaxCost = &val
}
}
if missingCost := string(ctx.QueryArgs().Peek("missing_cost_only")); missingCost != "" {
if val, err := strconv.ParseBool(missingCost); err == nil {
filters.MissingCostOnly = val
}
}
if contentSearch := string(ctx.QueryArgs().Peek("content_search")); contentSearch != "" {
filters.ContentSearch = contentSearch
}
parseMetadataFilters(ctx, filters)
// Extract pagination parameters
pagination.Limit = 50 // Default limit
if limit := string(ctx.QueryArgs().Peek("limit")); limit != "" {
if i, err := strconv.Atoi(limit); err == nil {
if i <= 0 {
SendError(ctx, fasthttp.StatusBadRequest, "limit must be greater than 0")
return
}
if i > 1000 {
SendError(ctx, fasthttp.StatusBadRequest, "limit cannot exceed 1000")
return
}
pagination.Limit = i
}
}
pagination.Offset = 0 // Default offset
if offset := string(ctx.QueryArgs().Peek("offset")); offset != "" {
if i, err := strconv.Atoi(offset); err == nil {
if i < 0 {
SendError(ctx, fasthttp.StatusBadRequest, "offset cannot be negative")
return
}
pagination.Offset = i
}
}
// Sort parameters
pagination.SortBy = "timestamp" // Default sort field
if sortBy := string(ctx.QueryArgs().Peek("sort_by")); sortBy != "" {
if sortBy == "timestamp" || sortBy == "latency" || sortBy == "tokens" || sortBy == "cost" {
pagination.SortBy = sortBy
}
}
pagination.Order = "desc" // Default sort order
if order := string(ctx.QueryArgs().Peek("order")); order != "" {
if order == "asc" || order == "desc" {
pagination.Order = order
}
}
result, err := h.logManager.Search(ctx, filters, pagination)
if err != nil {
logger.Error("failed to search logs: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Search failed: %v", err))
return
}
selectedKeyIDs := make(map[string]struct{})
virtualKeyIDs := make(map[string]struct{})
routingRuleIDs := make(map[string]struct{})
for _, log := range result.Logs {
if log.SelectedKeyID != "" {
selectedKeyIDs[log.SelectedKeyID] = struct{}{}
}
if log.VirtualKeyID != nil && *log.VirtualKeyID != "" {
virtualKeyIDs[*log.VirtualKeyID] = struct{}{}
}
if log.RoutingRuleID != nil && *log.RoutingRuleID != "" {
routingRuleIDs[*log.RoutingRuleID] = struct{}{}
}
}
toSlice := func(m map[string]struct{}) []string {
if len(m) == 0 {
return nil
}
out := make([]string, 0, len(m))
for id := range m {
out = append(out, id)
}
return out
}
redactedKeys := h.redactedKeysManager.GetAllRedactedKeys(ctx, toSlice(selectedKeyIDs))
redactedVirtualKeys := h.redactedKeysManager.GetAllRedactedVirtualKeys(ctx, toSlice(virtualKeyIDs))
redactedRoutingRules := h.redactedKeysManager.GetAllRedactedRoutingRules(ctx, toSlice(routingRuleIDs))
// Add selected key, virtual key, and routing rule to the result
for i, log := range result.Logs {
if log.SelectedKeyID != "" && log.SelectedKeyName != "" {
result.Logs[i].SelectedKey = findRedactedKey(redactedKeys, log.SelectedKeyID, log.SelectedKeyName)
}
if log.VirtualKeyID != nil && log.VirtualKeyName != nil && *log.VirtualKeyID != "" && *log.VirtualKeyName != "" {
result.Logs[i].VirtualKey = findRedactedVirtualKey(redactedVirtualKeys, *log.VirtualKeyID, *log.VirtualKeyName)
}
if log.RoutingRuleID != nil && log.RoutingRuleName != nil && *log.RoutingRuleID != "" && *log.RoutingRuleName != "" {
result.Logs[i].RoutingRule = findRedactedRoutingRule(redactedRoutingRules, *log.RoutingRuleID, *log.RoutingRuleName)
}
}
SendJSON(ctx, result)
}
// getLogByID handles GET /api/logs/{id} - Get a single log entry by ID including raw_request and raw_response
func (h *LoggingHandler) getLogByID(ctx *fasthttp.RequestCtx) {
id, ok := ctx.UserValue("id").(string)
if !ok || id == "" {
SendError(ctx, fasthttp.StatusBadRequest, "log id is required")
return
}
log, err := h.logManager.GetLog(ctx, id)
if err != nil {
if errors.Is(err, logstore.ErrNotFound) {
SendError(ctx, fasthttp.StatusNotFound, "log not found")
return
}
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("failed to get log: %v", err))
return
}
// Assemble virtual key, selected key, and routing rule objects (gorm:"-" fields not
// populated by GetLog) so the detail view receives the same structure as the list endpoint.
if log.SelectedKeyID != "" && log.SelectedKeyName != "" {
redactedKeys := h.redactedKeysManager.GetAllRedactedKeys(ctx, []string{log.SelectedKeyID})
log.SelectedKey = findRedactedKey(redactedKeys, log.SelectedKeyID, log.SelectedKeyName)
}
if log.VirtualKeyID != nil && log.VirtualKeyName != nil && *log.VirtualKeyID != "" && *log.VirtualKeyName != "" {
redactedVirtualKeys := h.redactedKeysManager.GetAllRedactedVirtualKeys(ctx, []string{*log.VirtualKeyID})
log.VirtualKey = findRedactedVirtualKey(redactedVirtualKeys, *log.VirtualKeyID, *log.VirtualKeyName)
}
if log.RoutingRuleID != nil && log.RoutingRuleName != nil && *log.RoutingRuleID != "" && *log.RoutingRuleName != "" {
redactedRoutingRules := h.redactedKeysManager.GetAllRedactedRoutingRules(ctx, []string{*log.RoutingRuleID})
log.RoutingRule = findRedactedRoutingRule(redactedRoutingRules, *log.RoutingRuleID, *log.RoutingRuleName)
}
SendJSON(ctx, log)
}
// getLogsStats handles GET /api/logs/stats - Get statistics for logs with filtering
func (h *LoggingHandler) getLogsStats(ctx *fasthttp.RequestCtx) {
// Parse query parameters into filters (same as getLogs)
filters := &logstore.SearchFilters{}
// Extract filters from query parameters
if providers := string(ctx.QueryArgs().Peek("providers")); providers != "" {
filters.Providers = parseCommaSeparated(providers)
}
if models := string(ctx.QueryArgs().Peek("models")); models != "" {
filters.Models = parseCommaSeparated(models)
}
if aliases := string(ctx.QueryArgs().Peek("aliases")); aliases != "" {
filters.Aliases = parseCommaSeparated(aliases)
}
if statuses := string(ctx.QueryArgs().Peek("status")); statuses != "" {
filters.Status = parseCommaSeparated(statuses)
}
if objects := string(ctx.QueryArgs().Peek("objects")); objects != "" {
filters.Objects = parseCommaSeparated(objects)
}
if parentRequestID := parseParentRequestIDFilter(ctx); parentRequestID != "" {
filters.ParentRequestID = parentRequestID
}
if selectedKeyIDs := string(ctx.QueryArgs().Peek("selected_key_ids")); selectedKeyIDs != "" {
filters.SelectedKeyIDs = parseCommaSeparated(selectedKeyIDs)
}
if virtualKeyIDs := string(ctx.QueryArgs().Peek("virtual_key_ids")); virtualKeyIDs != "" {
filters.VirtualKeyIDs = parseCommaSeparated(virtualKeyIDs)
}
if routingRuleIDs := string(ctx.QueryArgs().Peek("routing_rule_ids")); routingRuleIDs != "" {
filters.RoutingRuleIDs = parseCommaSeparated(routingRuleIDs)
}
if teamIDs := string(ctx.QueryArgs().Peek("team_ids")); teamIDs != "" {
filters.TeamIDs = parseCommaSeparated(teamIDs)
}
if customerIDs := string(ctx.QueryArgs().Peek("customer_ids")); customerIDs != "" {
filters.CustomerIDs = parseCommaSeparated(customerIDs)
}
if userIDs := string(ctx.QueryArgs().Peek("user_ids")); userIDs != "" {
filters.UserIDs = parseCommaSeparated(userIDs)
}
if businessUnitIDs := string(ctx.QueryArgs().Peek("business_unit_ids")); businessUnitIDs != "" {
filters.BusinessUnitIDs = parseCommaSeparated(businessUnitIDs)
}
if routingEngines := string(ctx.QueryArgs().Peek("routing_engine_used")); routingEngines != "" {
filters.RoutingEngineUsed = parseCommaSeparated(routingEngines)
}
if startTime := string(ctx.QueryArgs().Peek("start_time")); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
filters.StartTime = &t
}
}
if endTime := string(ctx.QueryArgs().Peek("end_time")); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
filters.EndTime = &t
}
}
if minLatency := string(ctx.QueryArgs().Peek("min_latency")); minLatency != "" {
if f, err := strconv.ParseFloat(minLatency, 64); err == nil {
filters.MinLatency = &f
}
}
if maxLatency := string(ctx.QueryArgs().Peek("max_latency")); maxLatency != "" {
if val, err := strconv.ParseFloat(maxLatency, 64); err == nil {
filters.MaxLatency = &val
}
}
if minTokens := string(ctx.QueryArgs().Peek("min_tokens")); minTokens != "" {
if val, err := strconv.Atoi(minTokens); err == nil {
filters.MinTokens = &val
}
}
if maxTokens := string(ctx.QueryArgs().Peek("max_tokens")); maxTokens != "" {
if val, err := strconv.Atoi(maxTokens); err == nil {
filters.MaxTokens = &val
}
}
if cost := string(ctx.QueryArgs().Peek("min_cost")); cost != "" {
if val, err := strconv.ParseFloat(cost, 64); err == nil {
filters.MinCost = &val
}
}
if maxCost := string(ctx.QueryArgs().Peek("max_cost")); maxCost != "" {
if val, err := strconv.ParseFloat(maxCost, 64); err == nil {
filters.MaxCost = &val
}
}
if missingCost := string(ctx.QueryArgs().Peek("missing_cost_only")); missingCost != "" {
if val, err := strconv.ParseBool(missingCost); err == nil {
filters.MissingCostOnly = val
}
}
if contentSearch := string(ctx.QueryArgs().Peek("content_search")); contentSearch != "" {
filters.ContentSearch = contentSearch
}
parseMetadataFilters(ctx, filters)
stats, err := h.logManager.GetStats(ctx, filters)
if err != nil {
logger.Error("failed to get log stats: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Stats calculation failed: %v", err))
return
}
SendJSON(ctx, stats)
}
// getLogsHistogram handles GET /api/logs/histogram - Get time-bucketed request counts
func (h *LoggingHandler) getLogsHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get log histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// calculateBucketSize determines appropriate bucket size based on time range
func calculateBucketSize(start, end *time.Time) int64 {
if start == nil || end == nil {
return 3600 // Default 1 hour
}
duration := end.Sub(*start)
switch {
case duration >= 365*24*time.Hour: // >= 12 months
return 30 * 24 * 3600 // Monthly (30 days)
case duration >= 90*24*time.Hour: // >= 3 months
return 7 * 24 * 3600 // Weekly (7 days)
case duration >= 30*24*time.Hour: // >= 1 month
return 3 * 24 * 3600 // 3 days
case duration >= 7*24*time.Hour: // >= 7 days
return 24 * 3600 // Daily
case duration >= 3*24*time.Hour: // >= 3 days
return 8 * 3600 // 8 hours
case duration >= 24*time.Hour: // >= 24 hours
return 3600 // Hourly
case duration >= 2*time.Hour: // >= 2 hours
return 600 // 10 minutes
default:
return 60 // 1 minute buckets for < 2 hours
}
}
// parseHistogramFilters extracts common filter parameters from query args
func parseHistogramFilters(ctx *fasthttp.RequestCtx) *logstore.SearchFilters {
filters := &logstore.SearchFilters{}
if providers := string(ctx.QueryArgs().Peek("providers")); providers != "" {
filters.Providers = parseCommaSeparated(providers)
}
if models := string(ctx.QueryArgs().Peek("models")); models != "" {
filters.Models = parseCommaSeparated(models)
}
if aliases := string(ctx.QueryArgs().Peek("aliases")); aliases != "" {
filters.Aliases = parseCommaSeparated(aliases)
}
if statuses := string(ctx.QueryArgs().Peek("status")); statuses != "" {
filters.Status = parseCommaSeparated(statuses)
}
if objects := string(ctx.QueryArgs().Peek("objects")); objects != "" {
filters.Objects = parseCommaSeparated(objects)
}
if parentRequestID := parseParentRequestIDFilter(ctx); parentRequestID != "" {
filters.ParentRequestID = parentRequestID
}
if selectedKeyIDs := string(ctx.QueryArgs().Peek("selected_key_ids")); selectedKeyIDs != "" {
filters.SelectedKeyIDs = parseCommaSeparated(selectedKeyIDs)
}
if virtualKeyIDs := string(ctx.QueryArgs().Peek("virtual_key_ids")); virtualKeyIDs != "" {
filters.VirtualKeyIDs = parseCommaSeparated(virtualKeyIDs)
}
if routingRuleIDs := string(ctx.QueryArgs().Peek("routing_rule_ids")); routingRuleIDs != "" {
filters.RoutingRuleIDs = parseCommaSeparated(routingRuleIDs)
}
if teamIDs := string(ctx.QueryArgs().Peek("team_ids")); teamIDs != "" {
filters.TeamIDs = parseCommaSeparated(teamIDs)
}
if customerIDs := string(ctx.QueryArgs().Peek("customer_ids")); customerIDs != "" {
filters.CustomerIDs = parseCommaSeparated(customerIDs)
}
if userIDs := string(ctx.QueryArgs().Peek("user_ids")); userIDs != "" {
filters.UserIDs = parseCommaSeparated(userIDs)
}
if businessUnitIDs := string(ctx.QueryArgs().Peek("business_unit_ids")); businessUnitIDs != "" {
filters.BusinessUnitIDs = parseCommaSeparated(businessUnitIDs)
}
if routingEngines := string(ctx.QueryArgs().Peek("routing_engine_used")); routingEngines != "" {
filters.RoutingEngineUsed = parseCommaSeparated(routingEngines)
}
if startTime := string(ctx.QueryArgs().Peek("start_time")); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
filters.StartTime = &t
}
}
if endTime := string(ctx.QueryArgs().Peek("end_time")); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
filters.EndTime = &t
}
}
if minLatency := string(ctx.QueryArgs().Peek("min_latency")); minLatency != "" {
if f, err := strconv.ParseFloat(minLatency, 64); err == nil {
filters.MinLatency = &f
}
}
if maxLatency := string(ctx.QueryArgs().Peek("max_latency")); maxLatency != "" {
if val, err := strconv.ParseFloat(maxLatency, 64); err == nil {
filters.MaxLatency = &val
}
}
if minTokens := string(ctx.QueryArgs().Peek("min_tokens")); minTokens != "" {
if val, err := strconv.Atoi(minTokens); err == nil {
filters.MinTokens = &val
}
}
if maxTokens := string(ctx.QueryArgs().Peek("max_tokens")); maxTokens != "" {
if val, err := strconv.Atoi(maxTokens); err == nil {
filters.MaxTokens = &val
}
}
if cost := string(ctx.QueryArgs().Peek("min_cost")); cost != "" {
if val, err := strconv.ParseFloat(cost, 64); err == nil {
filters.MinCost = &val
}
}
if maxCost := string(ctx.QueryArgs().Peek("max_cost")); maxCost != "" {
if val, err := strconv.ParseFloat(maxCost, 64); err == nil {
filters.MaxCost = &val
}
}
if missingCost := string(ctx.QueryArgs().Peek("missing_cost_only")); missingCost != "" {
if val, err := strconv.ParseBool(missingCost); err == nil {
filters.MissingCostOnly = val
}
}
if contentSearch := string(ctx.QueryArgs().Peek("content_search")); contentSearch != "" {
filters.ContentSearch = contentSearch
}
parseMetadataFilters(ctx, filters)
return filters
}
// getLogsTokenHistogram handles GET /api/logs/histogram/tokens - Get time-bucketed token usage
func (h *LoggingHandler) getLogsTokenHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetTokenHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get token histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Token histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsCostHistogram handles GET /api/logs/histogram/cost - Get time-bucketed cost data with model breakdown
func (h *LoggingHandler) getLogsCostHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetCostHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get cost histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Cost histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsModelHistogram handles GET /api/logs/histogram/models - Get time-bucketed model usage with success/error breakdown
func (h *LoggingHandler) getLogsModelHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetModelHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get model histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Model histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsLatencyHistogram handles GET /api/logs/histogram/latency - Get time-bucketed latency percentiles
func (h *LoggingHandler) getLogsLatencyHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetLatencyHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get latency histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Latency histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsProviderCostHistogram handles GET /api/logs/histogram/cost/by-provider - Get time-bucketed cost data with provider breakdown
func (h *LoggingHandler) getLogsProviderCostHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetProviderCostHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get provider cost histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Provider cost histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsProviderTokenHistogram handles GET /api/logs/histogram/tokens/by-provider - Get time-bucketed token usage with provider breakdown
func (h *LoggingHandler) getLogsProviderTokenHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetProviderTokenHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get provider token histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Provider token histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsProviderLatencyHistogram handles GET /api/logs/histogram/latency/by-provider - Get time-bucketed latency percentiles with provider breakdown
func (h *LoggingHandler) getLogsProviderLatencyHistogram(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetProviderLatencyHistogram(ctx, filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get provider latency histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Provider latency histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// parseDimension extracts and validates the "dimension" query parameter.
// Returns the validated HistogramDimension and true on success, or sends an error response and returns false.
func parseDimension(ctx *fasthttp.RequestCtx) (logstore.HistogramDimension, bool) {
dim := logstore.HistogramDimension(string(ctx.QueryArgs().Peek("dimension")))
if dim == "" {
SendError(ctx, fasthttp.StatusBadRequest, "Missing required query parameter: dimension. Valid values: provider, team_id, customer_id, user_id, business_unit_id")
return "", false
}
if !logstore.ValidHistogramDimensions[dim] {
SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid dimension: %s. Valid values: provider, team_id, customer_id, user_id, business_unit_id", dim))
return "", false
}
return dim, true
}
// getLogsDimensionCostHistogram handles GET /api/logs/histogram/cost/by-dimension
// Returns time-bucketed cost data grouped by the dimension specified in the "dimension" query param.
func (h *LoggingHandler) getLogsDimensionCostHistogram(ctx *fasthttp.RequestCtx) {
dimension, ok := parseDimension(ctx)
if !ok {
return
}
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetDimensionCostHistogram(ctx, filters, bucketSizeSeconds, dimension)
if err != nil {
logger.Error("failed to get dimension cost histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Dimension cost histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsDimensionTokenHistogram handles GET /api/logs/histogram/tokens/by-dimension
// Returns time-bucketed token usage grouped by the dimension specified in the "dimension" query param.
func (h *LoggingHandler) getLogsDimensionTokenHistogram(ctx *fasthttp.RequestCtx) {
dimension, ok := parseDimension(ctx)
if !ok {
return
}
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetDimensionTokenHistogram(ctx, filters, bucketSizeSeconds, dimension)
if err != nil {
logger.Error("failed to get dimension token histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Dimension token histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getLogsDimensionLatencyHistogram handles GET /api/logs/histogram/latency/by-dimension
// Returns time-bucketed latency percentiles grouped by the dimension specified in the "dimension" query param.
func (h *LoggingHandler) getLogsDimensionLatencyHistogram(ctx *fasthttp.RequestCtx) {
dimension, ok := parseDimension(ctx)
if !ok {
return
}
filters := parseHistogramFilters(ctx)
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetDimensionLatencyHistogram(ctx, filters, bucketSizeSeconds, dimension)
if err != nil {
logger.Error("failed to get dimension latency histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Dimension latency histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getDroppedRequests handles GET /api/logs/dropped - Get the number of dropped requests
func (h *LoggingHandler) getDroppedRequests(ctx *fasthttp.RequestCtx) {
droppedRequests := h.logManager.GetDroppedRequests(ctx)
SendJSON(ctx, map[string]int64{"dropped_requests": droppedRequests})
}
// getModelRankings handles GET /api/logs/rankings - Get models ranked by usage with trends
func (h *LoggingHandler) getModelRankings(ctx *fasthttp.RequestCtx) {
filters := parseHistogramFilters(ctx)
result, err := h.logManager.GetModelRankings(ctx, filters)
if err != nil {
logger.Error("failed to get model rankings: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Model rankings calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getAvailableFilterData handles GET /api/logs/filterdata - Get all unique filter data from logs
func (h *LoggingHandler) getAvailableFilterData(ctx *fasthttp.RequestCtx) {
hideDeletedVirtualKeys := h.shouldHideDeletedVirtualKeysInFilters()
var (
models []string
aliases []string
selectedKeys []logging.KeyPair
virtualKeys []logging.KeyPair
routingRules []logging.KeyPair
routingEngines []string
teams []logging.KeyPair
customers []logging.KeyPair
users []logging.KeyPair
businessUnits []logging.KeyPair
metadataKeys map[string][]string
mu sync.Mutex
)
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
result := h.logManager.GetAvailableModels(gCtx)
mu.Lock()
models = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableAliases(gCtx)
mu.Lock()
aliases = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableSelectedKeys(gCtx)
mu.Lock()
selectedKeys = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableVirtualKeys(gCtx)
mu.Lock()
virtualKeys = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableRoutingRules(gCtx)
mu.Lock()
routingRules = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableRoutingEngines(gCtx)
mu.Lock()
routingEngines = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableTeams(gCtx)
mu.Lock()
teams = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableCustomers(gCtx)
mu.Lock()
customers = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableUsers(gCtx)
mu.Lock()
users = result
mu.Unlock()
return nil
})
g.Go(func() error {
result := h.logManager.GetAvailableBusinessUnits(gCtx)
mu.Lock()
businessUnits = result
mu.Unlock()
return nil
})
g.Go(func() error {
result, err := h.logManager.GetAvailableMetadataKeys(gCtx)
if err != nil {
return err
}
mu.Lock()
metadataKeys = result
mu.Unlock()
return nil
})
if err := g.Wait(); err != nil {
logger.Error("failed to get filter data: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to get filter data: %v", err))
return
}
// Extract IDs for redaction lookup
selectedKeyIDs := make([]string, len(selectedKeys))
for i, key := range selectedKeys {
selectedKeyIDs[i] = key.ID
}
virtualKeyIDs := make([]string, len(virtualKeys))
for i, key := range virtualKeys {
virtualKeyIDs[i] = key.ID
}
routingRuleIDs := make([]string, len(routingRules))
for i, rule := range routingRules {
routingRuleIDs[i] = rule.ID
}
redactedSelectedKeys := make(map[string]schemas.Key)
for _, selectedKey := range h.redactedKeysManager.GetAllRedactedKeys(ctx, selectedKeyIDs) {
redactedSelectedKeys[selectedKey.ID] = selectedKey
}
redactedVirtualKeys := make(map[string]tables.TableVirtualKey)
for _, virtualKey := range h.redactedKeysManager.GetAllRedactedVirtualKeys(ctx, virtualKeyIDs) {
redactedVirtualKeys[virtualKey.ID] = virtualKey
}
redactedRoutingRules := make(map[string]tables.TableRoutingRule)
for _, routingRule := range h.redactedKeysManager.GetAllRedactedRoutingRules(ctx, routingRuleIDs) {
redactedRoutingRules[routingRule.ID] = routingRule
}
// Check if all selected key ids are present in the redacted selected keys (will not be present in case a key is deleted, but we still need to show its filter)
for _, selectedKey := range selectedKeys {
if _, ok := redactedSelectedKeys[selectedKey.ID]; !ok {
// Create a new key struct directly since we know it doesn't exist
redactedSelectedKeys[selectedKey.ID] = schemas.Key{
ID: selectedKey.ID,
Name: selectedKey.Name + " (deleted)",
}
}
}
// Check if all virtual key ids are present in the redacted virtual keys (will not be present in case a virtual key is deleted, but we still need to show its filter)
for _, virtualKey := range virtualKeys {
if _, ok := redactedVirtualKeys[virtualKey.ID]; !ok {
if hideDeletedVirtualKeys {
continue
}
// Create a new virtual key struct directly since we know it doesn't exist
redactedVirtualKeys[virtualKey.ID] = tables.TableVirtualKey{
ID: virtualKey.ID,
Name: virtualKey.Name + " (deleted)",
}
}
}
// Check if all routing rule ids are present in the redacted routing rules (will not be present in case a routing rule is deleted, but we still need to show its filter)
for _, routingRule := range routingRules {
if _, ok := redactedRoutingRules[routingRule.ID]; !ok {
// Create a new routing rule struct directly since we know it doesn't exist
redactedRoutingRules[routingRule.ID] = tables.TableRoutingRule{
ID: routingRule.ID,
Name: routingRule.Name + " (deleted)",
}
}
}
// Convert maps to arrays for frontend consumption
selectedKeysArray := make([]schemas.Key, 0, len(redactedSelectedKeys))
for _, key := range redactedSelectedKeys {
selectedKeysArray = append(selectedKeysArray, key)
}
virtualKeysArray := make([]tables.TableVirtualKey, 0, len(redactedVirtualKeys))
for _, key := range redactedVirtualKeys {
virtualKeysArray = append(virtualKeysArray, key)
}
routingRulesArray := make([]tables.TableRoutingRule, 0, len(redactedRoutingRules))
for _, rule := range redactedRoutingRules {
routingRulesArray = append(routingRulesArray, rule)
}
if metadataKeys == nil {
metadataKeys = make(map[string][]string)
}
SendJSON(ctx, map[string]interface{}{"models": models, "aliases": aliases, "selected_keys": selectedKeysArray, "virtual_keys": virtualKeysArray, "routing_rules": routingRulesArray, "routing_engines": routingEngines, "teams": teams, "customers": customers, "users": users, "business_units": businessUnits, "metadata_keys": metadataKeys})
}
// deleteLogs handles DELETE /api/logs - Delete logs by their IDs
func (h *LoggingHandler) deleteLogs(ctx *fasthttp.RequestCtx) {
var req struct {
IDs []string `json:"ids"`
}
if err := sonic.Unmarshal(ctx.PostBody(), &req); err != nil {
SendError(ctx, fasthttp.StatusBadRequest, "Invalid JSON")
return
}
if len(req.IDs) == 0 {
SendError(ctx, fasthttp.StatusBadRequest, "No log IDs provided")
return
}
if err := h.logManager.DeleteLogs(ctx, req.IDs); err != nil {
logger.Error("failed to delete logs: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, "Failed to delete logs")
return
}
SendJSON(ctx, map[string]interface{}{
"message": "Logs deleted successfully",
})
}
// recalculateLogCosts handles POST /api/logs/recalculate-cost - recompute missing costs in batches
func (h *LoggingHandler) recalculateLogCosts(ctx *fasthttp.RequestCtx) {
var payload recalculateCostRequest
body := ctx.PostBody()
if len(body) > 0 {
if err := sonic.Unmarshal(body, &payload); err != nil {
SendError(ctx, fasthttp.StatusBadRequest, "Invalid JSON")
return
}
}
limit := 200
if payload.Limit != nil {
limit = *payload.Limit
}
if limit <= 0 {
limit = 200
}
if limit > 1000 {
limit = 1000
}
filters := payload.Filters
filters.MissingCostOnly = true
result, err := h.logManager.RecalculateCosts(ctx, &filters, limit)
if err != nil {
logger.Error("failed to recalculate log costs: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to recalculate costs: %v", err))
return
}
SendJSON(ctx, result)
}
// Helper functions
func findRedactedKey(redactedKeys []schemas.Key, id string, name string) *schemas.Key {
if len(redactedKeys) == 0 {
return &schemas.Key{
ID: id,
Name: func() string {
if name != "" {
return name + " (deleted)"
} else {
return ""
}
}(),
}
}
for _, key := range redactedKeys {
if key.ID == id {
return &key
}
}
return &schemas.Key{
ID: id,
Name: func() string {
if name != "" {
return name + " (deleted)"
} else {
return ""
}
}(),
}
}
func findRedactedVirtualKey(redactedVirtualKeys []tables.TableVirtualKey, id string, name string) *tables.TableVirtualKey {
if len(redactedVirtualKeys) == 0 {
return &tables.TableVirtualKey{
ID: id,
Name: func() string {
if name != "" {
return name + " (deleted)"
} else {
return ""
}
}(),
}
}
for _, virtualKey := range redactedVirtualKeys {
if virtualKey.ID == id {
return &virtualKey
}
}
return &tables.TableVirtualKey{
ID: id,
Name: func() string {
if name != "" {
return name + " (deleted)"
} else {
return ""
}
}(),
}
}
func findRedactedRoutingRule(redactedRoutingRules []tables.TableRoutingRule, id string, name string) *tables.TableRoutingRule {
if len(redactedRoutingRules) == 0 {
return &tables.TableRoutingRule{
ID: id,
Name: func() string {
if name != "" {
return name + " (deleted)"
} else {
return ""
}
}(),
}
}
for _, routingRule := range redactedRoutingRules {
if routingRule.ID == id {
return &routingRule
}
}
return &tables.TableRoutingRule{
ID: id,
Name: func() string {
if name != "" {
return name + " (deleted)"
} else {
return ""
}
}(),
}
}
// parseCommaSeparated splits a comma-separated string into a slice
func parseCommaSeparated(s string) []string {
if s == "" {
return nil
}
var result []string
for _, item := range strings.Split(s, ",") {
if trimmed := strings.TrimSpace(item); trimmed != "" {
result = append(result, trimmed)
}
}
return result
}
// parseMetadataFilters extracts metadata_* query params and sets them on the filters.
func parseMetadataFilters(ctx *fasthttp.RequestCtx, filters *logstore.SearchFilters) {
var metadataFilters map[string]string
ctx.QueryArgs().VisitAll(func(key, value []byte) { //nolint:staticcheck
keyStr := string(key)
if strings.HasPrefix(keyStr, "metadata_") {
metadataKey := strings.TrimPrefix(keyStr, "metadata_")
if metadataKey != "" {
if metadataFilters == nil {
metadataFilters = make(map[string]string)
}
metadataFilters[metadataKey] = string(value)
}
}
})
if len(metadataFilters) > 0 {
filters.MetadataFilters = metadataFilters
}
}
type recalculateCostRequest struct {
Filters logstore.SearchFilters `json:"filters"`
Limit *int `json:"limit,omitempty"`
}
// parseMCPFiltersAndPagination parses MCP tool log filters and pagination from query parameters.
// Returns an error if any required parsing fails (e.g., invalid time format, invalid number format).
func parseMCPFiltersAndPagination(ctx *fasthttp.RequestCtx) (*logstore.MCPToolLogSearchFilters, *logstore.PaginationOptions, error) {
filters := &logstore.MCPToolLogSearchFilters{}
pagination := &logstore.PaginationOptions{}
// Extract filters from query parameters
if toolNames := string(ctx.QueryArgs().Peek("tool_names")); toolNames != "" {
filters.ToolNames = parseCommaSeparated(toolNames)
}
if serverLabels := string(ctx.QueryArgs().Peek("server_labels")); serverLabels != "" {
filters.ServerLabels = parseCommaSeparated(serverLabels)
}
if statuses := string(ctx.QueryArgs().Peek("status")); statuses != "" {
filters.Status = parseCommaSeparated(statuses)
}
if virtualKeyIDs := string(ctx.QueryArgs().Peek("virtual_key_ids")); virtualKeyIDs != "" {
filters.VirtualKeyIDs = parseCommaSeparated(virtualKeyIDs)
}
if llmRequestIDs := string(ctx.QueryArgs().Peek("llm_request_ids")); llmRequestIDs != "" {
filters.LLMRequestIDs = parseCommaSeparated(llmRequestIDs)
}
if startTime := string(ctx.QueryArgs().Peek("start_time")); startTime != "" {
t, err := time.Parse(time.RFC3339, startTime)
if err != nil {
return nil, nil, fmt.Errorf("invalid start_time format: %w", err)
}
filters.StartTime = &t
}
if endTime := string(ctx.QueryArgs().Peek("end_time")); endTime != "" {
t, err := time.Parse(time.RFC3339, endTime)
if err != nil {
return nil, nil, fmt.Errorf("invalid end_time format: %w", err)
}
filters.EndTime = &t
}
if minLatency := string(ctx.QueryArgs().Peek("min_latency")); minLatency != "" {
f, err := strconv.ParseFloat(minLatency, 64)
if err != nil {
return nil, nil, fmt.Errorf("invalid min_latency format: %w", err)
}
filters.MinLatency = &f
}
if maxLatency := string(ctx.QueryArgs().Peek("max_latency")); maxLatency != "" {
val, err := strconv.ParseFloat(maxLatency, 64)
if err != nil {
return nil, nil, fmt.Errorf("invalid max_latency format: %w", err)
}
filters.MaxLatency = &val
}
if contentSearch := string(ctx.QueryArgs().Peek("content_search")); contentSearch != "" {
filters.ContentSearch = contentSearch
}
// Extract pagination parameters
pagination.Limit = 50 // Default limit
if limit := string(ctx.QueryArgs().Peek("limit")); limit != "" {
i, err := strconv.Atoi(limit)
if err != nil {
return nil, nil, fmt.Errorf("invalid limit format: %w", err)
}
if i <= 0 {
return nil, nil, fmt.Errorf("limit must be greater than 0")
}
if i > 1000 {
return nil, nil, fmt.Errorf("limit cannot exceed 1000")
}
pagination.Limit = i
}
pagination.Offset = 0 // Default offset
if offset := string(ctx.QueryArgs().Peek("offset")); offset != "" {
i, err := strconv.Atoi(offset)
if err != nil {
return nil, nil, fmt.Errorf("invalid offset format: %w", err)
}
if i < 0 {
return nil, nil, fmt.Errorf("offset cannot be negative")
}
pagination.Offset = i
}
// Sort parameters
pagination.SortBy = "timestamp" // Default sort field
if sortBy := string(ctx.QueryArgs().Peek("sort_by")); sortBy != "" {
if sortBy == "timestamp" || sortBy == "latency" || sortBy == "cost" {
pagination.SortBy = sortBy
} else {
return nil, nil, fmt.Errorf("invalid sort_by: must be 'timestamp', 'latency' or 'cost'")
}
}
pagination.Order = "desc" // Default sort order
if order := string(ctx.QueryArgs().Peek("order")); order != "" {
if order == "asc" || order == "desc" {
pagination.Order = order
} else {
return nil, nil, fmt.Errorf("invalid order: must be 'asc' or 'desc'")
}
}
return filters, pagination, nil
}
// parseMCPFilters parses MCP tool log filters from query parameters (without pagination).
// Returns an error if any required parsing fails.
func parseMCPFilters(ctx *fasthttp.RequestCtx) (*logstore.MCPToolLogSearchFilters, error) {
filters := &logstore.MCPToolLogSearchFilters{}
// Extract filters from query parameters
if toolNames := string(ctx.QueryArgs().Peek("tool_names")); toolNames != "" {
filters.ToolNames = parseCommaSeparated(toolNames)
}
if serverLabels := string(ctx.QueryArgs().Peek("server_labels")); serverLabels != "" {
filters.ServerLabels = parseCommaSeparated(serverLabels)
}
if statuses := string(ctx.QueryArgs().Peek("status")); statuses != "" {
filters.Status = parseCommaSeparated(statuses)
}
if virtualKeyIDs := string(ctx.QueryArgs().Peek("virtual_key_ids")); virtualKeyIDs != "" {
filters.VirtualKeyIDs = parseCommaSeparated(virtualKeyIDs)
}
if llmRequestIDs := string(ctx.QueryArgs().Peek("llm_request_ids")); llmRequestIDs != "" {
filters.LLMRequestIDs = parseCommaSeparated(llmRequestIDs)
}
if startTime := string(ctx.QueryArgs().Peek("start_time")); startTime != "" {
t, err := time.Parse(time.RFC3339, startTime)
if err != nil {
return nil, fmt.Errorf("invalid start_time format: %w", err)
}
filters.StartTime = &t
}
if endTime := string(ctx.QueryArgs().Peek("end_time")); endTime != "" {
t, err := time.Parse(time.RFC3339, endTime)
if err != nil {
return nil, fmt.Errorf("invalid end_time format: %w", err)
}
filters.EndTime = &t
}
if minLatency := string(ctx.QueryArgs().Peek("min_latency")); minLatency != "" {
f, err := strconv.ParseFloat(minLatency, 64)
if err != nil {
return nil, fmt.Errorf("invalid min_latency format: %w", err)
}
filters.MinLatency = &f
}
if maxLatency := string(ctx.QueryArgs().Peek("max_latency")); maxLatency != "" {
val, err := strconv.ParseFloat(maxLatency, 64)
if err != nil {
return nil, fmt.Errorf("invalid max_latency format: %w", err)
}
filters.MaxLatency = &val
}
if contentSearch := string(ctx.QueryArgs().Peek("content_search")); contentSearch != "" {
filters.ContentSearch = contentSearch
}
return filters, nil
}
// ==================== MCP TOOL LOGGING HANDLERS ====================
// getMCPLogs handles GET /api/mcp-logs - Get MCP tool logs with filtering, search, and pagination via query parameters
func (h *LoggingHandler) getMCPLogs(ctx *fasthttp.RequestCtx) {
filters, pagination, err := parseMCPFiltersAndPagination(ctx)
if err != nil {
SendError(ctx, fasthttp.StatusBadRequest, err.Error())
return
}
result, err := h.logManager.SearchMCPToolLogs(ctx, filters, pagination)
if err != nil {
logger.Error("failed to search MCP tool logs: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Search failed: %v", err))
return
}
// Collect unique virtual key IDs from the logs
virtualKeyIDs := make(map[string]struct{})
for _, log := range result.Logs {
if log.VirtualKeyID != nil && *log.VirtualKeyID != "" {
virtualKeyIDs[*log.VirtualKeyID] = struct{}{}
}
}
toSlice := func(m map[string]struct{}) []string {
if len(m) == 0 {
return nil
}
out := make([]string, 0, len(m))
for id := range m {
out = append(out, id)
}
return out
}
redactedVirtualKeys := h.redactedKeysManager.GetAllRedactedVirtualKeys(ctx, toSlice(virtualKeyIDs))
// Add virtual key to the result
for i, log := range result.Logs {
if log.VirtualKeyID != nil && log.VirtualKeyName != nil && *log.VirtualKeyID != "" && *log.VirtualKeyName != "" {
result.Logs[i].VirtualKey = findRedactedVirtualKey(redactedVirtualKeys, *log.VirtualKeyID, *log.VirtualKeyName)
}
}
SendJSON(ctx, result)
}
// getMCPLogsStats handles GET /api/mcp-logs/stats - Get statistics for MCP tool logs with filtering
func (h *LoggingHandler) getMCPLogsStats(ctx *fasthttp.RequestCtx) {
filters, err := parseMCPFilters(ctx)
if err != nil {
SendError(ctx, fasthttp.StatusBadRequest, err.Error())
return
}
stats, err := h.logManager.GetMCPToolLogStats(ctx, filters)
if err != nil {
logger.Error("failed to get MCP tool log stats: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Stats calculation failed: %v", err))
return
}
SendJSON(ctx, stats)
}
// getMCPLogsFilterData handles GET /api/mcp-logs/filterdata - Get all unique filter data from MCP tool logs
func (h *LoggingHandler) getMCPLogsFilterData(ctx *fasthttp.RequestCtx) {
hideDeletedVirtualKeys := h.shouldHideDeletedVirtualKeysInFilters()
toolNames, err := h.logManager.GetAvailableToolNames(ctx)
if err != nil {
logger.Error("failed to get available tool names: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to get available tool names: %v", err))
return
}
serverLabels, err := h.logManager.GetAvailableServerLabels(ctx)
if err != nil {
logger.Error("failed to get available server labels: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to get available server labels: %v", err))
return
}
virtualKeys := h.logManager.GetAvailableMCPVirtualKeys(ctx)
// Extract IDs for redaction lookup
virtualKeyIDs := make([]string, len(virtualKeys))
for i, key := range virtualKeys {
virtualKeyIDs[i] = key.ID
}
redactedVirtualKeys := make(map[string]tables.TableVirtualKey)
for _, virtualKey := range h.redactedKeysManager.GetAllRedactedVirtualKeys(ctx, virtualKeyIDs) {
redactedVirtualKeys[virtualKey.ID] = virtualKey
}
// Check if all virtual key ids are present in the redacted virtual keys (will not be present in case a virtual key is deleted, but we still need to show its filter)
for _, virtualKey := range virtualKeys {
if _, ok := redactedVirtualKeys[virtualKey.ID]; !ok {
if hideDeletedVirtualKeys {
continue
}
// Create a new virtual key struct directly since we know it doesn't exist
redactedVirtualKeys[virtualKey.ID] = tables.TableVirtualKey{
ID: virtualKey.ID,
Name: virtualKey.Name + " (deleted)",
}
}
}
// Convert maps to arrays for frontend consumption
virtualKeysArray := make([]tables.TableVirtualKey, 0, len(redactedVirtualKeys))
for _, key := range redactedVirtualKeys {
virtualKeysArray = append(virtualKeysArray, key)
}
SendJSON(ctx, map[string]interface{}{
"tool_names": toolNames,
"server_labels": serverLabels,
"virtual_keys": virtualKeysArray,
})
}
// deleteMCPLogs handles DELETE /api/mcp-logs - Delete MCP tool logs by their IDs
func (h *LoggingHandler) deleteMCPLogs(ctx *fasthttp.RequestCtx) {
var req struct {
IDs []string `json:"ids"`
}
if err := sonic.Unmarshal(ctx.PostBody(), &req); err != nil {
SendError(ctx, fasthttp.StatusBadRequest, "Invalid JSON")
return
}
if len(req.IDs) == 0 {
SendError(ctx, fasthttp.StatusBadRequest, "No log IDs provided")
return
}
if err := h.logManager.DeleteMCPToolLogs(ctx, req.IDs); err != nil {
logger.Error("failed to delete MCP tool logs: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, "Failed to delete MCP tool logs")
return
}
SendJSON(ctx, map[string]interface{}{
"message": "MCP tool logs deleted successfully",
})
}
// parseMCPHistogramFilters extracts time range and MCP-specific filters for histogram queries.
func parseMCPHistogramFilters(ctx *fasthttp.RequestCtx) (*logstore.MCPToolLogSearchFilters, error) {
return parseMCPFilters(ctx)
}
// getMCPHistogram handles GET /api/mcp-logs/histogram - Get time-bucketed MCP tool call volume
func (h *LoggingHandler) getMCPHistogram(ctx *fasthttp.RequestCtx) {
filters, err := parseMCPHistogramFilters(ctx)
if err != nil {
SendError(ctx, fasthttp.StatusBadRequest, err.Error())
return
}
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetMCPHistogram(ctx, *filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get MCP histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("MCP histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getMCPCostHistogram handles GET /api/mcp-logs/histogram/cost - Get time-bucketed MCP cost data
func (h *LoggingHandler) getMCPCostHistogram(ctx *fasthttp.RequestCtx) {
filters, err := parseMCPHistogramFilters(ctx)
if err != nil {
SendError(ctx, fasthttp.StatusBadRequest, err.Error())
return
}
bucketSizeSeconds := calculateBucketSize(filters.StartTime, filters.EndTime)
result, err := h.logManager.GetMCPCostHistogram(ctx, *filters, bucketSizeSeconds)
if err != nil {
logger.Error("failed to get MCP cost histogram: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("MCP cost histogram calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}
// getMCPTopTools handles GET /api/mcp-logs/histogram/top-tools - Get top 10 MCP tools by call count
func (h *LoggingHandler) getMCPTopTools(ctx *fasthttp.RequestCtx) {
filters, err := parseMCPHistogramFilters(ctx)
if err != nil {
SendError(ctx, fasthttp.StatusBadRequest, err.Error())
return
}
result, err := h.logManager.GetMCPTopTools(ctx, *filters, 10)
if err != nil {
logger.Error("failed to get MCP top tools: %v", err)
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("MCP top tools calculation failed: %v", err))
return
}
SendJSON(ctx, result)
}