// Package integrations provides a generic router framework for handling different LLM provider APIs. // // CENTRALIZED STREAMING ARCHITECTURE: // // This package implements a centralized streaming approach where all stream handling logic // is consolidated in the GenericRouter, eliminating the need for provider-specific StreamHandler // implementations. The key components are: // // 1. StreamConfig: Defines streaming configuration for each route, including: // - ResponseConverter: Converts BifrostResponse to provider-specific streaming format // - ErrorConverter: Converts BifrostError to provider-specific streaming error format // // 2. Centralized Stream Processing: The GenericRouter handles all streaming logic: // - SSE header management // - Stream channel processing // - Error handling and conversion // - Response formatting and flushing // - Stream closure (handled automatically by provider implementation) // // 3. Provider-Specific Type Conversion: Integration types.go files only handle type conversion: // - Derive{Provider}StreamFromBifrostResponse: Convert responses to streaming format // - Derive{Provider}StreamFromBifrostError: Convert errors to streaming error format // // BENEFITS: // - Eliminates code duplication across provider-specific stream handlers // - Centralizes streaming logic for consistency and maintainability // - Separates concerns: routing logic vs type conversion // - Automatic stream closure management by provider implementations // - Consistent error handling across all providers // // USAGE EXAMPLE: // // routes := []RouteConfig{ // { // Path: "/openai/chat/completions", // Method: "POST", // // ... other configs ... // StreamConfig: &StreamConfig{ // ResponseConverter: func(resp *schemas.BifrostResponse) (interface{}, error) { // return DeriveOpenAIStreamFromBifrostResponse(resp), nil // }, // ErrorConverter: func(err *schemas.BifrostError) interface{} { // return DeriveOpenAIStreamFromBifrostError(err) // }, // }, // }, // } package integrations import ( "bytes" "context" "fmt" "io" "errors" "mime" "mime/multipart" "strconv" "strings" "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream" "github.com/bytedance/sonic" "github.com/fasthttp/router" bifrost "github.com/maximhq/bifrost/core" "github.com/maximhq/bifrost/core/providers/bedrock" "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/framework/logstore" "github.com/maximhq/bifrost/transports/bifrost-http/lib" "github.com/valyala/fasthttp" ) // ExtensionRouter defines the interface that all integration routers must implement // to register their routes with the main HTTP router. type ExtensionRouter interface { RegisterRoutes(r *router.Router, middlewares ...schemas.BifrostHTTPMiddleware) } // StreamingRequest interface for requests that support streaming type StreamingRequest interface { IsStreamingRequested() bool } // RequestWithSettableExtraParams is implemented by request types that accept // provider-specific extra parameters via the extra_params JSON key. The // integration router extracts extra_params from the raw request body and // passes them through so they propagate to the downstream provider. type RequestWithSettableExtraParams interface { SetExtraParams(params map[string]interface{}) } // BatchRequest wraps a Bifrost batch request with its type information. type BatchRequest struct { Type schemas.RequestType CreateRequest *schemas.BifrostBatchCreateRequest ListRequest *schemas.BifrostBatchListRequest RetrieveRequest *schemas.BifrostBatchRetrieveRequest CancelRequest *schemas.BifrostBatchCancelRequest DeleteRequest *schemas.BifrostBatchDeleteRequest ResultsRequest *schemas.BifrostBatchResultsRequest } // FileRequest wraps a Bifrost file request with its type information. type FileRequest struct { Type schemas.RequestType UploadRequest *schemas.BifrostFileUploadRequest ListRequest *schemas.BifrostFileListRequest RetrieveRequest *schemas.BifrostFileRetrieveRequest DeleteRequest *schemas.BifrostFileDeleteRequest ContentRequest *schemas.BifrostFileContentRequest } // ContainerRequest wraps a Bifrost container request with its type information. type ContainerRequest struct { Type schemas.RequestType CreateRequest *schemas.BifrostContainerCreateRequest ListRequest *schemas.BifrostContainerListRequest RetrieveRequest *schemas.BifrostContainerRetrieveRequest DeleteRequest *schemas.BifrostContainerDeleteRequest } // ContainerFileRequest is a wrapper for Bifrost container file requests. type ContainerFileRequest struct { Type schemas.RequestType CreateRequest *schemas.BifrostContainerFileCreateRequest ListRequest *schemas.BifrostContainerFileListRequest RetrieveRequest *schemas.BifrostContainerFileRetrieveRequest ContentRequest *schemas.BifrostContainerFileContentRequest DeleteRequest *schemas.BifrostContainerFileDeleteRequest } // BatchRequestConverter is a function that converts integration-specific batch requests to Bifrost format. type BatchRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*BatchRequest, error) // FileRequestConverter is a function that converts integration-specific file requests to Bifrost format. type FileRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*FileRequest, error) // ContainerRequestConverter is a function that converts integration-specific container requests to Bifrost format. type ContainerRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*ContainerRequest, error) // ContainerFileRequestConverter is a function that converts integration-specific container file requests to Bifrost format. type ContainerFileRequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*ContainerFileRequest, error) // RequestConverter is a function that converts integration-specific requests to Bifrost format. // It takes the parsed request object and returns a BifrostRequest ready for processing. type RequestConverter func(ctx *schemas.BifrostContext, req interface{}) (*schemas.BifrostRequest, error) // ListModelsResponseConverter is a function that converts BifrostListModelsResponse to integration-specific format. // It takes a BifrostListModelsResponse and returns the format expected by the specific integration. type ListModelsResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostListModelsResponse) (interface{}, error) // TextResponseConverter is a function that converts BifrostTextCompletionResponse to integration-specific format. // It takes a BifrostTextCompletionResponse and returns the format expected by the specific integration. type TextResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTextCompletionResponse) (interface{}, error) // ChatResponseConverter is a function that converts BifrostChatResponse to integration-specific format. // It takes a BifrostChatResponse and returns the format expected by the specific integration. type ChatResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostChatResponse) (interface{}, error) // AsyncChatResponseConverter is a function that converts an async job response to an integration-specific format. // It takes an async job response and a method to convert the chat response, and returns the integration-specific format, extra headers, and an error. type AsyncChatResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.AsyncJobResponse, chatResponseConverter ChatResponseConverter) (interface{}, map[string]string, error) // ResponsesResponseConverter is a function that converts BifrostResponsesResponse to integration-specific format. // It takes a BifrostResponsesResponse and returns the format expected by the specific integration. type ResponsesResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostResponsesResponse) (interface{}, error) // AsyncResponsesResponseConverter is a function that converts an async job response to an integration-specific format. // It takes an async job response and a method to convert the responses response, and returns the integration-specific format, extra headers, and an error. type AsyncResponsesResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.AsyncJobResponse, responsesResponseConverter ResponsesResponseConverter) (interface{}, map[string]string, error) // EmbeddingResponseConverter is a function that converts BifrostEmbeddingResponse to integration-specific format. // It takes a BifrostEmbeddingResponse and returns the format expected by the specific integration. type EmbeddingResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostEmbeddingResponse) (interface{}, error) // RerankResponseConverter is a function that converts BifrostRerankResponse to integration-specific format. // It takes a BifrostRerankResponse and returns the format expected by the specific integration. type RerankResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostRerankResponse) (interface{}, error) // OCRResponseConverter is a function that converts BifrostOCRResponse to integration-specific format. // It takes a BifrostOCRResponse and returns the format expected by the specific integration. type OCRResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostOCRResponse) (interface{}, error) // SpeechResponseConverter is a function that converts BifrostSpeechResponse to integration-specific format. // It takes a BifrostSpeechResponse and returns the format expected by the specific integration. type SpeechResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostSpeechResponse) (interface{}, error) // TranscriptionResponseConverter is a function that converts BifrostTranscriptionResponse to integration-specific format. // It takes a BifrostTranscriptionResponse and returns the format expected by the specific integration. type TranscriptionResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTranscriptionResponse) (interface{}, error) // BatchCreateResponseConverter is a function that converts BifrostBatchCreateResponse to integration-specific format. // It takes a BifrostBatchCreateResponse and returns the format expected by the specific integration. type BatchCreateResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchCreateResponse) (interface{}, error) // BatchListResponseConverter is a function that converts BifrostBatchListResponse to integration-specific format. // It takes a BifrostBatchListResponse and returns the format expected by the specific integration. type BatchListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchListResponse) (interface{}, error) // BatchRetrieveResponseConverter is a function that converts BifrostBatchRetrieveResponse to integration-specific format. // It takes a BifrostBatchRetrieveResponse and returns the format expected by the specific integration. type BatchRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchRetrieveResponse) (interface{}, error) // BatchCancelResponseConverter is a function that converts BifrostBatchCancelResponse to integration-specific format. // It takes a BifrostBatchCancelResponse and returns the format expected by the specific integration. type BatchCancelResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchCancelResponse) (interface{}, error) // BatchResultsResponseConverter is a function that converts BifrostBatchResultsResponse to integration-specific format. // It takes a BifrostBatchResultsResponse and returns the format expected by the specific integration. type BatchResultsResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchResultsResponse) (interface{}, error) // BatchDeleteResponseConverter is a function that converts BifrostBatchDeleteResponse to integration-specific format. // It takes a BifrostBatchDeleteResponse and returns the format expected by the specific integration. type BatchDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchDeleteResponse) (interface{}, error) // FileUploadResponseConverter is a function that converts BifrostFileUploadResponse to integration-specific format. // It takes a BifrostFileUploadResponse and returns the format expected by the specific integration. type FileUploadResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileUploadResponse) (interface{}, error) // FileListResponseConverter is a function that converts BifrostFileListResponse to integration-specific format. // It takes a BifrostFileListResponse and returns the format expected by the specific integration. type FileListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileListResponse) (interface{}, error) // FileRetrieveResponseConverter is a function that converts BifrostFileRetrieveResponse to integration-specific format. // It takes a BifrostFileRetrieveResponse and returns the format expected by the specific integration. type FileRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileRetrieveResponse) (interface{}, error) // FileDeleteResponseConverter is a function that converts BifrostFileDeleteResponse to integration-specific format. // It takes a BifrostFileDeleteResponse and returns the format expected by the specific integration. type FileDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileDeleteResponse) (interface{}, error) // FileContentResponseConverter is a function that converts BifrostFileContentResponse to integration-specific format. // It takes a BifrostFileContentResponse and returns the format expected by the specific integration. // Note: This may return binary data or a wrapper object depending on the integration. type FileContentResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileContentResponse) (interface{}, error) // ContainerCreateResponseConverter is a function that converts BifrostContainerCreateResponse to integration-specific format. // It takes a BifrostContainerCreateResponse and returns the format expected by the specific integration. type ContainerCreateResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerCreateResponse) (interface{}, error) // ContainerListResponseConverter is a function that converts BifrostContainerListResponse to integration-specific format. // It takes a BifrostContainerListResponse and returns the format expected by the specific integration. type ContainerListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerListResponse) (interface{}, error) // ContainerRetrieveResponseConverter is a function that converts BifrostContainerRetrieveResponse to integration-specific format. // It takes a BifrostContainerRetrieveResponse and returns the format expected by the specific integration. type ContainerRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerRetrieveResponse) (interface{}, error) // ContainerDeleteResponseConverter is a function that converts BifrostContainerDeleteResponse to integration-specific format. // It takes a BifrostContainerDeleteResponse and returns the format expected by the specific integration. type ContainerDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerDeleteResponse) (interface{}, error) // ContainerFileCreateResponseConverter is a function that converts BifrostContainerFileCreateResponse to integration-specific format. // It takes a BifrostContainerFileCreateResponse and returns the format expected by the specific integration. type ContainerFileCreateResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileCreateResponse) (interface{}, error) // ContainerFileListResponseConverter is a function that converts BifrostContainerFileListResponse to integration-specific format. // It takes a BifrostContainerFileListResponse and returns the format expected by the specific integration. type ContainerFileListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileListResponse) (interface{}, error) // ContainerFileRetrieveResponseConverter is a function that converts BifrostContainerFileRetrieveResponse to integration-specific format. // It takes a BifrostContainerFileRetrieveResponse and returns the format expected by the specific integration. type ContainerFileRetrieveResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileRetrieveResponse) (interface{}, error) // ContainerFileContentResponseConverter is a function that converts BifrostContainerFileContentResponse to integration-specific format. // It takes a BifrostContainerFileContentResponse and returns the format expected by the specific integration. type ContainerFileContentResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileContentResponse) (interface{}, error) // ContainerFileDeleteResponseConverter is a function that converts BifrostContainerFileDeleteResponse to integration-specific format. // It takes a BifrostContainerFileDeleteResponse and returns the format expected by the specific integration. type ContainerFileDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostContainerFileDeleteResponse) (interface{}, error) // CountTokensResponseConverter is a function that converts BifrostCountTokensResponse to integration-specific format. // It takes a BifrostCountTokensResponse and returns the format expected by the specific integration. type CountTokensResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostCountTokensResponse) (interface{}, error) // TextStreamResponseConverter is a function that converts BifrostTextCompletionResponse to integration-specific streaming format. // It takes a BifrostTextCompletionResponse and returns the event type and the streaming format expected by the specific integration. type TextStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTextCompletionResponse) (string, interface{}, error) // ChatStreamResponseConverter is a function that converts BifrostChatResponse to integration-specific streaming format. // It takes a BifrostChatResponse and returns the event type and the streaming format expected by the specific integration. type ChatStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostChatResponse) (string, interface{}, error) // ResponsesStreamResponseConverter is a function that converts BifrostResponsesStreamResponse to integration-specific streaming format. // It takes a BifrostResponsesStreamResponse and returns a single event type and payload, which can itself encode one or more SSE events if needed by the integration. type ResponsesStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostResponsesStreamResponse) (string, interface{}, error) // SpeechStreamResponseConverter is a function that converts BifrostSpeechStreamResponse to integration-specific streaming format. // It takes a BifrostSpeechStreamResponse and returns the event type and the streaming format expected by the specific integration. type SpeechStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostSpeechStreamResponse) (string, interface{}, error) // TranscriptionStreamResponseConverter is a function that converts BifrostTranscriptionStreamResponse to integration-specific streaming format. // It takes a BifrostTranscriptionStreamResponse and returns the event type and the streaming format expected by the specific integration. type TranscriptionStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostTranscriptionStreamResponse) (string, interface{}, error) // ImageGenerationResponseConverter is a function that converts BifrostImageGenerationResponse to integration-specific format. // It takes a BifrostImageGenerationResponse and returns the format expected by the specific integration. type ImageGenerationResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostImageGenerationResponse) (interface{}, error) // ImageGenerationStreamResponseConverter is a function that converts BifrostImageGenerationStreamResponse to integration-specific streaming format. // It takes a BifrostImageGenerationStreamResponse and returns the event type and the streaming format expected by the specific integration. type ImageGenerationStreamResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostImageGenerationStreamResponse) (string, interface{}, error) // ImageEditResponseConverter is a function that converts BifrostImageGenerationResponse to integration-specific format. // It takes a BifrostImageGenerationResponse and returns the format expected by the specific integration. type ImageEditResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostImageGenerationResponse) (interface{}, error) // VideoGenerationResponseConverter is a function that converts BifrostVideoGenerationResponse to integration-specific format. // It takes a BifrostVideoGenerationResponse and returns the format expected by the specific integration. type VideoGenerationResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoGenerationResponse) (interface{}, error) // VideoDownloadResponseConverter is a function that converts BifrostVideoDownloadResponse to integration-specific format. // It takes a BifrostVideoDownloadResponse and returns the format expected by the specific integration. type VideoDownloadResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoDownloadResponse) (interface{}, error) // VideoRetrieveAsDownloadConverter is a function that converts BifrostVideoGenerationResponse to integration-specific format. // It takes a BifrostVideoGenerationResponse and returns the format expected by the specific integration. type VideoRetrieveAsDownloadConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoGenerationResponse) (interface{}, error) // VideoDeleteResponseConverter is a function that converts BifrostVideoDeleteResponse to integration-specific format. // It takes a BifrostVideoDeleteResponse and returns the format expected by the specific integration. type VideoDeleteResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoDeleteResponse) (interface{}, error) // VideoListResponseConverter is a function that converts BifrostVideoListResponse to integration-specific format. // It takes a BifrostVideoListResponse and returns the format expected by the specific integration. type VideoListResponseConverter func(ctx *schemas.BifrostContext, resp *schemas.BifrostVideoListResponse) (interface{}, error) // ErrorConverter is a function that converts BifrostError to integration-specific format. // It takes a BifrostError and returns the format expected by the specific integration. type ErrorConverter func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} // StreamErrorConverter is a function that converts BifrostError to integration-specific streaming error format. // It takes a BifrostError and returns the streaming error format expected by the specific integration. type StreamErrorConverter func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} // RequestParser is a function that handles custom request body parsing. // It replaces the default JSON parsing when configured (e.g., for multipart/form-data). // The parser should populate the provided request object from the fasthttp context. // If it returns an error, the request processing stops. type RequestParser func(ctx *fasthttp.RequestCtx, req interface{}) error // PreRequestCallback is called after parsing the request but before processing through Bifrost. // It can be used to modify the request object (e.g., extract model from URL parameters) // or perform validation. If it returns an error, the request processing stops. // It can also modify the bifrost context based on the request context before it is given to Bifrost. type PreRequestCallback func(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error // PostRequestCallback is called after processing the request but before sending the response. // It can be used to modify the response or perform additional logging/metrics. // If it returns an error, an error response is sent instead of the success response. type PostRequestCallback func(ctx *fasthttp.RequestCtx, req interface{}, resp interface{}) error // HTTPRequestTypeGetter is a function type that accepts only a *fasthttp.RequestCtx and // returns a schemas.RequestType indicating the HTTP request type derived from the context. type HTTPRequestTypeGetter func(ctx *fasthttp.RequestCtx) schemas.RequestType // ShortCircuit is a function that determines if the request should be short-circuited. type ShortCircuit func(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) (bool, error) // StreamConfig defines streaming-specific configuration for an integration // // SSE FORMAT BEHAVIOR: // // The ResponseConverter and ErrorConverter functions in StreamConfig can return either: // // 1. OBJECTS (interface{} that's not a string): // - Will be JSON marshaled and sent as standard SSE: data: {json}\n\n // - Use this for most providers (OpenAI, Google, etc.) // - Example: return map[string]interface{}{"delta": {"content": "hello"}} // - Result: data: {"delta":{"content":"hello"}}\n\n // // 2. STRINGS: // - Will be sent directly as-is without any modification // - Use this for providers requiring custom SSE event types (Anthropic, etc.) // - Example: return "event: content_block_delta\ndata: {\"type\":\"text\"}\n\n" // - Result: event: content_block_delta // data: {"type":"text"} // // Choose the appropriate return type based on your provider's SSE specification. type StreamConfig struct { TextStreamResponseConverter TextStreamResponseConverter // Function to convert BifrostTextCompletionResponse to streaming format ChatStreamResponseConverter ChatStreamResponseConverter // Function to convert BifrostChatResponse to streaming format ResponsesStreamResponseConverter ResponsesStreamResponseConverter // Function to convert BifrostResponsesResponse to streaming format SpeechStreamResponseConverter SpeechStreamResponseConverter // Function to convert BifrostSpeechResponse to streaming format TranscriptionStreamResponseConverter TranscriptionStreamResponseConverter // Function to convert BifrostTranscriptionResponse to streaming format ImageGenerationStreamResponseConverter ImageGenerationStreamResponseConverter // Function to convert BifrostImageGenerationStreamResponse to streaming format ErrorConverter StreamErrorConverter // Function to convert BifrostError to streaming error format } type RouteConfigType string const ( RouteConfigTypeOpenAI RouteConfigType = "openai" RouteConfigTypeAnthropic RouteConfigType = "anthropic" RouteConfigTypeGenAI RouteConfigType = "genai" RouteConfigTypeBedrock RouteConfigType = "bedrock" RouteConfigTypeCohere RouteConfigType = "cohere" ) // RouteConfig defines the configuration for a single route in an integration. // It specifies the path, method, and handlers for request/response conversion. type RouteConfig struct { Type RouteConfigType // Type of the route Path string // HTTP path pattern (e.g., "/openai/v1/chat/completions") Method string // HTTP method (POST, GET, PUT, DELETE) GetHTTPRequestType HTTPRequestTypeGetter // Function to get the HTTP request type from the context (SHOULD NOT BE NIL) GetRequestTypeInstance func(ctx context.Context) interface{} // Factory function to create request instance (SHOULD NOT BE NIL) RequestParser RequestParser // Optional: custom request parsing (e.g., multipart/form-data) RequestConverter RequestConverter // Function to convert request to BifrostRequest (for inference requests) BatchRequestConverter BatchRequestConverter // Function to convert request to BatchRequest (for batch operations) FileRequestConverter FileRequestConverter // Function to convert request to FileRequest (for file operations) ContainerRequestConverter ContainerRequestConverter // Function to convert request to ContainerRequest (for container operations) ContainerFileRequestConverter ContainerFileRequestConverter // Function to convert request to ContainerFileRequest (for container file operations) ListModelsResponseConverter ListModelsResponseConverter // Function to convert BifrostListModelsResponse to integration format (SHOULD NOT BE NIL) TextResponseConverter TextResponseConverter // Function to convert BifrostTextCompletionResponse to integration format (SHOULD NOT BE NIL) ChatResponseConverter ChatResponseConverter // Function to convert BifrostChatResponse to integration format (SHOULD NOT BE NIL) AsyncChatResponseConverter AsyncChatResponseConverter // Function to convert AsyncJobResponse to integration format (SHOULD NOT BE NIL) ResponsesResponseConverter ResponsesResponseConverter // Function to convert BifrostResponsesResponse to integration format (SHOULD NOT BE NIL) AsyncResponsesResponseConverter AsyncResponsesResponseConverter // Function to convert AsyncJobResponse to integration format (SHOULD NOT BE NIL) EmbeddingResponseConverter EmbeddingResponseConverter // Function to convert BifrostEmbeddingResponse to integration format (SHOULD NOT BE NIL) RerankResponseConverter RerankResponseConverter // Function to convert BifrostRerankResponse to integration format OCRResponseConverter OCRResponseConverter // Function to convert BifrostOCRResponse to integration format SpeechResponseConverter SpeechResponseConverter // Function to convert BifrostSpeechResponse to integration format (SHOULD NOT BE NIL) TranscriptionResponseConverter TranscriptionResponseConverter // Function to convert BifrostTranscriptionResponse to integration format (SHOULD NOT BE NIL) ImageGenerationResponseConverter ImageGenerationResponseConverter // Function to convert BifrostImageGenerationResponse to integration format (SHOULD NOT BE NIL) VideoGenerationResponseConverter VideoGenerationResponseConverter // Function to convert BifrostVideoGenerationResponse to integration format (SHOULD NOT BE NIL) VideoDownloadResponseConverter VideoDownloadResponseConverter // Function to convert BifrostVideoDownloadResponse to integration format (SHOULD NOT BE NIL) VideoDeleteResponseConverter VideoDeleteResponseConverter // Function to convert BifrostVideoDeleteResponse to integration format (SHOULD NOT BE NIL) VideoListResponseConverter VideoListResponseConverter // Function to convert BifrostVideoListResponse to integration format (SHOULD NOT BE NIL) BatchCreateResponseConverter BatchCreateResponseConverter // Function to convert BifrostBatchCreateResponse to integration format BatchListResponseConverter BatchListResponseConverter // Function to convert BifrostBatchListResponse to integration format BatchRetrieveResponseConverter BatchRetrieveResponseConverter // Function to convert BifrostBatchRetrieveResponse to integration format BatchCancelResponseConverter BatchCancelResponseConverter // Function to convert BifrostBatchCancelResponse to integration format BatchDeleteResponseConverter BatchDeleteResponseConverter // Function to convert BifrostBatchDeleteResponse to integration format BatchResultsResponseConverter BatchResultsResponseConverter // Function to convert BifrostBatchResultsResponse to integration format FileUploadResponseConverter FileUploadResponseConverter // Function to convert BifrostFileUploadResponse to integration format FileListResponseConverter FileListResponseConverter // Function to convert BifrostFileListResponse to integration format FileRetrieveResponseConverter FileRetrieveResponseConverter // Function to convert BifrostFileRetrieveResponse to integration format FileDeleteResponseConverter FileDeleteResponseConverter // Function to convert BifrostFileDeleteResponse to integration format FileContentResponseConverter FileContentResponseConverter // Function to convert BifrostFileContentResponse to integration format ContainerCreateResponseConverter ContainerCreateResponseConverter // Function to convert BifrostContainerCreateResponse to integration format ContainerListResponseConverter ContainerListResponseConverter // Function to convert BifrostContainerListResponse to integration format ContainerRetrieveResponseConverter ContainerRetrieveResponseConverter // Function to convert BifrostContainerRetrieveResponse to integration format ContainerDeleteResponseConverter ContainerDeleteResponseConverter // Function to convert BifrostContainerDeleteResponse to integration format ContainerFileCreateResponseConverter ContainerFileCreateResponseConverter // Function to convert BifrostContainerFileCreateResponse to integration format ContainerFileListResponseConverter ContainerFileListResponseConverter // Function to convert BifrostContainerFileListResponse to integration format ContainerFileRetrieveResponseConverter ContainerFileRetrieveResponseConverter // Function to convert BifrostContainerFileRetrieveResponse to integration format ContainerFileContentResponseConverter ContainerFileContentResponseConverter // Function to convert BifrostContainerFileContentResponse to integration format ContainerFileDeleteResponseConverter ContainerFileDeleteResponseConverter // Function to convert BifrostContainerFileDeleteResponse to integration format CountTokensResponseConverter CountTokensResponseConverter // Function to convert BifrostCountTokensResponse to integration format ErrorConverter ErrorConverter // Function to convert BifrostError to integration format (SHOULD NOT BE NIL) StreamConfig *StreamConfig // Optional: Streaming configuration (if nil, streaming not supported) PreCallback PreRequestCallback // Optional: called after parsing but before Bifrost processing PostCallback PostRequestCallback // Optional: called after request processing ShortCircuit ShortCircuit } type PassthroughConfig struct { Provider schemas.ModelProvider // which provider's key pool to draw from ProviderDetector func(ctx *fasthttp.RequestCtx, model string) schemas.ModelProvider // optional: dynamic provider detection StripPrefix []string // e.g. "/openai" — stripped before forwarding } // LargePayloadHook is called before body parsing to detect and set up large payload streaming. // If it returns skipBodyParse=true, the router skips JSON parsing of the request body. // The hook is responsible for setting all relevant context keys (BifrostContextKeyLargePayloadMode, // BifrostContextKeyLargePayloadReader, BifrostContextKeyLargePayloadContentLength, // BifrostContextKeyLargePayloadMetadata) when activating large payload mode. type LargePayloadHook func( ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, routeType RouteConfigType, ) (skipBodyParse bool, err error) // LargeResponseHook is called before streaming a large response body to the client. // Enterprise uses this to wrap the response reader with Phase B scanning (e.g., usage extraction // from the full response stream when usage is beyond the Phase A prefetch window). // The hook receives the bifrost context with BifrostContextKeyLargeResponseReader already set // and may replace the reader on context with a wrapped version. type LargeResponseHook func( ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, ) // GenericRouter provides a reusable router implementation for all integrations. // It handles the common flow of: parse request → convert to Bifrost → execute → convert response. // Integration-specific logic is handled through the RouteConfig callbacks and converters. type GenericRouter struct { client *bifrost.Bifrost // Bifrost client for executing requests handlerStore lib.HandlerStore // Config provider for the router routes []RouteConfig // List of route configurations passthroughCfg *PassthroughConfig logger schemas.Logger // Logger for the router largePayloadHook LargePayloadHook // Optional: enterprise hook for large payload detection largeResponseHook LargeResponseHook // Optional: enterprise hook for large response scanning } // SetLargePayloadHook sets the hook for large payload detection and streaming. // This is used by enterprise to inject large payload optimization without // embedding the logic in the OSS router. func (g *GenericRouter) SetLargePayloadHook(hook LargePayloadHook) { g.largePayloadHook = hook } // SetLargeResponseHook sets the hook for large response scanning. // Enterprise uses this to inject Phase B usage extraction into the response stream // without embedding scanning logic in the OSS router. func (g *GenericRouter) SetLargeResponseHook(hook LargeResponseHook) { g.largeResponseHook = hook } // NewGenericRouter creates a new generic router with the given bifrost client and route configurations. // Each integration should create their own routes and pass them to this constructor. func NewGenericRouter(client *bifrost.Bifrost, handlerStore lib.HandlerStore, routes []RouteConfig, passthroughCfg *PassthroughConfig, logger schemas.Logger) *GenericRouter { return &GenericRouter{ client: client, handlerStore: handlerStore, routes: routes, passthroughCfg: passthroughCfg, logger: logger, } } // RegisterRoutes registers all configured routes on the given fasthttp router. // This method implements the ExtensionRouter interface. func (g *GenericRouter) RegisterRoutes(r *router.Router, middlewares ...schemas.BifrostHTTPMiddleware) { for _, route := range g.routes { // Validate route configuration at startup to fail fast method := strings.ToUpper(route.Method) if route.GetRequestTypeInstance == nil { g.logger.Warn("route configuration is invalid: GetRequestTypeInstance cannot be nil for route " + route.Path) continue } // Test that GetRequestTypeInstance returns a valid instance if testInstance := route.GetRequestTypeInstance(context.Background()); testInstance == nil { g.logger.Warn("route configuration is invalid: GetRequestTypeInstance returned nil for route " + route.Path) continue } // Determine route type: inference, batch, file, container, or container file isBatchRoute := route.BatchRequestConverter != nil isFileRoute := route.FileRequestConverter != nil isContainerRoute := route.ContainerRequestConverter != nil isContainerFileRoute := route.ContainerFileRequestConverter != nil isInferenceRoute := !isBatchRoute && !isFileRoute && !isContainerRoute && !isContainerFileRoute // For inference routes, require RequestConverter if isInferenceRoute && route.RequestConverter == nil { g.logger.Warn("route configuration is invalid: RequestConverter cannot be nil for inference route " + route.Path) continue } if route.ErrorConverter == nil { g.logger.Warn("route configuration is invalid: ErrorConverter cannot be nil for route " + route.Path) continue } registerRequestTypeMiddleware := func(next fasthttp.RequestHandler) fasthttp.RequestHandler { return func(ctx *fasthttp.RequestCtx) { if route.GetHTTPRequestType != nil { ctx.SetUserValue(schemas.BifrostContextKeyHTTPRequestType, route.GetHTTPRequestType(ctx)) } next(ctx) } } // Create a fresh middlewares list for this route (don't mutate the original) // This ensures each route only has its own middleware plus the originally passed middlewares routeMiddlewares := append([]schemas.BifrostHTTPMiddleware{registerRequestTypeMiddleware}, middlewares...) handler := g.createHandler(route) switch method { case fasthttp.MethodPost: r.POST(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...)) case fasthttp.MethodGet: r.GET(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...)) case fasthttp.MethodPut: r.PUT(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...)) case fasthttp.MethodDelete: r.DELETE(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...)) case fasthttp.MethodHead: r.HEAD(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...)) default: r.POST(route.Path, lib.ChainMiddlewares(handler, routeMiddlewares...)) // Default to POST } } if g.passthroughCfg != nil { catchAll := lib.ChainMiddlewares(g.handlePassthrough, middlewares...) // Register for all methods that need forwarding for _, method := range []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodPut, fasthttp.MethodDelete, fasthttp.MethodPatch, fasthttp.MethodHead} { for _, prefix := range g.passthroughCfg.StripPrefix { r.Handle(method, prefix+"/{path:*}", catchAll) } } } } // createHandler creates a fasthttp handler for the given route configuration. // The handler follows this flow: // 1. Parse JSON request body into the configured request type (for methods that expect bodies) // 2. Execute pre-callback (if configured) for request modification/validation // 3. Convert request to BifrostRequest using the configured converter // 4. Execute the request through Bifrost (streaming or non-streaming) // 5. Execute post-callback (if configured) for response modification // 6. Convert and send the response using the configured response converter func (g *GenericRouter) createHandler(config RouteConfig) fasthttp.RequestHandler { return func(ctx *fasthttp.RequestCtx) { method := string(ctx.Method()) // Parse request body into the integration-specific request type // Note: config validation is performed at startup in RegisterRoutes req := config.GetRequestTypeInstance(ctx) var rawBody []byte // Execute the request through Bifrost bifrostCtx, cancel := lib.ConvertToBifrostContext(ctx, g.handlerStore.ShouldAllowDirectKeys(), g.handlerStore.GetHeaderMatcher(), g.handlerStore.GetMCPHeaderCombinedAllowlist()) // Set integration type to context bifrostCtx.SetValue(schemas.BifrostContextKeyIntegrationType, string(config.Type)) // Set available providers to context availableProviders := g.handlerStore.GetAvailableProviders() bifrostCtx.SetValue(schemas.BifrostContextKeyAvailableProviders, availableProviders) // Async retrieve: check x-bf-async-id header early (before body parsing) if asyncID := string(ctx.Request.Header.Peek(schemas.AsyncHeaderGetID)); asyncID != "" { defer cancel() g.handleAsyncRetrieve(ctx, config, bifrostCtx) return } // Parse request body based on configuration if method != fasthttp.MethodGet && method != fasthttp.MethodHead { // Hook executes before JSON parsing so large requests can remain streaming. isLargePayload := false if g.largePayloadHook != nil { var err error isLargePayload, err = g.largePayloadHook(ctx, bifrostCtx, config.Type) if err != nil { cancel() g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "large payload detection failed")) return } } if isLargePayload { // Large payload mode: body streams directly to provider via // BifrostContextKeyLargePayloadReader. Skip all body parsing // (JSON and multipart) — metadata was already extracted by the hook. } else if config.RequestParser != nil { // Use custom parser (e.g., for multipart/form-data) if err := config.RequestParser(ctx, req); err != nil { cancel() g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to parse request")) return } } else { // Use default JSON parsing rawBody = ctx.Request.Body() if len(rawBody) > 0 { if err := sonic.Unmarshal(rawBody, req); err != nil { cancel() g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "Invalid JSON")) return } } } // Extract the "extra_params" JSON key when passthrough is // explicitly enabled via x-bf-passthrough-extra-params: true. // Provider-specific fields (e.g. Bedrock guardrailConfig) // must be nested under "extra_params" in the request body. // Runs after both RequestParser and default JSON paths. if !isLargePayload && bifrostCtx.Value(schemas.BifrostContextKeyPassthroughExtraParams) == true { if rws, ok := req.(RequestWithSettableExtraParams); ok { if rawBody == nil { rawBody = ctx.Request.Body() } if len(rawBody) > 0 { var wrapper struct { ExtraParams map[string]interface{} `json:"extra_params"` } if err := sonic.Unmarshal(rawBody, &wrapper); err == nil && len(wrapper.ExtraParams) > 0 { rws.SetExtraParams(wrapper.ExtraParams) } } } } } // Execute pre-request callback if configured // This is typically used for extracting data from URL parameters // or performing request validation after parsing if config.PreCallback != nil { if err := config.PreCallback(ctx, bifrostCtx, req); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute pre-request callback: "+err.Error())) return } } // Execute short-circuit handler if configured. // If it returns handled=true the callback has already written a response // to ctx and we return immediately, bypassing the Bifrost flow entirely. if config.ShortCircuit != nil { handled, err := config.ShortCircuit(ctx, bifrostCtx, req) if err != nil { defer cancel() g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "short-circuit handler error: "+err.Error())) return } if handled { defer cancel() return } } // Set direct key from context if available if ctx.UserValue(string(schemas.BifrostContextKeyDirectKey)) != nil { key, ok := ctx.UserValue(string(schemas.BifrostContextKeyDirectKey)).(schemas.Key) if ok { bifrostCtx.SetValue(schemas.BifrostContextKeyDirectKey, key) } } // Handle batch requests if BatchRequestConverter is set // GenAI has two cases: (1) Dedicated batch routes (list/retrieve) have only BatchRequestConverter — always use batch path. // (2) The models path has both BatchRequestConverter and RequestConverter — use batch path only for batch create. isGenAIBatchCreate := config.Type == RouteConfigTypeGenAI && bifrostCtx.Value(isGeminiBatchCreateRequestContextKey) != nil useBatchPath := config.BatchRequestConverter != nil && (config.RequestConverter == nil || config.Type != RouteConfigTypeGenAI || isGenAIBatchCreate) if useBatchPath { defer cancel() batchReq, err := config.BatchRequestConverter(bifrostCtx, req) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert batch request")) return } if batchReq == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch request")) return } g.handleBatchRequest(ctx, config, req, batchReq, bifrostCtx) return } // Handle file requests if FileRequestConverter is set if config.FileRequestConverter != nil { defer cancel() fileReq, err := config.FileRequestConverter(bifrostCtx, req) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file request")) return } if fileReq == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file request")) return } g.handleFileRequest(ctx, config, req, fileReq, bifrostCtx) return } // Handle container requests if ContainerRequestConverter is set if config.ContainerRequestConverter != nil { defer cancel() containerReq, err := config.ContainerRequestConverter(bifrostCtx, req) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container request")) return } if containerReq == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container request")) return } g.handleContainerRequest(ctx, config, req, containerReq, bifrostCtx) return } // Handle container file requests if ContainerFileRequestConverter is set if config.ContainerFileRequestConverter != nil { defer cancel() containerFileReq, err := config.ContainerFileRequestConverter(bifrostCtx, req) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container file request")) return } if containerFileReq == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file request")) return } g.handleContainerFileRequest(ctx, config, req, containerFileReq, bifrostCtx) return } // Convert the integration-specific request to Bifrost format (inference requests) bifrostReq, err := config.RequestConverter(bifrostCtx, req) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert request to Bifrost format")) return } if bifrostReq == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid request")) return } if sendRawRequestBody, ok := (*bifrostCtx).Value(schemas.BifrostContextKeyUseRawRequestBody).(bool); ok && sendRawRequestBody { bifrostReq.SetRawRequestBody(rawBody) } // Extract and parse fallbacks from the request if present if err := g.extractAndParseFallbacks(req, bifrostReq); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to parse fallbacks: "+err.Error())) return } // Async create: check x-bf-async header (needs parsed bifrostReq) if string(ctx.Request.Header.Peek(schemas.AsyncHeaderCreate)) != "" { defer cancel() g.handleAsyncCreate(ctx, config, req, bifrostReq, bifrostCtx) return } // Check if streaming is requested isStreaming := false if streamingReq, ok := req.(StreamingRequest); ok { isStreaming = streamingReq.IsStreamingRequested() } if isStreaming { g.handleStreamingRequest(ctx, config, bifrostReq, bifrostCtx, cancel) } else { defer cancel() // Ensure cleanup on function exit g.handleNonStreamingRequest(ctx, config, req, bifrostReq, bifrostCtx) } } } // handleNonStreamingRequest handles regular (non-streaming) requests func (g *GenericRouter) handleNonStreamingRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, bifrostReq *schemas.BifrostRequest, bifrostCtx *schemas.BifrostContext) { // Use the cancellable context from ConvertToBifrostContext // While we can't detect client disconnects until we try to write, having a cancellable context // allows providers that check ctx.Done() to cancel early if needed. This is less critical than // streaming requests (where we actively detect write errors), but still provides a mechanism // for providers to respect cancellation. var response interface{} var err error var providerResponseHeaders map[string]string switch { case bifrostReq.ListModelsRequest != nil: // Determine provider: explicit header overrides request field; otherwise // fall back to the request field and finally to list-all behavior. listModelsProvider := strings.ToLower(string(ctx.Request.Header.Peek("x-bf-model-provider"))) switch listModelsProvider { case "": // keep any provider already set on the request case "all": bifrostReq.ListModelsRequest.Provider = "" default: bifrostReq.ListModelsRequest.Provider = schemas.ModelProvider(listModelsProvider) } var listModelsResponse *schemas.BifrostListModelsResponse var bifrostErr *schemas.BifrostError if bifrostReq.ListModelsRequest.Provider != "" { listModelsResponse, bifrostErr = g.client.ListModelsRequest(bifrostCtx, bifrostReq.ListModelsRequest) } else { listModelsResponse, bifrostErr = g.client.ListAllModels(bifrostCtx, bifrostReq.ListModelsRequest) } if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, listModelsResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if listModelsResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } response, err = config.ListModelsResponseConverter(bifrostCtx, listModelsResponse) providerResponseHeaders = listModelsResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.TextCompletionRequest != nil: textCompletionResponse, bifrostErr := g.client.TextCompletionRequest(bifrostCtx, bifrostReq.TextCompletionRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, textCompletionResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if textCompletionResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } // Convert Bifrost response to integration-specific format and send response, err = config.TextResponseConverter(bifrostCtx, textCompletionResponse) providerResponseHeaders = textCompletionResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.ChatRequest != nil: chatResponse, bifrostErr := g.client.ChatCompletionRequest(bifrostCtx, bifrostReq.ChatRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, chatResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if chatResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } // Convert Bifrost response to integration-specific format and send response, err = config.ChatResponseConverter(bifrostCtx, chatResponse) providerResponseHeaders = chatResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.ResponsesRequest != nil: responsesResponse, bifrostErr := g.client.ResponsesRequest(bifrostCtx, bifrostReq.ResponsesRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, responsesResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if responsesResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } // Convert Bifrost response to integration-specific format and send response, err = config.ResponsesResponseConverter(bifrostCtx, responsesResponse) providerResponseHeaders = responsesResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.EmbeddingRequest != nil: embeddingResponse, bifrostErr := g.client.EmbeddingRequest(bifrostCtx, bifrostReq.EmbeddingRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, embeddingResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if embeddingResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } providerResponseHeaders = embeddingResponse.ExtraFields.ProviderResponseHeaders // Convert Bifrost response to integration-specific format and send response, err = config.EmbeddingResponseConverter(bifrostCtx, embeddingResponse) case bifrostReq.RerankRequest != nil: rerankResponse, bifrostErr := g.client.RerankRequest(bifrostCtx, bifrostReq.RerankRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, rerankResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if rerankResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } providerResponseHeaders = rerankResponse.ExtraFields.ProviderResponseHeaders if config.RerankResponseConverter != nil { response, err = config.RerankResponseConverter(bifrostCtx, rerankResponse) } else { response = rerankResponse } case bifrostReq.OCRRequest != nil: ocrResponse, bifrostErr := g.client.OCRRequest(bifrostCtx, bifrostReq.OCRRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, ocrResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if ocrResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "bifrost response is nil after post-request callback")) return } providerResponseHeaders = ocrResponse.ExtraFields.ProviderResponseHeaders if config.OCRResponseConverter != nil { response, err = config.OCRResponseConverter(bifrostCtx, ocrResponse) } else { response = ocrResponse } case bifrostReq.SpeechRequest != nil: speechResponse, bifrostErr := g.client.SpeechRequest(bifrostCtx, bifrostReq.SpeechRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, speechResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if speechResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } providerResponseHeaders = speechResponse.ExtraFields.ProviderResponseHeaders if g.tryStreamLargeResponse(ctx, bifrostCtx) { return } if config.SpeechResponseConverter != nil { response, err = config.SpeechResponseConverter(bifrostCtx, speechResponse) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert speech response")) return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) return } else { ctx.Response.Header.Set("Content-Type", "audio/mpeg") ctx.Response.Header.Set("Content-Disposition", "attachment; filename=speech.mp3") ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(speechResponse.Audio))) ctx.Response.SetBody(speechResponse.Audio) return } case bifrostReq.TranscriptionRequest != nil: transcriptionResponse, bifrostErr := g.client.TranscriptionRequest(bifrostCtx, bifrostReq.TranscriptionRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, transcriptionResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if transcriptionResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if g.tryStreamLargeResponse(ctx, bifrostCtx) { return } // Convert Bifrost response to integration-specific format and send response, err = config.TranscriptionResponseConverter(bifrostCtx, transcriptionResponse) providerResponseHeaders = transcriptionResponse.ExtraFields.ProviderResponseHeaders // If converter returns raw bytes, write directly with provider headers. // Used for plain-text transcription formats (text, srt, vtt). if err == nil { if rawBytes, ok := response.([]byte); ok { for key, value := range providerResponseHeaders { ctx.Response.Header.Set(key, value) } ctx.SetStatusCode(fasthttp.StatusOK) ctx.SetBody(rawBytes) return } } case bifrostReq.ImageGenerationRequest != nil: imageGenerationResponse, bifrostErr := g.client.ImageGenerationRequest(bifrostCtx, bifrostReq.ImageGenerationRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, imageGenerationResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if imageGenerationResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.ImageGenerationResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing ImageGenerationResponseConverter for integration")) return } if g.tryStreamLargeResponse(ctx, bifrostCtx) { return } // Convert Bifrost response to integration-specific format and send response, err = config.ImageGenerationResponseConverter(bifrostCtx, imageGenerationResponse) providerResponseHeaders = imageGenerationResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.ImageEditRequest != nil: imageEditResponse, bifrostErr := g.client.ImageEditRequest(bifrostCtx, bifrostReq.ImageEditRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, imageEditResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if imageEditResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.ImageGenerationResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing ImageGenerationResponseConverter for integration")) return } if g.tryStreamLargeResponse(ctx, bifrostCtx) { return } // Convert Bifrost response to integration-specific format and send response, err = config.ImageGenerationResponseConverter(bifrostCtx, imageEditResponse) providerResponseHeaders = imageEditResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.ImageVariationRequest != nil: imageVariationResponse, bifrostErr := g.client.ImageVariationRequest(bifrostCtx, bifrostReq.ImageVariationRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, imageVariationResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if imageVariationResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.ImageGenerationResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing ImageGenerationResponseConverter for integration")) return } if g.tryStreamLargeResponse(ctx, bifrostCtx) { return } // Convert Bifrost response to integration-specific format and send response, err = config.ImageGenerationResponseConverter(bifrostCtx, imageVariationResponse) providerResponseHeaders = imageVariationResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.VideoGenerationRequest != nil: videoGenerationResponse, bifrostErr := g.client.VideoGenerationRequest(bifrostCtx, bifrostReq.VideoGenerationRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, videoGenerationResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if videoGenerationResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.VideoGenerationResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoGenerationResponseConverter for integration")) return } response, err = config.VideoGenerationResponseConverter(bifrostCtx, videoGenerationResponse) providerResponseHeaders = videoGenerationResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.VideoRetrieveRequest != nil: videoRetrieveResponse, bifrostErr := g.client.VideoRetrieveRequest(bifrostCtx, bifrostReq.VideoRetrieveRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, videoRetrieveResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if videoRetrieveResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.VideoGenerationResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoGenerationResponseConverter for integration")) return } response, err = config.VideoGenerationResponseConverter(bifrostCtx, videoRetrieveResponse) providerResponseHeaders = videoRetrieveResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.VideoDownloadRequest != nil: videoDownloadResponse, bifrostErr := g.client.VideoDownloadRequest(bifrostCtx, bifrostReq.VideoDownloadRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, videoDownloadResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if videoDownloadResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.VideoDownloadResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoDownloadResponseConverter for integration")) return } response, err = config.VideoDownloadResponseConverter(bifrostCtx, videoDownloadResponse) providerResponseHeaders = videoDownloadResponse.ExtraFields.ProviderResponseHeaders // If converter returns binary content, write directly with content-type. if err == nil { if rawBytes, ok := response.([]byte); ok { contentType := videoDownloadResponse.ContentType if contentType == "" { contentType = "application/octet-stream" } ctx.Response.Header.Set("Content-Type", contentType) ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(rawBytes))) ctx.Response.SetBody(rawBytes) return } } case bifrostReq.VideoDeleteRequest != nil: videoDeleteResponse, bifrostErr := g.client.VideoDeleteRequest(bifrostCtx, bifrostReq.VideoDeleteRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, videoDeleteResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if videoDeleteResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.VideoDeleteResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoDeleteResponseConverter for integration")) return } response, err = config.VideoDeleteResponseConverter(bifrostCtx, videoDeleteResponse) providerResponseHeaders = videoDeleteResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.VideoRemixRequest != nil: videoRemixResponse, bifrostErr := g.client.VideoRemixRequest(bifrostCtx, bifrostReq.VideoRemixRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, videoRemixResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if videoRemixResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.VideoGenerationResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoGenerationResponseConverter for integration")) return } response, err = config.VideoGenerationResponseConverter(bifrostCtx, videoRemixResponse) providerResponseHeaders = videoRemixResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.VideoListRequest != nil: // extract provider from header providerHeader := strings.ToLower(string(ctx.Request.Header.Peek("x-bf-video-list-provider"))) if providerHeader != "" { bifrostReq.VideoListRequest.Provider = schemas.ModelProvider(providerHeader) } else if bifrostReq.VideoListRequest.Provider == "" { bifrostReq.VideoListRequest.Provider = schemas.OpenAI } videoListResponse, bifrostErr := g.client.VideoListRequest(bifrostCtx, bifrostReq.VideoListRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, videoListResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if videoListResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } if config.VideoListResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "missing VideoListResponseConverter for integration")) return } response, err = config.VideoListResponseConverter(bifrostCtx, videoListResponse) providerResponseHeaders = videoListResponse.ExtraFields.ProviderResponseHeaders case bifrostReq.CountTokensRequest != nil: countTokensResponse, bifrostErr := g.client.CountTokensRequest(bifrostCtx, bifrostReq.CountTokensRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // Execute post-request callback if configured // This is typically used for response modification or additional processing if config.PostCallback != nil { if err := config.PostCallback(ctx, req, countTokensResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if countTokensResponse == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Bifrost response is nil after post-request callback")) return } // Convert Bifrost response to integration-specific format and send if config.CountTokensResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "CountTokensResponseConverter not configured")) return } response, err = config.CountTokensResponseConverter(bifrostCtx, countTokensResponse) providerResponseHeaders = countTokensResponse.ExtraFields.ProviderResponseHeaders default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Invalid request type")) return } if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to encode response")) return } // Forward provider response headers only after conversion succeeds for key, value := range providerResponseHeaders { ctx.Response.Header.Set(key, value) } if g.tryStreamLargeResponse(ctx, bifrostCtx) { return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) } // --- Async integration handlers --- // handleAsyncCreate submits an async job for the current inference request. // It stores the raw Bifrost response in the DB; the response converter is applied at retrieval time. func (g *GenericRouter) handleAsyncCreate( ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, bifrostReq *schemas.BifrostRequest, bifrostCtx *schemas.BifrostContext, ) { executor := g.handlerStore.GetAsyncJobExecutor() if executor == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operations not available: logs store not configured")) return } // Reject streaming + async if streamingReq, ok := req.(StreamingRequest); ok && streamingReq.IsStreamingRequested() { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostErrorWithCode(nil, "streaming is not supported for async requests", fasthttp.StatusBadRequest)) return } // Reject non-inference routes (batch, file, container) if config.BatchRequestConverter != nil || config.FileRequestConverter != nil || config.ContainerRequestConverter != nil || config.ContainerFileRequestConverter != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async is not supported for batch, file, or container operations")) return } switch config.GetHTTPRequestType(ctx) { case schemas.ChatCompletionRequest: if config.AsyncChatResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route")) return } case schemas.ResponsesRequest: if config.AsyncResponsesResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route")) return } default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route")) return } operationType := config.GetHTTPRequestType(ctx) resultTTL := getResultTTLFromHeaderWithDefault(ctx, g.handlerStore.GetAsyncJobResultTTL()) // The operation closure runs the Bifrost client call in the background. // It returns the raw typed Bifrost response (NOT provider-converted). // The response converter is applied at retrieval time via handleAsyncRetrieve. operation := func(bgCtx *schemas.BifrostContext) (interface{}, *schemas.BifrostError) { switch { case bifrostReq.ChatRequest != nil: return g.client.ChatCompletionRequest(bgCtx, bifrostReq.ChatRequest) case bifrostReq.ResponsesRequest != nil: return g.client.ResponsesRequest(bgCtx, bifrostReq.ResponsesRequest) default: return nil, newBifrostError(nil, "unsupported request type for async execution") } } job, err := executor.SubmitJob(bifrostCtx, resultTTL, operation, operationType) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to create async job")) return } g.handleAsyncJobResponse(ctx, bifrostCtx, config, job) } // handleAsyncRetrieve retrieves an async job by ID and returns the response // using the route's response converter for completed jobs. func (g *GenericRouter) handleAsyncRetrieve( ctx *fasthttp.RequestCtx, config RouteConfig, bifrostCtx *schemas.BifrostContext, ) { executor := g.handlerStore.GetAsyncJobExecutor() if executor == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operations not available: logs store not configured")) return } jobID := string(ctx.Request.Header.Peek(schemas.AsyncHeaderGetID)) if jobID == "" { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "x-bf-async-id header value is empty")) return } vkValue := getVirtualKeyFromBifrostContext(bifrostCtx) job, err := executor.RetrieveJob(bifrostCtx, jobID, vkValue, config.GetHTTPRequestType(ctx)) if err != nil { if errors.Is(err, logstore.ErrJobInternal) { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostErrorWithCode(err, "failed to retrieve async job", fasthttp.StatusInternalServerError)) } else { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostErrorWithCode(err, "job not found or expired", fasthttp.StatusNotFound)) } return } g.handleAsyncJobResponse(ctx, bifrostCtx, config, job) return } func (g *GenericRouter) handleAsyncJobResponse(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, config RouteConfig, job *logstore.AsyncJob) { ctx.SetContentType("application/json") resp := job.ToResponse() switch job.Status { case schemas.AsyncJobStatusPending, schemas.AsyncJobStatusProcessing, schemas.AsyncJobStatusCompleted: switch job.RequestType { case schemas.ChatCompletionRequest: if config.AsyncChatResponseConverter == nil || config.ChatResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "async operation is not supported on this route")) return } response, extraHeaders, err := config.AsyncChatResponseConverter(bifrostCtx, resp, config.ChatResponseConverter) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert async chat response")) return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, extraHeaders) return case schemas.ResponsesRequest: if config.AsyncResponsesResponseConverter == nil || config.ResponsesResponseConverter == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "either async responses response converter or responses response converter not configured")) return } response, extraHeaders, err := config.AsyncResponsesResponseConverter(bifrostCtx, resp, config.ResponsesResponseConverter) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert async responses response")) return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, extraHeaders) return default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "unknown request type")) return } case schemas.AsyncJobStatusFailed: var err schemas.BifrostError // Deserialize the stored BifrostError and send through provider error converter if job.Error != "" { if unmarshalErr := sonic.Unmarshal([]byte(job.Error), &err); unmarshalErr != nil { // If unmarshal fails, create a basic error with the raw error string err = schemas.BifrostError{ Error: &schemas.ErrorField{ Message: job.Error, }, } } } g.sendError(ctx, bifrostCtx, config.ErrorConverter, &err) } } // handleBatchRequest handles batch API requests (create, list, retrieve, cancel, results) func (g *GenericRouter) handleBatchRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, batchReq *BatchRequest, bifrostCtx *schemas.BifrostContext) { var response interface{} var err error switch batchReq.Type { case schemas.BatchCreateRequest: if batchReq.CreateRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch create request")) return } batchResponse, bifrostErr := g.client.BatchCreateRequest(bifrostCtx, batchReq.CreateRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, batchResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.BatchCreateResponseConverter != nil { response, err = config.BatchCreateResponseConverter(bifrostCtx, batchResponse) } else { response = batchResponse } case schemas.BatchListRequest: if batchReq.ListRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch list request")) return } batchResponse, bifrostErr := g.client.BatchListRequest(bifrostCtx, batchReq.ListRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, batchResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.BatchListResponseConverter != nil { response, err = config.BatchListResponseConverter(bifrostCtx, batchResponse) } else { response = batchResponse } case schemas.BatchRetrieveRequest: if batchReq.RetrieveRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch retrieve request")) return } batchResponse, bifrostErr := g.client.BatchRetrieveRequest(bifrostCtx, batchReq.RetrieveRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, batchResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.BatchRetrieveResponseConverter != nil { response, err = config.BatchRetrieveResponseConverter(bifrostCtx, batchResponse) } else { response = batchResponse } case schemas.BatchCancelRequest: if batchReq.CancelRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch cancel request")) return } batchResponse, bifrostErr := g.client.BatchCancelRequest(bifrostCtx, batchReq.CancelRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, batchResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.BatchCancelResponseConverter != nil { response, err = config.BatchCancelResponseConverter(bifrostCtx, batchResponse) } else { response = batchResponse } case schemas.BatchDeleteRequest: if batchReq.DeleteRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch delete request")) return } batchResponse, bifrostErr := g.client.BatchDeleteRequest(bifrostCtx, batchReq.DeleteRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, batchResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.BatchDeleteResponseConverter != nil { response, err = config.BatchDeleteResponseConverter(bifrostCtx, batchResponse) } else { response = batchResponse } case schemas.BatchResultsRequest: if batchReq.ResultsRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid batch results request")) return } batchResponse, bifrostErr := g.client.BatchResultsRequest(bifrostCtx, batchReq.ResultsRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, batchResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.BatchResultsResponseConverter != nil { response, err = config.BatchResultsResponseConverter(bifrostCtx, batchResponse) } else { response = batchResponse } default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown batch request type")) return } if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert batch response")) return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) } // handleFileRequest handles file API requests (upload, list, retrieve, delete, content) func (g *GenericRouter) handleFileRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, fileReq *FileRequest, bifrostCtx *schemas.BifrostContext) { var response interface{} var err error switch fileReq.Type { case schemas.FileUploadRequest: if fileReq.UploadRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file upload request")) return } fileResponse, bifrostErr := g.client.FileUploadRequest(bifrostCtx, fileReq.UploadRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, fileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.FileUploadResponseConverter != nil { response, err = config.FileUploadResponseConverter(bifrostCtx, fileResponse) } else { response = fileResponse } case schemas.FileListRequest: if fileReq.ListRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file list request")) return } fileResponse, bifrostErr := g.client.FileListRequest(bifrostCtx, fileReq.ListRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, fileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.FileListResponseConverter != nil { response, err = config.FileListResponseConverter(bifrostCtx, fileResponse) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file list response")) return } // Handle raw byte responses (e.g., XML for S3 APIs) if rawBytes, ok := response.([]byte); ok { ctx.SetBody(rawBytes) return } } else { response = fileResponse } case schemas.FileRetrieveRequest: if fileReq.RetrieveRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file retrieve request")) return } fileResponse, bifrostErr := g.client.FileRetrieveRequest(bifrostCtx, fileReq.RetrieveRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, fileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.FileRetrieveResponseConverter != nil { response, err = config.FileRetrieveResponseConverter(bifrostCtx, fileResponse) } else { response = fileResponse } case schemas.FileDeleteRequest: if fileReq.DeleteRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file delete request")) return } fileResponse, bifrostErr := g.client.FileDeleteRequest(bifrostCtx, fileReq.DeleteRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, fileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.FileDeleteResponseConverter != nil { response, err = config.FileDeleteResponseConverter(bifrostCtx, fileResponse) } else { response = fileResponse } case schemas.FileContentRequest: if fileReq.ContentRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid file content request")) return } fileResponse, bifrostErr := g.client.FileContentRequest(bifrostCtx, fileReq.ContentRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, fileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } // For file content, handle binary response specially if no converter is set if config.FileContentResponseConverter != nil { response, err = config.FileContentResponseConverter(bifrostCtx, fileResponse) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file content response")) return } // Check if response is raw bytes - write directly without JSON encoding if rawBytes, ok := response.([]byte); ok { ctx.Response.Header.Set("Content-Type", fileResponse.ContentType) ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(rawBytes))) ctx.Response.SetBody(rawBytes) } else { g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) } } else { // Return raw file content ctx.Response.Header.Set("Content-Type", fileResponse.ContentType) ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(fileResponse.Content))) ctx.Response.SetBody(fileResponse.Content) } return default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown file request type")) return } if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert file response")) return } // If response is nil, PostCallback has set headers/status - return without body if response == nil { return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) } // handleContainerRequest handles container API requests (create, list, retrieve, delete) func (g *GenericRouter) handleContainerRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, containerReq *ContainerRequest, bifrostCtx *schemas.BifrostContext) { var response interface{} var err error switch containerReq.Type { case schemas.ContainerCreateRequest: if containerReq.CreateRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container create request")) return } containerResponse, bifrostErr := g.client.ContainerCreateRequest(bifrostCtx, containerReq.CreateRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerCreateResponseConverter != nil { response, err = config.ContainerCreateResponseConverter(bifrostCtx, containerResponse) } else { response = containerResponse } case schemas.ContainerListRequest: if containerReq.ListRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container list request")) return } containerResponse, bifrostErr := g.client.ContainerListRequest(bifrostCtx, containerReq.ListRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerListResponseConverter != nil { response, err = config.ContainerListResponseConverter(bifrostCtx, containerResponse) } else { response = containerResponse } case schemas.ContainerRetrieveRequest: if containerReq.RetrieveRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container retrieve request")) return } containerResponse, bifrostErr := g.client.ContainerRetrieveRequest(bifrostCtx, containerReq.RetrieveRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerRetrieveResponseConverter != nil { response, err = config.ContainerRetrieveResponseConverter(bifrostCtx, containerResponse) } else { response = containerResponse } case schemas.ContainerDeleteRequest: if containerReq.DeleteRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container delete request")) return } containerResponse, bifrostErr := g.client.ContainerDeleteRequest(bifrostCtx, containerReq.DeleteRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerDeleteResponseConverter != nil { response, err = config.ContainerDeleteResponseConverter(bifrostCtx, containerResponse) } else { response = containerResponse } default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown container request type")) return } if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container response")) return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) } // handleContainerFileRequest handles container file API requests (create, list, retrieve, content, delete) func (g *GenericRouter) handleContainerFileRequest(ctx *fasthttp.RequestCtx, config RouteConfig, req interface{}, containerFileReq *ContainerFileRequest, bifrostCtx *schemas.BifrostContext) { var response interface{} var err error switch containerFileReq.Type { case schemas.ContainerFileCreateRequest: if containerFileReq.CreateRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file create request")) return } containerFileResponse, bifrostErr := g.client.ContainerFileCreateRequest(bifrostCtx, containerFileReq.CreateRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerFileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerFileCreateResponseConverter != nil { response, err = config.ContainerFileCreateResponseConverter(bifrostCtx, containerFileResponse) } else { response = containerFileResponse } case schemas.ContainerFileListRequest: if containerFileReq.ListRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file list request")) return } containerFileResponse, bifrostErr := g.client.ContainerFileListRequest(bifrostCtx, containerFileReq.ListRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerFileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerFileListResponseConverter != nil { response, err = config.ContainerFileListResponseConverter(bifrostCtx, containerFileResponse) } else { response = containerFileResponse } case schemas.ContainerFileRetrieveRequest: if containerFileReq.RetrieveRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file retrieve request")) return } containerFileResponse, bifrostErr := g.client.ContainerFileRetrieveRequest(bifrostCtx, containerFileReq.RetrieveRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerFileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerFileRetrieveResponseConverter != nil { response, err = config.ContainerFileRetrieveResponseConverter(bifrostCtx, containerFileResponse) } else { response = containerFileResponse } case schemas.ContainerFileContentRequest: if containerFileReq.ContentRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file content request")) return } containerFileResponse, bifrostErr := g.client.ContainerFileContentRequest(bifrostCtx, containerFileReq.ContentRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerFileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } // For content requests, handle binary response specially if converter is set if config.ContainerFileContentResponseConverter != nil { response, err = config.ContainerFileContentResponseConverter(bifrostCtx, containerFileResponse) if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container file content response")) return } // Check if response is raw bytes - write directly without JSON encoding if rawBytes, ok := response.([]byte); ok { ctx.Response.Header.Set("Content-Type", containerFileResponse.ContentType) ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(rawBytes))) ctx.Response.SetBody(rawBytes) } else { g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) } } else { // Return raw binary content ctx.Response.Header.Set("Content-Type", containerFileResponse.ContentType) ctx.Response.Header.Set("Content-Length", strconv.Itoa(len(containerFileResponse.Content))) ctx.Response.SetBody(containerFileResponse.Content) } return case schemas.ContainerFileDeleteRequest: if containerFileReq.DeleteRequest == nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "invalid container file delete request")) return } containerFileResponse, bifrostErr := g.client.ContainerFileDeleteRequest(bifrostCtx, containerFileReq.DeleteRequest) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } if config.PostCallback != nil { if err := config.PostCallback(ctx, req, containerFileResponse); err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to execute post-request callback")) return } } if config.ContainerFileDeleteResponseConverter != nil { response, err = config.ContainerFileDeleteResponseConverter(bifrostCtx, containerFileResponse) } else { response = containerFileResponse } default: g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "Unknown container file request type")) return } if err != nil { g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(err, "failed to convert container file response")) return } g.sendSuccess(ctx, bifrostCtx, config.ErrorConverter, response, nil) } // handleStreamingRequest handles streaming requests using Server-Sent Events (SSE) func (g *GenericRouter) handleStreamingRequest(ctx *fasthttp.RequestCtx, config RouteConfig, bifrostReq *schemas.BifrostRequest, bifrostCtx *schemas.BifrostContext, cancel context.CancelFunc) { // Use the cancellable context from ConvertToBifrostContext // ctx.Done() never fires here in practice: fasthttp.RequestCtx.Done only closes when the whole server shuts down, not when an individual connection drops. // As a result we'll leave the provider stream running until it naturally completes, even if the client went away (write error, network drop, etc.). // That keeps goroutines and upstream tokens alive long after the SSE writer has exited. // // We now get a cancellable context from ConvertToBifrostContext so we can cancel the upstream stream immediately when the client disconnects. var stream chan *schemas.BifrostStreamChunk var bifrostErr *schemas.BifrostError // Handle different request types if bifrostReq.TextCompletionRequest != nil { stream, bifrostErr = g.client.TextCompletionStreamRequest(bifrostCtx, bifrostReq.TextCompletionRequest) } else if bifrostReq.ChatRequest != nil { stream, bifrostErr = g.client.ChatCompletionStreamRequest(bifrostCtx, bifrostReq.ChatRequest) } else if bifrostReq.ResponsesRequest != nil { stream, bifrostErr = g.client.ResponsesStreamRequest(bifrostCtx, bifrostReq.ResponsesRequest) } else if bifrostReq.SpeechRequest != nil { stream, bifrostErr = g.client.SpeechStreamRequest(bifrostCtx, bifrostReq.SpeechRequest) } else if bifrostReq.TranscriptionRequest != nil { stream, bifrostErr = g.client.TranscriptionStreamRequest(bifrostCtx, bifrostReq.TranscriptionRequest) } else if bifrostReq.ImageGenerationRequest != nil { stream, bifrostErr = g.client.ImageGenerationStreamRequest(bifrostCtx, bifrostReq.ImageGenerationRequest) } else if bifrostReq.ImageEditRequest != nil { stream, bifrostErr = g.client.ImageEditStreamRequest(bifrostCtx, bifrostReq.ImageEditRequest) } // Provider error before streaming started — return proper HTTP error status // (SSE headers not yet committed, so we can still set status code + JSON body) if bifrostErr != nil { cancel() g.sendError(ctx, bifrostCtx, config.ErrorConverter, bifrostErr) return } // No request type matched — stream is nil. Return error without spawning // a drain goroutine (for-range on nil channel blocks forever). if stream == nil { cancel() g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "streaming is not supported for this request type")) return } // Forward provider response headers stored in context by streaming handlers if headers, ok := bifrostCtx.Value(schemas.BifrostContextKeyProviderResponseHeaders).(map[string]string); ok { for key, value := range headers { ctx.Response.Header.Set(key, value) } } // Large payload streaming passthrough — bypass SSE event processing, pipe raw upstream if g.tryStreamLargeResponse(ctx, bifrostCtx) { ctx.Response.Header.Set("Cache-Control", "no-cache") ctx.Response.Header.Set("Connection", "keep-alive") ctx.Response.Header.Set("Access-Control-Allow-Origin", "*") cancel() go func() { for range stream { } }() return } // Check if streaming is configured for this route if config.StreamConfig == nil { cancel() // Drain the stream channel to prevent goroutine leaks go func() { for range stream { } }() g.sendError(ctx, bifrostCtx, config.ErrorConverter, newBifrostError(nil, "streaming is not supported for this integration")) return } // SSE headers set only after successful stream setup — errors above get proper HTTP status codes if config.Type == RouteConfigTypeBedrock { ctx.SetContentType("application/vnd.amazon.eventstream") ctx.Response.Header.Set("x-amzn-bedrock-content-type", "application/json") } else { ctx.SetContentType("text/event-stream") } ctx.Response.Header.Set("Cache-Control", "no-cache") ctx.Response.Header.Set("Connection", "keep-alive") ctx.Response.Header.Set("Access-Control-Allow-Origin", "*") // Handle streaming using the centralized approach // Pass cancel function so it can be called when the writer exits (errors, completion, etc.) g.handleStreaming(ctx, bifrostCtx, config, stream, cancel) } // handleStreaming processes a stream of BifrostResponse objects and sends them as Server-Sent Events (SSE). // It handles both successful responses and errors in the streaming format. // // SSE FORMAT HANDLING: // // By default, all responses and errors are sent in the standard SSE format: // // data: {"response": "content"}\n\n // // However, some providers (like Anthropic) require custom SSE event formats with explicit event types: // // event: content_block_delta // data: {"type": "content_block_delta", "delta": {...}} // // event: message_stop // data: {"type": "message_stop"} // // STREAMCONFIG CONVERTER BEHAVIOR: // // The StreamConfig.ResponseConverter and StreamConfig.ErrorConverter functions can return: // // 1. OBJECTS (default behavior): // - Return any Go struct/map/interface{} // - Will be JSON marshaled and wrapped as: data: {json}\n\n // - Example: return map[string]interface{}{"content": "hello"} // - Result: data: {"content":"hello"}\n\n // // 2. STRINGS (custom SSE format): // - Return a complete SSE string with custom event types and formatting // - Will be sent directly without any wrapping or modification // - Example: return "event: content_block_delta\ndata: {\"type\":\"text\"}\n\n" // - Result: event: content_block_delta // data: {"type":"text"} // // IMPLEMENTATION GUIDELINES: // // For standard providers (OpenAI, etc.): Return objects from converters // For custom SSE providers (Anthropic, etc.): Return pre-formatted SSE strings // // When returning strings, ensure they: // - Include proper event: lines (if needed) // - Include data: lines with JSON content // - End with \n\n for proper SSE formatting // - Follow the provider's specific SSE event specification // // CONTEXT CANCELLATION: // // The cancel function is called ONLY when client disconnects are detected via write errors. // Bifrost handles cleanup internally for normal completion and errors, so we only cancel // upstream streams when write errors indicate the client has disconnected. func (g *GenericRouter) handleStreaming(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, config RouteConfig, streamChan chan *schemas.BifrostStreamChunk, cancel context.CancelFunc) { // Signal to tracing middleware that trace completion should be deferred // The streaming callback will complete the trace after the stream ends ctx.SetUserValue(schemas.BifrostContextKeyDeferTraceCompletion, true) // Get the trace completer function for use in the streaming callback. // Signature is func([]schemas.PluginLogEntry) so the callback never reads from // ctx.UserValue (ctx may be recycled by fasthttp by the time this fires). // Router path has no transport post-hook phase, so we always pass nil. traceCompleter, _ := ctx.UserValue(schemas.BifrostContextKeyTraceCompleter).(func([]schemas.PluginLogEntry)) // Get stream chunk interceptor for plugin hooks interceptor := g.handlerStore.GetStreamChunkInterceptor() var httpReq *schemas.HTTPRequest if interceptor != nil { httpReq = lib.BuildHTTPRequestFromFastHTTP(ctx) } // Use SSEStreamReader to bypass fasthttp's internal pipe (fasthttputil.PipeConns) // which batches multiple SSE events into single TCP segments. reader := lib.NewSSEStreamReader() ctx.Response.SetBodyStream(reader, -1) // Producer goroutine: processes the stream channel, formats events, sends to reader go func() { // Separate defers ensure each cleanup runs even if an earlier one panics (LIFO order) defer reader.Done() defer schemas.ReleaseHTTPRequest(httpReq) defer func() { // Complete the trace after streaming finishes // This ensures all spans (including llm.call) are properly ended before the trace is sent to OTEL if traceCompleter != nil { traceCompleter(nil) } }() // Create encoder for AWS Event Stream if needed var eventStreamEncoder *eventstream.Encoder if config.Type == RouteConfigTypeBedrock { eventStreamEncoder = eventstream.NewEncoder() } shouldSendDoneMarker := true if config.Type == RouteConfigTypeAnthropic || strings.Contains(config.Path, "/responses") || strings.Contains(config.Path, "/images/generations") { shouldSendDoneMarker = false } // Process streaming responses for chunk := range streamChan { if chunk == nil { continue } // Note: We no longer check ctx.Done() here because fasthttp.RequestCtx.Done() // only closes when the whole server shuts down, not when an individual client disconnects. // Client disconnects are detected via write errors on reader.Send(), which returns false. // Handle errors if chunk.BifrostError != nil { var errorResponse interface{} // Use stream error converter if available, otherwise fallback to regular error converter if config.StreamConfig != nil && config.StreamConfig.ErrorConverter != nil { errorResponse = config.StreamConfig.ErrorConverter(bifrostCtx, chunk.BifrostError) } else if config.ErrorConverter != nil { errorResponse = config.ErrorConverter(bifrostCtx, chunk.BifrostError) } else { // Default error response errorResponse = map[string]interface{}{ "error": map[string]interface{}{ "type": "internal_error", "message": "An error occurred while processing your request", }, } } // Check if the error converter returned a raw SSE string or JSON object if sseErrorString, ok := errorResponse.(string); ok { // CUSTOM SSE FORMAT: The converter returned a complete SSE string // This is used by providers like Anthropic that need custom event types reader.Send([]byte(sseErrorString)) } else { // STANDARD SSE FORMAT: The converter returned an object errorJSON, err := sonic.Marshal(errorResponse) if err != nil { // Fallback to basic error if marshaling fails basicError := map[string]interface{}{ "error": map[string]interface{}{ "type": "internal_error", "message": "An error occurred while processing your request", }, } if errorJSON, err = sonic.Marshal(basicError); err != nil { cancel() return } } // Send error as SSE data reader.SendEvent("", errorJSON) } return // End stream on error, Bifrost handles cleanup internally } else { // Allow plugins to modify/filter the chunk via StreamChunkInterceptor if interceptor != nil { var err error chunk, err = interceptor.InterceptChunk(bifrostCtx, httpReq, chunk) if err != nil { if chunk == nil { errorJSON, marshalErr := sonic.Marshal(map[string]string{"error": err.Error()}) if marshalErr != nil { cancel() return } // Return error event and stop streaming reader.SendError(errorJSON) cancel() return } // Else add warn log and continue g.logger.Warn("%v", err) } if chunk == nil { // Skip chunk if plugin wants to skip it continue } } // Handle successful responses // Convert response to integration-specific streaming format var eventType string var convertedResponse interface{} var err error switch { case chunk.BifrostTextCompletionResponse != nil: eventType, convertedResponse, err = config.StreamConfig.TextStreamResponseConverter(bifrostCtx, chunk.BifrostTextCompletionResponse) case chunk.BifrostChatResponse != nil: eventType, convertedResponse, err = config.StreamConfig.ChatStreamResponseConverter(bifrostCtx, chunk.BifrostChatResponse) case chunk.BifrostResponsesStreamResponse != nil: eventType, convertedResponse, err = config.StreamConfig.ResponsesStreamResponseConverter(bifrostCtx, chunk.BifrostResponsesStreamResponse) case chunk.BifrostSpeechStreamResponse != nil: eventType, convertedResponse, err = config.StreamConfig.SpeechStreamResponseConverter(bifrostCtx, chunk.BifrostSpeechStreamResponse) case chunk.BifrostTranscriptionStreamResponse != nil: eventType, convertedResponse, err = config.StreamConfig.TranscriptionStreamResponseConverter(bifrostCtx, chunk.BifrostTranscriptionStreamResponse) case chunk.BifrostImageGenerationStreamResponse != nil: eventType, convertedResponse, err = config.StreamConfig.ImageGenerationStreamResponseConverter(bifrostCtx, chunk.BifrostImageGenerationStreamResponse) default: requestType := safeGetRequestType(chunk) convertedResponse, err = nil, fmt.Errorf("no response converter found for request type: %s", requestType) } if convertedResponse == nil && err == nil { // Skip streaming chunk if no response is available and no error is returned continue } if err != nil { // Log conversion error but continue processing g.logger.Warn("Failed to convert streaming response: %v", err) continue } // Handle Bedrock Event Stream format if config.Type == RouteConfigTypeBedrock && eventStreamEncoder != nil { // We need to cast to BedrockStreamEvent to determine event type and structure if bedrockEvent, ok := convertedResponse.(*bedrock.BedrockStreamEvent); ok { // Convert to sequence of specific Bedrock events events := bedrockEvent.ToEncodedEvents() // Send all collected events for _, evt := range events { jsonData, err := sonic.Marshal(evt.Payload) if err != nil { g.logger.Warn("Failed to marshal bedrock payload: %v", err) continue } headers := eventstream.Headers{ { Name: ":content-type", Value: eventstream.StringValue("application/json"), }, { Name: ":event-type", Value: eventstream.StringValue(evt.EventType), }, { Name: ":message-type", Value: eventstream.StringValue("event"), }, } message := eventstream.Message{ Headers: headers, Payload: jsonData, } var msgBuf bytes.Buffer if err := eventStreamEncoder.Encode(&msgBuf, message); err != nil { g.logger.Warn("[Bedrock Stream] Failed to encode message: %v", err) cancel() return } if !reader.Send(msgBuf.Bytes()) { g.logger.Warn("[Bedrock Stream] Client disconnected") cancel() return } } } // Continue to next chunk (we handled sending internally) continue } // Build and send SSE event var buf []byte var sent bool if sseString, ok := convertedResponse.(string); ok { if strings.HasPrefix(sseString, "data: ") || strings.HasPrefix(sseString, "event: ") { // Pre-formatted SSE string (e.g. Anthropic custom event types) if eventType != "" { // Prepend event type line to pre-formatted data buf = make([]byte, 0, 7+len(eventType)+1+len(sseString)) buf = append(buf, "event: "...) buf = append(buf, eventType...) buf = append(buf, '\n') buf = append(buf, sseString...) sent = reader.Send(buf) } else { sent = reader.Send([]byte(sseString)) } } else { sent = reader.SendEvent(eventType, []byte(sseString)) } } else { responseJSON, err := sonic.Marshal(convertedResponse) if err != nil { g.logger.Warn("Failed to marshal streaming response: %v", err) continue } sent = reader.SendEvent(eventType, responseJSON) } if !sent { cancel() // Client disconnected, cancel upstream stream return } } } // Only send the [DONE] marker for plain SSE APIs that expect it. // Do NOT send [DONE] for the following cases: // - OpenAI "responses" API and Anthropic messages API: they signal completion by simply closing the stream, not sending [DONE]. // - Bedrock: uses AWS Event Stream format rather than SSE with [DONE]. // Bifrost handles any additional cleanup internally on normal stream completion. if shouldSendDoneMarker && config.Type != RouteConfigTypeGenAI && config.Type != RouteConfigTypeBedrock { if !reader.SendDone() { g.logger.Warn("Failed to write SSE done marker: client disconnected") cancel() return } } }() } // extractPassthroughModel extracts the model from the passthrough request path and/or body. // Path patterns: models/{model}, models/{model}:suffix (GenAI), .../models/{model} (Vertex), tunedModels/{model}. // Body is pre-parsed by parsePassthroughBody to avoid redundant unmarshaling. func extractPassthroughModel(path string, bodyModel string) string { if model := extractModelFromPath(path); model != "" { return model } return bodyModel } func extractModelFromPath(path string) string { path = strings.TrimPrefix(path, "/") parts := strings.Split(path, "/") for i, p := range parts { if p == "models" || p == "tunedModels" { if i+1 < len(parts) { model := parts[i+1] // Strip :suffix for GenAI (e.g. :generateContent, :streamGenerateContent) if idx := strings.Index(model, ":"); idx > 0 { model = model[:idx] } return strings.TrimSpace(model) } break } } return "" } // parsePassthroughBody extracts model and streaming flag from the request body in a // single unmarshal pass. Pass the raw Content-Type header value so multipart boundaries // are resolved from the header rather than scraped from the body bytes. func parsePassthroughBody(contentType string, body []byte) (model string, isStream bool) { if len(body) == 0 { return } mediaType, params, err := mime.ParseMediaType(contentType) if err == nil && strings.HasPrefix(mediaType, "multipart/") { if boundary := params["boundary"]; boundary != "" { return parseMultipartPassthroughBody(body, boundary) } } // JSON (or unknown) body — one unmarshal for both fields. var parsed struct { Model string `json:"model"` Stream bool `json:"stream"` } if err := sonic.Unmarshal(body, &parsed); err == nil { model = strings.TrimSpace(parsed.Model) isStream = parsed.Stream } return } // parseMultipartPassthroughBody scans multipart parts and extracts model and stream. // - Form fields (Content-Disposition name="model"/"stream"): read as plain text. func parseMultipartPassthroughBody(body []byte, boundary string) (model string, isStream bool) { mr := multipart.NewReader(bytes.NewReader(body), boundary) for { part, err := mr.NextPart() if err != nil { break } // Plain form field — check by name. switch part.FormName() { case "model": val, _ := io.ReadAll(part) part.Close() model = strings.TrimSpace(string(val)) case "stream": val, _ := io.ReadAll(part) part.Close() s := strings.TrimSpace(strings.ToLower(string(val))) isStream = s == "true" || s == "1" default: part.Close() } if model != "" && isStream { break } } return } func (g *GenericRouter) handlePassthrough(ctx *fasthttp.RequestCtx) { cfg := g.passthroughCfg safeHeaders := make(map[string]string) ctx.Request.Header.All()(func(key, value []byte) bool { keyStr := strings.ToLower(string(key)) switch keyStr { case "authorization", "api-key", "x-api-key", "x-goog-api-key", "host", "connection", "transfer-encoding", "cookie", "set-cookie", "proxy-authorization", "accept-encoding": default: if strings.HasPrefix(keyStr, "x-bf-") { return true // drop internal gateway headers } safeHeaders[keyStr] = string(value) } return true }) bifrostCtx, cancel := lib.ConvertToBifrostContext(ctx, g.handlerStore.ShouldAllowDirectKeys(), g.handlerStore.GetHeaderMatcher(), g.handlerStore.GetMCPHeaderCombinedAllowlist()) if directKey := ctx.UserValue(string(schemas.BifrostContextKeyDirectKey)); directKey != nil { if key, ok := directKey.(schemas.Key); ok { bifrostCtx.SetValue(schemas.BifrostContextKeyDirectKey, key) } } path := string(ctx.Path()) for _, prefix := range g.passthroughCfg.StripPrefix { if strings.HasPrefix(path, prefix) { path = path[len(prefix):] break } } body := ctx.Request.Body() // Parse body once to get both model and stream flag. contentType := string(ctx.Request.Header.ContentType()) bodyModel, bodyStream := parsePassthroughBody(contentType, body) resolvedModel := extractPassthroughModel(path, bodyModel) provider := cfg.Provider if cfg.ProviderDetector != nil { provider = cfg.ProviderDetector(ctx, resolvedModel) } provider = getProviderFromHeader(ctx, provider) isStreaming := strings.Contains(strings.ToLower(path), "stream") || bodyStream passthroughReq := &schemas.BifrostPassthroughRequest{ Method: string(ctx.Method()), Path: path, RawQuery: string(ctx.URI().QueryString()), Body: body, SafeHeaders: safeHeaders, Provider: provider, Model: resolvedModel, } if isStreaming { g.handlePassthroughStream(ctx, bifrostCtx, cancel, provider, passthroughReq) } else { g.handlePassthroughNonStream(ctx, bifrostCtx, cancel, provider, passthroughReq) } } func (g *GenericRouter) handlePassthroughNonStream( ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, cancel context.CancelFunc, provider schemas.ModelProvider, req *schemas.BifrostPassthroughRequest, ) { defer cancel() resp, bifrostErr := g.client.Passthrough(bifrostCtx, provider, req) if bifrostErr != nil { g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} { return err }, bifrostErr) return } ctx.SetStatusCode(resp.StatusCode) for k, v := range resp.Headers { switch strings.ToLower(k) { case "connection", "transfer-encoding", "set-cookie", "proxy-authenticate", "www-authenticate": // drop default: ctx.Response.Header.Set(k, v) } } ctx.Response.SetBody(resp.Body) } func (g *GenericRouter) handlePassthroughStream( ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, cancel context.CancelFunc, provider schemas.ModelProvider, req *schemas.BifrostPassthroughRequest, ) { // Deferred trace completion must be explicitly finalized after streaming ends. // Without this, observability injectors (including logging) never flush final state. traceCompleter, _ := ctx.UserValue(schemas.BifrostContextKeyTraceCompleter).(func([]schemas.PluginLogEntry)) stream, bifrostErr := g.client.PassthroughStream(bifrostCtx, provider, req) if bifrostErr != nil { cancel() g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} { return err }, bifrostErr) return } // Read the first chunk to extract status code and headers before streaming begins. firstChunk, ok := <-stream if !ok { cancel() g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} { return err }, newBifrostError(nil, "passthrough stream ended before headers were received")) return } if firstChunk == nil { cancel() g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} { return err }, newBifrostError(nil, "passthrough stream returned nil first chunk")) return } if firstChunk.BifrostError != nil { cancel() g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} { return err }, firstChunk.BifrostError) return } passthroughResp := firstChunk.BifrostPassthroughResponse if passthroughResp == nil { cancel() g.sendError(ctx, bifrostCtx, func(_ *schemas.BifrostContext, err *schemas.BifrostError) interface{} { return err }, newBifrostError(nil, "passthrough stream returned empty first chunk")) return } // Skip post-hook body materialization — ctx.Response.Body() would buffer the entire stream. ctx.SetUserValue(schemas.BifrostContextKeyDeferTraceCompletion, true) ctx.SetStatusCode(passthroughResp.StatusCode) ctx.SetConnectionClose() for k, v := range passthroughResp.Headers { switch strings.ToLower(k) { case "connection", "transfer-encoding", "content-length", "set-cookie", "proxy-authenticate", "www-authenticate": // drop — content-length is invalid for a streaming response default: ctx.Response.Header.Set(k, v) } } // Use SSEStreamReader to bypass fasthttp's internal pipe batching reader := lib.NewSSEStreamReader() ctx.Response.SetBodyStream(reader, -1) go func() { defer func() { if traceCompleter != nil { traceCompleter(nil) } reader.Done() cancel() }() // Write the first chunk's data. if len(passthroughResp.Body) > 0 { if !reader.Send(passthroughResp.Body) { cancel() return } } for chunk := range stream { if chunk == nil { continue } if chunk.BifrostError != nil { break } if chunk.BifrostPassthroughResponse != nil && len(chunk.BifrostPassthroughResponse.Body) > 0 { if !reader.Send(chunk.BifrostPassthroughResponse.Body) { cancel() return } } } }() }