feat(mcp): streamable HTTP transport with session, init, and dispatch

Implements the Streamable HTTP transport: POST routing handles initialize
(issues session ID), tools/list, tools/call, and unknown methods; GET SSE
emits a keepalive comment then blocks on context cancellation. A minimal
registry stub is introduced so the server compiles and tools/list returns
an empty array until Phase 6+ registers real tools.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mathias Bergqvist
2026-05-04 20:49:54 +02:00
parent 50a3b27825
commit 36765b8360
4 changed files with 296 additions and 1 deletions

149
internal/mcp/server.go Normal file
View File

@@ -0,0 +1,149 @@
package mcp
import (
"encoding/json"
"errors"
"net/http"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
const ProtocolVersion = "2025-06-18"
type ServerOptions struct {
Registry *registry.Registry
OriginAllowlist []string
Sessions *SessionStore
}
type Server struct {
opts ServerOptions
}
func NewServer(opts ServerOptions) *Server {
if opts.Sessions == nil {
opts.Sessions = NewSessionStore()
}
return &Server{opts: opts}
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Origin allowlist (no-op when allowlist empty or Origin missing)
if len(s.opts.OriginAllowlist) > 0 {
origin := r.Header.Get("Origin")
if origin != "" {
ok := false
for _, a := range s.opts.OriginAllowlist {
if a == origin {
ok = true
break
}
}
if !ok {
http.Error(w, "origin not allowed", http.StatusForbidden)
return
}
}
}
switch r.Method {
case http.MethodGet:
s.handleGET(w, r)
case http.MethodPost:
s.handlePOST(w, r)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) handlePOST(w http.ResponseWriter, r *http.Request) {
var req Request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, NewErrorResponse(nil, -32700, "parse error", nil))
return
}
if req.ID == nil {
// Notification — no response.
w.WriteHeader(http.StatusAccepted)
return
}
// initialize is the only method allowed without a session.
if req.Method == "initialize" {
sid := s.opts.Sessions.Issue()
w.Header().Set("Mcp-Session-Id", sid)
writeJSON(w, http.StatusOK, NewResponse(req.ID, map[string]any{
"protocolVersion": ProtocolVersion,
"capabilities": map[string]any{"tools": map[string]any{}},
"serverInfo": map[string]any{"name": "gitea-mcp", "version": "0.1.0"},
}))
return
}
sid := r.Header.Get("Mcp-Session-Id")
if !s.opts.Sessions.Valid(sid) {
http.Error(w, "missing or invalid Mcp-Session-Id", http.StatusBadRequest)
return
}
switch req.Method {
case "tools/list":
writeJSON(w, http.StatusOK, NewResponse(req.ID, map[string]any{
"tools": s.opts.Registry.Tools(),
}))
case "tools/call":
var p struct {
Name string `json:"name"`
Arguments json.RawMessage `json:"arguments"`
}
if err := json.Unmarshal(req.Params, &p); err != nil {
writeJSON(w, http.StatusOK,
NewErrorResponse(req.ID, -32602, "invalid params", nil))
return
}
out, err := s.opts.Registry.Dispatch(r.Context(), p.Name, p.Arguments)
if err != nil {
code := -32000
if errors.Is(err, ErrToolNotFound) {
code = CodeNotFound
}
writeJSON(w, http.StatusOK,
NewErrorResponse(req.ID, code, err.Error(), nil))
return
}
writeJSON(w, http.StatusOK, NewResponse(req.ID, map[string]any{
"content": []map[string]any{{"type": "text", "text": string(out)}},
}))
default:
writeJSON(w, http.StatusOK,
NewErrorResponse(req.ID, -32601, "method not found: "+req.Method, nil))
}
}
func (s *Server) handleGET(w http.ResponseWriter, r *http.Request) {
sid := r.Header.Get("Mcp-Session-Id")
if !s.opts.Sessions.Valid(sid) {
http.Error(w, "missing or invalid Mcp-Session-Id", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, _ := w.(http.Flusher)
// Emit a comment as keepalive; real notifications come via a future channel.
_, _ = w.Write([]byte(": stream open\n\n"))
if flusher != nil {
flusher.Flush()
}
<-r.Context().Done()
}
var ErrToolNotFound = errors.New("tool not found")
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}

View File

@@ -0,0 +1,92 @@
package mcp_test
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/mcp"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newServer(t *testing.T) *mcp.Server {
t.Helper()
reg := registry.New()
return mcp.NewServer(mcp.ServerOptions{
Registry: reg,
OriginAllowlist: nil,
Sessions: mcp.NewSessionStore(),
})
}
func postJSON(t *testing.T, srv http.Handler, body any, sessionID string) *httptest.ResponseRecorder {
t.Helper()
b, _ := json.Marshal(body)
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBuffer(b))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")
if sessionID != "" {
req.Header.Set("Mcp-Session-Id", sessionID)
}
rr := httptest.NewRecorder()
srv.ServeHTTP(rr, req)
return rr
}
func TestInitialize(t *testing.T) {
srv := newServer(t)
rr := postJSON(t, srv, map[string]any{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": map[string]any{"protocolVersion": "2025-06-18"},
}, "")
require.Equal(t, http.StatusOK, rr.Code)
sid := rr.Header().Get("Mcp-Session-Id")
assert.NotEmpty(t, sid)
var resp map[string]any
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp))
result := resp["result"].(map[string]any)
assert.Equal(t, "2025-06-18", result["protocolVersion"])
si := result["serverInfo"].(map[string]any)
assert.Equal(t, "gitea-mcp", si["name"])
}
func TestPostWithoutSessionRejected(t *testing.T) {
srv := newServer(t)
rr := postJSON(t, srv, map[string]any{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
}, "")
require.Equal(t, http.StatusBadRequest, rr.Code)
}
func TestToolsListAfterInitialize(t *testing.T) {
srv := newServer(t)
init := postJSON(t, srv, map[string]any{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": map[string]any{"protocolVersion": "2025-06-18"},
}, "")
sid := init.Header().Get("Mcp-Session-Id")
rr := postJSON(t, srv, map[string]any{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
}, sid)
require.Equal(t, http.StatusOK, rr.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp))
result := resp["result"].(map[string]any)
assert.Contains(t, result, "tools")
}

View File

@@ -0,0 +1,42 @@
package registry
import (
"context"
"encoding/json"
"errors"
)
type ToolDescriptor struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"inputSchema"`
}
type Tool interface {
Descriptor() ToolDescriptor
Call(ctx context.Context, args json.RawMessage) (json.RawMessage, error)
}
type Registry struct {
tools map[string]Tool
}
func New() *Registry { return &Registry{tools: map[string]Tool{}} }
func (r *Registry) Register(t Tool) { r.tools[t.Descriptor().Name] = t }
func (r *Registry) Tools() []ToolDescriptor {
out := make([]ToolDescriptor, 0, len(r.tools))
for _, t := range r.tools {
out = append(out, t.Descriptor())
}
return out
}
func (r *Registry) Dispatch(ctx context.Context, name string, args json.RawMessage) (json.RawMessage, error) {
t, ok := r.tools[name]
if !ok {
return nil, errors.New("tool not found: " + name)
}
return t.Call(ctx, args)
}