-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
159 lines (135 loc) · 4.26 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package main
import (
"context"
"flag"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/joho/godotenv"
)
// ServerConfig represents the MCP server configuration structure
type ServerConfig struct {
Command string `yaml:"command" json:"command"`
Args []string `yaml:"args" json:"args"`
Env map[string]string `yaml:"env" json:"env"`
Extensions *Extensions `yaml:"_extensions" json:"_extensions"`
}
// ToolsExtensions contains tool allow/deny lists
type ToolsExtensions struct {
Allow []string `yaml:"allow" json:"allow"`
Deny []string `yaml:"deny" json:"deny"`
}
// Extensions contains various extension configurations
type Extensions struct {
Tools ToolsExtensions `yaml:"tools" json:"tools"`
}
// Config represents the application's global configuration structure
type Config struct {
MCPServers map[string]ServerConfig `yaml:"mcpServers" json:"mcpServers"`
}
func main() {
// Handle command line arguments
configPath := flag.String("config", "", "path to config file (required)")
port := flag.String("port", "8080", "port to listen on")
debug := flag.Bool("debug", false, "enable debug mode")
initTimeoutSec := flag.Int("init-timeout", 60, "timeout in seconds for each MCP client initialization")
flag.Parse()
// Initialize logger with level
logLevel := slog.LevelInfo
if *debug {
logLevel = slog.LevelDebug
}
InitLogger(logLevel)
logger := WithComponent("main")
// Load dotenv if it exists
if err := godotenv.Load(); err != nil {
logger.Info("Notice: .env file not found, continuing without it")
}
// Check if config file path is provided
if *configPath == "" {
logger.Error("Config file path is required", "error", "Use -config flag to specify it")
os.Exit(1)
}
// Load config file
cfg, err := LoadConfig(*configPath)
if err != nil {
logger.Error("Failed to load config", "error", err)
os.Exit(1)
}
// Create empty MCP clients map and start server immediately
server := NewServer(make(map[string]*MCPClient))
// Create context for graceful shutdown
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// Start server in a goroutine
errCh := make(chan error, 1)
go func() {
errCh <- server.Start(*port)
}()
// Initialize MCP clients asynchronously
go func() {
logger.Info("Starting MCP client initialization")
tempMcpClients := make(map[string]*MCPClient)
for name, serverCfg := range cfg.MCPServers {
mcpCfg := ConvertToMCPClientConfig(serverCfg)
client, err := NewMCPClient(mcpCfg)
if err != nil {
logger.Error("Failed to create MCP client",
"server_name", name,
"error", err)
continue
}
err = withTimeout(ctx, time.Duration(*initTimeoutSec)*time.Second, func(ctx context.Context) error {
_, err := client.Initialize(ctx)
return err
})
if err != nil {
logger.Error("Failed to initialize MCP client", "server_name", name, "error", err)
continue
}
tempMcpClients[name] = client
logger.Info("MCP Server initialized successfully", "server_name", name)
// Log environment variables checksums if in debug mode
if *debug {
LogConfig(logger, name, serverCfg.Env)
}
}
// Update server's mcpClients map atomically with initMu lock
server.initMu.Lock()
server.mcpClients = tempMcpClients
server.initMu.Unlock()
logger.Info("MCP client initialization completed; Now its ready")
}()
// Add cleanup for MCP clients on shutdown
defer func() {
server.initMu.RLock()
defer server.initMu.RUnlock()
mcpClients := server.mcpClients
for _, client := range mcpClients {
client.Close()
}
}()
// Wait for interrupt signal or server error
select {
case <-ctx.Done():
logger.Info("Shutdown signal received")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Error("Server shutdown error", "error", err)
}
case err := <-errCh:
if err != nil && err != http.ErrServerClosed {
logger.Error("Fatal server error", "error", err)
os.Exit(1)
}
}
}
func withTimeout(parentCtx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error {
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()
return fn(ctx)
}