Files
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

3546 lines
121 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package logstore
import (
"context"
"database/sql"
"errors"
"fmt"
"math"
"regexp"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/bytedance/sonic"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/framework/configstore/tables"
"golang.org/x/sync/errgroup"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// validMetadataKeyRegex allows alphanumeric, hyphens, underscores, and dots in metadata keys.
var validMetadataKeyRegex = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`)
// isValidMetadataKey validates a metadata key to prevent SQL injection.
func isValidMetadataKey(key string) bool {
return key != "" && len(key) <= 256 && validMetadataKeyRegex.MatchString(key)
}
const bulkUpdateCostChunkSize = 500
const sessionLogPageLimit = 50
const (
// defaultMaxQueryLimit is a safety cap for unbounded queries (FindAll, FindAllDistinct).
defaultMaxQueryLimit = 10000
// defaultMaxSearchLimit is the maximum number of rows returned by SearchLogs / SearchMCPToolLogs.
defaultMaxSearchLimit = 1000
// defaultMaxRankingsLimit caps the number of model+provider groups returned by GetModelRankings.
defaultMaxRankingsLimit = 100
// defaultFilterDataCutoffDays limits GetDistinct* filter-data queries to recent data.
defaultFilterDataCutoffDays = 30
// defaultFilterDataLimit caps the number of distinct values returned by filter-data queries.
defaultFilterDataLimit = 500
)
// RDBLogStore represents a log store that uses a SQLite database.
type RDBLogStore struct {
db *gorm.DB
logger schemas.Logger
matViewsReady atomic.Bool
}
// generateBucketTimestamps generates all bucket timestamps for a time range.
// It aligns the start time to bucket boundaries and generates timestamps up to (but not exceeding) the end time.
func generateBucketTimestamps(startTime, endTime *time.Time, bucketSizeSeconds int64) []int64 {
if startTime == nil || endTime == nil || bucketSizeSeconds <= 0 {
return nil
}
startUnix := startTime.Unix()
endUnix := endTime.Unix()
// Align start time to bucket boundary
alignedStart := (startUnix / bucketSizeSeconds) * bucketSizeSeconds
// Generate all bucket timestamps
var timestamps []int64
for ts := alignedStart; ts <= endUnix; ts += bucketSizeSeconds {
timestamps = append(timestamps, ts)
}
return timestamps
}
// applyFilters applies search filters to a GORM query
func (s *RDBLogStore) applyFilters(baseQuery *gorm.DB, filters SearchFilters) *gorm.DB {
if len(filters.Providers) > 0 {
baseQuery = baseQuery.Where("provider IN ?", filters.Providers)
}
if len(filters.Models) > 0 {
baseQuery = baseQuery.Where("model IN ?", filters.Models)
}
if len(filters.Aliases) > 0 {
baseQuery = baseQuery.Where("alias IN ?", filters.Aliases)
}
if len(filters.Status) > 0 {
baseQuery = baseQuery.Where("status IN ?", filters.Status)
}
if len(filters.Objects) > 0 {
baseQuery = baseQuery.Where("object_type IN ?", filters.Objects)
}
if filters.ParentRequestID != "" {
baseQuery = baseQuery.Where("parent_request_id = ?", filters.ParentRequestID)
}
if len(filters.SelectedKeyIDs) > 0 {
baseQuery = baseQuery.Where("selected_key_id IN ?", filters.SelectedKeyIDs)
}
if len(filters.VirtualKeyIDs) > 0 {
baseQuery = baseQuery.Where("virtual_key_id IN ?", filters.VirtualKeyIDs)
}
if len(filters.RoutingRuleIDs) > 0 {
baseQuery = baseQuery.Where("routing_rule_id IN ?", filters.RoutingRuleIDs)
}
if len(filters.TeamIDs) > 0 {
baseQuery = baseQuery.Where("team_id IN ?", filters.TeamIDs)
}
if len(filters.CustomerIDs) > 0 {
baseQuery = baseQuery.Where("customer_id IN ?", filters.CustomerIDs)
}
if len(filters.UserIDs) > 0 {
baseQuery = baseQuery.Where("user_id IN ?", filters.UserIDs)
}
if len(filters.BusinessUnitIDs) > 0 {
baseQuery = baseQuery.Where("business_unit_id IN ?", filters.BusinessUnitIDs)
}
if len(filters.RoutingEngineUsed) > 0 {
// Query routing engines (comma-separated values) - find logs containing ANY of the specified engines
dialect := s.db.Dialector.Name()
// Collect non-empty engine values
var engines []string
for _, engine := range filters.RoutingEngineUsed {
engine = strings.TrimSpace(engine)
if engine != "" {
engines = append(engines, engine)
}
}
if len(engines) > 0 {
switch dialect {
case "postgres":
// Use array overlap operator which can leverage the GIN index on
// string_to_array(routing_engines_used, ',').
placeholders := make([]string, len(engines))
args := make([]interface{}, len(engines))
for i, e := range engines {
placeholders[i] = "?"
args[i] = e
}
baseQuery = baseQuery.Where(
"string_to_array(routing_engines_used, ',') && ARRAY["+strings.Join(placeholders, ",")+"]::text[]",
args...,
)
default:
// SQLite and others: use delimiter-aware LIKE matching
var engineConditions []string
var engineArgs []interface{}
var concatExpr string
if dialect == "sqlite" {
concatExpr = "',' || routing_engines_used || ','"
} else {
concatExpr = "CONCAT(',', routing_engines_used, ',')"
}
for _, engine := range engines {
engineConditions = append(engineConditions, concatExpr+" LIKE ?")
engineArgs = append(engineArgs, "%,"+engine+",%")
}
baseQuery = baseQuery.Where(strings.Join(engineConditions, " OR "), engineArgs...)
}
}
}
if filters.StartTime != nil {
baseQuery = baseQuery.Where("timestamp >= ?", *filters.StartTime)
}
if filters.EndTime != nil {
baseQuery = baseQuery.Where("timestamp <= ?", *filters.EndTime)
}
if filters.MinLatency != nil {
baseQuery = baseQuery.Where("latency >= ?", *filters.MinLatency)
}
if filters.MaxLatency != nil {
baseQuery = baseQuery.Where("latency <= ?", *filters.MaxLatency)
}
if filters.MinTokens != nil {
baseQuery = baseQuery.Where("total_tokens >= ?", *filters.MinTokens)
}
if filters.MaxTokens != nil {
baseQuery = baseQuery.Where("total_tokens <= ?", *filters.MaxTokens)
}
if filters.MinCost != nil {
baseQuery = baseQuery.Where("cost >= ?", *filters.MinCost)
}
if filters.MaxCost != nil {
baseQuery = baseQuery.Where("cost <= ?", *filters.MaxCost)
}
if filters.MissingCostOnly {
// cost is null and status is not error
baseQuery = baseQuery.Where("(cost IS NULL OR cost <= 0) AND status NOT IN ('error')")
}
if filters.ContentSearch != "" {
dialect := s.db.Dialector.Name()
if dialect == "postgres" {
baseQuery = baseQuery.Where("to_tsvector('simple', content_summary) @@ plainto_tsquery('simple', ?)", filters.ContentSearch)
} else {
baseQuery = baseQuery.Where("content_summary LIKE ?", "%"+filters.ContentSearch+"%")
}
}
if len(filters.MetadataFilters) > 0 {
dialect := s.db.Dialector.Name()
// Guard must match the partial-index predicate so the planner uses the GIN index.
// SQLite does not support IS JSON OBJECT, so fall back to the equivalent json_type check.
if dialect == "postgres" {
baseQuery = baseQuery.Where("metadata IS NOT NULL AND metadata IS JSON OBJECT")
} else {
baseQuery = baseQuery.Where("metadata IS NOT NULL AND json_valid(metadata) AND json_type(metadata) = 'object'")
}
for key, value := range filters.MetadataFilters {
if !isValidMetadataKey(key) {
continue
}
switch dialect {
case "postgres":
// Use @> containment operator to leverage GIN index on metadata::jsonb
// Preserve value type (number/boolean) for JSON containment
var jsonFragment string
if value == "true" || value == "false" {
jsonFragment = fmt.Sprintf(`{%q: %s}`, key, value)
} else if f, err := strconv.ParseFloat(value, 64); err == nil && !math.IsNaN(f) && !math.IsInf(f, 0) {
// Reject NaN/Inf which would produce invalid JSON; normalize the number
jsonFragment = fmt.Sprintf(`{%q: %s}`, key, strconv.FormatFloat(f, 'f', -1, 64))
} else {
jsonFragment = fmt.Sprintf(`{%q: %q}`, key, value)
}
baseQuery = baseQuery.Where("metadata::jsonb @> ?::jsonb", jsonFragment)
default:
// SQLite: quote the member name so dots/hyphens stay part of the key
path := `$."` + key + `"`
if value == "true" {
// json_extract returns 1 for true, but json_type returns 'true'
baseQuery = baseQuery.Where("json_type(metadata, ?) = 'true'", path)
} else if value == "false" {
baseQuery = baseQuery.Where("json_type(metadata, ?) = 'false'", path)
} else {
// Numeric and string values: compare both as-is and as text
baseQuery = baseQuery.Where("json_extract(metadata, ?) = ? OR CAST(json_extract(metadata, ?) AS TEXT) = ?", path, value, path, value)
}
}
}
}
return baseQuery
}
// Create inserts a new log entry into the database.
func (s *RDBLogStore) Create(ctx context.Context, entry *Log) error {
return s.db.WithContext(ctx).Create(entry).Error
}
// CreateIfNotExists inserts a new log entry only if it doesn't already exist.
// Uses ON CONFLICT DO NOTHING to handle duplicate key errors gracefully.
func (s *RDBLogStore) CreateIfNotExists(ctx context.Context, entry *Log) error {
return s.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoNothing: true,
}).Create(entry).Error
}
// BatchCreateIfNotExists inserts multiple log entries in a single transaction.
// Uses ON CONFLICT DO NOTHING for idempotency.
func (s *RDBLogStore) BatchCreateIfNotExists(ctx context.Context, entries []*Log) error {
if len(entries) == 0 {
return nil
}
return s.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoNothing: true,
}).Create(&entries).Error
}
// Ping checks if the database is reachable.
func (s *RDBLogStore) Ping(ctx context.Context) error {
return s.db.WithContext(ctx).Exec("SELECT 1").Error
}
// Update updates a log entry in the database.
func (s *RDBLogStore) Update(ctx context.Context, id string, entry any) error {
serializedEntry, err := serializeLogUpdateEntry(entry)
if err != nil {
return err
}
tx := s.db.WithContext(ctx).Model(&Log{}).Where("id = ?", id).Updates(serializedEntry)
if tx.Error != nil {
return tx.Error
}
if tx.RowsAffected == 0 {
return ErrNotFound
}
return nil
}
// BulkUpdateCost updates log costs in bulk, using a PostgreSQL-specific batched
// VALUES update when available and per-row updates for other dialects.
func (s *RDBLogStore) BulkUpdateCost(ctx context.Context, updates map[string]float64) error {
if len(updates) == 0 {
return nil
}
if s.db.Dialector.Name() == "postgres" {
return s.bulkUpdateCostPostgres(ctx, updates)
}
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for id, cost := range updates {
costValue := cost
if err := tx.Model(&Log{}).Where("id = ?", id).Update("cost", costValue).Error; err != nil {
return err
}
}
return nil
})
}
// serializeLogUpdateEntry serializes parsed Log fields before passing the
// update payload to GORM. Non-Log payloads are returned unchanged.
func serializeLogUpdateEntry(entry any) (any, error) {
switch v := entry.(type) {
case *Log:
if err := v.SerializeFields(); err != nil {
return nil, err
}
return v, nil
case Log:
copyEntry := v
if err := copyEntry.SerializeFields(); err != nil {
return nil, err
}
return copyEntry, nil
default:
return entry, nil
}
}
// buildBulkUpdateCostPostgresSQL builds a deterministic UPDATE ... FROM
// (VALUES ...) statement and argument list for a chunk of PostgreSQL log cost
// updates.
func buildBulkUpdateCostPostgresSQL(ids []string, updates map[string]float64) (string, []interface{}) {
var sqlBuilder strings.Builder
args := make([]interface{}, 0, len(ids)*2)
sqlBuilder.WriteString("UPDATE logs SET cost = v.cost FROM (VALUES ")
for i, id := range ids {
if i > 0 {
sqlBuilder.WriteString(",")
}
argOffset := i * 2
sqlBuilder.WriteString("($")
sqlBuilder.WriteString(strconv.Itoa(argOffset + 1))
sqlBuilder.WriteString("::text,$")
sqlBuilder.WriteString(strconv.Itoa(argOffset + 2))
sqlBuilder.WriteString("::float8)")
args = append(args, id, updates[id])
}
sqlBuilder.WriteString(") AS v(id, cost) WHERE logs.id = v.id")
return sqlBuilder.String(), args
}
// bulkUpdateCostPostgres applies chunked PostgreSQL bulk cost updates to avoid
// issuing one UPDATE per log row.
func (s *RDBLogStore) bulkUpdateCostPostgres(ctx context.Context, updates map[string]float64) error {
ids := make([]string, 0, len(updates))
for id := range updates {
ids = append(ids, id)
}
sort.Strings(ids)
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for start := 0; start < len(ids); start += bulkUpdateCostChunkSize {
end := min(start+bulkUpdateCostChunkSize, len(ids))
query, args := buildBulkUpdateCostPostgresSQL(ids[start:end], updates)
if err := tx.Exec(query, args...).Error; err != nil {
return err
}
}
return nil
})
}
// SearchLogs searches for logs in the database without calculating statistics.
func (s *RDBLogStore) SearchLogs(ctx context.Context, filters SearchFilters, pagination PaginationOptions) (*SearchResult, error) {
// Build order clause up front (needed by the data goroutine).
direction := "DESC"
if pagination.Order == "asc" {
direction = "ASC"
}
var orderClause string
switch pagination.SortBy {
case "timestamp":
orderClause = "timestamp " + direction
case "latency":
orderClause = "latency " + direction
case "tokens":
orderClause = "total_tokens " + direction
case "cost":
orderClause = "cost " + direction
default:
orderClause = "timestamp " + direction
}
limit := pagination.Limit
if limit <= 0 || limit > defaultMaxSearchLimit {
limit = defaultMaxSearchLimit
}
pagination.Limit = limit
// Run COUNT and data fetch concurrently — the COUNT on large tables is the
// bottleneck, so overlapping it with the (fast) data query halves wall time.
// Each goroutine builds its own *gorm.DB because Count() mutates the session.
var totalCount int64
var logs []Log
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) {
var err error
totalCount, err = s.getCountFromMatView(gCtx, filters)
return err
}
countQuery := s.db.WithContext(gCtx).Model(&Log{})
countQuery = s.applyFilters(countQuery, filters)
return countQuery.Count(&totalCount).Error
})
g.Go(func() error {
dataQuery := s.db.WithContext(gCtx).Model(&Log{})
dataQuery = s.applyFilters(dataQuery, filters)
dataQuery = dataQuery.Order(orderClause).Select(s.listSelectColumns()).Limit(limit)
if pagination.Offset > 0 {
dataQuery = dataQuery.Offset(pagination.Offset)
}
err := dataQuery.Find(&logs).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
hasLogs := len(logs) > 0
if !hasLogs {
var err error
hasLogs, err = s.HasLogs(ctx)
if err != nil {
return nil, err
}
}
return &SearchResult{
Logs: logs,
Pagination: pagination,
Stats: SearchStats{
TotalRequests: totalCount,
},
HasLogs: hasLogs,
}, nil
}
// GetSessionLogs returns paginated logs for a single parent_request_id session.
func (s *RDBLogStore) GetSessionLogs(ctx context.Context, sessionID string, pagination PaginationOptions) (*SessionDetailResult, error) {
if strings.TrimSpace(sessionID) == "" {
return nil, fmt.Errorf("sessionID cannot be empty")
}
limit := pagination.Limit
if limit <= 0 || limit > sessionLogPageLimit {
limit = sessionLogPageLimit
}
pagination.Limit = limit
if pagination.Offset < 0 {
pagination.Offset = 0
}
pagination.SortBy = "timestamp"
orderDir := "ASC"
if pagination.Order == "desc" {
orderDir = "DESC"
}
orderClause := "timestamp " + orderDir + ", id " + orderDir
baseQuery := s.db.WithContext(ctx).Model(&Log{}).Where("parent_request_id = ?", sessionID)
var (
totalCount int64
logs []Log
)
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
return s.db.WithContext(gCtx).Model(&Log{}).Where("parent_request_id = ?", sessionID).Count(&totalCount).Error
})
g.Go(func() error {
dataQuery := baseQuery.Session(&gorm.Session{}).
WithContext(gCtx).
Order(orderClause).
Select(s.listSelectColumns()).
Limit(limit)
if pagination.Offset > 0 {
dataQuery = dataQuery.Offset(pagination.Offset)
}
err := dataQuery.Find(&logs).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
pagination.TotalCount = totalCount
returnedCount := len(logs)
return &SessionDetailResult{
SessionID: sessionID,
Logs: logs,
Pagination: pagination,
Count: totalCount,
ReturnedCount: returnedCount,
HasMore: int64(pagination.Offset+returnedCount) < totalCount,
}, nil
}
// GetSessionSummary returns aggregate totals for a single parent_request_id session.
func (s *RDBLogStore) GetSessionSummary(ctx context.Context, sessionID string) (*SessionSummaryResult, error) {
if strings.TrimSpace(sessionID) == "" {
return nil, fmt.Errorf("sessionID cannot be empty")
}
var (
count int64
totalCost float64
totalTokens int64
startedAt string
latestAt string
startedRaw any
latestRaw any
)
// Single aggregate select keeps Count/SUM/MIN/MAX consistent against the same row snapshot
// and halves the round trips compared to running Count and the aggregate row in parallel.
row := s.db.WithContext(ctx).
Model(&Log{}).
Where("parent_request_id = ?", sessionID).
Select("COUNT(*) AS count, COALESCE(SUM(cost), 0) AS total_cost, COALESCE(SUM(total_tokens), 0) AS total_tokens, MIN(timestamp) AS started_at, MAX(timestamp) AS latest_at").
Row()
if err := row.Scan(&count, &totalCost, &totalTokens, &startedRaw, &latestRaw); err != nil {
return nil, err
}
startedAt = normalizeAggregateTimestamp(startedRaw)
latestAt = normalizeAggregateTimestamp(latestRaw)
durationMs := int64(0)
if startedAt != "" && latestAt != "" {
if startedTime, err := time.Parse(time.RFC3339Nano, startedAt); err == nil {
if latestTime, err := time.Parse(time.RFC3339Nano, latestAt); err == nil {
durationMs = latestTime.Sub(startedTime).Milliseconds()
if durationMs < 0 {
durationMs = 0
}
}
}
}
return &SessionSummaryResult{
SessionID: sessionID,
Count: count,
TotalCost: totalCost,
TotalTokens: totalTokens,
StartedAt: startedAt,
LatestAt: latestAt,
DurationMs: durationMs,
}, nil
}
func normalizeAggregateTimestamp(value any) string {
switch v := value.(type) {
case nil:
return ""
case time.Time:
return v.UTC().Format(time.RFC3339Nano)
case []byte:
return normalizeAggregateTimestamp(string(v))
case string:
raw := strings.TrimSpace(v)
if raw == "" {
return ""
}
layouts := []string{
time.RFC3339Nano,
time.RFC3339,
"2006-01-02 15:04:05.999999999-07:00",
"2006-01-02 15:04:05.999999999Z07:00",
"2006-01-02 15:04:05.999999999",
"2006-01-02 15:04:05",
"2006-01-02T15:04:05.999999999",
"2006-01-02T15:04:05",
}
for _, layout := range layouts {
if parsed, err := time.Parse(layout, raw); err == nil {
return parsed.UTC().Format(time.RFC3339Nano)
}
}
return raw
default:
return fmt.Sprint(v)
}
}
// listSelectColumns returns a SELECT clause for list queries that omits large
// output/detail TEXT columns and uses SQL JSON functions to extract only the
// last element from input_history and responses_input_history arrays.
//
// Realtime turn rows are kept intact because the logs table renders them as a
// combined Tool/User/Assistant summary and needs the full turn context.
func (s *RDBLogStore) listSelectColumns() string {
baseCols := strings.Join([]string{
"id", "parent_request_id", "timestamp", "object_type", "provider", "model", "alias",
"number_of_retries", "fallback_index",
"selected_key_id", "selected_key_name",
"virtual_key_id", "virtual_key_name",
"routing_engines_used", "routing_rule_id", "routing_rule_name",
"user_id", "team_id", "team_name", "customer_id", "customer_name",
"business_unit_id", "business_unit_name",
"speech_input", "transcription_input", "image_generation_input", "video_generation_input",
"latency", "token_usage", "cost", "status", "error_details", "stream",
"content_summary", "metadata",
"is_large_payload_request", "is_large_payload_response",
"prompt_tokens", "completion_tokens", "total_tokens",
"created_at",
}, ", ")
var inputHistoryExpr, responsesInputExpr, outputMessageExpr string
switch s.db.Dialector.Name() {
case "postgres":
inputHistoryExpr = `CASE
WHEN object_type = 'realtime.turn' THEN input_history
WHEN input_history IS NOT NULL AND input_history != '' AND input_history != '[]'
THEN jsonb_build_array(input_history::jsonb->-1)::text
ELSE input_history END AS input_history`
responsesInputExpr = `CASE
WHEN object_type = 'realtime.turn' THEN responses_input_history
WHEN responses_input_history IS NOT NULL AND responses_input_history != '' AND responses_input_history != '[]'
THEN jsonb_build_array(responses_input_history::jsonb->-1)::text
ELSE responses_input_history END AS responses_input_history`
outputMessageExpr = `CASE WHEN object_type = 'realtime.turn' THEN output_message ELSE NULL END AS output_message`
default: // sqlite
inputHistoryExpr = `CASE
WHEN object_type = 'realtime.turn' THEN input_history
WHEN input_history IS NOT NULL AND input_history != '' AND input_history != '[]'
THEN json_array(json_extract(input_history, '$[' || (json_array_length(input_history) - 1) || ']'))
ELSE input_history END AS input_history`
responsesInputExpr = `CASE
WHEN object_type = 'realtime.turn' THEN responses_input_history
WHEN responses_input_history IS NOT NULL AND responses_input_history != '' AND responses_input_history != '[]'
THEN json_array(json_extract(responses_input_history, '$[' || (json_array_length(responses_input_history) - 1) || ']'))
ELSE responses_input_history END AS responses_input_history`
outputMessageExpr = `CASE WHEN object_type = 'realtime.turn' THEN output_message ELSE NULL END AS output_message`
}
return baseCols + ", " + inputHistoryExpr + ", " + responsesInputExpr + ", " + outputMessageExpr
}
// GetStats calculates statistics for logs matching the given filters.
func (s *RDBLogStore) GetStats(ctx context.Context, filters SearchFilters) (*SearchStats, error) {
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) {
return s.getStatsFromMatView(ctx, filters)
}
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
// Get total count (includes processing status)
var totalCount int64
if err := baseQuery.Count(&totalCount).Error; err != nil {
return nil, err
}
stats := &SearchStats{
TotalRequests: totalCount,
}
if totalCount > 0 {
// Single query for all completed-request stats: counts, latency, tokens, cost
var result struct {
CompletedCount sql.NullInt64 `gorm:"column:completed_count"`
SuccessCount sql.NullInt64 `gorm:"column:success_count"`
AvgLatency sql.NullFloat64 `gorm:"column:avg_latency"`
TotalTokens sql.NullInt64 `gorm:"column:total_tokens"`
TotalCost sql.NullFloat64 `gorm:"column:total_cost"`
}
statsQuery := s.db.WithContext(ctx).Model(&Log{})
statsQuery = s.applyFilters(statsQuery, filters)
statsQuery = statsQuery.Where("status IN ?", []string{"success", "error"})
if err := statsQuery.Select(`
COUNT(*) as completed_count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_count,
AVG(latency) as avg_latency,
SUM(total_tokens) as total_tokens,
SUM(cost) as total_cost
`).Scan(&result).Error; err != nil {
return nil, err
}
completedCount := result.CompletedCount.Int64
if completedCount > 0 {
stats.SuccessRate = float64(result.SuccessCount.Int64) / float64(completedCount) * 100
if result.AvgLatency.Valid {
stats.AverageLatency = result.AvgLatency.Float64
}
if result.TotalTokens.Valid {
stats.TotalTokens = result.TotalTokens.Int64
}
if result.TotalCost.Valid {
stats.TotalCost = result.TotalCost.Float64
}
}
// User-facing success rate: count each fallback chain as one user request.
// A chain is identified by the original request (fallback_index=0); any successful
// attempt (original or fallback) makes the whole chain a success.
// When scoped to a specific parent request, root rows are excluded by definition
// (they have parent_request_id = NULL), so fall back to the per-attempt success rate.
if filters.ParentRequestID != "" {
stats.UserFacingSuccessRate = stats.SuccessRate
} else {
var userFacingResult struct {
TotalUserRequests sql.NullInt64 `gorm:"column:total_user_requests"`
SuccessfulUserRequests sql.NullInt64 `gorm:"column:successful_user_requests"`
}
userFacingQuery := s.db.WithContext(ctx).Model(&Log{})
userFacingQuery = s.applyFilters(userFacingQuery, filters)
// Scope to root rows only so denominator and numerator are drawn from the same population.
// A chain is successful if the root itself succeeded or any of its fallbacks succeeded.
userFacingQuery = userFacingQuery.Where("fallback_index = ?", 0).Where("status IN ?", []string{"success", "error"})
// Use a LEFT JOIN instead of a correlated EXISTS subquery: the inner set is computed
// once and hash-joined, reducing complexity from O(N×M) to O(N+M).
// The inner subquery is bounded by the same time window as the outer query for
// performance — an unbounded scan of the full logs table is too expensive.
// Known tradeoff: fallbacks that complete outside the time window boundary will be
// missed, slightly under-counting success at the edges.
innerJoin := `LEFT JOIN (
SELECT DISTINCT parent_request_id
FROM logs
WHERE status = 'success' AND parent_request_id IS NOT NULL`
var innerArgs []interface{}
if filters.StartTime != nil {
innerJoin += " AND timestamp >= ?"
innerArgs = append(innerArgs, *filters.StartTime)
}
if filters.EndTime != nil {
innerJoin += " AND timestamp <= ?"
innerArgs = append(innerArgs, *filters.EndTime)
}
innerJoin += `) fallback_success ON fallback_success.parent_request_id = logs.id`
userFacingQuery = userFacingQuery.Joins(innerJoin, innerArgs...)
if err := userFacingQuery.Select(`
COUNT(DISTINCT logs.id) as total_user_requests,
COUNT(DISTINCT CASE
WHEN logs.status = 'success' OR fallback_success.parent_request_id IS NOT NULL THEN logs.id
ELSE NULL
END) as successful_user_requests
`).Scan(&userFacingResult).Error; err != nil {
return nil, err
}
stats.UserFacingTotalRequests = userFacingResult.TotalUserRequests.Int64
if userFacingResult.TotalUserRequests.Int64 > 0 {
stats.UserFacingSuccessRate = float64(userFacingResult.SuccessfulUserRequests.Int64) / float64(userFacingResult.TotalUserRequests.Int64) * 100
}
}
}
return stats, nil
}
// GetHistogram returns time-bucketed request counts for the given filters.
func (s *RDBLogStore) GetHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*HistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600 // Default to 1 hour
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
// Determine database type for SQL syntax
dialect := s.db.Dialector.Name()
// Build query with filters
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
// Query for histogram buckets - use int64 for bucket timestamp to avoid parsing issues
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Total int64 `gorm:"column:total"`
Success int64 `gorm:"column:success"`
Error int64 `gorm:"column:error_count"`
}
// Build select clause with database-specific unix timestamp calculation
var selectClause string
switch dialect {
case "sqlite":
// SQLite: use strftime to get unix timestamp, then bucket
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
// MySQL: use UNIX_TIMESTAMP
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
`, bucketSizeSeconds, bucketSizeSeconds)
default:
// PostgreSQL (and others): use EXTRACT(EPOCH FROM timestamp)
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get histogram: %w", err)
}
// Create a map of bucket timestamp -> result for quick lookup
resultMap := make(map[int64]struct {
Total int64
Success int64
Error int64
})
for _, r := range results {
resultMap[r.BucketTimestamp] = struct {
Total int64
Success int64
Error int64
}{
Total: r.Total,
Success: r.Success,
Error: r.Error,
}
}
// Generate all bucket timestamps for the time range
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
// If no time range specified, just return what we have from the query
if len(allTimestamps) == 0 {
buckets := make([]HistogramBucket, len(results))
for i, r := range results {
buckets[i] = HistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
Count: r.Total,
Success: r.Success,
Error: r.Error,
}
}
return &HistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
}, nil
}
// Fill in all buckets, using zeros for missing timestamps
buckets := make([]HistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if data, exists := resultMap[ts]; exists {
buckets[i] = HistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
Count: data.Total,
Success: data.Success,
Error: data.Error,
}
} else {
buckets[i] = HistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
Count: 0,
Success: 0,
Error: 0,
}
}
}
return &HistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
}, nil
}
// GetTokenHistogram returns time-bucketed token usage for the given filters.
func (s *RDBLogStore) GetTokenHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*TokenHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600 // Default to 1 hour
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getTokenHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
// Only count completed requests for token stats
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
PromptTokens int64 `gorm:"column:prompt_tokens"`
CompletionTokens int64 `gorm:"column:completion_tokens"`
TotalTokens int64 `gorm:"column:total_tokens"`
CachedReadTokens int64 `gorm:"column:cached_read_tokens"`
}
var selectClause string
switch dialect {
case "sqlite":
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
COALESCE(SUM(completion_tokens), 0) as completion_tokens,
COALESCE(SUM(total_tokens), 0) as total_tokens,
COALESCE(SUM(cached_read_tokens), 0) as cached_read_tokens
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
COALESCE(SUM(completion_tokens), 0) as completion_tokens,
COALESCE(SUM(total_tokens), 0) as total_tokens,
COALESCE(SUM(cached_read_tokens), 0) as cached_read_tokens
`, bucketSizeSeconds, bucketSizeSeconds)
default:
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
COALESCE(SUM(completion_tokens), 0) as completion_tokens,
COALESCE(SUM(total_tokens), 0) as total_tokens,
COALESCE(SUM(cached_read_tokens), 0) as cached_read_tokens
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get token histogram: %w", err)
}
// Create a map of bucket timestamp -> result for quick lookup
resultMap := make(map[int64]struct {
PromptTokens int64
CompletionTokens int64
TotalTokens int64
CachedReadTokens int64
})
for _, r := range results {
resultMap[r.BucketTimestamp] = struct {
PromptTokens int64
CompletionTokens int64
TotalTokens int64
CachedReadTokens int64
}{
PromptTokens: r.PromptTokens,
CompletionTokens: r.CompletionTokens,
TotalTokens: r.TotalTokens,
CachedReadTokens: r.CachedReadTokens,
}
}
// Generate all bucket timestamps for the time range
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
// If no time range specified, just return what we have from the query
if len(allTimestamps) == 0 {
buckets := make([]TokenHistogramBucket, len(results))
for i, r := range results {
buckets[i] = TokenHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
PromptTokens: r.PromptTokens,
CompletionTokens: r.CompletionTokens,
TotalTokens: r.TotalTokens,
CachedReadTokens: r.CachedReadTokens,
}
}
return &TokenHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
}, nil
}
// Fill in all buckets, using zeros for missing timestamps
buckets := make([]TokenHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if data, exists := resultMap[ts]; exists {
buckets[i] = TokenHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
PromptTokens: data.PromptTokens,
CompletionTokens: data.CompletionTokens,
TotalTokens: data.TotalTokens,
CachedReadTokens: data.CachedReadTokens,
}
} else {
buckets[i] = TokenHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
}
}
}
return &TokenHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
}, nil
}
// GetCostHistogram returns time-bucketed cost data with model breakdown for the given filters.
func (s *RDBLogStore) GetCostHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*CostHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600 // Default to 1 hour
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getCostHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
// Only count completed requests with cost
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
baseQuery = baseQuery.Where("cost IS NOT NULL AND cost > 0")
// Query grouped by bucket and model
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Model string `gorm:"column:model"`
TotalCost float64 `gorm:"column:total_cost"`
}
var selectClause string
switch dialect {
case "sqlite":
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
model,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
model,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
default:
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
model,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp, model").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get cost histogram: %w", err)
}
// Aggregate results into buckets with model breakdown
bucketMap := make(map[int64]*CostHistogramBucket)
modelsSet := make(map[string]bool)
for _, r := range results {
modelsSet[r.Model] = true
if bucket, exists := bucketMap[r.BucketTimestamp]; exists {
bucket.TotalCost += r.TotalCost
bucket.ByModel[r.Model] = r.TotalCost
} else {
bucketMap[r.BucketTimestamp] = &CostHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
TotalCost: r.TotalCost,
ByModel: map[string]float64{r.Model: r.TotalCost},
}
}
}
// Extract unique models
models := make([]string, 0, len(modelsSet))
for model := range modelsSet {
models = append(models, model)
}
// Generate all bucket timestamps for the time range
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
// If no time range specified, just return what we have from the query
if len(allTimestamps) == 0 {
// Convert map to sorted slice
buckets := make([]CostHistogramBucket, 0, len(bucketMap))
for _, bucket := range bucketMap {
buckets = append(buckets, *bucket)
}
// Sort by timestamp
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Timestamp.Before(buckets[j].Timestamp)
})
return &CostHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Models: models,
}, nil
}
// Fill in all buckets, using zeros for missing timestamps
buckets := make([]CostHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if bucket, exists := bucketMap[ts]; exists {
buckets[i] = *bucket
} else {
buckets[i] = CostHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
TotalCost: 0,
ByModel: make(map[string]float64),
}
}
}
return &CostHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Models: models,
}, nil
}
// GetModelHistogram returns time-bucketed model usage with success/error breakdown for the given filters.
func (s *RDBLogStore) GetModelHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ModelHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600 // Default to 1 hour
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getModelHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
// Query grouped by bucket and model with status counts
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Model string `gorm:"column:model"`
Total int64 `gorm:"column:total"`
Success int64 `gorm:"column:success"`
Error int64 `gorm:"column:error_count"`
}
var selectClause string
switch dialect {
case "sqlite":
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
model,
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
model,
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
`, bucketSizeSeconds, bucketSizeSeconds)
default:
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
model,
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp, model").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get model histogram: %w", err)
}
// Aggregate results into buckets with model breakdown
bucketMap := make(map[int64]*ModelHistogramBucket)
modelsSet := make(map[string]bool)
for _, r := range results {
modelsSet[r.Model] = true
if bucket, exists := bucketMap[r.BucketTimestamp]; exists {
bucket.ByModel[r.Model] = ModelUsageStats{
Total: r.Total,
Success: r.Success,
Error: r.Error,
}
} else {
bucketMap[r.BucketTimestamp] = &ModelHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
ByModel: map[string]ModelUsageStats{
r.Model: {
Total: r.Total,
Success: r.Success,
Error: r.Error,
},
},
}
}
}
// Extract unique models
models := make([]string, 0, len(modelsSet))
for model := range modelsSet {
models = append(models, model)
}
// Generate all bucket timestamps for the time range
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
// If no time range specified, just return what we have from the query
if len(allTimestamps) == 0 {
// Convert map to sorted slice
buckets := make([]ModelHistogramBucket, 0, len(bucketMap))
for _, bucket := range bucketMap {
buckets = append(buckets, *bucket)
}
// Sort by timestamp
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Timestamp.Before(buckets[j].Timestamp)
})
return &ModelHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Models: models,
}, nil
}
// Fill in all buckets, using empty maps for missing timestamps
buckets := make([]ModelHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if bucket, exists := bucketMap[ts]; exists {
buckets[i] = *bucket
} else {
buckets[i] = ModelHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
ByModel: make(map[string]ModelUsageStats),
}
}
}
return &ModelHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Models: models,
}, nil
}
// computePercentile computes the p-th percentile (01) from a pre-sorted float64 slice using linear interpolation.
func computePercentile(sorted []float64, p float64) float64 {
if len(sorted) == 0 {
return 0
}
if len(sorted) == 1 {
return sorted[0]
}
rank := p * float64(len(sorted)-1)
lower := int(math.Floor(rank))
upper := int(math.Ceil(rank))
if lower == upper {
return sorted[lower]
}
frac := rank - float64(lower)
return sorted[lower]*(1-frac) + sorted[upper]*frac
}
// GetLatencyHistogram returns time-bucketed latency percentiles (avg, p90, p95, p99) for the given filters.
// PostgreSQL uses database-level percentile_cont aggregation (returns 1 row per bucket).
// MySQL and SQLite fall back to Go-based percentile computation (loads individual latency values).
func (s *RDBLogStore) GetLatencyHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*LatencyHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getLatencyHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
baseQuery = baseQuery.Where("latency IS NOT NULL")
switch dialect {
case "sqlite":
return s.getLatencyHistogramSQLite(ctx, baseQuery, filters, bucketSizeSeconds)
case "mysql":
return s.getLatencyHistogramMySQL(ctx, baseQuery, filters, bucketSizeSeconds)
default:
return s.getLatencyHistogramPercentileCont(ctx, baseQuery, filters, bucketSizeSeconds)
}
}
// getLatencyHistogramPercentileCont uses database-level percentile_cont for PostgreSQL.
// Returns 1 aggregated row per bucket instead of loading all individual latency values.
func (s *RDBLogStore) getLatencyHistogramPercentileCont(ctx context.Context, baseQuery *gorm.DB, filters SearchFilters, bucketSizeSeconds int64) (*LatencyHistogramResult, error) {
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
AvgLatency sql.NullFloat64 `gorm:"column:avg_latency"`
P90Latency sql.NullFloat64 `gorm:"column:p90_latency"`
P95Latency sql.NullFloat64 `gorm:"column:p95_latency"`
P99Latency sql.NullFloat64 `gorm:"column:p99_latency"`
TotalRequests int64 `gorm:"column:total_requests"`
}
selectClause := fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
AVG(latency) as avg_latency,
percentile_cont(0.90) WITHIN GROUP (ORDER BY latency) as p90_latency,
percentile_cont(0.95) WITHIN GROUP (ORDER BY latency) as p95_latency,
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency) as p99_latency,
COUNT(*) as total_requests
`, bucketSizeSeconds, bucketSizeSeconds)
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get latency histogram: %w", err)
}
computedBuckets := make(map[int64]LatencyHistogramBucket, len(results))
var orderedKeys []int64
for _, r := range results {
orderedKeys = append(orderedKeys, r.BucketTimestamp)
computedBuckets[r.BucketTimestamp] = LatencyHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
AvgLatency: r.AvgLatency.Float64,
P90Latency: r.P90Latency.Float64,
P95Latency: r.P95Latency.Float64,
P99Latency: r.P99Latency.Float64,
TotalRequests: r.TotalRequests,
}
}
return s.buildLatencyHistogramResult(computedBuckets, orderedKeys, filters, bucketSizeSeconds)
}
// getLatencyHistogramSQLite uses Go-based percentile computation for SQLite
// which lacks percentile_cont.
func (s *RDBLogStore) getLatencyHistogramSQLite(ctx context.Context, baseQuery *gorm.DB, filters SearchFilters, bucketSizeSeconds int64) (*LatencyHistogramResult, error) {
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Latency float64 `gorm:"column:latency"`
}
selectClause := fmt.Sprintf(
`(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp, latency`,
bucketSizeSeconds, bucketSizeSeconds,
)
if err := baseQuery.
Select(selectClause).
Order("bucket_timestamp ASC, latency ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get latency histogram: %w", err)
}
type bucketData struct {
latencies []float64
}
bucketMap := make(map[int64]*bucketData)
var orderedKeys []int64
for _, r := range results {
bd, exists := bucketMap[r.BucketTimestamp]
if !exists {
bd = &bucketData{}
bucketMap[r.BucketTimestamp] = bd
orderedKeys = append(orderedKeys, r.BucketTimestamp)
}
bd.latencies = append(bd.latencies, r.Latency)
}
computedBuckets := make(map[int64]LatencyHistogramBucket, len(bucketMap))
for ts, bd := range bucketMap {
var sum float64
for _, v := range bd.latencies {
sum += v
}
computedBuckets[ts] = LatencyHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
AvgLatency: sum / float64(len(bd.latencies)),
P90Latency: computePercentile(bd.latencies, 0.90),
P95Latency: computePercentile(bd.latencies, 0.95),
P99Latency: computePercentile(bd.latencies, 0.99),
TotalRequests: int64(len(bd.latencies)),
}
}
return s.buildLatencyHistogramResult(computedBuckets, orderedKeys, filters, bucketSizeSeconds)
}
// getLatencyHistogramMySQL uses Go-based percentile computation for MySQL
// which lacks percentile_cont.
func (s *RDBLogStore) getLatencyHistogramMySQL(ctx context.Context, baseQuery *gorm.DB, filters SearchFilters, bucketSizeSeconds int64) (*LatencyHistogramResult, error) {
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Latency float64 `gorm:"column:latency"`
}
selectClause := fmt.Sprintf(
`(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp, latency`,
bucketSizeSeconds, bucketSizeSeconds,
)
if err := baseQuery.
Select(selectClause).
Order("bucket_timestamp ASC, latency ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get latency histogram: %w", err)
}
type bucketData struct {
latencies []float64
}
bucketMap := make(map[int64]*bucketData)
var orderedKeys []int64
for _, r := range results {
bd, exists := bucketMap[r.BucketTimestamp]
if !exists {
bd = &bucketData{}
bucketMap[r.BucketTimestamp] = bd
orderedKeys = append(orderedKeys, r.BucketTimestamp)
}
bd.latencies = append(bd.latencies, r.Latency)
}
computedBuckets := make(map[int64]LatencyHistogramBucket, len(bucketMap))
for ts, bd := range bucketMap {
var sum float64
for _, v := range bd.latencies {
sum += v
}
computedBuckets[ts] = LatencyHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
AvgLatency: sum / float64(len(bd.latencies)),
P90Latency: computePercentile(bd.latencies, 0.90),
P95Latency: computePercentile(bd.latencies, 0.95),
P99Latency: computePercentile(bd.latencies, 0.99),
TotalRequests: int64(len(bd.latencies)),
}
}
return s.buildLatencyHistogramResult(computedBuckets, orderedKeys, filters, bucketSizeSeconds)
}
// buildLatencyHistogramResult fills in bucket timestamps for the time range and returns the result.
func (s *RDBLogStore) buildLatencyHistogramResult(computedBuckets map[int64]LatencyHistogramBucket, orderedKeys []int64, filters SearchFilters, bucketSizeSeconds int64) (*LatencyHistogramResult, error) {
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
if len(allTimestamps) == 0 {
buckets := make([]LatencyHistogramBucket, 0, len(computedBuckets))
for _, ts := range orderedKeys {
buckets = append(buckets, computedBuckets[ts])
}
return &LatencyHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
}, nil
}
buckets := make([]LatencyHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if bucket, exists := computedBuckets[ts]; exists {
buckets[i] = bucket
} else {
buckets[i] = LatencyHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
}
}
}
return &LatencyHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
}, nil
}
// GetModelRankings returns models ranked by usage with trend comparison to the previous period.
func (s *RDBLogStore) GetModelRankings(ctx context.Context, filters SearchFilters) (*ModelRankingResult, error) {
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) {
return s.getModelRankingsFromMatView(ctx, filters)
}
selectClause := `
model,
provider,
COUNT(*) as total_requests,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_count,
SUM(total_tokens) as total_tokens,
COALESCE(SUM(cost), 0) as total_cost,
AVG(latency) as avg_latency
`
// Query current period
currentQuery := s.db.WithContext(ctx).Model(&Log{})
currentQuery = s.applyFilters(currentQuery, filters)
currentQuery = currentQuery.Where("status IN ?", []string{"success", "error"})
currentQuery = currentQuery.Where("model IS NOT NULL AND model != ''")
var currentResults []struct {
Model string `gorm:"column:model"`
Provider string `gorm:"column:provider"`
TotalRequests int64 `gorm:"column:total_requests"`
SuccessCount int64 `gorm:"column:success_count"`
TotalTokens sql.NullInt64 `gorm:"column:total_tokens"`
TotalCost sql.NullFloat64 `gorm:"column:total_cost"`
AvgLatency sql.NullFloat64 `gorm:"column:avg_latency"`
}
if err := currentQuery.
Select(selectClause).
Group("model, provider").
Order("total_requests DESC").
Limit(defaultMaxRankingsLimit).
Find(&currentResults).Error; err != nil {
return nil, fmt.Errorf("failed to get model rankings: %w", err)
}
// Query previous period for trend comparison
type modelProviderKey struct {
provider string
model string
}
prevMap := make(map[modelProviderKey]ModelRankingEntry)
if filters.StartTime != nil && filters.EndTime != nil {
duration := filters.EndTime.Sub(*filters.StartTime)
prevStart := filters.StartTime.Add(-duration)
prevEnd := filters.StartTime.Add(-time.Nanosecond)
prevFilters := filters
prevFilters.StartTime = &prevStart
prevFilters.EndTime = &prevEnd
prevQuery := s.db.WithContext(ctx).Model(&Log{})
prevQuery = s.applyFilters(prevQuery, prevFilters)
prevQuery = prevQuery.Where("status IN ?", []string{"success", "error"})
prevQuery = prevQuery.Where("model IS NOT NULL AND model != ''")
// Only fetch previous-period data for (model, provider) pairs that
// appear in the current ranking so trend computation is accurate even
// when the previous period has more groups than the limit.
if len(currentResults) > 0 {
pairConditions := make([]string, len(currentResults))
pairArgs := make([]interface{}, 0, len(currentResults)*2)
for i, r := range currentResults {
pairConditions[i] = "(model = ? AND provider = ?)"
pairArgs = append(pairArgs, r.Model, r.Provider)
}
prevQuery = prevQuery.Where(strings.Join(pairConditions, " OR "), pairArgs...)
}
var prevResults []struct {
Model string `gorm:"column:model"`
Provider string `gorm:"column:provider"`
TotalRequests int64 `gorm:"column:total_requests"`
SuccessCount int64 `gorm:"column:success_count"`
TotalTokens sql.NullInt64 `gorm:"column:total_tokens"`
TotalCost sql.NullFloat64 `gorm:"column:total_cost"`
AvgLatency sql.NullFloat64 `gorm:"column:avg_latency"`
}
if err := prevQuery.
Select(selectClause).
Group("model, provider").
Find(&prevResults).Error; err != nil {
return nil, fmt.Errorf("failed to get previous period model rankings: %w", err)
}
for _, r := range prevResults {
key := modelProviderKey{provider: r.Provider, model: r.Model}
prevMap[key] = ModelRankingEntry{
Model: r.Model,
Provider: r.Provider,
TotalRequests: r.TotalRequests,
TotalTokens: r.TotalTokens.Int64,
TotalCost: r.TotalCost.Float64,
AvgLatency: r.AvgLatency.Float64,
}
}
}
// Build results with trends
rankings := make([]ModelRankingWithTrend, len(currentResults))
for i, r := range currentResults {
entry := ModelRankingEntry{
Model: r.Model,
Provider: r.Provider,
TotalRequests: r.TotalRequests,
SuccessCount: r.SuccessCount,
TotalTokens: r.TotalTokens.Int64,
TotalCost: r.TotalCost.Float64,
AvgLatency: r.AvgLatency.Float64,
}
if r.TotalRequests > 0 {
entry.SuccessRate = float64(r.SuccessCount) / float64(r.TotalRequests) * 100
}
var trend ModelRankingTrend
key := modelProviderKey{provider: r.Provider, model: r.Model}
if prev, ok := prevMap[key]; ok && prev.TotalRequests > 0 {
trend.HasPreviousPeriod = true
trend.RequestsTrend = pctChange(float64(prev.TotalRequests), float64(r.TotalRequests))
trend.TokensTrend = pctChange(float64(prev.TotalTokens), float64(r.TotalTokens.Int64))
trend.CostTrend = pctChange(prev.TotalCost, r.TotalCost.Float64)
if prev.AvgLatency > 0 {
trend.LatencyTrend = pctChange(prev.AvgLatency, r.AvgLatency.Float64)
}
}
rankings[i] = ModelRankingWithTrend{
ModelRankingEntry: entry,
Trend: trend,
}
}
return &ModelRankingResult{Rankings: rankings}, nil
}
// GetUserRankings returns users ranked by usage with trend comparison to the previous period.
func (s *RDBLogStore) GetUserRankings(ctx context.Context, filters SearchFilters) (*UserRankingResult, error) {
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) {
return s.getUserRankingsFromMatView(ctx, filters)
}
selectClause := `
user_id,
COUNT(*) as total_requests,
SUM(total_tokens) as total_tokens,
COALESCE(SUM(cost), 0) as total_cost
`
// Query current period
currentQuery := s.db.WithContext(ctx).Model(&Log{})
currentQuery = s.applyFilters(currentQuery, filters)
currentQuery = currentQuery.Where("status IN ?", []string{"success", "error"})
currentQuery = currentQuery.Where("user_id IS NOT NULL AND user_id != ''")
var currentResults []struct {
UserID string `gorm:"column:user_id"`
TotalRequests int64 `gorm:"column:total_requests"`
TotalTokens sql.NullInt64 `gorm:"column:total_tokens"`
TotalCost sql.NullFloat64 `gorm:"column:total_cost"`
}
if err := currentQuery.
Select(selectClause).
Group("user_id").
Order("total_requests DESC").
Limit(defaultMaxRankingsLimit).
Find(&currentResults).Error; err != nil {
return nil, fmt.Errorf("failed to get user rankings: %w", err)
}
// Query previous period for trend comparison
prevMap := make(map[string]UserRankingEntry)
if filters.StartTime != nil && filters.EndTime != nil {
duration := filters.EndTime.Sub(*filters.StartTime)
prevStart := filters.StartTime.Add(-duration)
prevEnd := filters.StartTime.Add(-time.Nanosecond)
prevFilters := filters
prevFilters.StartTime = &prevStart
prevFilters.EndTime = &prevEnd
prevQuery := s.db.WithContext(ctx).Model(&Log{})
prevQuery = s.applyFilters(prevQuery, prevFilters)
prevQuery = prevQuery.Where("status IN ?", []string{"success", "error"})
prevQuery = prevQuery.Where("user_id IS NOT NULL AND user_id != ''")
if len(currentResults) > 0 {
userIDs := make([]string, len(currentResults))
for i, r := range currentResults {
userIDs[i] = r.UserID
}
prevQuery = prevQuery.Where("user_id IN ?", userIDs)
}
var prevResults []struct {
UserID string `gorm:"column:user_id"`
TotalRequests int64 `gorm:"column:total_requests"`
TotalTokens sql.NullInt64 `gorm:"column:total_tokens"`
TotalCost sql.NullFloat64 `gorm:"column:total_cost"`
}
if err := prevQuery.
Select(selectClause).
Group("user_id").
Find(&prevResults).Error; err != nil {
return nil, fmt.Errorf("failed to get previous period user rankings: %w", err)
}
for _, r := range prevResults {
prevMap[r.UserID] = UserRankingEntry{
UserID: r.UserID,
TotalRequests: r.TotalRequests,
TotalTokens: r.TotalTokens.Int64,
TotalCost: r.TotalCost.Float64,
}
}
}
// Build results with trends
rankings := make([]UserRankingWithTrend, len(currentResults))
for i, r := range currentResults {
entry := UserRankingEntry{
UserID: r.UserID,
TotalRequests: r.TotalRequests,
TotalTokens: r.TotalTokens.Int64,
TotalCost: r.TotalCost.Float64,
}
var trend UserRankingTrend
if prev, ok := prevMap[r.UserID]; ok && prev.TotalRequests > 0 {
trend.HasPreviousPeriod = true
trend.RequestsTrend = pctChange(float64(prev.TotalRequests), float64(r.TotalRequests))
trend.TokensTrend = pctChange(float64(prev.TotalTokens), float64(r.TotalTokens.Int64))
trend.CostTrend = pctChange(prev.TotalCost, r.TotalCost.Float64)
}
rankings[i] = UserRankingWithTrend{
UserRankingEntry: entry,
Trend: trend,
}
}
return &UserRankingResult{Rankings: rankings}, nil
}
// pctChange computes the percentage change from old to new.
func pctChange(old, new float64) float64 {
if old == 0 {
return 0
}
return (new - old) / old * 100
}
// GetProviderCostHistogram returns time-bucketed cost data with provider breakdown for the given filters.
func (s *RDBLogStore) GetProviderCostHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderCostHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getProviderCostHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
baseQuery = baseQuery.Where("cost IS NOT NULL AND cost > 0")
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Provider string `gorm:"column:provider"`
TotalCost float64 `gorm:"column:total_cost"`
}
var selectClause string
switch dialect {
case "sqlite":
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
provider,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
provider,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
default:
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
provider,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp, provider").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get provider cost histogram: %w", err)
}
bucketMap := make(map[int64]*ProviderCostHistogramBucket)
providersSet := make(map[string]bool)
for _, r := range results {
providersSet[r.Provider] = true
if bucket, exists := bucketMap[r.BucketTimestamp]; exists {
bucket.TotalCost += r.TotalCost
bucket.ByProvider[r.Provider] = r.TotalCost
} else {
bucketMap[r.BucketTimestamp] = &ProviderCostHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
TotalCost: r.TotalCost,
ByProvider: map[string]float64{r.Provider: r.TotalCost},
}
}
}
providers := make([]string, 0, len(providersSet))
for provider := range providersSet {
providers = append(providers, provider)
}
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
if len(allTimestamps) == 0 {
buckets := make([]ProviderCostHistogramBucket, 0, len(bucketMap))
for _, bucket := range bucketMap {
buckets = append(buckets, *bucket)
}
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Timestamp.Before(buckets[j].Timestamp)
})
return &ProviderCostHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Providers: providers,
}, nil
}
buckets := make([]ProviderCostHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if bucket, exists := bucketMap[ts]; exists {
buckets[i] = *bucket
} else {
buckets[i] = ProviderCostHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
TotalCost: 0,
ByProvider: make(map[string]float64),
}
}
}
return &ProviderCostHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Providers: providers,
}, nil
}
// GetProviderTokenHistogram returns time-bucketed token usage with provider breakdown for the given filters.
func (s *RDBLogStore) GetProviderTokenHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderTokenHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getProviderTokenHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Provider string `gorm:"column:provider"`
PromptTokens int64 `gorm:"column:prompt_tokens"`
CompletionTokens int64 `gorm:"column:completion_tokens"`
TotalTokens int64 `gorm:"column:total_tokens"`
}
var selectClause string
switch dialect {
case "sqlite":
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
provider,
COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
COALESCE(SUM(completion_tokens), 0) as completion_tokens,
COALESCE(SUM(total_tokens), 0) as total_tokens
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
provider,
COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
COALESCE(SUM(completion_tokens), 0) as completion_tokens,
COALESCE(SUM(total_tokens), 0) as total_tokens
`, bucketSizeSeconds, bucketSizeSeconds)
default:
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
provider,
COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,
COALESCE(SUM(completion_tokens), 0) as completion_tokens,
COALESCE(SUM(total_tokens), 0) as total_tokens
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp, provider").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get provider token histogram: %w", err)
}
bucketMap := make(map[int64]*ProviderTokenHistogramBucket)
providersSet := make(map[string]bool)
for _, r := range results {
providersSet[r.Provider] = true
if bucket, exists := bucketMap[r.BucketTimestamp]; exists {
bucket.ByProvider[r.Provider] = ProviderTokenStats{
PromptTokens: r.PromptTokens,
CompletionTokens: r.CompletionTokens,
TotalTokens: r.TotalTokens,
}
} else {
bucketMap[r.BucketTimestamp] = &ProviderTokenHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
ByProvider: map[string]ProviderTokenStats{
r.Provider: {
PromptTokens: r.PromptTokens,
CompletionTokens: r.CompletionTokens,
TotalTokens: r.TotalTokens,
},
},
}
}
}
providers := make([]string, 0, len(providersSet))
for provider := range providersSet {
providers = append(providers, provider)
}
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
if len(allTimestamps) == 0 {
buckets := make([]ProviderTokenHistogramBucket, 0, len(bucketMap))
for _, bucket := range bucketMap {
buckets = append(buckets, *bucket)
}
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Timestamp.Before(buckets[j].Timestamp)
})
return &ProviderTokenHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Providers: providers,
}, nil
}
buckets := make([]ProviderTokenHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if bucket, exists := bucketMap[ts]; exists {
buckets[i] = *bucket
} else {
buckets[i] = ProviderTokenHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
ByProvider: make(map[string]ProviderTokenStats),
}
}
}
return &ProviderTokenHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Providers: providers,
}, nil
}
// GetProviderLatencyHistogram returns time-bucketed latency percentiles with provider breakdown for the given filters.
// PostgreSQL uses database-level percentile_cont aggregation.
// MySQL and SQLite fall back to Go-based percentile computation.
func (s *RDBLogStore) GetProviderLatencyHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderLatencyHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getProviderLatencyHistogramFromMatView(ctx, filters, bucketSizeSeconds)
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
baseQuery = baseQuery.Where("latency IS NOT NULL")
switch dialect {
case "sqlite":
return s.getProviderLatencyHistogramSQLite(ctx, baseQuery, filters, bucketSizeSeconds)
case "mysql":
return s.getProviderLatencyHistogramMySQL(ctx, baseQuery, filters, bucketSizeSeconds)
default:
return s.getProviderLatencyHistogramPercentileCont(ctx, baseQuery, filters, bucketSizeSeconds)
}
}
// getProviderLatencyHistogramPercentileCont uses database-level percentile_cont for PostgreSQL.
// Returns 1 aggregated row per (bucket, provider) instead of loading all individual latency values.
func (s *RDBLogStore) getProviderLatencyHistogramPercentileCont(ctx context.Context, baseQuery *gorm.DB, filters SearchFilters, bucketSizeSeconds int64) (*ProviderLatencyHistogramResult, error) {
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Provider string `gorm:"column:provider"`
AvgLatency sql.NullFloat64 `gorm:"column:avg_latency"`
P90Latency sql.NullFloat64 `gorm:"column:p90_latency"`
P95Latency sql.NullFloat64 `gorm:"column:p95_latency"`
P99Latency sql.NullFloat64 `gorm:"column:p99_latency"`
TotalRequests int64 `gorm:"column:total_requests"`
}
selectClause := fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
provider,
AVG(latency) as avg_latency,
percentile_cont(0.90) WITHIN GROUP (ORDER BY latency) as p90_latency,
percentile_cont(0.95) WITHIN GROUP (ORDER BY latency) as p95_latency,
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency) as p99_latency,
COUNT(*) as total_requests
`, bucketSizeSeconds, bucketSizeSeconds)
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp, provider").
Order("bucket_timestamp ASC, provider ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get provider latency histogram: %w", err)
}
providersSet := make(map[string]bool)
computedBuckets := make(map[int64]*ProviderLatencyHistogramBucket)
var orderedBuckets []int64
seenBuckets := make(map[int64]bool)
for _, r := range results {
providersSet[r.Provider] = true
if !seenBuckets[r.BucketTimestamp] {
seenBuckets[r.BucketTimestamp] = true
orderedBuckets = append(orderedBuckets, r.BucketTimestamp)
}
stats := ProviderLatencyStats{
AvgLatency: r.AvgLatency.Float64,
P90Latency: r.P90Latency.Float64,
P95Latency: r.P95Latency.Float64,
P99Latency: r.P99Latency.Float64,
TotalRequests: r.TotalRequests,
}
if bucket, exists := computedBuckets[r.BucketTimestamp]; exists {
bucket.ByProvider[r.Provider] = stats
} else {
computedBuckets[r.BucketTimestamp] = &ProviderLatencyHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
ByProvider: map[string]ProviderLatencyStats{r.Provider: stats},
}
}
}
providers := make([]string, 0, len(providersSet))
for provider := range providersSet {
providers = append(providers, provider)
}
return s.buildProviderLatencyHistogramResult(computedBuckets, orderedBuckets, providers, filters, bucketSizeSeconds)
}
// getProviderLatencyHistogramSQLite uses Go-based percentile computation for SQLite
// which lacks percentile_cont.
func (s *RDBLogStore) getProviderLatencyHistogramSQLite(ctx context.Context, baseQuery *gorm.DB, filters SearchFilters, bucketSizeSeconds int64) (*ProviderLatencyHistogramResult, error) {
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Provider string `gorm:"column:provider"`
Latency float64 `gorm:"column:latency"`
}
selectClause := fmt.Sprintf(
`(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp, provider, latency`,
bucketSizeSeconds, bucketSizeSeconds,
)
if err := baseQuery.
Select(selectClause).
Order("bucket_timestamp ASC, provider ASC, latency ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get provider latency histogram: %w", err)
}
type providerBucketKey struct {
BucketTimestamp int64
Provider string
}
latencyMap := make(map[providerBucketKey][]float64)
providersSet := make(map[string]bool)
var orderedBuckets []int64
seenBuckets := make(map[int64]bool)
for _, r := range results {
providersSet[r.Provider] = true
key := providerBucketKey{BucketTimestamp: r.BucketTimestamp, Provider: r.Provider}
latencyMap[key] = append(latencyMap[key], r.Latency)
if !seenBuckets[r.BucketTimestamp] {
seenBuckets[r.BucketTimestamp] = true
orderedBuckets = append(orderedBuckets, r.BucketTimestamp)
}
}
providers := make([]string, 0, len(providersSet))
for provider := range providersSet {
providers = append(providers, provider)
}
computedBuckets := make(map[int64]*ProviderLatencyHistogramBucket)
for key, latencies := range latencyMap {
var sum float64
for _, v := range latencies {
sum += v
}
stats := ProviderLatencyStats{
AvgLatency: sum / float64(len(latencies)),
P90Latency: computePercentile(latencies, 0.90),
P95Latency: computePercentile(latencies, 0.95),
P99Latency: computePercentile(latencies, 0.99),
TotalRequests: int64(len(latencies)),
}
if bucket, exists := computedBuckets[key.BucketTimestamp]; exists {
bucket.ByProvider[key.Provider] = stats
} else {
computedBuckets[key.BucketTimestamp] = &ProviderLatencyHistogramBucket{
Timestamp: time.Unix(key.BucketTimestamp, 0).UTC(),
ByProvider: map[string]ProviderLatencyStats{key.Provider: stats},
}
}
}
return s.buildProviderLatencyHistogramResult(computedBuckets, orderedBuckets, providers, filters, bucketSizeSeconds)
}
// getProviderLatencyHistogramMySQL uses Go-based percentile computation for MySQL
// which lacks percentile_cont.
func (s *RDBLogStore) getProviderLatencyHistogramMySQL(ctx context.Context, baseQuery *gorm.DB, filters SearchFilters, bucketSizeSeconds int64) (*ProviderLatencyHistogramResult, error) {
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Provider string `gorm:"column:provider"`
Latency float64 `gorm:"column:latency"`
}
selectClause := fmt.Sprintf(
`(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp, provider, latency`,
bucketSizeSeconds, bucketSizeSeconds,
)
if err := baseQuery.
Select(selectClause).
Order("bucket_timestamp ASC, provider ASC, latency ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get provider latency histogram: %w", err)
}
type bucketProviderKey struct {
BucketTimestamp int64
Provider string
}
latencyMap := make(map[bucketProviderKey][]float64)
providersSet := make(map[string]bool)
var orderedBuckets []int64
seenBuckets := make(map[int64]bool)
for _, r := range results {
key := bucketProviderKey{r.BucketTimestamp, r.Provider}
latencyMap[key] = append(latencyMap[key], r.Latency)
providersSet[r.Provider] = true
if !seenBuckets[r.BucketTimestamp] {
seenBuckets[r.BucketTimestamp] = true
orderedBuckets = append(orderedBuckets, r.BucketTimestamp)
}
}
providers := make([]string, 0, len(providersSet))
for provider := range providersSet {
providers = append(providers, provider)
}
computedBuckets := make(map[int64]*ProviderLatencyHistogramBucket)
for key, latencies := range latencyMap {
var sum float64
for _, v := range latencies {
sum += v
}
stats := ProviderLatencyStats{
AvgLatency: sum / float64(len(latencies)),
P90Latency: computePercentile(latencies, 0.90),
P95Latency: computePercentile(latencies, 0.95),
P99Latency: computePercentile(latencies, 0.99),
TotalRequests: int64(len(latencies)),
}
if bucket, exists := computedBuckets[key.BucketTimestamp]; exists {
bucket.ByProvider[key.Provider] = stats
} else {
computedBuckets[key.BucketTimestamp] = &ProviderLatencyHistogramBucket{
Timestamp: time.Unix(key.BucketTimestamp, 0).UTC(),
ByProvider: map[string]ProviderLatencyStats{key.Provider: stats},
}
}
}
return s.buildProviderLatencyHistogramResult(computedBuckets, orderedBuckets, providers, filters, bucketSizeSeconds)
}
// buildProviderLatencyHistogramResult fills in bucket timestamps and returns the result.
func (s *RDBLogStore) buildProviderLatencyHistogramResult(computedBuckets map[int64]*ProviderLatencyHistogramBucket, orderedBuckets []int64, providers []string, filters SearchFilters, bucketSizeSeconds int64) (*ProviderLatencyHistogramResult, error) {
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
if len(allTimestamps) == 0 {
buckets := make([]ProviderLatencyHistogramBucket, 0, len(computedBuckets))
for _, ts := range orderedBuckets {
if bucket, exists := computedBuckets[ts]; exists {
buckets = append(buckets, *bucket)
}
}
return &ProviderLatencyHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Providers: providers,
}, nil
}
buckets := make([]ProviderLatencyHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if bucket, exists := computedBuckets[ts]; exists {
buckets[i] = *bucket
} else {
buckets[i] = ProviderLatencyHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
ByProvider: make(map[string]ProviderLatencyStats),
}
}
}
return &ProviderLatencyHistogramResult{
Buckets: buckets,
BucketSizeSeconds: bucketSizeSeconds,
Providers: providers,
}, nil
}
// ---------------------------------------------------------------------------
// Generic dimension histogram methods
// ---------------------------------------------------------------------------
// GetDimensionCostHistogram returns time-bucketed cost data grouped by the specified dimension.
// Uses the mv_logs_hourly materialized view on PostgreSQL when eligible; falls back to raw queries otherwise.
func (s *RDBLogStore) GetDimensionCostHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64, dimension HistogramDimension) (*DimensionCostHistogramResult, error) {
if !ValidHistogramDimensions[dimension] {
return nil, fmt.Errorf("invalid histogram dimension: %s", dimension)
}
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getDimensionCostHistogramFromMatView(ctx, filters, bucketSizeSeconds, dimension)
}
dimCol := string(dimension)
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
baseQuery = baseQuery.Where("cost IS NOT NULL AND cost > 0")
var bucketExpr string
switch dialect {
case "sqlite":
bucketExpr = fmt.Sprintf("CAST((CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d AS INTEGER)", bucketSizeSeconds, bucketSizeSeconds)
default:
bucketExpr = fmt.Sprintf("CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT)", bucketSizeSeconds, bucketSizeSeconds)
}
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
DimValue string `gorm:"column:dim_value"`
Cost float64 `gorm:"column:cost"`
}
if err := baseQuery.Select(fmt.Sprintf(`
%s AS bucket_timestamp,
COALESCE(%s, '') AS dim_value,
SUM(cost) AS cost
`, bucketExpr, dimCol)).
Group(fmt.Sprintf("bucket_timestamp, %s", dimCol)).
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, err
}
type bucketAgg struct {
totalCost float64
byDimension map[string]float64
}
grouped := make(map[int64]*bucketAgg)
dimSet := make(map[string]struct{})
for _, r := range results {
a, ok := grouped[r.BucketTimestamp]
if !ok {
a = &bucketAgg{byDimension: make(map[string]float64)}
grouped[r.BucketTimestamp] = a
}
a.totalCost += r.Cost
a.byDimension[r.DimValue] += r.Cost
dimSet[r.DimValue] = struct{}{}
}
dimValues := sortedStringKeys(dimSet)
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
// If no time range specified, build buckets directly from query results
if len(allTimestamps) == 0 {
keys := make([]int64, 0, len(grouped))
for ts := range grouped {
keys = append(keys, ts)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
buckets := make([]DimensionCostHistogramBucket, 0, len(keys))
for _, ts := range keys {
a := grouped[ts]
buckets = append(buckets, DimensionCostHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
TotalCost: a.totalCost,
ByDimension: a.byDimension,
})
}
return &DimensionCostHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Dimension: dimension, DimensionValues: dimValues}, nil
}
buckets := make([]DimensionCostHistogramBucket, 0, len(allTimestamps))
for _, ts := range allTimestamps {
b := DimensionCostHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByDimension: make(map[string]float64)}
if a, ok := grouped[ts]; ok {
b.TotalCost = a.totalCost
b.ByDimension = a.byDimension
}
buckets = append(buckets, b)
}
return &DimensionCostHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Dimension: dimension, DimensionValues: dimValues}, nil
}
// GetDimensionTokenHistogram returns time-bucketed token usage grouped by the specified dimension.
// Uses the mv_logs_hourly materialized view on PostgreSQL when eligible; falls back to raw queries otherwise.
func (s *RDBLogStore) GetDimensionTokenHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64, dimension HistogramDimension) (*DimensionTokenHistogramResult, error) {
if !ValidHistogramDimensions[dimension] {
return nil, fmt.Errorf("invalid histogram dimension: %s", dimension)
}
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getDimensionTokenHistogramFromMatView(ctx, filters, bucketSizeSeconds, dimension)
}
dimCol := string(dimension)
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
var bucketExpr string
switch dialect {
case "sqlite":
bucketExpr = fmt.Sprintf("CAST((CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d AS INTEGER)", bucketSizeSeconds, bucketSizeSeconds)
default:
bucketExpr = fmt.Sprintf("CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT)", bucketSizeSeconds, bucketSizeSeconds)
}
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
DimValue string `gorm:"column:dim_value"`
PromptTokens int64 `gorm:"column:prompt_tokens"`
CompletionTokens int64 `gorm:"column:completion_tokens"`
TotalTokens int64 `gorm:"column:total_tkns"`
}
if err := baseQuery.Select(fmt.Sprintf(`
%s AS bucket_timestamp,
COALESCE(%s, '') AS dim_value,
COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
COALESCE(SUM(total_tokens), 0) AS total_tkns
`, bucketExpr, dimCol)).
Group(fmt.Sprintf("bucket_timestamp, %s", dimCol)).
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, err
}
type dimAgg struct {
prompt, completion, total int64
}
type bucketAgg struct {
byDimension map[string]*dimAgg
}
grouped := make(map[int64]*bucketAgg)
dimSet := make(map[string]struct{})
for _, r := range results {
a, ok := grouped[r.BucketTimestamp]
if !ok {
a = &bucketAgg{byDimension: make(map[string]*dimAgg)}
grouped[r.BucketTimestamp] = a
}
da, ok := a.byDimension[r.DimValue]
if !ok {
da = &dimAgg{}
a.byDimension[r.DimValue] = da
}
da.prompt += r.PromptTokens
da.completion += r.CompletionTokens
da.total += r.TotalTokens
dimSet[r.DimValue] = struct{}{}
}
dimValues := sortedStringKeys(dimSet)
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
// If no time range specified, build buckets directly from query results
if len(allTimestamps) == 0 {
keys := make([]int64, 0, len(grouped))
for ts := range grouped {
keys = append(keys, ts)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
buckets := make([]DimensionTokenHistogramBucket, 0, len(keys))
for _, ts := range keys {
a := grouped[ts]
b := DimensionTokenHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByDimension: make(map[string]DimensionTokenStats)}
for dim, da := range a.byDimension {
b.ByDimension[dim] = DimensionTokenStats{
PromptTokens: da.prompt,
CompletionTokens: da.completion,
TotalTokens: da.total,
}
}
buckets = append(buckets, b)
}
return &DimensionTokenHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Dimension: dimension, DimensionValues: dimValues}, nil
}
buckets := make([]DimensionTokenHistogramBucket, 0, len(allTimestamps))
for _, ts := range allTimestamps {
b := DimensionTokenHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByDimension: make(map[string]DimensionTokenStats)}
if a, ok := grouped[ts]; ok {
for dim, da := range a.byDimension {
b.ByDimension[dim] = DimensionTokenStats{
PromptTokens: da.prompt,
CompletionTokens: da.completion,
TotalTokens: da.total,
}
}
}
buckets = append(buckets, b)
}
return &DimensionTokenHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Dimension: dimension, DimensionValues: dimValues}, nil
}
// GetDimensionLatencyHistogram returns time-bucketed latency percentiles grouped by the specified dimension.
// Uses the mv_logs_hourly materialized view on PostgreSQL when eligible; falls back to raw queries otherwise.
// The fallback path computes AVG latency only (no percentiles) since percentile_cont is Postgres-specific.
func (s *RDBLogStore) GetDimensionLatencyHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64, dimension HistogramDimension) (*DimensionLatencyHistogramResult, error) {
if !ValidHistogramDimensions[dimension] {
return nil, fmt.Errorf("invalid histogram dimension: %s", dimension)
}
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
if s.db.Dialector.Name() == "postgres" && s.canUseMatView(filters) && bucketSizeSeconds >= 3600 {
return s.getDimensionLatencyHistogramFromMatView(ctx, filters, bucketSizeSeconds, dimension)
}
dimCol := string(dimension)
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&Log{})
baseQuery = s.applyFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
baseQuery = baseQuery.Where("latency IS NOT NULL")
var bucketExpr string
switch dialect {
case "sqlite":
bucketExpr = fmt.Sprintf("CAST((CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d AS INTEGER)", bucketSizeSeconds, bucketSizeSeconds)
default:
bucketExpr = fmt.Sprintf("CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT)", bucketSizeSeconds, bucketSizeSeconds)
}
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
DimValue string `gorm:"column:dim_value"`
AvgLatency float64 `gorm:"column:avg_lat"`
TotalRequests int64 `gorm:"column:total_requests"`
}
if err := baseQuery.Select(fmt.Sprintf(`
%s AS bucket_timestamp,
COALESCE(%s, '') AS dim_value,
COALESCE(AVG(latency), 0) AS avg_lat,
COUNT(*) AS total_requests
`, bucketExpr, dimCol)).
Group(fmt.Sprintf("bucket_timestamp, %s", dimCol)).
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, err
}
type bucketAgg struct {
byDimension map[string]DimensionLatencyStats
}
grouped := make(map[int64]*bucketAgg)
dimSet := make(map[string]struct{})
for _, r := range results {
a, ok := grouped[r.BucketTimestamp]
if !ok {
a = &bucketAgg{byDimension: make(map[string]DimensionLatencyStats)}
grouped[r.BucketTimestamp] = a
}
a.byDimension[r.DimValue] = DimensionLatencyStats{
AvgLatency: r.AvgLatency,
TotalRequests: r.TotalRequests,
}
dimSet[r.DimValue] = struct{}{}
}
dimValues := sortedStringKeys(dimSet)
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
// If no time range specified, build buckets directly from query results
if len(allTimestamps) == 0 {
keys := make([]int64, 0, len(grouped))
for ts := range grouped {
keys = append(keys, ts)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
buckets := make([]DimensionLatencyHistogramBucket, 0, len(keys))
for _, ts := range keys {
a := grouped[ts]
buckets = append(buckets, DimensionLatencyHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
ByDimension: a.byDimension,
})
}
return &DimensionLatencyHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Dimension: dimension, DimensionValues: dimValues}, nil
}
buckets := make([]DimensionLatencyHistogramBucket, 0, len(allTimestamps))
for _, ts := range allTimestamps {
b := DimensionLatencyHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByDimension: make(map[string]DimensionLatencyStats)}
if a, ok := grouped[ts]; ok {
b.ByDimension = a.byDimension
}
buckets = append(buckets, b)
}
return &DimensionLatencyHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Dimension: dimension, DimensionValues: dimValues}, nil
}
// HasLogs checks if there are any logs in the database.
func (s *RDBLogStore) HasLogs(ctx context.Context) (bool, error) {
var log Log
err := s.db.WithContext(ctx).Select("id").Limit(1).Take(&log).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}
return false, err
}
return true, nil
}
// FindByID gets a log entry from the database by its ID.
func (s *RDBLogStore) FindByID(ctx context.Context, id string) (*Log, error) {
var log Log
if err := s.db.WithContext(ctx).Where("id = ?", id).First(&log).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
return nil, err
}
return &log, nil
}
// IsLogEntryPresent checks if a log entry is present in the database.
// Here we dont load entire log entry in memory - just check if it exists.
func (s *RDBLogStore) IsLogEntryPresent(ctx context.Context, id string) (bool, error) {
var log Log
err := s.db.WithContext(ctx).Select("id").Where("id = ?", id).First(&log).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}
return false, err
}
return true, nil
}
// FindFirst gets a log entry from the database.
func (s *RDBLogStore) FindFirst(ctx context.Context, query any, fields ...string) (*Log, error) {
var log Log
if err := s.db.WithContext(ctx).Select(fields).Where(query).First(&log).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
return nil, err
}
return &log, nil
}
// Flush deletes old log entries from the database.
func (s *RDBLogStore) Flush(ctx context.Context, since time.Time) error {
result := s.db.WithContext(ctx).Where("status = ? AND created_at < ?", "processing", since).Delete(&Log{})
if result.Error != nil {
return fmt.Errorf("failed to cleanup old processing logs: %w", result.Error)
}
return nil
}
// GetDistinctModels returns all unique non-empty model values using SELECT DISTINCT.
// Scoped to recent data to avoid full table scans.
func (s *RDBLogStore) GetDistinctModels(ctx context.Context) ([]string, error) {
if s.db.Dialector.Name() == "postgres" && s.matViewsReady.Load() {
return s.getDistinctModelsFromMatView(ctx)
}
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var models []string
err := s.db.WithContext(ctx).Model(&Log{}).
Where("model IS NOT NULL AND model != '' AND timestamp >= ?", cutoff).
Distinct("model").Limit(defaultFilterDataLimit).Pluck("model", &models).Error
if err != nil {
return nil, fmt.Errorf("failed to get distinct models: %w", err)
}
return models, nil
}
// GetDistinctAliases returns all unique non-empty alias values using SELECT DISTINCT.
// Scoped to recent data to avoid full table scans.
func (s *RDBLogStore) GetDistinctAliases(ctx context.Context) ([]string, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var aliases []string
err := s.db.WithContext(ctx).Model(&Log{}).
Where("alias IS NOT NULL AND alias != '' AND timestamp >= ?", cutoff).
Distinct("alias").Limit(defaultFilterDataLimit).Pluck("alias", &aliases).Error
if err != nil {
return nil, fmt.Errorf("failed to get distinct aliases: %w", err)
}
return aliases, nil
}
// allowedKeyPairColumns is a whitelist of column names that can be used in GetDistinctKeyPairs
// to prevent SQL injection from interpolated column names.
var allowedKeyPairColumns = map[string]struct{}{
"selected_key_id": {},
"selected_key_name": {},
"virtual_key_id": {},
"virtual_key_name": {},
"routing_rule_id": {},
"routing_rule_name": {},
"team_id": {},
"team_name": {},
"customer_id": {},
"customer_name": {},
"user_id": {},
"business_unit_id": {},
"business_unit_name": {},
}
// GetDistinctKeyPairs returns unique non-empty ID-Name pairs for the given columns using SELECT DISTINCT.
// idCol and nameCol must be valid column names (e.g., "selected_key_id", "selected_key_name").
func (s *RDBLogStore) GetDistinctKeyPairs(ctx context.Context, idCol, nameCol string) ([]KeyPairResult, error) {
if s.db.Dialector.Name() == "postgres" && s.matViewsReady.Load() {
return s.getDistinctKeyPairsFromMatView(ctx, idCol, nameCol)
}
if _, ok := allowedKeyPairColumns[idCol]; !ok {
return nil, fmt.Errorf("invalid id column: %s", idCol)
}
if _, ok := allowedKeyPairColumns[nameCol]; !ok {
return nil, fmt.Errorf("invalid name column: %s", nameCol)
}
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var results []KeyPairResult
err := s.db.WithContext(ctx).Model(&Log{}).
Select(fmt.Sprintf("DISTINCT %s as id, %s as name", idCol, nameCol)).
Where(fmt.Sprintf("%s IS NOT NULL AND %s != '' AND %s IS NOT NULL AND %s != '' AND timestamp >= ?", idCol, idCol, nameCol, nameCol), cutoff).
Limit(defaultFilterDataLimit).
Find(&results).Error
if err != nil {
return nil, fmt.Errorf("failed to get distinct key pairs (%s, %s): %w", idCol, nameCol, err)
}
return results, nil
}
// GetDistinctRoutingEngines returns all unique routing engine values from the comma-separated column.
// Scoped to recent data to avoid full table scans.
func (s *RDBLogStore) GetDistinctRoutingEngines(ctx context.Context) ([]string, error) {
if s.db.Dialector.Name() == "postgres" && s.matViewsReady.Load() {
return s.getDistinctRoutingEnginesFromMatView(ctx)
}
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var rawValues []string
err := s.db.WithContext(ctx).Model(&Log{}).
Where("routing_engines_used IS NOT NULL AND routing_engines_used != '' AND timestamp >= ?", cutoff).
Distinct("routing_engines_used").Limit(defaultFilterDataLimit).Pluck("routing_engines_used", &rawValues).Error
if err != nil {
return nil, fmt.Errorf("failed to get distinct routing engines: %w", err)
}
// Each row may contain comma-separated values; deduplicate across all rows
uniqueEngines := make(map[string]struct{})
for _, raw := range rawValues {
for _, engine := range strings.Split(raw, ",") {
engine = strings.TrimSpace(engine)
if engine != "" {
uniqueEngines[engine] = struct{}{}
}
}
}
engines := make([]string, 0, len(uniqueEngines))
for engine := range uniqueEngines {
engines = append(engines, engine)
}
return engines, nil
}
// metadataSystemKeys are metadata keys added by the system that should be excluded from filter data.
var metadataSystemKeys = map[string]struct{}{
"isAsyncRequest": {},
}
const (
// maxMetadataRows is the maximum number of recent rows to scan for metadata keys.
maxMetadataRows = 1000
// maxMetadataValuesPerKey caps the number of distinct values collected per metadata key.
maxMetadataValuesPerKey = 100
)
// GetDistinctMetadataKeys returns unique metadata keys and their distinct values from recent logs.
// It scans a bounded number of recent rows to avoid memory bloat on large tables.
func (s *RDBLogStore) GetDistinctMetadataKeys(ctx context.Context) (map[string][]string, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var metadataStrings []string
// Guard must match the partial-index predicate so the planner uses the GIN index.
var metadataGuard string
if s.db.Dialector.Name() == "postgres" {
metadataGuard = "metadata IS NOT NULL AND metadata IS JSON OBJECT AND metadata != '{}' AND timestamp >= ?"
} else {
metadataGuard = "metadata IS NOT NULL AND json_valid(metadata) AND json_type(metadata) = 'object' AND metadata != '{}' AND timestamp >= ?"
}
err := s.db.WithContext(ctx).Model(&Log{}).
Where(metadataGuard, cutoff).
Order("timestamp DESC").
Limit(maxMetadataRows).
Pluck("metadata", &metadataStrings).Error
if err != nil {
return nil, fmt.Errorf("failed to get metadata: %w", err)
}
// Collect unique key-value pairs with bounded sizes
keyValues := make(map[string]map[string]struct{})
for _, raw := range metadataStrings {
var parsed map[string]interface{}
if err := sonic.UnmarshalString(raw, &parsed); err != nil {
continue
}
for key, val := range parsed {
if _, isSystem := metadataSystemKeys[key]; isSystem {
continue
}
if !isValidMetadataKey(key) {
continue
}
if _, ok := keyValues[key]; !ok {
keyValues[key] = make(map[string]struct{})
}
if len(keyValues[key]) >= maxMetadataValuesPerKey {
continue
}
var strVal string
switch v := val.(type) {
case string:
strVal = v
case float64:
strVal = fmt.Sprint(v)
case bool:
strVal = fmt.Sprint(v)
default:
continue
}
if strVal != "" {
keyValues[key][strVal] = struct{}{}
}
}
}
result := make(map[string][]string, len(keyValues))
for key, vals := range keyValues {
values := make([]string, 0, len(vals))
for v := range vals {
values = append(values, v)
}
sort.Strings(values)
result[key] = values
}
return result, nil
}
// FindAll finds all log entries from the database.
func (s *RDBLogStore) FindAll(ctx context.Context, query any, fields ...string) ([]*Log, error) {
var logs []*Log
if err := s.db.WithContext(ctx).Select(fields).Where(query).Limit(defaultMaxQueryLimit).Find(&logs).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return []*Log{}, nil
}
return nil, err
}
return logs, nil
}
// allowedDistinctLogColumns is an allowlist of column names that can be passed to
// FindAllDistinct. GORM's Distinct() does not parameterize column identifiers,
// so we validate against this set to prevent SQL injection.
var allowedDistinctLogColumns = map[string]struct{}{
"id": {}, "parent_request_id": {}, "timestamp": {}, "object_type": {},
"provider": {}, "model": {}, "number_of_retries": {}, "fallback_index": {},
"selected_key_id": {}, "selected_key_name": {},
"virtual_key_id": {}, "virtual_key_name": {},
"routing_engines_used": {}, "routing_rule_id": {}, "routing_rule_name": {},
"status": {}, "stream": {},
}
// FindAllDistinct finds all distinct log entries for the given fields.
// Uses SQL DISTINCT to return only unique combinations, avoiding loading
// all rows when only unique values are needed (e.g., for filter dropdowns).
func (s *RDBLogStore) FindAllDistinct(ctx context.Context, query any, fields ...string) ([]*Log, error) {
var logs []*Log
db := s.db.WithContext(ctx).Where(query)
if len(fields) > 0 {
for _, f := range fields {
if _, ok := allowedDistinctLogColumns[f]; !ok {
return nil, fmt.Errorf("invalid distinct field: %s", f)
}
}
args := make([]interface{}, len(fields))
for i, f := range fields {
args[i] = f
}
db = db.Distinct(args...)
}
if err := db.Limit(defaultMaxQueryLimit).Find(&logs).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return []*Log{}, nil
}
return nil, err
}
return logs, nil
}
// DeleteLogsBatch deletes logs older than the cutoff time in batches.
func (s *RDBLogStore) DeleteLogsBatch(ctx context.Context, cutoff time.Time, batchSize int) (deletedCount int64, err error) {
// First, select the IDs of logs to delete with proper LIMIT
var ids []string
if err := s.db.WithContext(ctx).
Model(&Log{}).
Select("id").
Where("created_at < ?", cutoff).
Limit(batchSize).
Pluck("id", &ids).Error; err != nil {
return 0, err
}
// If no IDs found, return early
if len(ids) == 0 {
return 0, nil
}
// Delete the selected IDs
result := s.db.WithContext(ctx).Where("id IN ?", ids).Delete(&Log{})
if result.Error != nil {
return 0, result.Error
}
return result.RowsAffected, nil
}
// Close closes the log store.
func (s *RDBLogStore) Close(ctx context.Context) error {
sqlDB, err := s.db.WithContext(ctx).DB()
if err != nil {
return err
}
return sqlDB.Close()
}
// DeleteLog deletes a log entry from the database by its ID.
func (s *RDBLogStore) DeleteLog(ctx context.Context, id string) error {
if err := s.db.WithContext(ctx).Where("id = ?", id).Delete(&Log{}).Error; err != nil {
return err
}
return nil
}
// DeleteLogs deletes multiple log entries from the database by their IDs.
func (s *RDBLogStore) DeleteLogs(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
if err := s.db.WithContext(ctx).Where("id IN ?", ids).Delete(&Log{}).Error; err != nil {
return err
}
return nil
}
// ============================================================================
// MCP Tool Log Methods
// ============================================================================
// applyMCPFilters applies search filters to a GORM query for MCP tool logs
func (s *RDBLogStore) applyMCPFilters(baseQuery *gorm.DB, filters MCPToolLogSearchFilters) *gorm.DB {
if len(filters.ToolNames) > 0 {
baseQuery = baseQuery.Where("tool_name IN ?", filters.ToolNames)
}
if len(filters.ServerLabels) > 0 {
baseQuery = baseQuery.Where("server_label IN ?", filters.ServerLabels)
}
if len(filters.Status) > 0 {
baseQuery = baseQuery.Where("status IN ?", filters.Status)
}
if len(filters.VirtualKeyIDs) > 0 {
baseQuery = baseQuery.Where("virtual_key_id IN ?", filters.VirtualKeyIDs)
}
if len(filters.LLMRequestIDs) > 0 {
baseQuery = baseQuery.Where("llm_request_id IN ?", filters.LLMRequestIDs)
}
if filters.StartTime != nil {
baseQuery = baseQuery.Where("timestamp >= ?", *filters.StartTime)
}
if filters.EndTime != nil {
baseQuery = baseQuery.Where("timestamp <= ?", *filters.EndTime)
}
if filters.MinLatency != nil {
baseQuery = baseQuery.Where("latency >= ?", *filters.MinLatency)
}
if filters.MaxLatency != nil {
baseQuery = baseQuery.Where("latency <= ?", *filters.MaxLatency)
}
if filters.ContentSearch != "" {
// Search in both arguments and result fields
dialect := s.db.Dialector.Name()
if dialect == "postgres" {
baseQuery = baseQuery.Where("(to_tsvector('simple', arguments) @@ plainto_tsquery('simple', ?) OR to_tsvector('simple', result) @@ plainto_tsquery('simple', ?))", filters.ContentSearch, filters.ContentSearch)
} else {
search := "%" + filters.ContentSearch + "%"
baseQuery = baseQuery.Where("(arguments LIKE ? OR result LIKE ?)", search, search)
}
}
return baseQuery
}
// CreateMCPToolLog inserts a new MCP tool log entry into the database.
func (s *RDBLogStore) CreateMCPToolLog(ctx context.Context, entry *MCPToolLog) error {
return s.db.WithContext(ctx).Create(entry).Error
}
// FindMCPToolLog retrieves a single MCP tool log entry by its ID.
func (s *RDBLogStore) FindMCPToolLog(ctx context.Context, id string) (*MCPToolLog, error) {
var log MCPToolLog
if err := s.db.WithContext(ctx).Where("id = ?", id).First(&log).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
return nil, err
}
return &log, nil
}
// UpdateMCPToolLog updates an MCP tool log entry in the database.
func (s *RDBLogStore) UpdateMCPToolLog(ctx context.Context, id string, entry any) error {
serializedEntry, err := serializeMCPToolLogUpdateEntry(entry)
if err != nil {
return err
}
tx := s.db.WithContext(ctx).Model(&MCPToolLog{}).Where("id = ?", id).Updates(serializedEntry)
if tx.Error != nil {
return tx.Error
}
if tx.RowsAffected == 0 {
return ErrNotFound
}
return nil
}
// serializeMCPToolLogUpdateEntry serializes parsed MCP tool log fields before
// passing the update payload to GORM. Non-MCPToolLog payloads are returned unchanged.
func serializeMCPToolLogUpdateEntry(entry any) (any, error) {
switch v := entry.(type) {
case *MCPToolLog:
if err := v.SerializeFields(); err != nil {
return nil, err
}
return v, nil
case MCPToolLog:
copyEntry := v
if err := copyEntry.SerializeFields(); err != nil {
return nil, err
}
return copyEntry, nil
default:
return entry, nil
}
}
// SearchMCPToolLogs searches for MCP tool logs in the database.
func (s *RDBLogStore) SearchMCPToolLogs(ctx context.Context, filters MCPToolLogSearchFilters, pagination PaginationOptions) (*MCPToolLogSearchResult, error) {
var err error
baseQuery := s.db.WithContext(ctx).Model(&MCPToolLog{})
// Apply filters
baseQuery = s.applyMCPFilters(baseQuery, filters)
// Get total count for pagination
var totalCount int64
if err := baseQuery.Count(&totalCount).Error; err != nil {
return nil, err
}
// Build order clause
direction := "DESC"
if pagination.Order == "asc" {
direction = "ASC"
}
var orderClause string
switch pagination.SortBy {
case "timestamp":
orderClause = "timestamp " + direction
case "latency":
orderClause = "latency " + direction
case "cost":
orderClause = "cost " + direction
default:
orderClause = "timestamp " + direction
}
// Execute main query with sorting and pagination
var logs []MCPToolLog
mainQuery := baseQuery.Order(orderClause)
limit := pagination.Limit
if limit <= 0 || limit > defaultMaxSearchLimit {
limit = defaultMaxSearchLimit
}
pagination.Limit = limit
mainQuery = mainQuery.Limit(limit)
if pagination.Offset > 0 {
mainQuery = mainQuery.Offset(pagination.Offset)
}
if err = mainQuery.Find(&logs).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
pagination.TotalCount = totalCount
return &MCPToolLogSearchResult{
Logs: logs,
Pagination: pagination,
Stats: MCPToolLogStats{
TotalExecutions: totalCount,
},
}, nil
}
return nil, err
}
// Populate virtual key objects for logs that have virtual key information
for i := range logs {
if logs[i].VirtualKeyID != nil && logs[i].VirtualKeyName != nil {
logs[i].VirtualKey = &tables.TableVirtualKey{
ID: *logs[i].VirtualKeyID,
Name: *logs[i].VirtualKeyName,
}
}
}
hasLogs := len(logs) > 0
if !hasLogs {
hasLogs, err = s.HasMCPToolLogs(ctx)
if err != nil {
return nil, err
}
}
pagination.TotalCount = totalCount
return &MCPToolLogSearchResult{
Logs: logs,
Pagination: pagination,
Stats: MCPToolLogStats{
TotalExecutions: totalCount,
},
HasLogs: hasLogs,
}, nil
}
// GetMCPToolLogStats calculates statistics for MCP tool logs matching the given filters.
func (s *RDBLogStore) GetMCPToolLogStats(ctx context.Context, filters MCPToolLogSearchFilters) (*MCPToolLogStats, error) {
baseQuery := s.db.WithContext(ctx).Model(&MCPToolLog{})
baseQuery = s.applyMCPFilters(baseQuery, filters)
// Get total count (includes processing status)
var totalCount int64
if err := baseQuery.Count(&totalCount).Error; err != nil {
return nil, err
}
stats := &MCPToolLogStats{
TotalExecutions: totalCount,
}
if totalCount > 0 {
// Single query for all completed-execution stats
var result struct {
CompletedCount sql.NullInt64 `gorm:"column:completed_count"`
SuccessCount sql.NullInt64 `gorm:"column:success_count"`
AvgLatency sql.NullFloat64 `gorm:"column:avg_latency"`
TotalCost sql.NullFloat64 `gorm:"column:total_cost"`
}
statsQuery := s.db.WithContext(ctx).Model(&MCPToolLog{})
statsQuery = s.applyMCPFilters(statsQuery, filters)
statsQuery = statsQuery.Where("status IN ?", []string{"success", "error"})
if err := statsQuery.Select(`
COUNT(*) as completed_count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_count,
AVG(latency) as avg_latency,
SUM(cost) as total_cost
`).Scan(&result).Error; err != nil {
return nil, err
}
completedCount := result.CompletedCount.Int64
if completedCount > 0 {
stats.SuccessRate = float64(result.SuccessCount.Int64) / float64(completedCount) * 100
if result.AvgLatency.Valid {
stats.AverageLatency = result.AvgLatency.Float64
}
if result.TotalCost.Valid {
stats.TotalCost = result.TotalCost.Float64
}
}
}
return stats, nil
}
// HasMCPToolLogs checks if there are any MCP tool logs in the database.
func (s *RDBLogStore) HasMCPToolLogs(ctx context.Context) (bool, error) {
var log MCPToolLog
err := s.db.WithContext(ctx).Select("id").Limit(1).Take(&log).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}
return false, err
}
return true, nil
}
// DeleteMCPToolLogs deletes multiple MCP tool log entries from the database by their IDs.
func (s *RDBLogStore) DeleteMCPToolLogs(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
if err := s.db.WithContext(ctx).Where("id IN ?", ids).Delete(&MCPToolLog{}).Error; err != nil {
return err
}
return nil
}
// FlushMCPToolLogs deletes old processing MCP tool log entries from the database.
func (s *RDBLogStore) FlushMCPToolLogs(ctx context.Context, since time.Time) error {
result := s.db.WithContext(ctx).Where("status = ? AND created_at < ?", "processing", since).Delete(&MCPToolLog{})
if result.Error != nil {
return fmt.Errorf("failed to cleanup old processing MCP tool logs: %w", result.Error)
}
return nil
}
// GetAvailableToolNames returns all unique tool names from the MCP tool logs.
// Scoped to recent data to avoid full table scans.
func (s *RDBLogStore) GetAvailableToolNames(ctx context.Context) ([]string, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var toolNames []string
result := s.db.WithContext(ctx).Model(&MCPToolLog{}).
Where("tool_name IS NOT NULL AND tool_name != '' AND timestamp >= ?", cutoff).
Distinct("tool_name").Limit(defaultFilterDataLimit).Pluck("tool_name", &toolNames)
if result.Error != nil {
return nil, fmt.Errorf("failed to get available tool names: %w", result.Error)
}
return toolNames, nil
}
// GetAvailableServerLabels returns all unique server labels from the MCP tool logs.
// Scoped to recent data to avoid full table scans.
func (s *RDBLogStore) GetAvailableServerLabels(ctx context.Context) ([]string, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var serverLabels []string
result := s.db.WithContext(ctx).Model(&MCPToolLog{}).
Where("server_label IS NOT NULL AND server_label != '' AND timestamp >= ?", cutoff).
Distinct("server_label").Limit(defaultFilterDataLimit).Pluck("server_label", &serverLabels)
if result.Error != nil {
return nil, fmt.Errorf("failed to get available server labels: %w", result.Error)
}
return serverLabels, nil
}
// GetAvailableMCPVirtualKeys returns all unique virtual key ID-Name pairs from MCP tool logs.
// Scoped to recent data to avoid full table scans.
func (s *RDBLogStore) GetAvailableMCPVirtualKeys(ctx context.Context) ([]MCPToolLog, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays)
var logs []MCPToolLog
result := s.db.WithContext(ctx).
Model(&MCPToolLog{}).
Select("DISTINCT virtual_key_id, virtual_key_name").
Where("virtual_key_id IS NOT NULL AND virtual_key_id != '' AND virtual_key_name IS NOT NULL AND virtual_key_name != '' AND timestamp >= ?", cutoff).
Limit(defaultFilterDataLimit).
Find(&logs)
if result.Error != nil {
return nil, fmt.Errorf("failed to get available virtual keys from MCP logs: %w", result.Error)
}
return logs, nil
}
// GetMCPHistogram returns time-bucketed MCP tool call volume for the given filters.
func (s *RDBLogStore) GetMCPHistogram(ctx context.Context, filters MCPToolLogSearchFilters, bucketSizeSeconds int64) (*MCPHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&MCPToolLog{})
baseQuery = s.applyMCPFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
Count int64 `gorm:"column:count"`
Success int64 `gorm:"column:success"`
Error int64 `gorm:"column:error"`
}
var selectClause string
switch dialect {
case "sqlite":
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
COUNT(*) as count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
COUNT(*) as count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error
`, bucketSizeSeconds, bucketSizeSeconds)
default:
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
COUNT(*) as count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get mcp histogram: %w", err)
}
resultMap := make(map[int64]struct {
Count int64
Success int64
Error int64
})
for _, r := range results {
resultMap[r.BucketTimestamp] = struct {
Count int64
Success int64
Error int64
}{Count: r.Count, Success: r.Success, Error: r.Error}
}
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
if len(allTimestamps) == 0 {
buckets := make([]MCPHistogramBucket, len(results))
for i, r := range results {
buckets[i] = MCPHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
Count: r.Count,
Success: r.Success,
Error: r.Error,
}
}
return &MCPHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds}, nil
}
buckets := make([]MCPHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if data, exists := resultMap[ts]; exists {
buckets[i] = MCPHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
Count: data.Count,
Success: data.Success,
Error: data.Error,
}
} else {
buckets[i] = MCPHistogramBucket{Timestamp: time.Unix(ts, 0).UTC()}
}
}
return &MCPHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds}, nil
}
// GetMCPCostHistogram returns time-bucketed MCP cost data for the given filters.
func (s *RDBLogStore) GetMCPCostHistogram(ctx context.Context, filters MCPToolLogSearchFilters, bucketSizeSeconds int64) (*MCPCostHistogramResult, error) {
if bucketSizeSeconds <= 0 {
bucketSizeSeconds = 3600
}
dialect := s.db.Dialector.Name()
baseQuery := s.db.WithContext(ctx).Model(&MCPToolLog{})
baseQuery = s.applyMCPFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
var results []struct {
BucketTimestamp int64 `gorm:"column:bucket_timestamp"`
TotalCost float64 `gorm:"column:total_cost"`
}
var selectClause string
switch dialect {
case "sqlite":
selectClause = fmt.Sprintf(`
(CAST(strftime('%%s', timestamp) AS INTEGER) / %d) * %d as bucket_timestamp,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
case "mysql":
selectClause = fmt.Sprintf(`
(FLOOR(UNIX_TIMESTAMP(timestamp) / %d) * %d) as bucket_timestamp,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
default:
selectClause = fmt.Sprintf(`
CAST(FLOOR(EXTRACT(EPOCH FROM timestamp) / %d) * %d AS BIGINT) as bucket_timestamp,
COALESCE(SUM(cost), 0) as total_cost
`, bucketSizeSeconds, bucketSizeSeconds)
}
if err := baseQuery.
Select(selectClause).
Group("bucket_timestamp").
Order("bucket_timestamp ASC").
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get mcp cost histogram: %w", err)
}
resultMap := make(map[int64]float64)
for _, r := range results {
resultMap[r.BucketTimestamp] = r.TotalCost
}
allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds)
if len(allTimestamps) == 0 {
buckets := make([]MCPCostHistogramBucket, len(results))
for i, r := range results {
buckets[i] = MCPCostHistogramBucket{
Timestamp: time.Unix(r.BucketTimestamp, 0).UTC(),
TotalCost: r.TotalCost,
}
}
return &MCPCostHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds}, nil
}
buckets := make([]MCPCostHistogramBucket, len(allTimestamps))
for i, ts := range allTimestamps {
if cost, exists := resultMap[ts]; exists {
buckets[i] = MCPCostHistogramBucket{
Timestamp: time.Unix(ts, 0).UTC(),
TotalCost: cost,
}
} else {
buckets[i] = MCPCostHistogramBucket{Timestamp: time.Unix(ts, 0).UTC()}
}
}
return &MCPCostHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds}, nil
}
// GetMCPTopTools returns the top N MCP tools by call count for the given filters.
func (s *RDBLogStore) GetMCPTopTools(ctx context.Context, filters MCPToolLogSearchFilters, limit int) (*MCPTopToolsResult, error) {
if limit <= 0 {
limit = 10
}
baseQuery := s.db.WithContext(ctx).Model(&MCPToolLog{})
baseQuery = s.applyMCPFilters(baseQuery, filters)
baseQuery = baseQuery.Where("status IN ?", []string{"success", "error"})
var results []struct {
ToolName string `gorm:"column:tool_name"`
Count int64 `gorm:"column:count"`
Cost float64 `gorm:"column:cost"`
}
if err := baseQuery.
Select("tool_name, COUNT(*) as count, COALESCE(SUM(cost), 0) as cost").
Group("tool_name").
Order("count DESC").
Limit(limit).
Find(&results).Error; err != nil {
return nil, fmt.Errorf("failed to get mcp top tools: %w", err)
}
tools := make([]MCPTopToolResult, len(results))
for i, r := range results {
tools[i] = MCPTopToolResult{
ToolName: r.ToolName,
Count: r.Count,
Cost: r.Cost,
}
}
return &MCPTopToolsResult{Tools: tools}, nil
}
// CreateAsyncJob creates a new async job record in the database.
func (s *RDBLogStore) CreateAsyncJob(ctx context.Context, job *AsyncJob) error {
return s.db.WithContext(ctx).Create(job).Error
}
// FindAsyncJobByID retrieves an async job by its ID.
func (s *RDBLogStore) FindAsyncJobByID(ctx context.Context, id string) (*AsyncJob, error) {
var job AsyncJob
result := s.db.WithContext(ctx).Where("id = ? AND (expires_at IS NULL OR expires_at > ?)", id, time.Now().UTC()).First(&job)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
return nil, result.Error
}
return &job, nil
}
// UpdateAsyncJob updates an async job record with the provided fields.
func (s *RDBLogStore) UpdateAsyncJob(ctx context.Context, id string, updates map[string]interface{}) error {
return s.db.WithContext(ctx).Model(&AsyncJob{}).Where("id = ?", id).Updates(updates).Error
}
// DeleteExpiredAsyncJobs deletes async jobs whose expires_at has passed.
// Only deletes jobs that have a non-null expires_at (i.e., completed or failed jobs).
// Deletes in batches to avoid long-running transactions that hold row locks.
func (s *RDBLogStore) DeleteExpiredAsyncJobs(ctx context.Context) (int64, error) {
now := time.Now().UTC()
const batchLimit = 100
var totalDeleted int64
for {
result := s.db.WithContext(ctx).
Where("id IN (?)",
s.db.Model(&AsyncJob{}).Select("id").
Where("expires_at IS NOT NULL AND expires_at < ?", now).
Limit(batchLimit),
).Delete(&AsyncJob{})
if result.Error != nil {
return totalDeleted, result.Error
}
totalDeleted += result.RowsAffected
if result.RowsAffected < batchLimit {
break
}
}
return totalDeleted, nil
}
// DeleteStaleAsyncJobs deletes async jobs stuck in "processing" status since before the given time.
// This handles edge cases like marshal failures or server crashes that leave jobs permanently stuck.
func (s *RDBLogStore) DeleteStaleAsyncJobs(ctx context.Context, staleSince time.Time) (int64, error) {
result := s.db.WithContext(ctx).
Where("status = ? AND created_at < ?", "processing", staleSince).
Delete(&AsyncJob{})
return result.RowsAffected, result.Error
}