347 lines
15 KiB
Go
347 lines
15 KiB
Go
package mcp
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/maximhq/bifrost/core/schemas"
|
|
|
|
"github.com/mark3labs/mcp-go/server"
|
|
)
|
|
|
|
// ============================================================================
|
|
// CONSTANTS
|
|
// ============================================================================
|
|
|
|
const (
|
|
// MCP defaults and identifiers
|
|
BifrostMCPVersion = "1.0.0" // Version identifier for Bifrost
|
|
BifrostMCPClientName = "BifrostClient" // Name for internal Bifrost MCP client
|
|
BifrostMCPClientKey = "bifrostInternal" // Key for internal Bifrost client in clientMap
|
|
MCPLogPrefix = "[Bifrost MCP]" // Consistent logging prefix
|
|
MCPClientConnectionEstablishTimeout = 30 * time.Second // Timeout for MCP client connection establishment
|
|
)
|
|
|
|
// ============================================================================
|
|
// TYPE DEFINITIONS
|
|
// ============================================================================
|
|
|
|
// MCPManager manages MCP integration for Bifrost core.
|
|
// It provides a bridge between Bifrost and various MCP servers, supporting
|
|
// both local tool hosting and external MCP server connections.
|
|
type MCPManager struct {
|
|
ctx context.Context
|
|
logger schemas.Logger // Logger instance for this manager
|
|
oauth2Provider schemas.OAuth2Provider // Provider for OAuth2 functionality
|
|
toolsManager *ToolsManager // Handler for MCP tools
|
|
server *server.MCPServer // Local MCP server instance for hosting tools (STDIO-based)
|
|
clientMap map[string]*schemas.MCPClientState // Map of MCP client names to their configurations
|
|
mu sync.RWMutex // Read-write mutex for thread-safe operations
|
|
serverRunning bool // Track whether local MCP server is running
|
|
healthMonitorManager *HealthMonitorManager // Manager for client health monitors
|
|
toolSyncManager *ToolSyncManager // Manager for periodic tool synchronization
|
|
reconnectingClients sync.Map // Tracks in-flight reconnect attempts per client ID (map[string]bool)
|
|
}
|
|
|
|
// MCPToolFunction is a generic function type for handling tool calls with typed arguments.
|
|
// T represents the expected argument structure for the tool.
|
|
type MCPToolFunction[T any] func(args T) (string, error)
|
|
|
|
// ============================================================================
|
|
// CONSTRUCTOR AND INITIALIZATION
|
|
// ============================================================================
|
|
|
|
// NewMCPManager creates and initializes a new MCP manager instance.
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context for the MCP manager
|
|
// - config: MCP configuration including server port and client configs
|
|
// - oauth2Provider: OAuth2 provider for authentication
|
|
// - logger: Logger instance for structured logging (uses default if nil)
|
|
// - codeMode: Optional CodeMode implementation for code execution (e.g., Starlark).
|
|
// Pass nil if code mode is not needed. The CodeMode's dependencies will be
|
|
// injected automatically via SetDependencies after the manager is created.
|
|
//
|
|
// Returns:
|
|
// - *MCPManager: Initialized manager instance
|
|
func NewMCPManager(ctx context.Context, config schemas.MCPConfig, oauth2Provider schemas.OAuth2Provider, logger schemas.Logger, codeMode CodeMode) *MCPManager {
|
|
if logger == nil {
|
|
logger = defaultLogger
|
|
}
|
|
// Set default values
|
|
if config.ToolManagerConfig == nil {
|
|
config.ToolManagerConfig = &schemas.MCPToolManagerConfig{
|
|
ToolExecutionTimeout: schemas.DefaultToolExecutionTimeout,
|
|
MaxAgentDepth: schemas.DefaultMaxAgentDepth,
|
|
}
|
|
}
|
|
// Creating new instance
|
|
manager := &MCPManager{
|
|
ctx: ctx,
|
|
logger: logger,
|
|
clientMap: make(map[string]*schemas.MCPClientState),
|
|
healthMonitorManager: NewHealthMonitorManager(),
|
|
toolSyncManager: NewToolSyncManager(config.ToolSyncInterval),
|
|
oauth2Provider: oauth2Provider,
|
|
}
|
|
// Convert plugin pipeline provider functions to the interface expected by ToolsManager
|
|
var pluginPipelineProvider func() PluginPipeline
|
|
var releasePluginPipeline func(pipeline PluginPipeline)
|
|
|
|
if config.PluginPipelineProvider != nil && config.ReleasePluginPipeline != nil {
|
|
pluginPipelineProvider = func() PluginPipeline {
|
|
if pipeline := config.PluginPipelineProvider(); pipeline != nil {
|
|
if pp, ok := pipeline.(PluginPipeline); ok {
|
|
return pp
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
releasePluginPipeline = func(pipeline PluginPipeline) {
|
|
config.ReleasePluginPipeline(pipeline)
|
|
}
|
|
}
|
|
|
|
manager.toolsManager = NewToolsManager(config.ToolManagerConfig, manager, config.FetchNewRequestIDFunc, pluginPipelineProvider, releasePluginPipeline, oauth2Provider, logger)
|
|
|
|
// Set up CodeMode if provided - inject dependencies after manager is created
|
|
if codeMode != nil {
|
|
deps := manager.toolsManager.GetCodeModeDependencies()
|
|
codeMode.SetDependencies(deps)
|
|
manager.toolsManager.SetCodeMode(codeMode)
|
|
}
|
|
|
|
// Process client configs: create client map entries and establish connections
|
|
if len(config.ClientConfigs) > 0 {
|
|
// Add clients in parallel
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(config.ClientConfigs))
|
|
for _, clientConfig := range config.ClientConfigs {
|
|
go func(clientConfig *schemas.MCPClientConfig) {
|
|
defer wg.Done()
|
|
if err := manager.AddClient(clientConfig); err != nil {
|
|
manager.logger.Warn("%s Failed to register MCP client %s: %v", MCPLogPrefix, clientConfig.Name, err)
|
|
// Retain the entry in Disconnected state and start a health monitor to
|
|
// recover it automatically. On startup, a connection failure is likely
|
|
// transient (e.g. autoscaling cold start) — the client was previously
|
|
// configured and should be recovered without user intervention.
|
|
manager.mu.Lock()
|
|
if _, exists := manager.clientMap[clientConfig.ID]; !exists {
|
|
manager.clientMap[clientConfig.ID] = &schemas.MCPClientState{
|
|
Name: clientConfig.Name,
|
|
ExecutionConfig: clientConfig,
|
|
State: schemas.MCPConnectionStateDisconnected,
|
|
ToolMap: make(map[string]schemas.ChatTool),
|
|
ToolNameMapping: make(map[string]string),
|
|
ConnectionInfo: &schemas.MCPClientConnectionInfo{
|
|
Type: clientConfig.ConnectionType,
|
|
},
|
|
}
|
|
} else {
|
|
manager.clientMap[clientConfig.ID].State = schemas.MCPConnectionStateDisconnected
|
|
}
|
|
manager.mu.Unlock()
|
|
isPingAvailable := true
|
|
if clientConfig.IsPingAvailable != nil {
|
|
isPingAvailable = *clientConfig.IsPingAvailable
|
|
}
|
|
monitor := NewClientHealthMonitor(manager, clientConfig.ID, DefaultHealthCheckInterval, isPingAvailable, manager.logger)
|
|
manager.healthMonitorManager.StartMonitoring(monitor)
|
|
}
|
|
}(clientConfig)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
manager.logger.Info(MCPLogPrefix + " MCP Manager initialized")
|
|
return manager
|
|
}
|
|
|
|
// SetPluginPipeline updates the plugin pipeline provider and release function on the manager's
|
|
// ToolsManager and CodeMode. Call this after attaching an externally-created MCPManager to a Bifrost
|
|
// instance so that nested tool calls in code mode can run through Bifrost's plugin hooks.
|
|
func (manager *MCPManager) SetPluginPipeline(provider func() PluginPipeline, release func(PluginPipeline)) {
|
|
manager.toolsManager.SetPluginPipeline(provider, release)
|
|
}
|
|
|
|
// AddToolsToRequest parses available MCP tools from the context and adds them to the request.
|
|
// It respects context-based filtering for clients and tools, and returns the modified request
|
|
// with tools attached.
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context containing optional client/tool filtering keys
|
|
// - req: The Bifrost request to add tools to
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostRequest: The request with tools added
|
|
func (m *MCPManager) AddToolsToRequest(ctx *schemas.BifrostContext, req *schemas.BifrostRequest) *schemas.BifrostRequest {
|
|
return m.toolsManager.ParseAndAddToolsToRequest(ctx, req)
|
|
}
|
|
|
|
func (m *MCPManager) GetAvailableTools(ctx *schemas.BifrostContext) []schemas.ChatTool {
|
|
return m.toolsManager.GetAvailableTools(ctx)
|
|
}
|
|
|
|
// ExecuteToolCall executes a single tool call and returns the result.
|
|
// This is the primary tool executor and is used by both Chat Completions and Responses APIs.
|
|
//
|
|
// The method accepts an MCP request containing either a ChatAssistantMessageToolCall or
|
|
// ResponsesToolMessage, and returns the appropriate result format based on the request type.
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context for the tool execution
|
|
// - request: The MCP request containing the tool call (ChatAssistantMessageToolCall or ResponsesToolMessage)
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostMCPResponse: The result response containing tool execution output (ChatMessage or ResponsesMessage)
|
|
// - error: Any error that occurred during tool execution
|
|
func (m *MCPManager) ExecuteToolCall(ctx *schemas.BifrostContext, request *schemas.BifrostMCPRequest) (*schemas.BifrostMCPResponse, error) {
|
|
return m.toolsManager.ExecuteTool(ctx, request)
|
|
}
|
|
|
|
// UpdateToolManagerConfig updates the configuration for the tool manager.
|
|
// This allows runtime updates to settings like execution timeout and max agent depth.
|
|
//
|
|
// Parameters:
|
|
// - config: The new tool manager configuration to apply
|
|
func (m *MCPManager) UpdateToolManagerConfig(config *schemas.MCPToolManagerConfig) {
|
|
m.toolsManager.UpdateConfig(config)
|
|
}
|
|
|
|
// CheckAndExecuteAgentForChatRequest checks if the chat response contains tool calls,
|
|
// and if so, executes agent mode to handle the tool calls iteratively. If no tool calls
|
|
// are present, it returns the original response unchanged.
|
|
//
|
|
// Agent mode enables autonomous tool execution where:
|
|
// 1. Tool calls are automatically executed
|
|
// 2. Results are fed back to the LLM
|
|
// 3. The loop continues until no more tool calls are made or max depth is reached
|
|
// 4. Non-auto-executable tools are returned to the caller
|
|
//
|
|
// This method is available for both Chat Completions and Responses APIs.
|
|
// For Responses API, use CheckAndExecuteAgentForResponsesRequest().
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context for the agent execution
|
|
// - req: The original chat request
|
|
// - response: The initial chat response that may contain tool calls
|
|
// - makeReq: Function to make subsequent chat requests during agent execution
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostChatResponse: The final response after agent execution (or original if no tool calls)
|
|
// - *schemas.BifrostError: Any error that occurred during agent execution
|
|
func (m *MCPManager) CheckAndExecuteAgentForChatRequest(
|
|
ctx *schemas.BifrostContext,
|
|
req *schemas.BifrostChatRequest,
|
|
response *schemas.BifrostChatResponse,
|
|
makeReq func(ctx *schemas.BifrostContext, req *schemas.BifrostChatRequest) (*schemas.BifrostChatResponse, *schemas.BifrostError),
|
|
executeTool func(ctx *schemas.BifrostContext, request *schemas.BifrostMCPRequest) (*schemas.BifrostMCPResponse, error),
|
|
) (*schemas.BifrostChatResponse, *schemas.BifrostError) {
|
|
if makeReq == nil {
|
|
return nil, &schemas.BifrostError{
|
|
IsBifrostError: false,
|
|
Error: &schemas.ErrorField{
|
|
Message: "makeReq is required to execute agent mode",
|
|
},
|
|
}
|
|
}
|
|
// Check if initial response has tool calls
|
|
if !hasToolCallsForChatResponse(response) {
|
|
m.logger.Debug("No tool calls detected, returning response")
|
|
return response, nil
|
|
}
|
|
// Execute agent mode
|
|
return m.toolsManager.ExecuteAgentForChatRequest(ctx, req, response, makeReq, executeTool)
|
|
}
|
|
|
|
// CheckAndExecuteAgentForResponsesRequest checks if the responses response contains tool calls,
|
|
// and if so, executes agent mode to handle the tool calls iteratively. If no tool calls
|
|
// are present, it returns the original response unchanged.
|
|
//
|
|
// Agent mode for Responses API works identically to Chat API:
|
|
// 1. Detects tool calls in the response (function_call messages)
|
|
// 2. Automatically executes tools in parallel when possible
|
|
// 3. Feeds results back to the LLM in Responses API format
|
|
// 4. Continues the loop until no more tool calls or max depth reached
|
|
// 5. Returns non-auto-executable tools to the caller
|
|
//
|
|
// Format Handling:
|
|
// This method automatically handles format conversions:
|
|
// - Responses tool calls (ResponsesToolMessage) are converted to Chat format for execution
|
|
// - Tool execution results are converted back to Responses format (ResponsesMessage)
|
|
// - All conversions use the adapters in agent_adaptors.go and converters in schemas/mux.go
|
|
//
|
|
// This provides full feature parity between Chat Completions and Responses APIs for tool execution.
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context for the agent execution
|
|
// - req: The original responses request
|
|
// - response: The initial responses response that may contain tool calls
|
|
// - makeReq: Function to make subsequent responses requests during agent execution
|
|
//
|
|
// Returns:
|
|
// - *schemas.BifrostResponsesResponse: The final response after agent execution (or original if no tool calls)
|
|
// - *schemas.BifrostError: Any error that occurred during agent execution
|
|
func (m *MCPManager) CheckAndExecuteAgentForResponsesRequest(
|
|
ctx *schemas.BifrostContext,
|
|
req *schemas.BifrostResponsesRequest,
|
|
response *schemas.BifrostResponsesResponse,
|
|
makeReq func(ctx *schemas.BifrostContext, req *schemas.BifrostResponsesRequest) (*schemas.BifrostResponsesResponse, *schemas.BifrostError),
|
|
executeTool func(ctx *schemas.BifrostContext, request *schemas.BifrostMCPRequest) (*schemas.BifrostMCPResponse, error),
|
|
) (*schemas.BifrostResponsesResponse, *schemas.BifrostError) {
|
|
if makeReq == nil {
|
|
return nil, &schemas.BifrostError{
|
|
IsBifrostError: false,
|
|
Error: &schemas.ErrorField{
|
|
Message: "makeReq is required to execute agent mode",
|
|
},
|
|
}
|
|
}
|
|
// Check if initial response has tool calls
|
|
if !hasToolCallsForResponsesResponse(response) {
|
|
m.logger.Debug("No tool calls detected, returning response")
|
|
return response, nil
|
|
}
|
|
// Execute agent mode
|
|
return m.toolsManager.ExecuteAgentForResponsesRequest(ctx, req, response, makeReq, executeTool)
|
|
}
|
|
|
|
// Cleanup performs cleanup of all MCP resources including clients and local server.
|
|
// This function safely disconnects all MCP clients (HTTP, STDIO, and SSE) and
|
|
// cleans up the local MCP server. It handles proper cancellation of SSE contexts
|
|
// and closes all transport connections.
|
|
//
|
|
// Returns:
|
|
// - error: Always returns nil, but maintains error interface for consistency
|
|
func (m *MCPManager) Cleanup() error {
|
|
// Stop all health monitors first
|
|
m.healthMonitorManager.StopAll()
|
|
|
|
// Stop all tool syncers
|
|
m.toolSyncManager.StopAll()
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Disconnect all external MCP clients
|
|
for id := range m.clientMap {
|
|
if err := m.removeClientUnsafe(id); err != nil {
|
|
m.logger.Error("%s Failed to remove MCP client %s: %v", MCPLogPrefix, id, err)
|
|
}
|
|
}
|
|
|
|
// Clear the client map
|
|
m.clientMap = make(map[string]*schemas.MCPClientState)
|
|
|
|
// Clear local server reference
|
|
// Note: mark3labs/mcp-go STDIO server cleanup is handled automatically
|
|
if m.server != nil {
|
|
m.logger.Info(MCPLogPrefix + " Clearing local MCP server reference")
|
|
m.server = nil
|
|
m.serverRunning = false
|
|
}
|
|
|
|
m.logger.Info(MCPLogPrefix + " MCP cleanup completed")
|
|
return nil
|
|
}
|