File size: 5,267 Bytes
ec4aa90 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
"""
MCP Manager - Central orchestrator for multiple MCP server connections.
"""
import os
import logging
from typing import Dict, Optional
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
logger = logging.getLogger(__name__)
class MCPManager:
"""
Manages multiple MCP server connections and sessions.
Provides centralized connection pooling and session management.
"""
def __init__(self):
"""Initialize MCP Manager."""
self.servers: Dict[str, StdioServerParameters] = {}
self.sessions: Dict[str, ClientSession] = {}
self.active_connections: Dict[str, bool] = {}
logger.info("MCPManager initialized")
def register_server(self, name: str, command: str, args: list, env: Optional[Dict] = None):
"""
Register an MCP server configuration.
Args:
name: Unique name for the server
command: Command to start the server
args: Arguments for the command
env: Optional environment variables
"""
server_params = StdioServerParameters(
command=command,
args=args,
env=env or {}
)
self.servers[name] = server_params
self.active_connections[name] = False
logger.info(f"Registered MCP server: {name}")
def register_github_server(self):
"""Register GitHub MCP server."""
github_token = os.getenv("GITHUB_TOKEN")
if not github_token:
logger.warning("GITHUB_TOKEN not set, GitHub MCP will not be available")
return
self.register_server(
name="github",
command="npx",
args=["-y", "@modelcontextprotocol/server-github"],
env={"GITHUB_PERSONAL_ACCESS_TOKEN": github_token}
)
def register_tavily_server(self):
"""Register Tavily Search MCP server."""
tavily_key = os.getenv("TAVILY_API_KEY")
if not tavily_key:
logger.warning("TAVILY_API_KEY not set, Tavily MCP will not be available")
return
self.register_server(
name="tavily",
command="npx",
args=["-y", "@modelcontextprotocol/server-tavily"],
env={"TAVILY_API_KEY": tavily_key}
)
def register_memory_server(self):
"""Register Memory MCP server."""
self.register_server(
name="memory",
command="npx",
args=["-y", "@modelcontextprotocol/server-memory"]
)
def register_filesystem_server(self, allowed_directories: Optional[list] = None):
"""
Register Filesystem MCP server.
Args:
allowed_directories: List of allowed directories for file access
"""
args = ["-y", "@modelcontextprotocol/server-filesystem"]
if allowed_directories:
args.extend(allowed_directories)
self.register_server(
name="filesystem",
command="npx",
args=args
)
def get_server_params(self, name: str) -> Optional[StdioServerParameters]:
"""
Get server parameters by name.
Args:
name: Server name
Returns:
Server parameters or None if not found
"""
return self.servers.get(name)
def is_server_registered(self, name: str) -> bool:
"""
Check if a server is registered.
Args:
name: Server name
Returns:
True if registered, False otherwise
"""
return name in self.servers
def list_servers(self) -> list:
"""
List all registered servers.
Returns:
List of server names
"""
return list(self.servers.keys())
async def initialize_all_servers(self):
"""Initialize all registered MCP servers."""
logger.info("Initializing all MCP servers...")
for name in self.servers:
try:
logger.info(f"Initializing {name} MCP server...")
# Note: Actual initialization happens when clients connect
self.active_connections[name] = True
except Exception as e:
logger.error(f"Failed to initialize {name}: {e}")
self.active_connections[name] = False
logger.info("MCP server initialization complete")
def get_active_servers(self) -> list:
"""
Get list of active server connections.
Returns:
List of active server names
"""
return [name for name, active in self.active_connections.items() if active]
def register_all_standard_servers(self):
"""Register all standard MCP servers."""
logger.info("Registering all standard MCP servers...")
self.register_github_server()
self.register_tavily_server()
self.register_memory_server()
self.register_filesystem_server()
logger.info(f"Registered {len(self.servers)} MCP servers")
|