[Part 1]: 10K, 1M.. or more active SSE streams Building MCPSprut (MCP Hub)¶
チェック¶
- [ ] 本文を確認した
- [ ] 概要を確認した
- [ ] タグを確認した
- [ ]
inbox/直下へ移行した
概要¶
MCP サーバーの tool 変更をリアルタイムに追うため、大量の SSE stream を保持する MCP Hub MCPSprut を Go で作った記事。
10,000 個の仮想 MCP server を simulator で生成し、Hub 側は server ごとに goroutine を立てて SSE stream を維持し、tool change event を受けたら tool list を再取得して storage に保存する。
fan-out with supervision、storage interface、batch writer、observer callback、retry stability window、WaitGroup と mutex による shutdown safety、context propagation による graceful shutdown が実装上の論点。
本文¶
MCP は、LLM が外部サービスを使うための protocol として説明される。 各 MCP server は tools を公開し、LLM client は必要に応じてその tool を呼び出す。 ただし、tool set には固定 version があるわけではなく、server 側で tool が追加、削除、更新される可能性がある。
その変更を client が知るため、MCP server は SSE、Server-Sent Events の stream を維持し、tool set が変わったら event を送る。 client は event を受けたら tool list を再取得する。
ここで multi-tenant な AI platform を考えると、ユーザーごとに Gmail、Slack、Jira などの MCP server を接続する。 ユーザーが増えれば、維持すべき SSE stream は数万、数十万、あるいはそれ以上になる。 記事の問いは、単一マシンで 10K、1M 以上の active SSE stream をどこまで扱えるのか、というもの。
simulator¶
本物の MCP server を 10,000 個用意するのは現実的ではない。 認証も面倒で、tool が頻繁に変わる server を大量に集める必要もある。 そのため、まず Go で simulator を作っている。
simulator は 10,000 個の別プロセスを起動するわけではない。 単一 process 内に仮想 MCP server の pool を作り、1つの HTTP server の path で server を分ける。
この構成により、10,000 port を開けずに、10,000 server 相当の endpoint を作れる。
各仮想 server は goroutine を持ち、5〜60秒のランダム間隔で tool を変える。
変化は add、remove、update の3種類。
tool 名は get_users、create_reports、deploy_clusters のような verb + noun で生成され、ログが現実味を持つようにしている。
Mutation の概念は次のようなもの。
func mutate(tools []Tool) ([]Tool, MutationEvent) {
switch rand.Intn(3) {
case 0:
return append(tools, randomTool()), MutationEvent{Type: "add"}
case 1:
if len(tools) <= 1 {
return tools, MutationEvent{Type: "skip-remove"}
}
return removeRandom(tools), MutationEvent{Type: "remove"}
default:
return updateRandom(tools), MutationEvent{Type: "update"}
}
}
simulator は本物の MCP server ではないが、Hub の load test としては十分。 記事時点では 10,000 server を問題なく動かせており、今後さらに限界を探る予定になっている。
MCPSprut の役割¶
MCPSprut は MCP Hub。 複数の MCP server に接続し、それぞれの SSE stream を開いたまま保持する。 tool change event を受けると、その server から tool list を再取得し、storage に保存する。
要約すると、次の責務を持つ。
- 登録済み MCP server 一覧を読み込む。
- server ごとに connector goroutine を起動する。
- 各 connector が SSE stream を維持する。
- event 受信時に tool list を fetch する。
- tool 更新を batch writer 経由で storage に保存する。
Go を選んだ理由は、goroutine が「1 connection per server」に自然に対応するため。 prototype として実装速度も高い。 最大性能を追うなら Rust も候補だが、記事では将来の話として扱われている。
fan-out with supervision¶
Hub の基本構成は fan-out with supervision。 main process が config と server list を読み込み、各 server に対して connector goroutine を起動する。
func (h *Hub) Start(ctx context.Context) error {
servers, err := h.storage.LoadServers(ctx)
if err != nil {
return err
}
for _, server := range servers {
h.startConnector(ctx, server)
}
return nil
}
func (h *Hub) startConnector(ctx context.Context, server ServerConfig) {
go func() {
conn := NewConnector(server, h.batcher)
conn.Run(ctx)
}()
}
server address はあらかじめ database に入っている。 この service は server 登録の UI ではなく、登録済み server の realtime tool update を追うためのもの。
Storage interface¶
database は最初に決め打ちしない。 Postgres、Redis、ClickHouse、ファイル書き込みなど、用途によって選択肢が変わるため、storage interface を定義して差し替え可能にしている。
type Storage interface {
LoadServers(ctx context.Context) ([]ServerConfig, error)
SaveServer(ctx context.Context, server ServerConfig) error
OnNewServer(func(ServerConfig))
SaveToolsBatch(ctx context.Context, updates []ToolUpdate) error
GetTools(ctx context.Context, serverID string) ([]Tool, error)
Close() error
}
prototype では BoltDB を使っている。 BoltDB は Go 製の embedded database で、依存が少なく試作に向く。
重要なのは、storage に OnNewServer callback があること。
これにより、稼働中に新しい server が保存されたとき、Hub 側がその server の connector をすぐ起動できる。
Batch Writer¶
tool update を event ごとに storage へ書くとコストが高い。 そこで batch writer を用意し、更新を channel で受け取り、一定件数または一定時間でまとめて flush する。
設計は次のようなもの。
func (b *Batcher) Run(ctx context.Context) {
ticker := time.NewTicker(b.flushInterval)
defer ticker.Stop()
buf := make([]ToolUpdate, 0, b.batchSize)
for {
select {
case <-ctx.Done():
b.drain(&buf)
b.flush(buf)
return
case update := <-b.updates:
buf = append(buf, update)
if len(buf) >= b.batchSize {
b.flush(buf)
buf = buf[:0]
}
case <-ticker.C:
if len(buf) > 0 {
b.flush(buf)
buf = buf[:0]
}
}
}
}
batch size だけで flush すると、3件だけ入ったまま次の97件が来ない場合に stale data が残る。 そのため、件数条件と時間条件のどちらかで flush する。
shutdown 時には context cancellation を受け取り、channel 内の残りも drain してから flush する。 これは tool update の取りこぼしを減らすために重要。
記事では、各 server については1つの connector が順番に event を処理するため、同じ server の更新順序は connector 内で保たれると整理している。 そのため、BoltDB では最後の write が勝つ挙動で足りる。
Observer / Callback¶
稼働中に新しい MCP server を追加したら、すぐ監視対象にしたい。 そのために observer/callback pattern を使う。
Hub は storage に callback を登録する。
storage は SaveServer が呼ばれて新しい server を保存したら callback を呼ぶ。
storage は Hub や connector を知らない。
Hub も storage を polling しない。
保存イベントだけが伝わり、Hub が connector を起動する。
この分離により、server 追加の即時反映とコンポーネント間の疎結合を両立している。
Retry with Stability Window¶
connector には retry がある。 単純には3回失敗したら諦める。 しかし、SSE stream は長時間生きる。 一度安定して何時間も動いた connection が、たまに切れたからといって失敗回数を累積し続けるのは厳しすぎる。
そこで stability window を入れる。 connection が一定時間以上、記事では30秒以上生きたなら、その接続は安定していたとみなし、failure counter を reset する。
func (c *Connector) Run(ctx context.Context) {
failures := 0
for {
started := time.Now()
err := c.connect(ctx)
if ctx.Err() != nil {
return
}
if time.Since(started) > stableWindow {
failures = 0
}
failures++
if failures > maxRetries {
return
}
select {
case <-ctx.Done():
return
case <-time.After(c.retryInterval):
}
}
}
短時間に3回連続で落ちるなら server が落ちている可能性が高い。 長時間安定した後の切断なら、一時的な hiccup とみなして新しく数え直す。
WaitGroup + Mutex による shutdown safety¶
observer callback と shutdown の組み合わせで、sync.WaitGroup の race が起こり得る。
shutdown 中に wg.Wait() している、または終わった直後に、新しい server callback が発火して wg.Add(1) すると panic の原因になる。
これを防ぐため、Hub に mutex と shutdown flag を持たせる。
startConnector は mutex を取り、shutdown flag を確認し、shutdown 中でなければ同じ lock の中で wg.Add(1) する。
func (h *Hub) startConnector(ctx context.Context, server ServerConfig) {
h.shutdownMu.Lock()
if h.shutdown {
h.shutdownMu.Unlock()
return
}
h.wg.Add(1)
h.shutdownMu.Unlock()
go func() {
defer h.wg.Done()
NewConnector(server, h.batcher).Run(ctx)
}()
}
func (h *Hub) Wait() {
h.shutdownMu.Lock()
h.shutdown = true
h.shutdownMu.Unlock()
h.wg.Wait()
}
flag だけでは、flag check と wg.Add の間に race が残る。
mutex だけでは、Wait() 後に callback が入る問題を防げない。
両方を組み合わせ、Add と shutdown check を atomic に扱う。
Graceful shutdown¶
shutdown は context propagation で行う。
SIGINT/SIGTERM を受けたら root context を cancel する。
各 connector は ctx.Done() を見て SSE stream を閉じる。
Hub は shutdown flag を立て、新しい connector を拒否し、WaitGroup で既存 connector を待つ。
batcher は context cancellation を受けて残りを drain/flush する。
最後に BoltDB を close する。
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
cancel()
done := make(chan struct{})
go func() {
h.Wait()
batcher.Wait()
close(done)
}()
select {
case <-done:
log.Println("shutdown complete")
case <-time.After(5 * time.Second):
log.Println("shutdown timed out")
}
各 component は互いの内部を知らず、context cancellation だけで cleanup する。 長くかかりすぎる場合は timeout で強制終了する。 少数の update を失うより、永遠に止まらない方が問題という判断。
結果と次の課題¶
記事の MVP では、10,000 の active SSE stream を安定して維持できた。 Active Streams のグラフでは 10,000 まで ramp up し、その後 plateau に入って drop しない。 Tools Fetched rate のグラフでは、tool changed event を受けて tool list を再取得する処理が安定して回っている。
ただし、詳細な profiling、memory、CPU、ログ、OpenTelemetry による解析は次回以降。 次の問いは、どこで壊れるか。 100K、1M、さらに大きい数で、memory、CPU、BoltDB、network、file descriptor のどれが先に限界になるかを見る必要がある。
要点¶
- MCP server は tool set の変更を SSE で client に通知する。
- multi-tenant MCP platform では、server 数に比例して persistent stream が増える。
- simulator は単一 process + path routing で 10,000 仮想 MCP server を作る。
- MCPSprut は server ごとに goroutine を持ち、SSE stream を維持する。
- tool update は batch writer でまとめて storage に保存する。
- 稼働中 server 追加は storage callback で connector を起動する。
- 長時間 stream では失敗回数を単純累積せず、stability window で reset する。
- WaitGroup と callback を併用すると shutdown race が起きるため、mutex + shutdown flag が必要。
- graceful shutdown は context propagation で全 component に伝える。