// Package openai provides the OpenAI provider implementation for the Bifrost framework. package openai import ( "net/http" "time" "github.com/bytedance/sonic" "github.com/valyala/fasthttp" providerUtils "github.com/maximhq/bifrost/core/providers/utils" schemas "github.com/maximhq/bifrost/core/schemas" ) // largePayloadResult holds the lightweight metadata extracted from a large payload passthrough. type largePayloadResult struct { Usage *schemas.BifrostLLMUsage Latency int64 ResponseBody []byte // non-nil for request types that need the raw upstream response (transcription, speech, etc.) } // setStreamingRequestBody sets the request body for streaming handlers. // In normal mode it uses the marshaled jsonBody. In large payload mode it delegates to // ApplyLargePayloadRequestBodyWithModelNormalization which streams the original request // body to upstream with model prefix rewriting. func setStreamingRequestBody(ctx *schemas.BifrostContext, req *fasthttp.Request, jsonBody []byte, providerName schemas.ModelProvider) { if !providerUtils.ApplyLargePayloadRequestBodyWithModelNormalization(ctx, req, providerName) { req.SetBody(jsonBody) } } // handleOpenAILargePayloadPassthrough handles a complete large payload request-response cycle // for OpenAI-compatible providers. When large payload mode is active, it streams the request // body to upstream and optionally streams the response back without full materialization. // // Returns (result, nil, true) on success, (nil, err, true) on error, or (nil, nil, false) when // large payload mode is not active and the caller should use the normal path. func handleOpenAILargePayloadPassthrough( ctx *schemas.BifrostContext, client *fasthttp.Client, url string, key schemas.Key, extraHeaders map[string]string, providerName schemas.ModelProvider, logger schemas.Logger, ) (*largePayloadResult, *schemas.BifrostError, bool) { isLargePayload, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadMode).(bool) if !isLargePayload { return nil, nil, false } req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) // resp lifecycle: managed manually when large response streaming is active providerUtils.SetExtraHeaders(ctx, req, extraHeaders, nil) req.SetRequestURI(url) req.Header.SetMethod(http.MethodPost) if key.Value.GetValue() != "" { req.Header.Set("Authorization", "Bearer "+key.Value.GetValue()) } // Rewrite model prefix and stream request body to upstream. // Sets content-type from context; falls back to JSON if not set. if !providerUtils.ApplyLargePayloadRequestBodyWithModelNormalization(ctx, req, providerName) { fasthttp.ReleaseResponse(resp) return nil, nil, false } if len(req.Header.ContentType()) == 0 { req.Header.SetContentType("application/json") } // Choose client: enable response body streaming when threshold is configured activeClient := providerUtils.PrepareResponseStreaming(ctx, client, resp) latency, bifrostErr, wait := providerUtils.MakeRequestWithContext(ctx, activeClient, req, resp) wait() if bifrostErr != nil { fasthttp.ReleaseResponse(resp) return nil, bifrostErr, true } // Extract provider response headers early so they're available on error and large-response paths if headers := providerUtils.ExtractProviderResponseHeaders(resp); headers != nil { ctx.SetValue(schemas.BifrostContextKeyProviderResponseHeaders, headers) } // Error responses are always small — materialize stream body for error parsing if resp.StatusCode() != fasthttp.StatusOK { providerUtils.MaterializeStreamErrorBody(ctx, resp) parsedErr := ParseOpenAIError(resp) fasthttp.ReleaseResponse(resp) return nil, parsedErr, true } // Delegate response body handling (large detection + resp lifecycle) to finalizeOpenAIResponse body, result, respErr := finalizeOpenAIResponse(ctx, resp, latency, providerName, logger) if respErr != nil { return nil, respErr, true } if result != nil { return result, nil, true } // Normal path — extract usage from raw bytes (passthrough doesn't parse structured response) usage := extractOpenAIUsageFromBytes(body) return &largePayloadResult{Usage: usage, Latency: latency.Milliseconds(), ResponseBody: body}, nil, true } // finalizeOpenAIResponse handles response body processing with optional large response detection. // Delegates to FinalizeResponseWithLargeDetection for the core branching logic. // Takes ownership of resp — caller must NOT defer ReleaseResponse and must set respOwned = false // after this call returns. // // Returns: // - (body, nil, nil) — normal path; body ready for parsing; resp released. // - (nil, result, nil) — large response detected; context flags set for streaming; resp // wrapped in reader (released on reader Close). // - (nil, nil, err) — error; resp released. func finalizeOpenAIResponse( ctx *schemas.BifrostContext, resp *fasthttp.Response, latency time.Duration, providerName schemas.ModelProvider, logger schemas.Logger, ) ([]byte, *largePayloadResult, *schemas.BifrostError) { body, isLarge, bifrostErr := providerUtils.FinalizeResponseWithLargeDetection(ctx, resp, logger) if bifrostErr != nil { fasthttp.ReleaseResponse(resp) return nil, nil, bifrostErr } if isLarge { // Extract usage from the response preview stored in context by FinalizeResponseWithLargeDetection preview, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadResponsePreview).(string) usage := extractOpenAIUsageFromBytes([]byte(preview)) // resp owned by LargeResponseReader in context — don't release return nil, &largePayloadResult{Usage: usage, Latency: latency.Milliseconds()}, nil } // Normal path — body already copied by shared utility, safe to release resp fasthttp.ReleaseResponse(resp) return body, nil, nil } // extractOpenAIUsageFromBytes extracts usage metadata from OpenAI response bytes using sonic.Get. // OpenAI responses have "usage" at the top level with prompt_tokens, completion_tokens, total_tokens. func extractOpenAIUsageFromBytes(data []byte) *schemas.BifrostLLMUsage { node, err := sonic.Get(data, "usage") if err != nil { return nil } raw, err := node.Raw() if err != nil || raw == "" { return nil } return providerUtils.ParseOpenAIUsageFromBytes([]byte(raw)) }