Files
bifrost/core/providers/openai/large_payload.go
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

157 lines
6.2 KiB
Go

// Package openai provides the OpenAI provider implementation for the Bifrost framework.
package openai
import (
"net/http"
"time"
"github.com/bytedance/sonic"
"github.com/valyala/fasthttp"
providerUtils "github.com/maximhq/bifrost/core/providers/utils"
schemas "github.com/maximhq/bifrost/core/schemas"
)
// largePayloadResult holds the lightweight metadata extracted from a large payload passthrough.
type largePayloadResult struct {
Usage *schemas.BifrostLLMUsage
Latency int64
ResponseBody []byte // non-nil for request types that need the raw upstream response (transcription, speech, etc.)
}
// setStreamingRequestBody sets the request body for streaming handlers.
// In normal mode it uses the marshaled jsonBody. In large payload mode it delegates to
// ApplyLargePayloadRequestBodyWithModelNormalization which streams the original request
// body to upstream with model prefix rewriting.
func setStreamingRequestBody(ctx *schemas.BifrostContext, req *fasthttp.Request, jsonBody []byte, providerName schemas.ModelProvider) {
if !providerUtils.ApplyLargePayloadRequestBodyWithModelNormalization(ctx, req, providerName) {
req.SetBody(jsonBody)
}
}
// handleOpenAILargePayloadPassthrough handles a complete large payload request-response cycle
// for OpenAI-compatible providers. When large payload mode is active, it streams the request
// body to upstream and optionally streams the response back without full materialization.
//
// Returns (result, nil, true) on success, (nil, err, true) on error, or (nil, nil, false) when
// large payload mode is not active and the caller should use the normal path.
func handleOpenAILargePayloadPassthrough(
ctx *schemas.BifrostContext,
client *fasthttp.Client,
url string,
key schemas.Key,
extraHeaders map[string]string,
providerName schemas.ModelProvider,
logger schemas.Logger,
) (*largePayloadResult, *schemas.BifrostError, bool) {
isLargePayload, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadMode).(bool)
if !isLargePayload {
return nil, nil, false
}
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
// resp lifecycle: managed manually when large response streaming is active
providerUtils.SetExtraHeaders(ctx, req, extraHeaders, nil)
req.SetRequestURI(url)
req.Header.SetMethod(http.MethodPost)
if key.Value.GetValue() != "" {
req.Header.Set("Authorization", "Bearer "+key.Value.GetValue())
}
// Rewrite model prefix and stream request body to upstream.
// Sets content-type from context; falls back to JSON if not set.
if !providerUtils.ApplyLargePayloadRequestBodyWithModelNormalization(ctx, req, providerName) {
fasthttp.ReleaseResponse(resp)
return nil, nil, false
}
if len(req.Header.ContentType()) == 0 {
req.Header.SetContentType("application/json")
}
// Choose client: enable response body streaming when threshold is configured
activeClient := providerUtils.PrepareResponseStreaming(ctx, client, resp)
latency, bifrostErr, wait := providerUtils.MakeRequestWithContext(ctx, activeClient, req, resp)
wait()
if bifrostErr != nil {
fasthttp.ReleaseResponse(resp)
return nil, bifrostErr, true
}
// Extract provider response headers early so they're available on error and large-response paths
if headers := providerUtils.ExtractProviderResponseHeaders(resp); headers != nil {
ctx.SetValue(schemas.BifrostContextKeyProviderResponseHeaders, headers)
}
// Error responses are always small — materialize stream body for error parsing
if resp.StatusCode() != fasthttp.StatusOK {
providerUtils.MaterializeStreamErrorBody(ctx, resp)
parsedErr := ParseOpenAIError(resp)
fasthttp.ReleaseResponse(resp)
return nil, parsedErr, true
}
// Delegate response body handling (large detection + resp lifecycle) to finalizeOpenAIResponse
body, result, respErr := finalizeOpenAIResponse(ctx, resp, latency, providerName, logger)
if respErr != nil {
return nil, respErr, true
}
if result != nil {
return result, nil, true
}
// Normal path — extract usage from raw bytes (passthrough doesn't parse structured response)
usage := extractOpenAIUsageFromBytes(body)
return &largePayloadResult{Usage: usage, Latency: latency.Milliseconds(), ResponseBody: body}, nil, true
}
// finalizeOpenAIResponse handles response body processing with optional large response detection.
// Delegates to FinalizeResponseWithLargeDetection for the core branching logic.
// Takes ownership of resp — caller must NOT defer ReleaseResponse and must set respOwned = false
// after this call returns.
//
// Returns:
// - (body, nil, nil) — normal path; body ready for parsing; resp released.
// - (nil, result, nil) — large response detected; context flags set for streaming; resp
// wrapped in reader (released on reader Close).
// - (nil, nil, err) — error; resp released.
func finalizeOpenAIResponse(
ctx *schemas.BifrostContext,
resp *fasthttp.Response,
latency time.Duration,
providerName schemas.ModelProvider,
logger schemas.Logger,
) ([]byte, *largePayloadResult, *schemas.BifrostError) {
body, isLarge, bifrostErr := providerUtils.FinalizeResponseWithLargeDetection(ctx, resp, logger)
if bifrostErr != nil {
fasthttp.ReleaseResponse(resp)
return nil, nil, bifrostErr
}
if isLarge {
// Extract usage from the response preview stored in context by FinalizeResponseWithLargeDetection
preview, _ := ctx.Value(schemas.BifrostContextKeyLargePayloadResponsePreview).(string)
usage := extractOpenAIUsageFromBytes([]byte(preview))
// resp owned by LargeResponseReader in context — don't release
return nil, &largePayloadResult{Usage: usage, Latency: latency.Milliseconds()}, nil
}
// Normal path — body already copied by shared utility, safe to release resp
fasthttp.ReleaseResponse(resp)
return body, nil, nil
}
// extractOpenAIUsageFromBytes extracts usage metadata from OpenAI response bytes using sonic.Get.
// OpenAI responses have "usage" at the top level with prompt_tokens, completion_tokens, total_tokens.
func extractOpenAIUsageFromBytes(data []byte) *schemas.BifrostLLMUsage {
node, err := sonic.Get(data, "usage")
if err != nil {
return nil
}
raw, err := node.Raw()
if err != nil || raw == "" {
return nil
}
return providerUtils.ParseOpenAIUsageFromBytes([]byte(raw))
}