Files
Beyhan Oğur 880f412e2c first commit
2026-04-26 21:52:23 +03:00

1082 lines
41 KiB
Go

package integrations
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"strings"
"github.com/bytedance/sonic"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/core/providers/anthropic"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/transports/bifrost-http/lib"
"github.com/tidwall/gjson"
"github.com/valyala/fasthttp"
)
// AnthropicRouter handles Anthropic-compatible API endpoints
type AnthropicRouter struct {
*GenericRouter
}
// createAnthropicCompleteRouteConfig creates a route configuration for the `/v1/complete` endpoint.
func createAnthropicCompleteRouteConfig(pathPrefix string) RouteConfig {
return RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/complete",
Method: "POST",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.TextCompletionRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicTextRequest{}
},
RequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*schemas.BifrostRequest, error) {
if anthropicReq, ok := req.(*anthropic.AnthropicTextRequest); ok {
return &schemas.BifrostRequest{
TextCompletionRequest: anthropicReq.ToBifrostTextCompletionRequest(ctx),
}, nil
}
return nil, errors.New("invalid request type")
},
TextResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostTextCompletionResponse) (interface{}, error) {
if shouldUsePassthrough(ctx, resp.ExtraFields.Provider, resp.ExtraFields.OriginalModelRequested, resp.ExtraFields.ResolvedModelUsed) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
}
return anthropic.ToAnthropicTextCompletionResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: checkAnthropicPassthrough,
}
}
// createAnthropicMessagesRouteConfig creates a route configuration for the `/v1/messages` endpoint.
func createAnthropicMessagesRouteConfig(pathPrefix string, logger schemas.Logger) []RouteConfig {
var routes []RouteConfig
for _, path := range []string{
"/v1/messages",
"/v1/messages/{path:*}",
} {
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + path,
Method: "POST",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.ResponsesRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicMessageRequest{}
},
RequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*schemas.BifrostRequest, error) {
if anthropicReq, ok := req.(*anthropic.AnthropicMessageRequest); ok {
bifrostReq := anthropicReq.ToBifrostResponsesRequest(ctx)
normalizeBifrostInputContentBlocks(bifrostReq)
return &schemas.BifrostRequest{
ResponsesRequest: bifrostReq,
}, nil
}
return nil, errors.New("invalid request type")
},
ResponsesResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostResponsesResponse) (interface{}, error) {
if isClaudeModel(resp.ExtraFields.OriginalModelRequested, resp.ExtraFields.ResolvedModelUsed, string(resp.ExtraFields.Provider)) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
}
return anthropic.ToAnthropicResponsesResponse(ctx, resp), nil
},
AsyncResponsesResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.AsyncJobResponse, responsesResponseConverter ResponsesResponseConverter) (interface{}, map[string]string, error) {
if resp.Status == schemas.AsyncJobStatusCompleted {
responsesResp, ok := resp.Result.(*schemas.BifrostResponsesResponse)
if !ok {
return nil, nil, errors.New("invalid responses response type")
}
response, err := responsesResponseConverter(ctx, responsesResp)
if err != nil {
return nil, nil, err
}
return response, nil, nil
}
return &anthropic.AnthropicMessageResponse{
ID: resp.ID,
}, nil, nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
StreamConfig: &StreamConfig{
ResponsesStreamResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostResponsesStreamResponse) (string, interface{}, error) {
if shouldUsePassthrough(ctx, resp.ExtraFields.Provider, resp.ExtraFields.OriginalModelRequested, resp.ExtraFields.ResolvedModelUsed) {
// Skip passthrough for ContentPartAdded: it's a synthetic bifrost event whose
// RawResponse carries the parent content_block_start already emitted by OutputItemAdded.
// Passing through here would produce a duplicate content_block_start that causes
// the Anthropic SDK to error and drop all subsequent content_block_delta events.
if resp.ExtraFields.RawResponse != nil && resp.Type != schemas.ResponsesStreamResponseTypeContentPartAdded {
raw, ok := resp.ExtraFields.RawResponse.(string)
if !ok {
return "", nil, fmt.Errorf("expected RawResponse string, got %T", resp.ExtraFields.RawResponse)
}
if t := gjson.Get(raw, "type"); t.Exists() {
return t.String(), raw, nil
}
}
// Fallback: if RawResponse is not available, use bifrost-to-anthropic conversion
// instead of silently dropping all events
}
anthropicResponse := anthropic.ToAnthropicResponsesStreamResponse(ctx, resp)
// Can happen for openai lifecycle events
if len(anthropicResponse) == 0 {
return "", nil, nil
}
if len(anthropicResponse) > 1 {
var combinedContent strings.Builder
for _, event := range anthropicResponse {
responseJSON, err := sonic.Marshal(event)
if err != nil {
logger.Error("failed to marshal anthropic streaming message: %v", err)
continue
}
fmt.Fprintf(&combinedContent, "event: %s\ndata: %s\n\n", event.Type, responseJSON)
}
return "", combinedContent.String(), nil
}
return string(anthropicResponse[0].Type), anthropicResponse[0], nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicResponsesStreamError(err)
},
},
PreCallback: checkAnthropicPassthrough,
})
}
return routes
}
// CreateAnthropicRouteConfigs creates route configurations for Anthropic endpoints.
func CreateAnthropicRouteConfigs(pathPrefix string, logger schemas.Logger) []RouteConfig {
return append([]RouteConfig{
createAnthropicCompleteRouteConfig(pathPrefix),
}, createAnthropicMessagesRouteConfig(pathPrefix, logger)...)
}
// passthroughSafeHeaders is a whitelist of headers that are safe to pass through
var passthroughSafeHeaders = map[string]bool{
"anthropic-beta": true,
"anthropic-dangerous-direct-browser-access": true,
"anthropic-version": true,
}
func hasPromptCachingScopeBetaHeader(headers map[string][]string) bool {
for k, v := range headers {
if strings.ToLower(k) == anthropic.AnthropicBetaHeader {
for _, headerValue := range v {
if strings.Contains(headerValue, anthropic.AnthropicPromptCachingScopeBetaHeader) {
return true
}
}
}
}
return false
}
func hasFastModeBetaHeader(headers map[string][]string) bool {
for k, v := range headers {
if strings.ToLower(k) != anthropic.AnthropicBetaHeader {
continue
}
for _, headerValue := range v {
for beta := range strings.SplitSeq(headerValue, ",") {
if strings.HasPrefix(strings.TrimSpace(beta), anthropic.AnthropicFastModeBetaHeaderPrefix) {
return true
}
}
}
}
return false
}
// filterVertexUnsupportedBetaHeaders removes beta headers that Vertex AI doesn't support.
// Vertex AI doesn't support: structured-outputs, advanced-tool-use, prompt-caching-scope, mcp-client.
func filterVertexUnsupportedBetaHeaders(headers map[string][]string) map[string][]string {
var betaHeaderKey string
var betaHeaders []string
var found bool
for k, v := range headers {
if strings.ToLower(k) == anthropic.AnthropicBetaHeader {
betaHeaderKey = k
betaHeaders = v
found = true
break
}
}
if found {
var filteredBetas []string
for _, headerValue := range betaHeaders {
// Split comma-separated beta headers
for beta := range strings.SplitSeq(headerValue, ",") {
beta = strings.TrimSpace(beta)
if beta == "" {
continue
}
// Skip unsupported headers for Vertex.
// Use prefix matching so that future date bumps
// (e.g. structured-outputs-2025-12-15) are still caught.
if strings.HasPrefix(beta, anthropic.AnthropicAdvancedToolUseBetaHeaderPrefix) ||
strings.HasPrefix(beta, anthropic.AnthropicStructuredOutputsBetaHeaderPrefix) ||
strings.HasPrefix(beta, anthropic.AnthropicPromptCachingScopeBetaHeaderPrefix) ||
strings.HasPrefix(beta, anthropic.AnthropicMCPClientBetaHeaderPrefix) ||
strings.HasPrefix(beta, anthropic.AnthropicSkillsBetaHeaderPrefix) ||
strings.HasPrefix(beta, anthropic.AnthropicFastModeBetaHeaderPrefix) ||
strings.HasPrefix(beta, anthropic.AnthropicRedactThinkingBetaHeaderPrefix) {
continue
}
filteredBetas = append(filteredBetas, beta)
}
}
if len(filteredBetas) > 0 {
headers[betaHeaderKey] = []string{strings.Join(filteredBetas, ",")}
} else {
delete(headers, betaHeaderKey)
}
}
return headers
}
// extractPassthroughHeaders filters headers to only include those in the safe whitelist.
// Header matching is case-insensitive.
func extractPassthroughHeaders(allHeaders map[string][]string, provider schemas.ModelProvider) map[string][]string {
filtered := make(map[string][]string)
for k, v := range allHeaders {
if passthroughSafeHeaders[strings.ToLower(k)] {
filtered[strings.ToLower(k)] = v
}
}
return filtered
}
func CreateAnthropicListModelsRouteConfigs(pathPrefix string, handlerStore lib.HandlerStore) []RouteConfig {
return []RouteConfig{
{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/models",
Method: "GET",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.ListModelsRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &schemas.BifrostListModelsRequest{}
},
RequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*schemas.BifrostRequest, error) {
if listModelsReq, ok := req.(*schemas.BifrostListModelsRequest); ok {
return &schemas.BifrostRequest{
ListModelsRequest: listModelsReq,
}, nil
}
return nil, errors.New("invalid request type")
},
ListModelsResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostListModelsResponse) (interface{}, error) {
return anthropic.ToAnthropicListModelsResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicListModelsParams,
},
}
}
func hydrateAnthropicRequestFromLargePayloadMetadata(bifrostCtx *schemas.BifrostContext, req interface{}) {
if bifrostCtx == nil {
return
}
isLargePayload, _ := bifrostCtx.Value(schemas.BifrostContextKeyLargePayloadMode).(bool)
if !isLargePayload {
return
}
metadata := resolveLargePayloadMetadata(bifrostCtx)
if metadata == nil {
return
}
switch r := req.(type) {
case *anthropic.AnthropicTextRequest:
if r.Model == "" {
r.Model = metadata.Model
}
if metadata.StreamRequested != nil && r.Stream == nil {
r.Stream = schemas.Ptr(*metadata.StreamRequested)
}
case *anthropic.AnthropicMessageRequest:
if r.Model == "" {
r.Model = metadata.Model
}
if metadata.StreamRequested != nil && r.Stream == nil {
r.Stream = schemas.Ptr(*metadata.StreamRequested)
}
}
}
// checkAnthropicPassthrough pre-callback checks if the request is for a claude model.
// If it is, it attaches the raw request body for direct use by the provider.
// It also checks for anthropic oauth headers and sets the bifrost context.
func checkAnthropicPassthrough(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
hydrateAnthropicRequestFromLargePayloadMetadata(bifrostCtx, req)
var provider schemas.ModelProvider
var model string
switch r := req.(type) {
case *anthropic.AnthropicTextRequest:
provider, model = schemas.ParseModelString(r.Model, "")
// Check if model parameter explicitly has `anthropic/` prefix
if provider == schemas.Anthropic {
r.Model = model
}
case *anthropic.AnthropicMessageRequest:
provider, model = schemas.ParseModelString(r.Model, "")
// Check if model parameter explicitly has `anthropic/` prefix
if provider == schemas.Anthropic {
r.Model = model
}
}
headers := extractHeadersFromRequest(ctx)
schemas.ExtractAndSetUserAgentFromHeaders(headers, bifrostCtx)
// Check if anthropic oauth headers are present
if shouldUsePassthrough(bifrostCtx, provider, model, "") {
bifrostCtx.SetValue(schemas.BifrostContextKeyUseRawRequestBody, true)
bifrostCtx.SetValue(schemas.BifrostContextKeySendBackRawResponse, true)
if !isAnthropicAPIKeyAuth(ctx) && (provider == schemas.Anthropic || provider == "") {
url := extractExactPath(ctx)
if !strings.HasPrefix(url, "/") {
url = "/" + url
}
bifrostCtx.SetValue(schemas.BifrostContextKeyExtraHeaders, headers)
bifrostCtx.SetValue(schemas.BifrostContextKeyURLPath, url)
// This key is also used in IsClaudeCodeMaxMode
// So if you are changing the behaviour of this key, make sure to change IsClaudeCodeMaxMode as well
bifrostCtx.SetValue(schemas.BifrostContextKeySkipKeySelection, true)
} else {
// API key flow: pass only whitelisted safe headers (like anthropic-beta for feature detection)
passthroughHeaders := extractPassthroughHeaders(headers, provider)
if len(passthroughHeaders) > 0 {
bifrostCtx.SetValue(schemas.BifrostContextKeyExtraHeaders, passthroughHeaders)
}
}
if provider == schemas.Vertex && (hasPromptCachingScopeBetaHeader(headers) || hasFastModeBetaHeader(headers)) {
bifrostCtx.SetValue(schemas.BifrostContextKeyUseRawRequestBody, false)
return nil
}
}
return nil
}
// shouldUsePassthrough checks if the request should be sent to the passthrough endpoint.
func shouldUsePassthrough(ctx *schemas.BifrostContext, provider schemas.ModelProvider, model string, alias string) bool {
return anthropic.IsClaudeCodeRequest(ctx) && isClaudeModel(model, alias, string(provider))
}
func isClaudeModel(model, alias, provider string) bool {
return (provider == string(schemas.Anthropic) ||
(provider == "" && (schemas.IsAnthropicModel(model) || schemas.IsAnthropicModel(alias)))) ||
(provider == string(schemas.Vertex) && (schemas.IsAnthropicModel(model) || schemas.IsAnthropicModel(alias))) ||
(provider == string(schemas.Azure) && (schemas.IsAnthropicModel(model) || schemas.IsAnthropicModel(alias)))
}
// extractAnthropicListModelsParams extracts query parameters for list models request
func extractAnthropicListModelsParams(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
if listModelsReq, ok := req.(*schemas.BifrostListModelsRequest); ok {
// Extract limit from query parameters
if limitStr := string(ctx.QueryArgs().Peek("limit")); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil {
listModelsReq.PageSize = limit
} else {
return fmt.Errorf("invalid limit parameter: %w", err)
}
}
if beforeID := string(ctx.QueryArgs().Peek("before_id")); beforeID != "" {
if listModelsReq.ExtraParams == nil {
listModelsReq.ExtraParams = make(map[string]interface{})
}
listModelsReq.ExtraParams["before_id"] = beforeID
}
if afterID := string(ctx.QueryArgs().Peek("after_id")); afterID != "" {
if listModelsReq.ExtraParams == nil {
listModelsReq.ExtraParams = make(map[string]interface{})
}
listModelsReq.ExtraParams["after_id"] = afterID
}
return nil
}
return errors.New("invalid request type for Anthropic list models")
}
// CreateAnthropicCountTokensRouteConfigs creates route configurations for Anthropic count tokens endpoint.
func CreateAnthropicCountTokensRouteConfigs(pathPrefix string, handlerStore lib.HandlerStore) []RouteConfig {
return []RouteConfig{
{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/messages/count_tokens",
Method: "POST",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.CountTokensRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicMessageRequest{}
},
RequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*schemas.BifrostRequest, error) {
if anthropicReq, ok := req.(*anthropic.AnthropicMessageRequest); ok {
bifrostReq := anthropicReq.ToBifrostResponsesRequest(ctx)
normalizeBifrostInputContentBlocks(bifrostReq)
return &schemas.BifrostRequest{
CountTokensRequest: bifrostReq,
}, nil
}
return nil, errors.New("invalid request type for Anthropic count tokens")
},
CountTokensResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostCountTokensResponse) (interface{}, error) {
return anthropic.ToAnthropicCountTokensResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: checkAnthropicPassthrough,
},
}
}
// CreateAnthropicBatchRouteConfigs creates route configurations for Anthropic Batch API endpoints.
func CreateAnthropicBatchRouteConfigs(pathPrefix string, handlerStore lib.HandlerStore) []RouteConfig {
var routes []RouteConfig
// Create batch endpoint - POST /v1/messages/batches
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/messages/batches",
Method: "POST",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.BatchCreateRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicBatchCreateRequest{}
},
BatchRequestConverter: func(ctx *schemas.BifrostContext, req any) (*BatchRequest, error) {
if anthropicReq, ok := req.(*anthropic.AnthropicBatchCreateRequest); ok {
// Convert Anthropic batch request items to Bifrost format
isNonAnthropicProvider := false
var provider schemas.ModelProvider
var ok bool
if provider, ok = ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider); ok && provider != schemas.Anthropic {
isNonAnthropicProvider = true
}
var model *string
requests := make([]schemas.BatchRequestItem, len(anthropicReq.Requests))
for i, r := range anthropicReq.Requests {
if isNonAnthropicProvider {
requestModel, ok := r.Params["model"].(string)
if !ok {
return nil, errors.New("model is required")
}
if model == nil {
model = schemas.Ptr(requestModel)
} else if *model != requestModel {
return nil, errors.New("for non-Anthropic providers, model must be the same for all requests")
}
}
requests[i] = schemas.BatchRequestItem{
CustomID: r.CustomID,
Params: r.Params,
}
}
br := &BatchRequest{
Type: schemas.BatchCreateRequest,
CreateRequest: &schemas.BifrostBatchCreateRequest{
Model: model,
Provider: provider,
Requests: requests,
},
}
// If provider is openai, we need to generate endpoint too
if provider == schemas.OpenAI {
// Confirm if all requests have the same url
var url string
for _, request := range requests {
if urlParam, ok := request.Params["url"].(string); ok {
if url == "" {
url = urlParam
} else if url != urlParam {
return nil, errors.New("for OpenAI batch API, all requests must have the same url")
}
}
}
br.CreateRequest.Endpoint = schemas.BatchEndpoint(url)
}
return br, nil
}
return nil, errors.New("invalid batch create request type")
},
BatchCreateResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchCreateResponse) (interface{}, error) {
if resp.ExtraFields.Provider == schemas.Gemini {
resp.ID = strings.Replace(resp.ID, "batches/", "batches-", 1)
}
return anthropic.ToAnthropicBatchCreateResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicBatchCreateParams,
})
// List batches endpoint - GET /v1/messages/batches
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/messages/batches",
Method: "GET",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.BatchListRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicBatchListRequest{}
},
BatchRequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*BatchRequest, error) {
if listReq, ok := req.(*anthropic.AnthropicBatchListRequest); ok {
provider, ok := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
if !ok {
return nil, errors.New("provider not found in context")
}
return &BatchRequest{
Type: schemas.BatchListRequest,
ListRequest: &schemas.BifrostBatchListRequest{
Provider: provider,
PageSize: listReq.PageSize,
PageToken: listReq.PageToken,
},
}, nil
}
return nil, errors.New("invalid batch list request type")
},
BatchListResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchListResponse) (interface{}, error) {
if resp.ExtraFields.RawResponse != nil && resp.ExtraFields.Provider == schemas.Anthropic {
return resp.ExtraFields.RawResponse, nil
}
if resp.ExtraFields.Provider == schemas.Gemini {
for i, batch := range resp.Data {
resp.Data[i].ID = strings.Replace(batch.ID, "batches/", "batches-", 1)
}
}
return anthropic.ToAnthropicBatchListResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicBatchListQueryParams,
})
// Retrieve batch endpoint - GET /v1/messages/batches/{batch_id}
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/messages/batches/{batch_id}",
Method: "GET",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.BatchRetrieveRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicBatchRetrieveRequest{}
},
BatchRequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*BatchRequest, error) {
if retrieveReq, ok := req.(*anthropic.AnthropicBatchRetrieveRequest); ok {
provider := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
if provider == schemas.Gemini {
retrieveReq.BatchID = strings.Replace(retrieveReq.BatchID, "batches-", "batches/", 1)
}
return &BatchRequest{
Type: schemas.BatchRetrieveRequest,
RetrieveRequest: &schemas.BifrostBatchRetrieveRequest{
BatchID: retrieveReq.BatchID,
Provider: provider,
},
}, nil
}
return nil, errors.New("invalid batch retrieve request type")
},
BatchRetrieveResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchRetrieveResponse) (interface{}, error) {
if resp.ExtraFields.Provider == schemas.Gemini {
resp.ID = strings.Replace(resp.ID, "batches/", "batches-", 1)
}
return anthropic.ToAnthropicBatchRetrieveResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicBatchIDFromPath,
})
// Cancel batch endpoint - POST /v1/messages/batches/{batch_id}/cancel
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/messages/batches/{batch_id}/cancel",
Method: "POST",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.BatchCancelRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicBatchCancelRequest{}
},
BatchRequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*BatchRequest, error) {
if cancelReq, ok := req.(*anthropic.AnthropicBatchCancelRequest); ok {
provider := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
if provider == schemas.Gemini {
cancelReq.BatchID = strings.Replace(cancelReq.BatchID, "batches-", "batches/", 1)
}
return &BatchRequest{
Type: schemas.BatchCancelRequest,
CancelRequest: &schemas.BifrostBatchCancelRequest{
BatchID: cancelReq.BatchID,
Provider: provider,
},
}, nil
}
return nil, errors.New("invalid batch cancel request type")
},
BatchCancelResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchCancelResponse) (interface{}, error) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
return anthropic.ToAnthropicBatchCancelResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicBatchIDFromPath,
})
// Get batch results endpoint - GET /v1/messages/batches/{batch_id}/results
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/messages/batches/{batch_id}/results",
Method: "GET",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.BatchResultsRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicBatchResultsRequest{}
},
BatchRequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*BatchRequest, error) {
if resultsReq, ok := req.(*anthropic.AnthropicBatchResultsRequest); ok {
provider := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
if provider == schemas.Gemini {
resultsReq.BatchID = strings.Replace(resultsReq.BatchID, "batches-", "batches/", 1)
}
return &BatchRequest{
Type: schemas.BatchResultsRequest,
ResultsRequest: &schemas.BifrostBatchResultsRequest{
BatchID: resultsReq.BatchID,
Provider: provider,
},
}, nil
}
return nil, errors.New("invalid batch results request type")
},
BatchResultsResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostBatchResultsResponse) (interface{}, error) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
return resp, nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicBatchIDFromPath,
})
return routes
}
// extractAnthropicBatchCreateParams extracts provider from header for batch create requests
func extractAnthropicBatchCreateParams(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
// Extract provider from header, default to Anthropic
provider := string(ctx.Request.Header.Peek("x-model-provider"))
if provider == "" {
provider = string(schemas.Anthropic)
}
// Store provider in context for batch create converter to use
bifrostCtx.SetValue(bifrostContextKeyProvider, schemas.ModelProvider(provider))
return nil
}
// extractAnthropicBatchListQueryParams extracts provider from header and query parameters for Anthropic batch list requests
func extractAnthropicBatchListQueryParams(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
if listReq, ok := req.(*anthropic.AnthropicBatchListRequest); ok {
// Extract provider from header, default to Anthropic
provider := string(ctx.Request.Header.Peek("x-model-provider"))
if provider == "" {
provider = string(schemas.Anthropic)
}
bifrostCtx.SetValue(bifrostContextKeyProvider, schemas.ModelProvider(provider))
// Printing all query parameters
// Extract limit from query parameters
if limitStr := string(ctx.QueryArgs().Peek("page_size")); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil {
listReq.PageSize = limit
} else {
listReq.PageSize = 30
}
}
// Extract before_id cursor
if pageToken := string(ctx.QueryArgs().Peek("page_token")); pageToken != "" {
listReq.PageToken = &pageToken
}
}
return nil
}
// extractAnthropicBatchIDFromPath extracts provider from header and batch_id from path parameters
func extractAnthropicBatchIDFromPath(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
// Extract provider from header, default to Anthropic
provider := string(ctx.Request.Header.Peek("x-model-provider"))
if provider == "" {
provider = string(schemas.Anthropic)
}
bifrostCtx.SetValue(bifrostContextKeyProvider, schemas.ModelProvider(provider))
batchID := ctx.UserValue("batch_id")
if batchID == nil {
return errors.New("batch_id is required")
}
batchIDStr, ok := batchID.(string)
if !ok || batchIDStr == "" {
return errors.New("batch_id must be a non-empty string")
}
switch r := req.(type) {
case *anthropic.AnthropicBatchRetrieveRequest:
r.BatchID = batchIDStr
case *anthropic.AnthropicBatchCancelRequest:
r.BatchID = batchIDStr
case *anthropic.AnthropicBatchResultsRequest:
r.BatchID = batchIDStr
}
return nil
}
// extractAnthropicFileUploadParams extracts provider from header for file upload requests
func extractAnthropicFileUploadParams(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
provider := string(ctx.Request.Header.Peek("x-model-provider"))
if provider == "" {
provider = string(schemas.Anthropic)
}
bifrostCtx.SetValue(bifrostContextKeyProvider, schemas.ModelProvider(provider))
return nil
}
// extractAnthropicFileListQueryParams extracts provider from header and query parameters for Anthropic file list requests
func extractAnthropicFileListQueryParams(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
if listReq, ok := req.(*anthropic.AnthropicFileListRequest); ok {
// Extract provider from header, default to Anthropic
provider := string(ctx.Request.Header.Peek("x-model-provider"))
if provider == "" {
provider = string(schemas.Anthropic)
}
bifrostCtx.SetValue(bifrostContextKeyProvider, schemas.ModelProvider(provider))
// Extract limit from query parameters
if limitStr := string(ctx.QueryArgs().Peek("limit")); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil {
listReq.Limit = limit
} else {
// We are keeping default as 30
listReq.Limit = 30
}
}
// Extract after_id cursor
if afterID := string(ctx.QueryArgs().Peek("after_id")); afterID != "" {
listReq.After = &afterID
}
}
return nil
}
// extractAnthropicFileIDFromPath extracts provider from header and file_id from path parameters
func extractAnthropicFileIDFromPath(ctx *fasthttp.RequestCtx, bifrostCtx *schemas.BifrostContext, req interface{}) error {
// Extract provider from header, default to Anthropic
provider := string(ctx.Request.Header.Peek("x-model-provider"))
if provider == "" {
provider = string(schemas.Anthropic)
}
bifrostCtx.SetValue(bifrostContextKeyProvider, schemas.ModelProvider(provider))
fileID := ctx.UserValue("file_id")
if fileID == nil {
return errors.New("file_id is required")
}
fileIDStr, ok := fileID.(string)
if !ok || fileIDStr == "" {
return errors.New("file_id must be a non-empty string")
}
switch r := req.(type) {
case *anthropic.AnthropicFileRetrieveRequest:
r.FileID = fileIDStr
case *anthropic.AnthropicFileDeleteRequest:
r.FileID = fileIDStr
case *anthropic.AnthropicFileContentRequest:
r.FileID = fileIDStr
}
return nil
}
// CreateAnthropicFilesRouteConfigs creates route configurations for Anthropic Files API endpoints.
func CreateAnthropicFilesRouteConfigs(pathPrefix string, handlerStore lib.HandlerStore) []RouteConfig {
var routes []RouteConfig
// Upload file endpoint - POST /v1/files
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/files",
Method: "POST",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.FileUploadRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicFileUploadRequest{}
},
RequestParser: func(ctx *fasthttp.RequestCtx, req interface{}) error {
uploadReq, ok := req.(*anthropic.AnthropicFileUploadRequest)
if !ok {
return errors.New("invalid request type for file upload")
}
providerHeader := string(ctx.Request.Header.Peek("x-model-provider"))
if providerHeader == "" {
providerHeader = string(schemas.Anthropic)
}
provider := schemas.ModelProvider(providerHeader)
// Parse multipart form
form, err := ctx.MultipartForm()
if err != nil {
return err
}
// Extract purpose (required)
purposeValues := form.Value["purpose"]
if len(purposeValues) > 0 && purposeValues[0] != "" {
uploadReq.Purpose = purposeValues[0]
} else if provider == schemas.OpenAI && uploadReq.Purpose == "" {
uploadReq.Purpose = "batch"
}
// Extract file (required)
fileHeaders := form.File["file"]
if len(fileHeaders) == 0 {
return errors.New("file field is required")
}
// Read file content
fileHeader := fileHeaders[0]
file, err := fileHeader.Open()
if err != nil {
return err
}
defer file.Close()
// Read file data
fileData, err := io.ReadAll(file)
if err != nil {
return err
}
uploadReq.File = fileData
uploadReq.Filename = fileHeader.Filename
return nil
},
FileRequestConverter: func(ctx *schemas.BifrostContext, req any) (*FileRequest, error) {
if uploadReq, ok := req.(*anthropic.AnthropicFileUploadRequest); ok {
// Here if provider is OpenAI and purpose is empty then we override it with "batch"
provider, ok := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
if !ok {
return nil, errors.New("provider not found in context")
}
if provider == schemas.OpenAI && uploadReq.Purpose == "" {
uploadReq.Purpose = "batch"
}
return &FileRequest{
Type: schemas.FileUploadRequest,
UploadRequest: &schemas.BifrostFileUploadRequest{
File: uploadReq.File,
Filename: uploadReq.Filename,
Purpose: schemas.FilePurpose(uploadReq.Purpose),
Provider: provider,
},
}, nil
}
return nil, errors.New("invalid file upload request type")
},
FileUploadResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileUploadResponse) (interface{}, error) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
if resp.ExtraFields.Provider == schemas.Gemini {
// Here we will convert fileId to replace files/ with files-
resp.ID = strings.Replace(resp.ID, "files/", "files-", 1)
}
return anthropic.ToAnthropicFileUploadResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicFileUploadParams,
})
// List files endpoint - GET /v1/files
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/files",
Method: "GET",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.FileListRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicFileListRequest{}
},
FileRequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*FileRequest, error) {
if listReq, ok := req.(*anthropic.AnthropicFileListRequest); ok {
provider := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
return &FileRequest{
Type: schemas.FileListRequest,
ListRequest: &schemas.BifrostFileListRequest{
Limit: listReq.Limit,
After: listReq.After,
Order: listReq.Order,
Provider: provider,
},
}, nil
}
return nil, errors.New("invalid file list request type")
},
FileListResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileListResponse) (interface{}, error) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
if resp.ExtraFields.Provider == schemas.Gemini {
// Here we will convert fileId to replace files/ with files-
for i, file := range resp.Data {
resp.Data[i].ID = strings.Replace(file.ID, "files/", "files-", 1)
}
}
return anthropic.ToAnthropicFileListResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicFileListQueryParams,
})
// Retrieve file endpoint - GET /v1/files/{file_id}
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/files/{file_id}/content",
Method: "GET",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.FileContentRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicFileRetrieveRequest{}
},
FileRequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*FileRequest, error) {
if retrieveReq, ok := req.(*anthropic.AnthropicFileRetrieveRequest); ok {
provider := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
// Handle file id conversion for Gemini
if provider == schemas.Gemini {
retrieveReq.FileID = strings.Replace(retrieveReq.FileID, "files-", "files/", 1)
}
return &FileRequest{
Type: schemas.FileRetrieveRequest,
RetrieveRequest: &schemas.BifrostFileRetrieveRequest{
FileID: retrieveReq.FileID,
Provider: provider,
},
}, nil
}
return nil, errors.New("invalid file retrieve request type")
},
FileRetrieveResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileRetrieveResponse) (interface{}, error) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
return anthropic.ToAnthropicFileRetrieveResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicFileIDFromPath,
})
// Delete file endpoint - DELETE /v1/files/{file_id}
routes = append(routes, RouteConfig{
Type: RouteConfigTypeAnthropic,
Path: pathPrefix + "/v1/files/{file_id}",
Method: "DELETE",
GetHTTPRequestType: func(ctx *fasthttp.RequestCtx) schemas.RequestType {
return schemas.FileDeleteRequest
},
GetRequestTypeInstance: func(ctx context.Context) interface{} {
return &anthropic.AnthropicFileDeleteRequest{}
},
FileRequestConverter: func(ctx *schemas.BifrostContext, req interface{}) (*FileRequest, error) {
if deleteReq, ok := req.(*anthropic.AnthropicFileDeleteRequest); ok {
provider := ctx.Value(bifrostContextKeyProvider).(schemas.ModelProvider)
if provider == schemas.Gemini {
// Here we will convert fileId to replace files/ with files-
deleteReq.FileID = strings.Replace(deleteReq.FileID, "files-", "files/", 1)
}
return &FileRequest{
Type: schemas.FileDeleteRequest,
DeleteRequest: &schemas.BifrostFileDeleteRequest{
FileID: deleteReq.FileID,
Provider: provider,
},
}, nil
}
return nil, errors.New("invalid file delete request type")
},
FileDeleteResponseConverter: func(ctx *schemas.BifrostContext, resp *schemas.BifrostFileDeleteResponse) (interface{}, error) {
if resp.ExtraFields.RawResponse != nil {
return resp.ExtraFields.RawResponse, nil
}
return anthropic.ToAnthropicFileDeleteResponse(resp), nil
},
ErrorConverter: func(ctx *schemas.BifrostContext, err *schemas.BifrostError) interface{} {
return anthropic.ToAnthropicChatCompletionError(err)
},
PreCallback: extractAnthropicFileIDFromPath,
})
return routes
}
// NewAnthropicRouter creates a new AnthropicRouter with the given bifrost client.
func NewAnthropicRouter(client *bifrost.Bifrost, handlerStore lib.HandlerStore, logger schemas.Logger) *AnthropicRouter {
routes := CreateAnthropicRouteConfigs("/anthropic", logger)
routes = append(routes, CreateAnthropicListModelsRouteConfigs("/anthropic", handlerStore)...)
routes = append(routes, CreateAnthropicCountTokensRouteConfigs("/anthropic", handlerStore)...)
routes = append(routes, CreateAnthropicBatchRouteConfigs("/anthropic", handlerStore)...)
routes = append(routes, CreateAnthropicFilesRouteConfigs("/anthropic", handlerStore)...)
return &AnthropicRouter{
GenericRouter: NewGenericRouter(client, handlerStore, routes, nil, logger),
}
}