Files
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

2863 lines
127 KiB
Go

// Package integrations provides a generic router framework for handling different LLM provider APIs.
//
// CENTRALIZED STREAMING ARCHITECTURE:
//
// This package implements a centralized streaming approach where all stream handling logic
// is consolidated in the GenericRouter, eliminating the need for provider-specific StreamHandler
// implementations. The key components are:
//
// 1. StreamConfig: Defines streaming configuration for each route, including:
// - ResponseConverter: Converts BifrostResponse to provider-specific streaming format
// - ErrorConverter: Converts BifrostError to provider-specific streaming error format
//
// 2. Centralized Stream Processing: The GenericRouter handles all streaming logic:
// - SSE header management
// - Stream channel processing
// - Error handling and conversion
// - Response formatting and flushing
// - Stream closure (handled automatically by provider implementation)
//
// 3. Provider-Specific Type Conversion: Integration types.go files only handle type conversion:
// - Derive{Provider}StreamFromBifrostResponse: Convert responses to streaming format
// - Derive{Provider}StreamFromBifrostError: Convert errors to streaming error format
//
// BENEFITS:
// - Eliminates code duplication across provider-specific stream handlers
// - Centralizes streaming logic for consistency and maintainability
// - Separates concerns: routing logic vs type conversion
// - Automatic stream closure management by provider implementations
// - Consistent error handling across all providers
//
// USAGE EXAMPLE:
//
// routes := []RouteConfig{
// {
// Path: "/openai/chat/completions",
// Method: "POST",
// // ... other configs ...
// StreamConfig: &StreamConfig{
// ResponseConverter: func(resp *schemas.BifrostResponse) (interface{}, error) {
// return DeriveOpenAIStreamFromBifrostResponse(resp), nil
// },
// ErrorConverter: func(err *schemas.BifrostError) interface{} {
// return DeriveOpenAIStreamFromBifrostError(err)
// },
// },
// },
// }
package integrations
import (
"bytes"
"context"
"fmt"
"io"
"errors"
"mime"
"mime/multipart"
"strconv"
"strings"
"github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream"
"github.com/bytedance/sonic"
"github.com/fasthttp/router"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/core/providers/bedrock"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/framework/logstore"
"github.com/maximhq/bifrost/transports/bifrost-http/lib"
"github.com/valyala/fasthttp"
)
// ExtensionRouter defines the interface that all integration routers must implement
// to register their routes with the main HTTP router.
type ExtensionRouter interface {
RegisterRoutes(r *router.Router, middlewares ...schemas.BifrostHTTPMiddleware)
}
// StreamingRequest interface for requests that support streaming
type StreamingRequest interface {
IsStreamingRequested() bool
}
// RequestWithSettableExtraParams is implemented by request types that accept
// provider-specific extra parameters via the extra_params JSON key. The
// integration router extracts extra_params from the raw request body and
// passes them through so they propagate to the downstream provider.
type RequestWithSettableExtraParams interface {
SetExtraParams(params map[string]interface{})
}
// BatchRequest wraps a Bifrost batch request with its type information.
type BatchRequest struct {
Type schemas.RequestType
CreateRequest *schemas.BifrostBatchCreateRequest
ListRequest *schemas.BifrostBatchListRequest
RetrieveRequest *schemas.BifrostBatchRetrieveRequest
CancelRequest *schemas.BifrostBatchCancelRequest
DeleteRequest *schemas.BifrostBatchDeleteRequest
ResultsRequest *schemas.BifrostBatchResultsRequest
}
// FileRequest wraps a Bifrost file request with its type information.
type FileRequest struct {
Type schemas.RequestType
UploadRequest *schemas.BifrostFileUploadRequest
ListRequest *schemas.BifrostFileListRequest
RetrieveRequest *schemas.BifrostFileRetrieveRequest
DeleteRequest *schemas.BifrostFileDeleteRequest
ContentRequest *schemas.BifrostFileContentRequest
}
// ContainerRequest wraps a Bifrost container request with its type information.
type ContainerRequest struct {
Type schemas.RequestType
CreateRequest *schemas.BifrostContainerCreateRequest
ListRequest *schemas.BifrostContainerListRequest
RetrieveRequest *schemas.BifrostContainerRetrieveRequest
DeleteRequest *schemas.BifrostContainerDeleteRequest
}
// ContainerFileRequest is a wrapper for Bifrost container file requests.
type ContainerFileRequest struct {
Type schemas.RequestType
CreateRequest *schemas.BifrostContainerFileCreateRequest
ListRequest *schemas.BifrostContainerFileListRequest
RetrieveRequest *schemas.BifrostContainerFileRetrieveRequest
ContentRequest *schemas.BifrostContainerFileContentRequest
DeleteRequest *schemas.BifrostContainerFileDeleteRequest
}
// BatchRequestConverter is a function that converts integration-specific batch requests to Bifrost format.
type BatchRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*BatchRequest, error)
// FileRequestConverter is a function that converts integration-specific file requests to Bifrost format.
type FileRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*FileRequest, error)
// ContainerRequestConverter is a function that converts integration-specific container requests to Bifrost format.
type ContainerRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*ContainerRequest, error)
// ContainerFileRequestConverter is a function that converts integration-specific container file requests to Bifrost format.
type ContainerFileRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*ContainerFileRequest, error)
// RequestConverter is a function that converts integration-specific requests to Bifrost format.
// It takes the parsed request object and returns a BifrostRequest ready for processing.
type RequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*schemas.BifrostRequest, error)
// ListModelsResponseConverter is a function that converts BifrostListModelsResponse to integration-specific format.
// It takes a BifrostListModelsResponse and returns the format expected by the specific integration.
type ListModelsResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostListModelsResponse) (interface{}, error)
// TextResponseConverter is a function that converts BifrostTextCompletionResponse to integration-specific format.
// It takes a BifrostTextCompletionResponse and returns the format expected by the specific integration.
type TextResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTextCompletionResponse) (interface{}, error)
// ChatResponseConverter is a function that converts BifrostChatResponse to integration-specific format.
// It takes a BifrostChatResponse and returns the format expected by the specific integration.
type ChatResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostChatResponse) (interface{}, error)
// AsyncChatResponseConverter is a function that converts an async job response to an integration-specific format.
// It takes an async job response and a method to convert the chat response, and returns the integration-specific format, extra headers, and an error.
type AsyncChatResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.AsyncJobResponse, chatResponseConverter ChatResponseConverter) (interface{}, map[string]string, error)
// ResponsesResponseConverter is a function that converts BifrostResponsesResponse to integration-specific format.
// It takes a BifrostResponsesResponse and returns the format expected by the specific integration.
type ResponsesResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostResponsesResponse) (interface{}, error)
// AsyncResponsesResponseConverter is a function that converts an async job response to an integration-specific format.
// It takes an async job response and a method to convert the responses response, and returns the integration-specific format, extra headers, and an error.
type AsyncResponsesResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.AsyncJobResponse, responsesResponseConverter ResponsesResponseConverter) (interface{}, map[string]string, error)
// EmbeddingResponseConverter is a function that converts BifrostEmbeddingResponse to integration-specific format.
// It takes a BifrostEmbeddingResponse and returns the format expected by the specific integration.
type EmbeddingResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostEmbeddingResponse) (interface{}, error)
// RerankResponseConverter is a function that converts BifrostRerankResponse to integration-specific format.
// It takes a BifrostRerankResponse and returns the format expected by the specific integration.
type RerankResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostRerankResponse) (interface{}, error)
// OCRResponseConverter is a function that converts BifrostOCRResponse to integration-specific format.
// It takes a BifrostOCRResponse and returns the format expected by the specific integration.
type OCRResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostOCRResponse) (interface{}, error)
// SpeechResponseConverter is a function that converts BifrostSpeechResponse to integration-specific format.
// It takes a BifrostSpeechResponse and returns the format expected by the specific integration.
type SpeechResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostSpeechResponse) (interface{}, error)
// TranscriptionResponseConverter is a function that converts BifrostTranscriptionResponse to integration-specific format.
// It takes a BifrostTranscriptionResponse and returns the format expected by the specific integration.
type TranscriptionResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTranscriptionResponse) (interface{}, error)
// BatchCreateResponseConverter is a function that converts BifrostBatchCreateResponse to integration-specific format.
// It takes a BifrostBatchCreateResponse and returns the format expected by the specific integration.
type BatchCreateResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchCreateResponse) (interface{}, error)
// BatchListResponseConverter is a function that converts BifrostBatchListResponse to integration-specific format.
// It takes a BifrostBatchListResponse and returns the format expected by the specific integration.
type BatchListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchListResponse) (interface{}, error)
// BatchRetrieveResponseConverter is a function that converts BifrostBatchRetrieveResponse to integration-specific format.
// It takes a BifrostBatchRetrieveResponse and returns the format expected by the specific integration.
type BatchRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchRetrieveResponse) (interface{}, error)
// BatchCancelResponseConverter is a function that converts BifrostBatchCancelResponse to integration-specific format.
// It takes a BifrostBatchCancelResponse and returns the format expected by the specific integration.
type BatchCancelResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchCancelResponse) (interface{}, error)
// BatchResultsResponseConverter is a function that converts BifrostBatchResultsResponse to integration-specific format.
// It takes a BifrostBatchResultsResponse and returns the format expected by the specific integration.
type BatchResultsResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchResultsResponse) (interface{}, error)
// BatchDeleteResponseConverter is a function that converts BifrostBatchDeleteResponse to integration-specific format.
// It takes a BifrostBatchDeleteResponse and returns the format expected by the specific integration.
type BatchDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchDeleteResponse) (interface{}, error)
// FileUploadResponseConverter is a function that converts BifrostFileUploadResponse to integration-specific format.
// It takes a BifrostFileUploadResponse and returns the format expected by the specific integration.
type FileUploadResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileUploadResponse) (interface{}, error)
// FileListResponseConverter is a function that converts BifrostFileListResponse to integration-specific format.
// It takes a BifrostFileListResponse and returns the format expected by the specific integration.
type FileListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileListResponse) (interface{}, error)
// FileRetrieveResponseConverter is a function that converts BifrostFileRetrieveResponse to integration-specific format.
// It takes a BifrostFileRetrieveResponse and returns the format expected by the specific integration.
type FileRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileRetrieveResponse) (interface{}, error)
// FileDeleteResponseConverter is a function that converts BifrostFileDeleteResponse to integration-specific format.
// It takes a BifrostFileDeleteResponse and returns the format expected by the specific integration.
type FileDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileDeleteResponse) (interface{}, error)
// FileContentResponseConverter is a function that converts BifrostFileContentResponse to integration-specific format.
// It takes a BifrostFileContentResponse and returns the format expected by the specific integration.
// Note: This may return binary data or a wrapper object depending on the integration.
type FileContentResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileContentResponse) (interface{}, error)
// ContainerCreateResponseConverter is a function that converts BifrostContainerCreateResponse to integration-specific format.
// It takes a BifrostContainerCreateResponse and returns the format expected by the specific integration.
type ContainerCreateResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerCreateResponse) (interface{}, error)
// ContainerListResponseConverter is a function that converts BifrostContainerListResponse to integration-specific format.
// It takes a BifrostContainerListResponse and returns the format expected by the specific integration.
type ContainerListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerListResponse) (interface{}, error)
// ContainerRetrieveResponseConverter is a function that converts BifrostContainerRetrieveResponse to integration-specific format.
// It takes a BifrostContainerRetrieveResponse and returns the format expected by the specific integration.
type ContainerRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerRetrieveResponse) (interface{}, error)
// ContainerDeleteResponseConverter is a function that converts BifrostContainerDeleteResponse to integration-specific format.
// It takes a BifrostContainerDeleteResponse and returns the format expected by the specific integration.
type ContainerDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerDeleteResponse) (interface{}, error)
// ContainerFileCreateResponseConverter is a function that converts BifrostContainerFileCreateResponse to integration-specific format.
// It takes a BifrostContainerFileCreateResponse and returns the format expected by the specific integration.
type ContainerFileCreateResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileCreateResponse) (interface{}, error)
// ContainerFileListResponseConverter is a function that converts BifrostContainerFileListResponse to integration-specific format.
// It takes a BifrostContainerFileListResponse and returns the format expected by the specific integration.
type ContainerFileListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileListResponse) (interface{}, error)
// ContainerFileRetrieveResponseConverter is a function that converts BifrostContainerFileRetrieveResponse to integration-specific format.
// It takes a BifrostContainerFileRetrieveResponse and returns the format expected by the specific integration.
type ContainerFileRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileRetrieveResponse) (interface{}, error)
// ContainerFileContentResponseConverter is a function that converts BifrostContainerFileContentResponse to integration-specific format.
// It takes a BifrostContainerFileContentResponse and returns the format expected by the specific integration.
type ContainerFileContentResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileContentResponse) (interface{}, error)
// ContainerFileDeleteResponseConverter is a function that converts BifrostContainerFileDeleteResponse to integration-specific format.
// It takes a BifrostContainerFileDeleteResponse and returns the format expected by the specific integration.
type ContainerFileDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileDeleteResponse) (interface{}, error)
// CountTokensResponseConverter is a function that converts BifrostCountTokensResponse to integration-specific format.
// It takes a BifrostCountTokensResponse and returns the format expected by the specific integration.
type CountTokensResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostCountTokensResponse) (interface{}, error)
// TextStreamResponseConverter is a function that converts BifrostTextCompletionResponse to integration-specific streaming format.
// It takes a BifrostTextCompletionResponse and returns the event type and the streaming format expected by the specific integration.
type TextStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTextCompletionResponse) (string, interface{}, error)
// ChatStreamResponseConverter is a function that converts BifrostChatResponse to integration-specific streaming format.
// It takes a BifrostChatResponse and returns the event type and the streaming format expected by the specific integration.
type ChatStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostChatResponse) (string, interface{}, error)
// ResponsesStreamResponseConverter is a function that converts BifrostResponsesStreamResponse to integration-specific streaming format.
// It takes a BifrostResponsesStreamResponse and returns a single event type and payload, which can itself encode one or more SSE events if needed by the integration.
type ResponsesStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostResponsesStreamResponse) (string, interface{}, error)
// SpeechStreamResponseConverter is a function that converts BifrostSpeechStreamResponse to integration-specific streaming format.
// It takes a BifrostSpeechStreamResponse and returns the event type and the streaming format expected by the specific integration.
type SpeechStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostSpeechStreamResponse) (string, interface{}, error)
// TranscriptionStreamResponseConverter is a function that converts BifrostTranscriptionStreamResponse to integration-specific streaming format.
// It takes a BifrostTranscriptionStreamResponse and returns the event type and the streaming format expected by the specific integration.
type TranscriptionStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTranscriptionStreamResponse) (string, interface{}, error)
// ImageGenerationResponseConverter is a function that converts BifrostImageGenerationResponse to integration-specific format.
// It takes a BifrostImageGenerationResponse and returns the format expected by the specific integration.
type ImageGenerationResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostImageGenerationResponse) (interface{}, error)
// ImageGenerationStreamResponseConverter is a function that converts BifrostImageGenerationStreamResponse to integration-specific streaming format.
// It takes a BifrostImageGenerationStreamResponse and returns the event type and the streaming format expected by the specific integration.
type ImageGenerationStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostImageGenerationStreamResponse) (string, interface{}, error)
// ImageEditResponseConverter is a function that converts BifrostImageGenerationResponse to integration-specific format.
// It takes a BifrostImageGenerationResponse and returns the format expected by the specific integration.
type ImageEditResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostImageGenerationResponse) (interface{}, error)
// VideoGenerationResponseConverter is a function that converts BifrostVideoGenerationResponse to integration-specific format.
// It takes a BifrostVideoGenerationResponse and returns the format expected by the specific integration.
type VideoGenerationResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoGenerationResponse) (interface{}, error)
// VideoDownloadResponseConverter is a function that converts BifrostVideoDownloadResponse to integration-specific format.
// It takes a BifrostVideoDownloadResponse and returns the format expected by the specific integration.
type VideoDownloadResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoDownloadResponse) (interface{}, error)
// VideoRetrieveAsDownloadConverter is a function that converts BifrostVideoGenerationResponse to integration-specific format.
// It takes a BifrostVideoGenerationResponse and returns the format expected by the specific integration.
type VideoRetrieveAsDownloadConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoGenerationResponse) (interface{}, error)
// VideoDeleteResponseConverter is a function that converts BifrostVideoDeleteResponse to integration-specific format.
// It takes a BifrostVideoDeleteResponse and returns the format expected by the specific integration.
type VideoDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoDeleteResponse) (interface{}, error)
// VideoListResponseConverter is a function that converts BifrostVideoListResponse to integration-specific format.
// It takes a BifrostVideoListResponse and returns the format expected by the specific integration.
type VideoListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoListResponse) (interface{}, error)
// ErrorConverter is a function that converts BifrostError to integration-specific format.
// It takes a BifrostError and returns the format expected by the specific integration.
type ErrorConverter func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{}
// StreamErrorConverter is a function that converts BifrostError to integration-specific streaming error format.
// It takes a BifrostError and returns the streaming error format expected by the specific integration.
type StreamErrorConverter func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{}
// RequestParser is a function that handles custom request body parsing.
// It replaces the default JSON parsing when configured (e.g., for multipart/form-data).
// The parser should populate the provided request object from the fasthttp context.
// If it returns an error, the request processing stops.
type RequestParser func(ctx *fasthttp.RequestCtx, req interface{}) error
// PreRequestCallback is called after parsing the request but before processing through Bifrost.
// It can be used to modify the request object (e.g., extract model from URL parameters)
// or perform validation. If it returns an error, the request processing stops.
// It can also modify the bifrost context based on the request context before it is given to Bifrost.
type PreRequestCallback func(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error
// PostRequestCallback is called after processing the request but before sending the response.
// It can be used to modify the response or perform additional logging/metrics.
// If it returns an error, an error response is sent instead of the success response.
type PostRequestCallback func(ctx *fasthttp.RequestCtx, req interface{}, resp interface{}) error
// HTTPRequestTypeGetter is a function type that accepts only a *fasthttp.RequestCtx and
// returns a schemas.RequestType indicating the HTTP request type derived from the context.
type HTTPRequestTypeGetter func(ctx *fasthttp.RequestCtx) schemas.RequestType
// ShortCircuit is a function that determines if the request should be short-circuited.
type ShortCircuit func(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) (bool, error)
// StreamConfig defines streaming-specific configuration for an integration
//
// SSE FORMAT BEHAVIOR:
//
// The ResponseConverter and ErrorConverter functions in StreamConfig can return either:
//
// 1. OBJECTS (interface{} that's not a string):
// - Will be JSON marshaled and sent as standard SSE: data: {json}\n\n
// - Use this for most providers (OpenAI, Google, etc.)
// - Example: return map[string]interface{}{"delta": {"content": "hello"}}
// - Result: data: {"delta":{"content":"hello"}}\n\n
//
// 2. STRINGS:
// - Will be sent directly as-is without any modification
// - Use this for providers requiring custom SSE event types (Anthropic, etc.)
// - Example: return "event: content_block_delta\ndata: {\"type\":\"text\"}\n\n"
// - Result: event: content_block_delta
// data: {"type":"text"}
//
// Choose the appropriate return type based on your provider's SSE specification.
type StreamConfig struct {
TextStreamResponseConverter TextStreamResponseConverter // Function to convert BifrostTextCompletionResponse to streaming format
ChatStreamResponseConverter ChatStreamResponseConverter // Function to convert BifrostChatResponse to streaming format
ResponsesStreamResponseConverter ResponsesStreamResponseConverter // Function to convert BifrostResponsesResponse to streaming format
SpeechStreamResponseConverter SpeechStreamResponseConverter // Function to convert BifrostSpeechResponse to streaming format
TranscriptionStreamResponseConverter TranscriptionStreamResponseConverter // Function to convert BifrostTranscriptionResponse to streaming format
ImageGenerationStreamResponseConverter ImageGenerationStreamResponseConverter // Function to convert BifrostImageGenerationStreamResponse to streaming format
ErrorConverter StreamErrorConverter // Function to convert BifrostError to streaming error format
}
type RouteConfigType string
const (
RouteConfigTypeOpenAI RouteConfigType = "openai"
RouteConfigTypeAnthropic RouteConfigType = "anthropic"
RouteConfigTypeGenAI RouteConfigType = "genai"
RouteConfigTypeBedrock RouteConfigType = "bedrock"
RouteConfigTypeCohere RouteConfigType = "cohere"
)
// RouteConfig defines the configuration for a single route in an integration.
// It specifies the path, method, and handlers for request/response conversion.
type RouteConfig struct {
Type RouteConfigType // Type of the route
Path string // HTTP path pattern (e.g., "/openai/v1/chat/completions")
Method string // HTTP method (POST, GET, PUT, DELETE)
GetHTTPRequestType HTTPRequestTypeGetter // Function to get the HTTP request type from the context (SHOULD NOT BE NIL)
GetRequestTypeInstance func(ctx context.Context) interface{} // Factory function to create request instance (SHOULD NOT BE NIL)
RequestParser RequestParser // Optional: custom request parsing (e.g., multipart/form-data)
RequestConverter RequestConverter // Function to convert request to BifrostRequest (for inference requests)
BatchRequestConverter BatchRequestConverter // Function to convert request to BatchRequest (for batch operations)
FileRequestConverter FileRequestConverter // Function to convert request to FileRequest (for file operations)
ContainerRequestConverter ContainerRequestConverter // Function to convert request to ContainerRequest (for container operations)
ContainerFileRequestConverter ContainerFileRequestConverter // Function to convert request to ContainerFileRequest (for container file operations)
ListModelsResponseConverter ListModelsResponseConverter // Function to convert BifrostListModelsResponse to integration format (SHOULD NOT BE NIL)
TextResponseConverter TextResponseConverter // Function to convert BifrostTextCompletionResponse to integration format (SHOULD NOT BE NIL)
ChatResponseConverter ChatResponseConverter // Function to convert BifrostChatResponse to integration format (SHOULD NOT BE NIL)
AsyncChatResponseConverter AsyncChatResponseConverter // Function to convert AsyncJobResponse to integration format (SHOULD NOT BE NIL)
ResponsesResponseConverter ResponsesResponseConverter // Function to convert BifrostResponsesResponse to integration format (SHOULD NOT BE NIL)
AsyncResponsesResponseConverter AsyncResponsesResponseConverter // Function to convert AsyncJobResponse to integration format (SHOULD NOT BE NIL)
EmbeddingResponseConverter EmbeddingResponseConverter // Function to convert BifrostEmbeddingResponse to integration format (SHOULD NOT BE NIL)
RerankResponseConverter RerankResponseConverter // Function to convert BifrostRerankResponse to integration format
OCRResponseConverter OCRResponseConverter // Function to convert BifrostOCRResponse to integration format
SpeechResponseConverter SpeechResponseConverter // Function to convert BifrostSpeechResponse to integration format (SHOULD NOT BE NIL)
TranscriptionResponseConverter TranscriptionResponseConverter // Function to convert BifrostTranscriptionResponse to integration format (SHOULD NOT BE NIL)
ImageGenerationResponseConverter ImageGenerationResponseConverter // Function to convert BifrostImageGenerationResponse to integration format (SHOULD NOT BE NIL)
VideoGenerationResponseConverter VideoGenerationResponseConverter // Function to convert BifrostVideoGenerationResponse to integration format (SHOULD NOT BE NIL)
VideoDownloadResponseConverter VideoDownloadResponseConverter // Function to convert BifrostVideoDownloadResponse to integration format (SHOULD NOT BE NIL)
VideoDeleteResponseConverter VideoDeleteResponseConverter // Function to convert BifrostVideoDeleteResponse to integration format (SHOULD NOT BE NIL)
VideoListResponseConverter VideoListResponseConverter // Function to convert BifrostVideoListResponse to integration format (SHOULD NOT BE NIL)
BatchCreateResponseConverter BatchCreateResponseConverter // Function to convert BifrostBatchCreateResponse to integration format
BatchListResponseConverter BatchListResponseConverter // Function to convert BifrostBatchListResponse to integration format
BatchRetrieveResponseConverter BatchRetrieveResponseConverter // Function to convert BifrostBatchRetrieveResponse to integration format
BatchCancelResponseConverter BatchCancelResponseConverter // Function to convert BifrostBatchCancelResponse to integration format
BatchDeleteResponseConverter BatchDeleteResponseConverter // Function to convert BifrostBatchDeleteResponse to integration format
BatchResultsResponseConverter BatchResultsResponseConverter // Function to convert BifrostBatchResultsResponse to integration format
FileUploadResponseConverter FileUploadResponseConverter // Function to convert BifrostFileUploadResponse to integration format
FileListResponseConverter FileListResponseConverter // Function to convert BifrostFileListResponse to integration format
FileRetrieveResponseConverter FileRetrieveResponseConverter // Function to convert BifrostFileRetrieveResponse to integration format
FileDeleteResponseConverter FileDeleteResponseConverter // Function to convert BifrostFileDeleteResponse to integration format
FileContentResponseConverter FileContentResponseConverter // Function to convert BifrostFileContentResponse to integration format
ContainerCreateResponseConverter ContainerCreateResponseConverter // Function to convert BifrostContainerCreateResponse to integration format
ContainerListResponseConverter ContainerListResponseConverter // Function to convert BifrostContainerListResponse to integration format
ContainerRetrieveResponseConverter ContainerRetrieveResponseConverter // Function to convert BifrostContainerRetrieveResponse to integration format
ContainerDeleteResponseConverter ContainerDeleteResponseConverter // Function to convert BifrostContainerDeleteResponse to integration format
ContainerFileCreateResponseConverter ContainerFileCreateResponseConverter // Function to convert BifrostContainerFileCreateResponse to integration format
ContainerFileListResponseConverter ContainerFileListResponseConverter // Function to convert BifrostContainerFileListResponse to integration format
ContainerFileRetrieveResponseConverter ContainerFileRetrieveResponseConverter // Function to convert BifrostContainerFileRetrieveResponse to integration format
ContainerFileContentResponseConverter ContainerFileContentResponseConverter // Function to convert BifrostContainerFileContentResponse to integration format
ContainerFileDeleteResponseConverter ContainerFileDeleteResponseConverter // Function to convert BifrostContainerFileDeleteResponse to integration format
CountTokensResponseConverter CountTokensResponseConverter // Function to convert BifrostCountTokensResponse to integration format
ErrorConverter ErrorConverter // Function to convert BifrostError to integration format (SHOULD NOT BE NIL)
StreamConfig *StreamConfig // Optional: Streaming configuration (if nil, streaming not supported)
PreCallback PreRequestCallback // Optional: called after parsing but before Bifrost processing
PostCallback PostRequestCallback // Optional: called after request processing
ShortCircuit ShortCircuit
}
type PassthroughConfig struct {
Provider schemas.ModelProvider // which provider's key pool to draw from
ProviderDetector func(ctx *fasthttp.RequestCtx, model string) schemas.ModelProvider // optional: dynamic provider detection
StripPrefix []string // e.g. "/openai" — stripped before forwarding
}
// LargePayloadHook is called before body parsing to detect and set up large payload streaming.
// If it returns skipBodyParse=true, the router skips JSON parsing of the request body.
// The hook is responsible for setting all relevant context keys (BifrostContextKeyLargePayloadMode,
// BifrostContextKeyLargePayloadReader, BifrostContextKeyLargePayloadContentLength,
// BifrostContextKeyLargePayloadMetadata) when activating large payload mode.
type LargePayloadHook func(
ctx *fasthttp.RequestCtx,
bifrostCtx *schemas.BifrostContext,
routeType RouteConfigType,
) (skipBodyParse bool, err error)
// LargeResponseHook is called before streaming a large response body to the client.
// Enterprise uses this to wrap the response reader with Phase B scanning (e.g., usage extraction
// from the full response stream when usage is beyond the Phase A prefetch window).
// The hook receives the bifrost context with BifrostContextKeyLargeResponseReader already set
// and may replace the reader on context with a wrapped version.
type LargeResponseHook func(
ctx *fasthttp.RequestCtx,
bifrostCtx *schemas.BifrostContext,
)
// GenericRouter provides a reusable router implementation for all integrations.
// It handles the common flow of: parse request → convert to Bifrost → execute → convert response.
// Integration-specific logic is handled through the RouteConfig callbacks and converters.
type GenericRouter struct {
client *bifrost.Bifrost // Bifrost client for executing requests
handlerStore lib.HandlerStore // Config provider for the router
routes []RouteConfig // List of route configurations
passthroughCfg *PassthroughConfig
logger schemas.Logger // Logger for the router
largePayloadHook LargePayloadHook // Optional: enterprise hook for large payload detection
largeResponseHook LargeResponseHook // Optional: enterprise hook for large response scanning
}
// SetLargePayloadHook sets the hook for large payload detection and streaming.
// This is used by enterprise to inject large payload optimization without
// embedding the logic in the OSS router.
func (g *GenericRouter) SetLargePayloadHook(hook LargePayloadHook) {
g.largePayloadHook = hook
}
// SetLargeResponseHook sets the hook for large response scanning.
// Enterprise uses this to inject Phase B usage extraction into the response stream
// without embedding scanning logic in the OSS router.
func (g *GenericRouter) SetLargeResponseHook(hook LargeResponseHook) {
g.largeResponseHook = hook
}
// NewGenericRouter creates a new generic router with the given bifrost client and route configurations.
// Each integration should create their own routes and pass them to this constructor.
func NewGenericRouter(client *bifrost.Bifrost, handlerStore lib.HandlerStore, routes []RouteConfig, passthroughCfg *PassthroughConfig, logger schemas.Logger) *GenericRouter {
return &GenericRouter{
client: client,
handlerStore: handlerStore,
routes: routes,
passthroughCfg: passthroughCfg,
logger: logger,
}
}
// RegisterRoutes registers all configured routes on the given fasthttp router.
// This method implements the ExtensionRouter interface.
func (g *GenericRouter) RegisterRoutes(r *router.Router, middlewares ...schemas.BifrostHTTPMiddleware) {
for _, route := range g.routes {
// Validate route configuration at startup to fail fast
method := strings.ToUpper(route.Method)
if route.GetRequestTypeInstance == nil {
g.logger.Warn("route configuration is invalid: GetRequestTypeInstance cannot be nil for route " + route.Path)
continue
}
// Test that GetRequestTypeInstance returns a valid instance
if testInstance := route.GetRequestTypeInstance(context.Background()); testInstance == nil {
g.logger.Warn("route configuration is invalid: GetRequestTypeInstance returned nil for route " + route.Path)
continue
}
// Determine route type: inference, batch, file, container, or container file
isBatchRoute := route.BatchRequestConverter != nil
isFileRoute := route.FileRequestConverter != nil
isContainerRoute := route.ContainerRequestConverter != nil
isContainerFileRoute := route.ContainerFileRequestConverter != nil
isInferenceRoute := !isBatchRoute && !isFileRoute && !isContainerRoute && !isContainerFileRoute
// For inference routes, require RequestConverter
if isInferenceRoute && route.RequestConverter == nil {
g.logger.Warn("route configuration is invalid: RequestConverter cannot be nil for inference route " + route.Path)
continue
}
if route.ErrorConverter == nil {
g.logger.Warn("route configuration is invalid: ErrorConverter cannot be nil for route " + route.Path)
continue
}
registerRequestTypeMiddleware := func(next fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) {
if route.GetHTTPRequestType != nil {
ctx.SetUserValue(schemas.BifrostContextKeyHTTPRequestType, route.GetHTTPRequestType(ctx))
}
next(ctx)
}
}
// Create a fresh middlewares list for this route (don't mutate the original)
// This ensures each route only has its own middleware plus the originally passed middlewares
routeMiddlewares := append([]schemas.BifrostHTTPMiddleware{registerRequestTypeMiddleware}, middlewares...)
handler := g.createHandler(route)
switch method {
case fasthttp.MethodPost:
r.POST(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...))
case fasthttp.MethodGet:
r.GET(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...))
case fasthttp.MethodPut:
r.PUT(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...))
case fasthttp.MethodDelete:
r.DELETE(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...))
case fasthttp.MethodHead:
r.HEAD(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...))
default:
r.POST(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...)) // Default to POST
}
}
if g.passthroughCfg != nil {
catchAll := lib.ChainMiddlewares(g.handlePassthrough, middlewares...)
// Register for all methods that need forwarding
for _, method := range []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodPut, fasthttp.MethodDelete, fasthttp.MethodPatch, fasthttp.MethodHead} {
for _, prefix := range g.passthroughCfg.StripPrefix {
r.Handle(method, prefix+"/{path:*}", catchAll)
}
}
}
}
// createHandler creates a fasthttp handler for the given route configuration.
// The handler follows this flow:
// 1. Parse JSON request body into the configured request type (for methods that expect bodies)
// 2. Execute pre-callback (if configured) for request modification/validation
// 3. Convert request to BifrostRequest using the configured converter
// 4. Execute the request through Bifrost (streaming or non-streaming)
// 5. Execute post-callback (if configured) for response modification
// 6. Convert and send the response using the configured response converter
func (g *GenericRouter) createHandler(config RouteConfig) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) {
method := string(ctx.Method())
// Parse request body into the integration-specific request type
// Note: config validation is performed at startup in RegisterRoutes
req := config.GetRequestTypeInstance(ctx)
var rawBody []byte
// Execute the request through Bifrost
bifrostCtx, cancel := lib.ConvertToBifrostContext(ctx, g.handlerStore.ShouldAllowDirectKeys(), g.handlerStore.GetHeaderMatcher(), g.handlerStore.GetMCPHeaderCombinedAllowlist())
// Set integration type to context
bifrostCtx.SetValue(schemas.BifrostContextKeyIntegrationType, string(config.Type))
// Set available providers to context
availableProviders := g.handlerStore.GetAvailableProviders()
bifrostCtx.SetValue(schemas.BifrostContextKeyAvailableProviders, availableProviders)
// Async retrieve: check x-bf-async-id header early (before body parsing)
if asyncID := string(ctx.Request.Header.Peek(schemas.AsyncHeaderGetID)); asyncID != "" {
defer cancel()
g.handleAsyncRetrieve(ctx, config, bifrostCtx)
return
}
// Parse request body based on configuration
if method != fasthttp.MethodGet && method != fasthttp.MethodHead {
// Hook executes before JSON parsing so large requests can remain streaming.
isLargePayload := false
if g.largePayloadHook != nil {
var err error
isLargePayload, err = g.largePayloadHook(ctx, bifrostCtx, config.Type)
if err != nil {
cancel()
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "large payload detection failed"))
return
}
}
if isLargePayload {
// Large payload mode: body streams directly to provider via
// BifrostContextKeyLargePayloadReader. Skip all body parsing
// (JSON and multipart) — metadata was already extracted by the hook.
} else if config.RequestParser != nil {
// Use custom parser (e.g., for multipart/form-data)
if err := config.RequestParser(ctx, req); err != nil {
cancel()
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to parse request"))
return
}
} else {
// Use default JSON parsing
rawBody = ctx.Request.Body()
if len(rawBody) > 0 {
if err := sonic.Unmarshal(rawBody, req); err != nil {
cancel()
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "Invalid JSON"))
return
}
}
}
// Extract the "extra_params" JSON key when passthrough is
// explicitly enabled via x-bf-passthrough-extra-params: true.
// Provider-specific fields (e.g. Bedrock guardrailConfig)
// must be nested under "extra_params" in the request body.
// Runs after both RequestParser and default JSON paths.
if !isLargePayload && bifrostCtx.Value(schemas.BifrostContextKeyPassthroughExtraParams) == true {
if rws, ok := req.(RequestWithSettableExtraParams); ok {
if rawBody == nil {
rawBody = ctx.Request.Body()
}
if len(rawBody) > 0 {
var wrapper struct {
ExtraParams map[string]interface{} `json:"extra_params"`
}
if err := sonic.Unmarshal(rawBody, &wrapper); err == nil && len(wrapper.ExtraParams) > 0 {
rws.SetExtraParams(wrapper.ExtraParams)
}
}
}
}
}
// Execute pre-request callback if configured
// This is typically used for extracting data from URL parameters
// or performing request validation after parsing
if config.PreCallback != nil {
if err := config.PreCallback(ctx, bifrostCtx, req); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute pre-request callback: "+err.Error()))
return
}
}
// Execute short-circuit handler if configured.
// If it returns handled=true the callback has already written a response
// to ctx and we return immediately, bypassing the Bifrost flow entirely.
if config.ShortCircuit != nil {
handled, err := config.ShortCircuit(ctx, bifrostCtx, req)
if err != nil {
defer cancel()
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "short-circuit handler error: "+err.Error()))
return
}
if handled {
defer cancel()
return
}
}
// Set direct key from context if available
if ctx.UserValue(string(schemas.BifrostContextKeyDirectKey)) != nil {
key, ok := ctx.UserValue(string(schemas.BifrostContextKeyDirectKey)).(schemas.Key)
if ok {
bifrostCtx.SetValue(schemas.BifrostContextKeyDirectKey, key)
}
}
// Handle batch requests if BatchRequestConverter is set
// GenAI has two cases: (1) Dedicated batch routes (list/retrieve) have only BatchRequestConverter — always use batch path.
// (2) The models path has both BatchRequestConverter and RequestConverter — use batch path only for batch create.
isGenAIBatchCreate := config.Type == RouteConfigTypeGenAI && bifrostCtx.Value(isGeminiBatchCreateRequestContextKey) != nil
useBatchPath := config.BatchRequestConverter != nil && (config.RequestConverter == nil || config.Type != RouteConfigTypeGenAI || isGenAIBatchCreate)
if useBatchPath {
defer cancel()
batchReq, err := config.BatchRequestConverter(bifrostCtx, req)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert batch request"))
return
}
if batchReq == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch request"))
return
}
g.handleBatchRequest(ctx, config, req, batchReq, bifrostCtx)
return
}
// Handle file requests if FileRequestConverter is set
if config.FileRequestConverter != nil {
defer cancel()
fileReq, err := config.FileRequestConverter(bifrostCtx, req)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file request"))
return
}
if fileReq == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file request"))
return
}
g.handleFileRequest(ctx, config, req, fileReq, bifrostCtx)
return
}
// Handle container requests if ContainerRequestConverter is set
if config.ContainerRequestConverter != nil {
defer cancel()
containerReq, err := config.ContainerRequestConverter(bifrostCtx, req)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container request"))
return
}
if containerReq == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container request"))
return
}
g.handleContainerRequest(ctx, config, req, containerReq, bifrostCtx)
return
}
// Handle container file requests if ContainerFileRequestConverter is set
if config.ContainerFileRequestConverter != nil {
defer cancel()
containerFileReq, err := config.ContainerFileRequestConverter(bifrostCtx, req)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container file request"))
return
}
if containerFileReq == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file request"))
return
}
g.handleContainerFileRequest(ctx, config, req, containerFileReq, bifrostCtx)
return
}
// Convert the integration-specific request to Bifrost format (inference requests)
bifrostReq, err := config.RequestConverter(bifrostCtx, req)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert request to Bifrost format"))
return
}
if bifrostReq == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid request"))
return
}
if sendRawRequestBody, ok := (*bifrostCtx).Value(schemas.BifrostContextKeyUseRawRequestBody).(bool); ok && sendRawRequestBody {
bifrostReq.SetRawRequestBody(rawBody)
}
// Extract and parse fallbacks from the request if present
if err := g.extractAndParseFallbacks(req, bifrostReq); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to parse fallbacks: "+err.Error()))
return
}
// Async create: check x-bf-async header (needs parsed bifrostReq)
if string(ctx.Request.Header.Peek(schemas.AsyncHeaderCreate)) != "" {
defer cancel()
g.handleAsyncCreate(ctx, config, req, bifrostReq, bifrostCtx)
return
}
// Check if streaming is requested
isStreaming := false
if streamingReq, ok := req.(StreamingRequest); ok {
isStreaming = streamingReq.IsStreamingRequested()
}
if isStreaming {
g.handleStreamingRequest(ctx, config, bifrostReq, bifrostCtx, cancel)
} else {
defer cancel() // Ensure cleanup on function exit
g.handleNonStreamingRequest(ctx, config, req, bifrostReq, bifrostCtx)
}
}
}
// handleNonStreamingRequest handles regular (non-streaming) requests
func (g *GenericRouter) handleNonStreamingRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, bifrostReq *schemas.BifrostRequest, bifrostCtx *schemas.BifrostContext) {
// Use the cancellable context from ConvertToBifrostContext
// While we can't detect client disconnects until we try to write, having a cancellable context
// allows providers that check ctx.Done() to cancel early if needed. This is less critical than
// streaming requests (where we actively detect write errors), but still provides a mechanism
// for providers to respect cancellation.
var response interface{}
var err error
var providerResponseHeaders map[string]string
switch {
case bifrostReq.ListModelsRequest != nil:
// Determine provider: explicit header overrides request field; otherwise
// fall back to the request field and finally to list-all behavior.
listModelsProvider := strings.ToLower(string(ctx.Request.Header.Peek("x-bf-model-provider")))
switch listModelsProvider {
case "":
// keep any provider already set on the request
case "all":
bifrostReq.ListModelsRequest.Provider = ""
default:
bifrostReq.ListModelsRequest.Provider = schemas.ModelProvider(listModelsProvider)
}
var listModelsResponse *schemas.BifrostListModelsResponse
var bifrostErr *schemas.BifrostError
if bifrostReq.ListModelsRequest.Provider != "" {
listModelsResponse, bifrostErr = g.client.ListModelsRequest(bifrostCtx, bifrostReq.ListModelsRequest)
} else {
listModelsResponse, bifrostErr = g.client.ListAllModels(bifrostCtx, bifrostReq.ListModelsRequest)
}
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, listModelsResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if listModelsResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
response, err = config.ListModelsResponseConverter(bifrostCtx, listModelsResponse)
providerResponseHeaders = listModelsResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.TextCompletionRequest != nil:
textCompletionResponse, bifrostErr := g.client.TextCompletionRequest(bifrostCtx, bifrostReq.TextCompletionRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, textCompletionResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if textCompletionResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
// Convert Bifrost response to integration-specific format and send
response, err = config.TextResponseConverter(bifrostCtx, textCompletionResponse)
providerResponseHeaders = textCompletionResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.ChatRequest != nil:
chatResponse, bifrostErr := g.client.ChatCompletionRequest(bifrostCtx, bifrostReq.ChatRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, chatResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if chatResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
// Convert Bifrost response to integration-specific format and send
response, err = config.ChatResponseConverter(bifrostCtx, chatResponse)
providerResponseHeaders = chatResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.ResponsesRequest != nil:
responsesResponse, bifrostErr := g.client.ResponsesRequest(bifrostCtx, bifrostReq.ResponsesRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, responsesResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if responsesResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
// Convert Bifrost response to integration-specific format and send
response, err = config.ResponsesResponseConverter(bifrostCtx, responsesResponse)
providerResponseHeaders = responsesResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.EmbeddingRequest != nil:
embeddingResponse, bifrostErr := g.client.EmbeddingRequest(bifrostCtx, bifrostReq.EmbeddingRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, embeddingResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if embeddingResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
providerResponseHeaders = embeddingResponse.ExtraFields.ProviderResponseHeaders
// Convert Bifrost response to integration-specific format and send
response, err = config.EmbeddingResponseConverter(bifrostCtx, embeddingResponse)
case bifrostReq.RerankRequest != nil:
rerankResponse, bifrostErr := g.client.RerankRequest(bifrostCtx, bifrostReq.RerankRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, rerankResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if rerankResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
providerResponseHeaders = rerankResponse.ExtraFields.ProviderResponseHeaders
if config.RerankResponseConverter != nil {
response, err = config.RerankResponseConverter(bifrostCtx, rerankResponse)
} else {
response = rerankResponse
}
case bifrostReq.OCRRequest != nil:
ocrResponse, bifrostErr := g.client.OCRRequest(bifrostCtx, bifrostReq.OCRRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, ocrResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if ocrResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "bifrost response is nil after post-request callback"))
return
}
providerResponseHeaders = ocrResponse.ExtraFields.ProviderResponseHeaders
if config.OCRResponseConverter != nil {
response, err = config.OCRResponseConverter(bifrostCtx, ocrResponse)
} else {
response = ocrResponse
}
case bifrostReq.SpeechRequest != nil:
speechResponse, bifrostErr := g.client.SpeechRequest(bifrostCtx, bifrostReq.SpeechRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, speechResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if speechResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
providerResponseHeaders = speechResponse.ExtraFields.ProviderResponseHeaders
if g.tryStreamLargeResponse(ctx, bifrostCtx) {
return
}
if config.SpeechResponseConverter != nil {
response, err = config.SpeechResponseConverter(bifrostCtx, speechResponse)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert speech response"))
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
return
} else {
ctx.Response.Header.Set("Content-Type", "audio/mpeg")
ctx.Response.Header.Set("Content-Disposition", "attachment; filename=speech.mp3")
ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(speechResponse.Audio)))
ctx.Response.SetBody(speechResponse.Audio)
return
}
case bifrostReq.TranscriptionRequest != nil:
transcriptionResponse, bifrostErr := g.client.TranscriptionRequest(bifrostCtx, bifrostReq.TranscriptionRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, transcriptionResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if transcriptionResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if g.tryStreamLargeResponse(ctx, bifrostCtx) {
return
}
// Convert Bifrost response to integration-specific format and send
response, err = config.TranscriptionResponseConverter(bifrostCtx, transcriptionResponse)
providerResponseHeaders = transcriptionResponse.ExtraFields.ProviderResponseHeaders
// If converter returns raw bytes, write directly with provider headers.
// Used for plain-text transcription formats (text, srt, vtt).
if err == nil {
if rawBytes, ok := response.([]byte); ok {
for key, value := range providerResponseHeaders {
ctx.Response.Header.Set(key, value)
}
ctx.SetStatusCode(fasthttp.StatusOK)
ctx.SetBody(rawBytes)
return
}
}
case bifrostReq.ImageGenerationRequest != nil:
imageGenerationResponse, bifrostErr := g.client.ImageGenerationRequest(bifrostCtx, bifrostReq.ImageGenerationRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, imageGenerationResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if imageGenerationResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.ImageGenerationResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing ImageGenerationResponseConverter for integration"))
return
}
if g.tryStreamLargeResponse(ctx, bifrostCtx) {
return
}
// Convert Bifrost response to integration-specific format and send
response, err = config.ImageGenerationResponseConverter(bifrostCtx, imageGenerationResponse)
providerResponseHeaders = imageGenerationResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.ImageEditRequest != nil:
imageEditResponse, bifrostErr := g.client.ImageEditRequest(bifrostCtx, bifrostReq.ImageEditRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, imageEditResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if imageEditResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.ImageGenerationResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing ImageGenerationResponseConverter for integration"))
return
}
if g.tryStreamLargeResponse(ctx, bifrostCtx) {
return
}
// Convert Bifrost response to integration-specific format and send
response, err = config.ImageGenerationResponseConverter(bifrostCtx, imageEditResponse)
providerResponseHeaders = imageEditResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.ImageVariationRequest != nil:
imageVariationResponse, bifrostErr := g.client.ImageVariationRequest(bifrostCtx, bifrostReq.ImageVariationRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, imageVariationResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if imageVariationResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.ImageGenerationResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing ImageGenerationResponseConverter for integration"))
return
}
if g.tryStreamLargeResponse(ctx, bifrostCtx) {
return
}
// Convert Bifrost response to integration-specific format and send
response, err = config.ImageGenerationResponseConverter(bifrostCtx, imageVariationResponse)
providerResponseHeaders = imageVariationResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.VideoGenerationRequest != nil:
videoGenerationResponse, bifrostErr := g.client.VideoGenerationRequest(bifrostCtx, bifrostReq.VideoGenerationRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, videoGenerationResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if videoGenerationResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.VideoGenerationResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoGenerationResponseConverter for integration"))
return
}
response, err = config.VideoGenerationResponseConverter(bifrostCtx, videoGenerationResponse)
providerResponseHeaders = videoGenerationResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.VideoRetrieveRequest != nil:
videoRetrieveResponse, bifrostErr := g.client.VideoRetrieveRequest(bifrostCtx, bifrostReq.VideoRetrieveRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, videoRetrieveResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if videoRetrieveResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.VideoGenerationResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoGenerationResponseConverter for integration"))
return
}
response, err = config.VideoGenerationResponseConverter(bifrostCtx, videoRetrieveResponse)
providerResponseHeaders = videoRetrieveResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.VideoDownloadRequest != nil:
videoDownloadResponse, bifrostErr := g.client.VideoDownloadRequest(bifrostCtx, bifrostReq.VideoDownloadRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, videoDownloadResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if videoDownloadResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.VideoDownloadResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoDownloadResponseConverter for integration"))
return
}
response, err = config.VideoDownloadResponseConverter(bifrostCtx, videoDownloadResponse)
providerResponseHeaders = videoDownloadResponse.ExtraFields.ProviderResponseHeaders
// If converter returns binary content, write directly with content-type.
if err == nil {
if rawBytes, ok := response.([]byte); ok {
contentType := videoDownloadResponse.ContentType
if contentType == "" {
contentType = "application/octet-stream"
}
ctx.Response.Header.Set("Content-Type", contentType)
ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(rawBytes)))
ctx.Response.SetBody(rawBytes)
return
}
}
case bifrostReq.VideoDeleteRequest != nil:
videoDeleteResponse, bifrostErr := g.client.VideoDeleteRequest(bifrostCtx, bifrostReq.VideoDeleteRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, videoDeleteResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if videoDeleteResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.VideoDeleteResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoDeleteResponseConverter for integration"))
return
}
response, err = config.VideoDeleteResponseConverter(bifrostCtx, videoDeleteResponse)
providerResponseHeaders = videoDeleteResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.VideoRemixRequest != nil:
videoRemixResponse, bifrostErr := g.client.VideoRemixRequest(bifrostCtx, bifrostReq.VideoRemixRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, videoRemixResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if videoRemixResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.VideoGenerationResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoGenerationResponseConverter for integration"))
return
}
response, err = config.VideoGenerationResponseConverter(bifrostCtx, videoRemixResponse)
providerResponseHeaders = videoRemixResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.VideoListRequest != nil:
// extract provider from header
providerHeader := strings.ToLower(string(ctx.Request.Header.Peek("x-bf-video-list-provider")))
if providerHeader != "" {
bifrostReq.VideoListRequest.Provider = schemas.ModelProvider(providerHeader)
} else if bifrostReq.VideoListRequest.Provider == "" {
bifrostReq.VideoListRequest.Provider = schemas.OpenAI
}
videoListResponse, bifrostErr := g.client.VideoListRequest(bifrostCtx, bifrostReq.VideoListRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, videoListResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if videoListResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
if config.VideoListResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoListResponseConverter for integration"))
return
}
response, err = config.VideoListResponseConverter(bifrostCtx, videoListResponse)
providerResponseHeaders = videoListResponse.ExtraFields.ProviderResponseHeaders
case bifrostReq.CountTokensRequest != nil:
countTokensResponse, bifrostErr := g.client.CountTokensRequest(bifrostCtx, bifrostReq.CountTokensRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// Execute post-request callback if configured
// This is typically used for response modification or additional processing
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, countTokensResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if countTokensResponse == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback"))
return
}
// Convert Bifrost response to integration-specific format and send
if config.CountTokensResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "CountTokensResponseConverter not configured"))
return
}
response, err = config.CountTokensResponseConverter(bifrostCtx, countTokensResponse)
providerResponseHeaders = countTokensResponse.ExtraFields.ProviderResponseHeaders
default:
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Invalid request type"))
return
}
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to encode response"))
return
}
// Forward provider response headers only after conversion succeeds
for key, value := range providerResponseHeaders {
ctx.Response.Header.Set(key, value)
}
if g.tryStreamLargeResponse(ctx, bifrostCtx) {
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
}
// --- Async integration handlers ---
// handleAsyncCreate submits an async job for the current inference request.
// It stores the raw Bifrost response in the DB; the response converter is applied at retrieval time.
func (g *GenericRouter) handleAsyncCreate(
ctx *fasthttp.RequestCtx,
config RouteConfig,
req interface{},
bifrostReq *schemas.BifrostRequest,
bifrostCtx *schemas.BifrostContext,
) {
executor := g.handlerStore.GetAsyncJobExecutor()
if executor == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostError(nil, "async operations not available: logs store not configured"))
return
}
// Reject streaming + async
if streamingReq, ok := req.(StreamingRequest); ok && streamingReq.IsStreamingRequested() {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostErrorWithCode(nil, "streaming is not supported for async requests", fasthttp.StatusBadRequest))
return
}
// Reject non-inference routes (batch, file, container)
if config.BatchRequestConverter != nil || config.FileRequestConverter != nil ||
config.ContainerRequestConverter != nil || config.ContainerFileRequestConverter != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostError(nil, "async is not supported for batch, file, or container operations"))
return
}
switch config.GetHTTPRequestType(ctx) {
case schemas.ChatCompletionRequest:
if config.AsyncChatResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route"))
return
}
case schemas.ResponsesRequest:
if config.AsyncResponsesResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route"))
return
}
default:
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route"))
return
}
operationType := config.GetHTTPRequestType(ctx)
resultTTL := getResultTTLFromHeaderWithDefault(ctx, g.handlerStore.GetAsyncJobResultTTL())
// The operation closure runs the Bifrost client call in the background.
// It returns the raw typed Bifrost response (NOT provider-converted).
// The response converter is applied at retrieval time via handleAsyncRetrieve.
operation := func(bgCtx *schemas.BifrostContext) (interface{}, *schemas.BifrostError) {
switch {
case bifrostReq.ChatRequest != nil:
return g.client.ChatCompletionRequest(bgCtx, bifrostReq.ChatRequest)
case bifrostReq.ResponsesRequest != nil:
return g.client.ResponsesRequest(bgCtx, bifrostReq.ResponsesRequest)
default:
return nil, newBifrostError(nil, "unsupported request type for async execution")
}
}
job, err := executor.SubmitJob(bifrostCtx, resultTTL, operation, operationType)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostError(err, "failed to create async job"))
return
}
g.handleAsyncJobResponse(ctx, bifrostCtx, config, job)
}
// handleAsyncRetrieve retrieves an async job by ID and returns the response
// using the route's response converter for completed jobs.
func (g *GenericRouter) handleAsyncRetrieve(
ctx *fasthttp.RequestCtx,
config RouteConfig,
bifrostCtx *schemas.BifrostContext,
) {
executor := g.handlerStore.GetAsyncJobExecutor()
if executor == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostError(nil, "async operations not available: logs store not configured"))
return
}
jobID := string(ctx.Request.Header.Peek(schemas.AsyncHeaderGetID))
if jobID == "" {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostError(nil, "x-bf-async-id header value is empty"))
return
}
vkValue := getVirtualKeyFromBifrostContext(bifrostCtx)
job, err := executor.RetrieveJob(bifrostCtx, jobID, vkValue, config.GetHTTPRequestType(ctx))
if err != nil {
if errors.Is(err, logstore.ErrJobInternal) {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostErrorWithCode(err, "failed to retrieve async job", fasthttp.StatusInternalServerError))
} else {
g.sendError(ctx, bifrostCtx, config.ErrorConverter,
newBifrostErrorWithCode(err, "job not found or expired", fasthttp.StatusNotFound))
}
return
}
g.handleAsyncJobResponse(ctx, bifrostCtx, config, job)
return
}
func (g *GenericRouter) handleAsyncJobResponse(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, config RouteConfig, job *logstore.AsyncJob) {
ctx.SetContentType("application/json")
resp := job.ToResponse()
switch job.Status {
case schemas.AsyncJobStatusPending, schemas.AsyncJobStatusProcessing, schemas.AsyncJobStatusCompleted:
switch job.RequestType {
case schemas.ChatCompletionRequest:
if config.AsyncChatResponseConverter == nil || config.ChatResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route"))
return
}
response, extraHeaders, err := config.AsyncChatResponseConverter(bifrostCtx, resp, config.ChatResponseConverter)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert async chat response"))
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, extraHeaders)
return
case schemas.ResponsesRequest:
if config.AsyncResponsesResponseConverter == nil || config.ResponsesResponseConverter == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "either async responses response converter or responses response converter not configured"))
return
}
response, extraHeaders, err := config.AsyncResponsesResponseConverter(bifrostCtx, resp, config.ResponsesResponseConverter)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert async responses response"))
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, extraHeaders)
return
default:
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "unknown request type"))
return
}
case schemas.AsyncJobStatusFailed:
var err schemas.BifrostError
// Deserialize the stored BifrostError and send through provider error converter
if job.Error != "" {
if unmarshalErr := sonic.Unmarshal([]byte(job.Error), &err); unmarshalErr != nil {
// If unmarshal fails, create a basic error with the raw error string
err = schemas.BifrostError{
Error: &schemas.ErrorField{
Message: job.Error,
},
}
}
}
g.sendError(ctx, bifrostCtx, config.ErrorConverter, &err)
}
}
// handleBatchRequest handles batch API requests (create, list, retrieve, cancel, results)
func (g *GenericRouter) handleBatchRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, batchReq *BatchRequest, bifrostCtx *schemas.BifrostContext) {
var response interface{}
var err error
switch batchReq.Type {
case schemas.BatchCreateRequest:
if batchReq.CreateRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch create request"))
return
}
batchResponse, bifrostErr := g.client.BatchCreateRequest(bifrostCtx, batchReq.CreateRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, batchResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.BatchCreateResponseConverter != nil {
response, err = config.BatchCreateResponseConverter(bifrostCtx, batchResponse)
} else {
response = batchResponse
}
case schemas.BatchListRequest:
if batchReq.ListRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch list request"))
return
}
batchResponse, bifrostErr := g.client.BatchListRequest(bifrostCtx, batchReq.ListRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, batchResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.BatchListResponseConverter != nil {
response, err = config.BatchListResponseConverter(bifrostCtx, batchResponse)
} else {
response = batchResponse
}
case schemas.BatchRetrieveRequest:
if batchReq.RetrieveRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch retrieve request"))
return
}
batchResponse, bifrostErr := g.client.BatchRetrieveRequest(bifrostCtx, batchReq.RetrieveRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, batchResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.BatchRetrieveResponseConverter != nil {
response, err = config.BatchRetrieveResponseConverter(bifrostCtx, batchResponse)
} else {
response = batchResponse
}
case schemas.BatchCancelRequest:
if batchReq.CancelRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch cancel request"))
return
}
batchResponse, bifrostErr := g.client.BatchCancelRequest(bifrostCtx, batchReq.CancelRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, batchResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.BatchCancelResponseConverter != nil {
response, err = config.BatchCancelResponseConverter(bifrostCtx, batchResponse)
} else {
response = batchResponse
}
case schemas.BatchDeleteRequest:
if batchReq.DeleteRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch delete request"))
return
}
batchResponse, bifrostErr := g.client.BatchDeleteRequest(bifrostCtx, batchReq.DeleteRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, batchResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.BatchDeleteResponseConverter != nil {
response, err = config.BatchDeleteResponseConverter(bifrostCtx, batchResponse)
} else {
response = batchResponse
}
case schemas.BatchResultsRequest:
if batchReq.ResultsRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch results request"))
return
}
batchResponse, bifrostErr := g.client.BatchResultsRequest(bifrostCtx, batchReq.ResultsRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, batchResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.BatchResultsResponseConverter != nil {
response, err = config.BatchResultsResponseConverter(bifrostCtx, batchResponse)
} else {
response = batchResponse
}
default:
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown batch request type"))
return
}
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert batch response"))
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
}
// handleFileRequest handles file API requests (upload, list, retrieve, delete, content)
func (g *GenericRouter) handleFileRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, fileReq *FileRequest, bifrostCtx *schemas.BifrostContext) {
var response interface{}
var err error
switch fileReq.Type {
case schemas.FileUploadRequest:
if fileReq.UploadRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file upload request"))
return
}
fileResponse, bifrostErr := g.client.FileUploadRequest(bifrostCtx, fileReq.UploadRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, fileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.FileUploadResponseConverter != nil {
response, err = config.FileUploadResponseConverter(bifrostCtx, fileResponse)
} else {
response = fileResponse
}
case schemas.FileListRequest:
if fileReq.ListRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file list request"))
return
}
fileResponse, bifrostErr := g.client.FileListRequest(bifrostCtx, fileReq.ListRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, fileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.FileListResponseConverter != nil {
response, err = config.FileListResponseConverter(bifrostCtx, fileResponse)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file list response"))
return
}
// Handle raw byte responses (e.g., XML for S3 APIs)
if rawBytes, ok := response.([]byte); ok {
ctx.SetBody(rawBytes)
return
}
} else {
response = fileResponse
}
case schemas.FileRetrieveRequest:
if fileReq.RetrieveRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file retrieve request"))
return
}
fileResponse, bifrostErr := g.client.FileRetrieveRequest(bifrostCtx, fileReq.RetrieveRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, fileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.FileRetrieveResponseConverter != nil {
response, err = config.FileRetrieveResponseConverter(bifrostCtx, fileResponse)
} else {
response = fileResponse
}
case schemas.FileDeleteRequest:
if fileReq.DeleteRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file delete request"))
return
}
fileResponse, bifrostErr := g.client.FileDeleteRequest(bifrostCtx, fileReq.DeleteRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, fileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.FileDeleteResponseConverter != nil {
response, err = config.FileDeleteResponseConverter(bifrostCtx, fileResponse)
} else {
response = fileResponse
}
case schemas.FileContentRequest:
if fileReq.ContentRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file content request"))
return
}
fileResponse, bifrostErr := g.client.FileContentRequest(bifrostCtx, fileReq.ContentRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, fileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
// For file content, handle binary response specially if no converter is set
if config.FileContentResponseConverter != nil {
response, err = config.FileContentResponseConverter(bifrostCtx, fileResponse)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file content response"))
return
}
// Check if response is raw bytes - write directly without JSON encoding
if rawBytes, ok := response.([]byte); ok {
ctx.Response.Header.Set("Content-Type", fileResponse.ContentType)
ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(rawBytes)))
ctx.Response.SetBody(rawBytes)
} else {
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
}
} else {
// Return raw file content
ctx.Response.Header.Set("Content-Type", fileResponse.ContentType)
ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(fileResponse.Content)))
ctx.Response.SetBody(fileResponse.Content)
}
return
default:
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown file request type"))
return
}
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file response"))
return
}
// If response is nil, PostCallback has set headers/status - return without body
if response == nil {
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
}
// handleContainerRequest handles container API requests (create, list, retrieve, delete)
func (g *GenericRouter) handleContainerRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, containerReq *ContainerRequest, bifrostCtx *schemas.BifrostContext) {
var response interface{}
var err error
switch containerReq.Type {
case schemas.ContainerCreateRequest:
if containerReq.CreateRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container create request"))
return
}
containerResponse, bifrostErr := g.client.ContainerCreateRequest(bifrostCtx, containerReq.CreateRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerCreateResponseConverter != nil {
response, err = config.ContainerCreateResponseConverter(bifrostCtx, containerResponse)
} else {
response = containerResponse
}
case schemas.ContainerListRequest:
if containerReq.ListRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container list request"))
return
}
containerResponse, bifrostErr := g.client.ContainerListRequest(bifrostCtx, containerReq.ListRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerListResponseConverter != nil {
response, err = config.ContainerListResponseConverter(bifrostCtx, containerResponse)
} else {
response = containerResponse
}
case schemas.ContainerRetrieveRequest:
if containerReq.RetrieveRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container retrieve request"))
return
}
containerResponse, bifrostErr := g.client.ContainerRetrieveRequest(bifrostCtx, containerReq.RetrieveRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerRetrieveResponseConverter != nil {
response, err = config.ContainerRetrieveResponseConverter(bifrostCtx, containerResponse)
} else {
response = containerResponse
}
case schemas.ContainerDeleteRequest:
if containerReq.DeleteRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container delete request"))
return
}
containerResponse, bifrostErr := g.client.ContainerDeleteRequest(bifrostCtx, containerReq.DeleteRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerDeleteResponseConverter != nil {
response, err = config.ContainerDeleteResponseConverter(bifrostCtx, containerResponse)
} else {
response = containerResponse
}
default:
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown container request type"))
return
}
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container response"))
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
}
// handleContainerFileRequest handles container file API requests (create, list, retrieve, content, delete)
func (g *GenericRouter) handleContainerFileRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, containerFileReq *ContainerFileRequest, bifrostCtx *schemas.BifrostContext) {
var response interface{}
var err error
switch containerFileReq.Type {
case schemas.ContainerFileCreateRequest:
if containerFileReq.CreateRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file create request"))
return
}
containerFileResponse, bifrostErr := g.client.ContainerFileCreateRequest(bifrostCtx, containerFileReq.CreateRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerFileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerFileCreateResponseConverter != nil {
response, err = config.ContainerFileCreateResponseConverter(bifrostCtx, containerFileResponse)
} else {
response = containerFileResponse
}
case schemas.ContainerFileListRequest:
if containerFileReq.ListRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file list request"))
return
}
containerFileResponse, bifrostErr := g.client.ContainerFileListRequest(bifrostCtx, containerFileReq.ListRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerFileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerFileListResponseConverter != nil {
response, err = config.ContainerFileListResponseConverter(bifrostCtx, containerFileResponse)
} else {
response = containerFileResponse
}
case schemas.ContainerFileRetrieveRequest:
if containerFileReq.RetrieveRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file retrieve request"))
return
}
containerFileResponse, bifrostErr := g.client.ContainerFileRetrieveRequest(bifrostCtx, containerFileReq.RetrieveRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerFileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerFileRetrieveResponseConverter != nil {
response, err = config.ContainerFileRetrieveResponseConverter(bifrostCtx, containerFileResponse)
} else {
response = containerFileResponse
}
case schemas.ContainerFileContentRequest:
if containerFileReq.ContentRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file content request"))
return
}
containerFileResponse, bifrostErr := g.client.ContainerFileContentRequest(bifrostCtx, containerFileReq.ContentRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerFileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
// For content requests, handle binary response specially if converter is set
if config.ContainerFileContentResponseConverter != nil {
response, err = config.ContainerFileContentResponseConverter(bifrostCtx, containerFileResponse)
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container file content response"))
return
}
// Check if response is raw bytes - write directly without JSON encoding
if rawBytes, ok := response.([]byte); ok {
ctx.Response.Header.Set("Content-Type", containerFileResponse.ContentType)
ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(rawBytes)))
ctx.Response.SetBody(rawBytes)
} else {
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
}
} else {
// Return raw binary content
ctx.Response.Header.Set("Content-Type", containerFileResponse.ContentType)
ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(containerFileResponse.Content)))
ctx.Response.SetBody(containerFileResponse.Content)
}
return
case schemas.ContainerFileDeleteRequest:
if containerFileReq.DeleteRequest == nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file delete request"))
return
}
containerFileResponse, bifrostErr := g.client.ContainerFileDeleteRequest(bifrostCtx, containerFileReq.DeleteRequest)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
if config.PostCallback != nil {
if err := config.PostCallback(ctx, req, containerFileResponse); err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback"))
return
}
}
if config.ContainerFileDeleteResponseConverter != nil {
response, err = config.ContainerFileDeleteResponseConverter(bifrostCtx, containerFileResponse)
} else {
response = containerFileResponse
}
default:
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown container file request type"))
return
}
if err != nil {
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container file response"))
return
}
g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil)
}
// handleStreamingRequest handles streaming requests using Server-Sent Events (SSE)
func (g *GenericRouter) handleStreamingRequest(ctx *fasthttp.RequestCtx, config RouteConfig, bifrostReq *schemas.BifrostRequest, bifrostCtx *schemas.BifrostContext, cancel context.CancelFunc) {
// Use the cancellable context from ConvertToBifrostContext
// ctx.Done() never fires here in practice: fasthttp.RequestCtx.Done only closes when the whole server shuts down, not when an individual connection drops.
// As a result we'll leave the provider stream running until it naturally completes, even if the client went away (write error, network drop, etc.).
// That keeps goroutines and upstream tokens alive long after the SSE writer has exited.
//
// We now get a cancellable context from ConvertToBifrostContext so we can cancel the upstream stream immediately when the client disconnects.
var stream chan *schemas.BifrostStreamChunk
var bifrostErr *schemas.BifrostError
// Handle different request types
if bifrostReq.TextCompletionRequest != nil {
stream, bifrostErr = g.client.TextCompletionStreamRequest(bifrostCtx, bifrostReq.TextCompletionRequest)
} else if bifrostReq.ChatRequest != nil {
stream, bifrostErr = g.client.ChatCompletionStreamRequest(bifrostCtx, bifrostReq.ChatRequest)
} else if bifrostReq.ResponsesRequest != nil {
stream, bifrostErr = g.client.ResponsesStreamRequest(bifrostCtx, bifrostReq.ResponsesRequest)
} else if bifrostReq.SpeechRequest != nil {
stream, bifrostErr = g.client.SpeechStreamRequest(bifrostCtx, bifrostReq.SpeechRequest)
} else if bifrostReq.TranscriptionRequest != nil {
stream, bifrostErr = g.client.TranscriptionStreamRequest(bifrostCtx, bifrostReq.TranscriptionRequest)
} else if bifrostReq.ImageGenerationRequest != nil {
stream, bifrostErr = g.client.ImageGenerationStreamRequest(bifrostCtx, bifrostReq.ImageGenerationRequest)
} else if bifrostReq.ImageEditRequest != nil {
stream, bifrostErr = g.client.ImageEditStreamRequest(bifrostCtx, bifrostReq.ImageEditRequest)
}
// Provider error before streaming started — return proper HTTP error status
// (SSE headers not yet committed, so we can still set status code + JSON body)
if bifrostErr != nil {
cancel()
g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr)
return
}
// No request type matched — stream is nil. Return error without spawning
// a drain goroutine (for-range on nil channel blocks forever).
if stream == nil {
cancel()
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "streaming is not supported for this request type"))
return
}
// Forward provider response headers stored in context by streaming handlers
if headers, ok := bifrostCtx.Value(schemas.BifrostContextKeyProviderResponseHeaders).(map[string]string); ok {
for key, value := range headers {
ctx.Response.Header.Set(key, value)
}
}
// Large payload streaming passthrough — bypass SSE event processing, pipe raw upstream
if g.tryStreamLargeResponse(ctx, bifrostCtx) {
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
cancel()
go func() {
for range stream {
}
}()
return
}
// Check if streaming is configured for this route
if config.StreamConfig == nil {
cancel()
// Drain the stream channel to prevent goroutine leaks
go func() {
for range stream {
}
}()
g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "streaming is not supported for this integration"))
return
}
// SSE headers set only after successful stream setup — errors above get proper HTTP status codes
if config.Type == RouteConfigTypeBedrock {
ctx.SetContentType("application/vnd.amazon.eventstream")
ctx.Response.Header.Set("x-amzn-bedrock-content-type", "application/json")
} else {
ctx.SetContentType("text/event-stream")
}
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
// Handle streaming using the centralized approach
// Pass cancel function so it can be called when the writer exits (errors, completion, etc.)
g.handleStreaming(ctx, bifrostCtx, config, stream, cancel)
}
// handleStreaming processes a stream of BifrostResponse objects and sends them as Server-Sent Events (SSE).
// It handles both successful responses and errors in the streaming format.
//
// SSE FORMAT HANDLING:
//
// By default, all responses and errors are sent in the standard SSE format:
//
// data: {"response": "content"}\n\n
//
// However, some providers (like Anthropic) require custom SSE event formats with explicit event types:
//
// event: content_block_delta
// data: {"type": "content_block_delta", "delta": {...}}
//
// event: message_stop
// data: {"type": "message_stop"}
//
// STREAMCONFIG CONVERTER BEHAVIOR:
//
// The StreamConfig.ResponseConverter and StreamConfig.ErrorConverter functions can return:
//
// 1. OBJECTS (default behavior):
// - Return any Go struct/map/interface{}
// - Will be JSON marshaled and wrapped as: data: {json}\n\n
// - Example: return map[string]interface{}{"content": "hello"}
// - Result: data: {"content":"hello"}\n\n
//
// 2. STRINGS (custom SSE format):
// - Return a complete SSE string with custom event types and formatting
// - Will be sent directly without any wrapping or modification
// - Example: return "event: content_block_delta\ndata: {\"type\":\"text\"}\n\n"
// - Result: event: content_block_delta
// data: {"type":"text"}
//
// IMPLEMENTATION GUIDELINES:
//
// For standard providers (OpenAI, etc.): Return objects from converters
// For custom SSE providers (Anthropic, etc.): Return pre-formatted SSE strings
//
// When returning strings, ensure they:
// - Include proper event: lines (if needed)
// - Include data: lines with JSON content
// - End with \n\n for proper SSE formatting
// - Follow the provider's specific SSE event specification
//
// CONTEXT CANCELLATION:
//
// The cancel function is called ONLY when client disconnects are detected via write errors.
// Bifrost handles cleanup internally for normal completion and errors, so we only cancel
// upstream streams when write errors indicate the client has disconnected.
func (g *GenericRouter) handleStreaming(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, config RouteConfig, streamChan chan *schemas.BifrostStreamChunk, cancel context.CancelFunc) {
// Signal to tracing middleware that trace completion should be deferred
// The streaming callback will complete the trace after the stream ends
ctx.SetUserValue(schemas.BifrostContextKeyDeferTraceCompletion, true)
// Get the trace completer function for use in the streaming callback.
// Signature is func([]schemas.PluginLogEntry) so the callback never reads from
// ctx.UserValue (ctx may be recycled by fasthttp by the time this fires).
// Router path has no transport post-hook phase, so we always pass nil.
traceCompleter, _ := ctx.UserValue(schemas.BifrostContextKeyTraceCompleter).(func([]schemas.PluginLogEntry))
// Get stream chunk interceptor for plugin hooks
interceptor := g.handlerStore.GetStreamChunkInterceptor()
var httpReq *schemas.HTTPRequest
if interceptor != nil {
httpReq = lib.BuildHTTPRequestFromFastHTTP(ctx)
}
// Use SSEStreamReader to bypass fasthttp's internal pipe (fasthttputil.PipeConns)
// which batches multiple SSE events into single TCP segments.
reader := lib.NewSSEStreamReader()
ctx.Response.SetBodyStream(reader, -1)
// Producer goroutine: processes the stream channel, formats events, sends to reader
go func() {
// Separate defers ensure each cleanup runs even if an earlier one panics (LIFO order)
defer reader.Done()
defer schemas.ReleaseHTTPRequest(httpReq)
defer func() {
// Complete the trace after streaming finishes
// This ensures all spans (including llm.call) are properly ended before the trace is sent to OTEL
if traceCompleter != nil {
traceCompleter(nil)
}
}()
// Create encoder for AWS Event Stream if needed
var eventStreamEncoder *eventstream.Encoder
if config.Type == RouteConfigTypeBedrock {
eventStreamEncoder = eventstream.NewEncoder()
}
shouldSendDoneMarker := true
if config.Type == RouteConfigTypeAnthropic || strings.Contains(config.Path, "/responses") || strings.Contains(config.Path, "/images/generations") {
shouldSendDoneMarker = false
}
// Process streaming responses
for chunk := range streamChan {
if chunk == nil {
continue
}
// Note: We no longer check ctx.Done() here because fasthttp.RequestCtx.Done()
// only closes when the whole server shuts down, not when an individual client disconnects.
// Client disconnects are detected via write errors on reader.Send(), which returns false.
// Handle errors
if chunk.BifrostError != nil {
var errorResponse interface{}
// Use stream error converter if available, otherwise fallback to regular error converter
if config.StreamConfig != nil && config.StreamConfig.ErrorConverter != nil {
errorResponse = config.StreamConfig.ErrorConverter(bifrostCtx, chunk.BifrostError)
} else if config.ErrorConverter != nil {
errorResponse = config.ErrorConverter(bifrostCtx, chunk.BifrostError)
} else {
// Default error response
errorResponse = map[string]interface{}{
"error": map[string]interface{}{
"type": "internal_error",
"message": "An error occurred while processing your request",
},
}
}
// Check if the error converter returned a raw SSE string or JSON object
if sseErrorString, ok := errorResponse.(string); ok {
// CUSTOM SSE FORMAT: The converter returned a complete SSE string
// This is used by providers like Anthropic that need custom event types
reader.Send([]byte(sseErrorString))
} else {
// STANDARD SSE FORMAT: The converter returned an object
errorJSON, err := sonic.Marshal(errorResponse)
if err != nil {
// Fallback to basic error if marshaling fails
basicError := map[string]interface{}{
"error": map[string]interface{}{
"type": "internal_error",
"message": "An error occurred while processing your request",
},
}
if errorJSON, err = sonic.Marshal(basicError); err != nil {
cancel()
return
}
}
// Send error as SSE data
reader.SendEvent("", errorJSON)
}
return // End stream on error, Bifrost handles cleanup internally
} else {
// Allow plugins to modify/filter the chunk via StreamChunkInterceptor
if interceptor != nil {
var err error
chunk, err = interceptor.InterceptChunk(bifrostCtx, httpReq, chunk)
if err != nil {
if chunk == nil {
errorJSON, marshalErr := sonic.Marshal(map[string]string{"error": err.Error()})
if marshalErr != nil {
cancel()
return
}
// Return error event and stop streaming
reader.SendError(errorJSON)
cancel()
return
}
// Else add warn log and continue
g.logger.Warn("%v", err)
}
if chunk == nil {
// Skip chunk if plugin wants to skip it
continue
}
}
// Handle successful responses
// Convert response to integration-specific streaming format
var eventType string
var convertedResponse interface{}
var err error
switch {
case chunk.BifrostTextCompletionResponse != nil:
eventType, convertedResponse, err = config.StreamConfig.TextStreamResponseConverter(bifrostCtx, chunk.BifrostTextCompletionResponse)
case chunk.BifrostChatResponse != nil:
eventType, convertedResponse, err = config.StreamConfig.ChatStreamResponseConverter(bifrostCtx, chunk.BifrostChatResponse)
case chunk.BifrostResponsesStreamResponse != nil:
eventType, convertedResponse, err = config.StreamConfig.ResponsesStreamResponseConverter(bifrostCtx, chunk.BifrostResponsesStreamResponse)
case chunk.BifrostSpeechStreamResponse != nil:
eventType, convertedResponse, err = config.StreamConfig.SpeechStreamResponseConverter(bifrostCtx, chunk.BifrostSpeechStreamResponse)
case chunk.BifrostTranscriptionStreamResponse != nil:
eventType, convertedResponse, err = config.StreamConfig.TranscriptionStreamResponseConverter(bifrostCtx, chunk.BifrostTranscriptionStreamResponse)
case chunk.BifrostImageGenerationStreamResponse != nil:
eventType, convertedResponse, err = config.StreamConfig.ImageGenerationStreamResponseConverter(bifrostCtx, chunk.BifrostImageGenerationStreamResponse)
default:
requestType := safeGetRequestType(chunk)
convertedResponse, err = nil, fmt.Errorf("no response converter found for request type: %s", requestType)
}
if convertedResponse == nil && err == nil {
// Skip streaming chunk if no response is available and no error is returned
continue
}
if err != nil {
// Log conversion error but continue processing
g.logger.Warn("Failed to convert streaming response: %v", err)
continue
}
// Handle Bedrock Event Stream format
if config.Type == RouteConfigTypeBedrock && eventStreamEncoder != nil {
// We need to cast to BedrockStreamEvent to determine event type and structure
if bedrockEvent, ok := convertedResponse.(*bedrock.BedrockStreamEvent); ok {
// Convert to sequence of specific Bedrock events
events := bedrockEvent.ToEncodedEvents()
// Send all collected events
for _, evt := range events {
jsonData, err := sonic.Marshal(evt.Payload)
if err != nil {
g.logger.Warn("Failed to marshal bedrock payload: %v", err)
continue
}
headers := eventstream.Headers{
{
Name: ":content-type",
Value: eventstream.StringValue("application/json"),
},
{
Name: ":event-type",
Value: eventstream.StringValue(evt.EventType),
},
{
Name: ":message-type",
Value: eventstream.StringValue("event"),
},
}
message := eventstream.Message{
Headers: headers,
Payload: jsonData,
}
var msgBuf bytes.Buffer
if err := eventStreamEncoder.Encode(&msgBuf, message); err != nil {
g.logger.Warn("[Bedrock Stream] Failed to encode message: %v", err)
cancel()
return
}
if !reader.Send(msgBuf.Bytes()) {
g.logger.Warn("[Bedrock Stream] Client disconnected")
cancel()
return
}
}
}
// Continue to next chunk (we handled sending internally)
continue
}
// Build and send SSE event
var buf []byte
var sent bool
if sseString, ok := convertedResponse.(string); ok {
if strings.HasPrefix(sseString, "data: ") || strings.HasPrefix(sseString, "event: ") {
// Pre-formatted SSE string (e.g. Anthropic custom event types)
if eventType != "" {
// Prepend event type line to pre-formatted data
buf = make([]byte, 0, 7+len(eventType)+1+len(sseString))
buf = append(buf, "event: "...)
buf = append(buf, eventType...)
buf = append(buf, '\n')
buf = append(buf, sseString...)
sent = reader.Send(buf)
} else {
sent = reader.Send([]byte(sseString))
}
} else {
sent = reader.SendEvent(eventType, []byte(sseString))
}
} else {
responseJSON, err := sonic.Marshal(convertedResponse)
if err != nil {
g.logger.Warn("Failed to marshal streaming response: %v", err)
continue
}
sent = reader.SendEvent(eventType, responseJSON)
}
if !sent {
cancel() // Client disconnected, cancel upstream stream
return
}
}
}
// Only send the [DONE] marker for plain SSE APIs that expect it.
// Do NOT send [DONE] for the following cases:
// - OpenAI "responses" API and Anthropic messages API: they signal completion by simply closing the stream, not sending [DONE].
// - Bedrock: uses AWS Event Stream format rather than SSE with [DONE].
// Bifrost handles any additional cleanup internally on normal stream completion.
if shouldSendDoneMarker && config.Type != RouteConfigTypeGenAI && config.Type != RouteConfigTypeBedrock {
if !reader.SendDone() {
g.logger.Warn("Failed to write SSE done marker: client disconnected")
cancel()
return
}
}
}()
}
// extractPassthroughModel extracts the model from the passthrough request path and/or body.
// Path patterns: models/{model}, models/{model}:suffix (GenAI), .../models/{model} (Vertex), tunedModels/{model}.
// Body is pre-parsed by parsePassthroughBody to avoid redundant unmarshaling.
func extractPassthroughModel(path string, bodyModel string) string {
if model := extractModelFromPath(path); model != "" {
return model
}
return bodyModel
}
func extractModelFromPath(path string) string {
path = strings.TrimPrefix(path, "/")
parts := strings.Split(path, "/")
for i, p := range parts {
if p == "models" || p == "tunedModels" {
if i+1 < len(parts) {
model := parts[i+1]
// Strip :suffix for GenAI (e.g. :generateContent, :streamGenerateContent)
if idx := strings.Index(model, ":"); idx > 0 {
model = model[:idx]
}
return strings.TrimSpace(model)
}
break
}
}
return ""
}
// parsePassthroughBody extracts model and streaming flag from the request body in a
// single unmarshal pass. Pass the raw Content-Type header value so multipart boundaries
// are resolved from the header rather than scraped from the body bytes.
func parsePassthroughBody(contentType string, body []byte) (model string, isStream bool) {
if len(body) == 0 {
return
}
mediaType, params, err := mime.ParseMediaType(contentType)
if err == nil && strings.HasPrefix(mediaType, "multipart/") {
if boundary := params["boundary"]; boundary != "" {
return parseMultipartPassthroughBody(body, boundary)
}
}
// JSON (or unknown) body — one unmarshal for both fields.
var parsed struct {
Model string `json:"model"`
Stream bool `json:"stream"`
}
if err := sonic.Unmarshal(body, &parsed); err == nil {
model = strings.TrimSpace(parsed.Model)
isStream = parsed.Stream
}
return
}
// parseMultipartPassthroughBody scans multipart parts and extracts model and stream.
// - Form fields (Content-Disposition name="model"/"stream"): read as plain text.
func parseMultipartPassthroughBody(body []byte, boundary string) (model string, isStream bool) {
mr := multipart.NewReader(bytes.NewReader(body), boundary)
for {
part, err := mr.NextPart()
if err != nil {
break
}
// Plain form field — check by name.
switch part.FormName() {
case "model":
val, _ := io.ReadAll(part)
part.Close()
model = strings.TrimSpace(string(val))
case "stream":
val, _ := io.ReadAll(part)
part.Close()
s := strings.TrimSpace(strings.ToLower(string(val)))
isStream = s == "true" || s == "1"
default:
part.Close()
}
if model != "" && isStream {
break
}
}
return
}
func (g *GenericRouter) handlePassthrough(ctx *fasthttp.RequestCtx) {
cfg := g.passthroughCfg
safeHeaders := make(map[string]string)
ctx.Request.Header.All()(func(key, value []byte) bool {
keyStr := strings.ToLower(string(key))
switch keyStr {
case "authorization", "api-key", "x-api-key", "x-goog-api-key",
"host", "connection", "transfer-encoding", "cookie", "set-cookie", "proxy-authorization", "accept-encoding":
default:
if strings.HasPrefix(keyStr, "x-bf-") {
return true // drop internal gateway headers
}
safeHeaders[keyStr] = string(value)
}
return true
})
bifrostCtx, cancel := lib.ConvertToBifrostContext(ctx, g.handlerStore.ShouldAllowDirectKeys(), g.handlerStore.GetHeaderMatcher(), g.handlerStore.GetMCPHeaderCombinedAllowlist())
if directKey := ctx.UserValue(string(schemas.BifrostContextKeyDirectKey)); directKey != nil {
if key, ok := directKey.(schemas.Key); ok {
bifrostCtx.SetValue(schemas.BifrostContextKeyDirectKey, key)
}
}
path := string(ctx.Path())
for _, prefix := range g.passthroughCfg.StripPrefix {
if strings.HasPrefix(path, prefix) {
path = path[len(prefix):]
break
}
}
body := ctx.Request.Body()
// Parse body once to get both model and stream flag.
contentType := string(ctx.Request.Header.ContentType())
bodyModel, bodyStream := parsePassthroughBody(contentType, body)
resolvedModel := extractPassthroughModel(path, bodyModel)
provider := cfg.Provider
if cfg.ProviderDetector != nil {
provider = cfg.ProviderDetector(ctx, resolvedModel)
}
provider = getProviderFromHeader(ctx, provider)
isStreaming := strings.Contains(strings.ToLower(path), "stream") || bodyStream
passthroughReq := &schemas.BifrostPassthroughRequest{
Method: string(ctx.Method()),
Path: path,
RawQuery: string(ctx.URI().QueryString()),
Body: body,
SafeHeaders: safeHeaders,
Provider: provider,
Model: resolvedModel,
}
if isStreaming {
g.handlePassthroughStream(ctx, bifrostCtx, cancel, provider, passthroughReq)
} else {
g.handlePassthroughNonStream(ctx, bifrostCtx, cancel, provider, passthroughReq)
}
}
func (g *GenericRouter) handlePassthroughNonStream(
ctx *fasthttp.RequestCtx,
bifrostCtx *schemas.BifrostContext,
cancel context.CancelFunc,
provider schemas.ModelProvider,
req *schemas.BifrostPassthroughRequest,
) {
defer cancel()
resp, bifrostErr := g.client.Passthrough(bifrostCtx, provider, req)
if bifrostErr != nil {
g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return err
}, bifrostErr)
return
}
ctx.SetStatusCode(resp.StatusCode)
for k, v := range resp.Headers {
switch strings.ToLower(k) {
case "connection", "transfer-encoding", "set-cookie", "proxy-authenticate", "www-authenticate":
// drop
default:
ctx.Response.Header.Set(k, v)
}
}
ctx.Response.SetBody(resp.Body)
}
func (g *GenericRouter) handlePassthroughStream(
ctx *fasthttp.RequestCtx,
bifrostCtx *schemas.BifrostContext,
cancel context.CancelFunc,
provider schemas.ModelProvider,
req *schemas.BifrostPassthroughRequest,
) {
// Deferred trace completion must be explicitly finalized after streaming ends.
// Without this, observability injectors (including logging) never flush final state.
traceCompleter, _ := ctx.UserValue(schemas.BifrostContextKeyTraceCompleter).(func([]schemas.PluginLogEntry))
stream, bifrostErr := g.client.PassthroughStream(bifrostCtx, provider, req)
if bifrostErr != nil {
cancel()
g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return err
}, bifrostErr)
return
}
// Read the first chunk to extract status code and headers before streaming begins.
firstChunk, ok := <-stream
if !ok {
cancel()
g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return err
}, newBifrostError(nil, "passthrough stream ended before headers were received"))
return
}
if firstChunk == nil {
cancel()
g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return err
}, newBifrostError(nil, "passthrough stream returned nil first chunk"))
return
}
if firstChunk.BifrostError != nil {
cancel()
g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return err
}, firstChunk.BifrostError)
return
}
passthroughResp := firstChunk.BifrostPassthroughResponse
if passthroughResp == nil {
cancel()
g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return err
}, newBifrostError(nil, "passthrough stream returned empty first chunk"))
return
}
// Skip post-hook body materialization — ctx.Response.Body() would buffer the entire stream.
ctx.SetUserValue(schemas.BifrostContextKeyDeferTraceCompletion, true)
ctx.SetStatusCode(passthroughResp.StatusCode)
ctx.SetConnectionClose()
for k, v := range passthroughResp.Headers {
switch strings.ToLower(k) {
case "connection", "transfer-encoding", "content-length", "set-cookie", "proxy-authenticate", "www-authenticate":
// drop — content-length is invalid for a streaming response
default:
ctx.Response.Header.Set(k, v)
}
}
// Use SSEStreamReader to bypass fasthttp's internal pipe batching
reader := lib.NewSSEStreamReader()
ctx.Response.SetBodyStream(reader, -1)
go func() {
defer func() {
if traceCompleter != nil {
traceCompleter(nil)
}
reader.Done()
cancel()
}()
// Write the first chunk's data.
if len(passthroughResp.Body) > 0 {
if !reader.Send(passthroughResp.Body) {
cancel()
return
}
}
for chunk := range stream {
if chunk == nil {
continue
}
if chunk.BifrostError != nil {
break
}
if chunk.BifrostPassthroughResponse != nil && len(chunk.BifrostPassthroughResponse.Body) > 0 {
if !reader.Send(chunk.BifrostPassthroughResponse.Body) {
cancel()
return
}
}
}
}()
}