9 Commits

Author SHA1 Message Date
Mathias Bergqvist
370d30e376 feat(ingestion): mount MCP handler at POST /mcp
The ingestion server now exposes both REST and MCP on the same port
(3300). MCP shares brainDir, pipeline config, and LLM client with the
REST handlers — single source of process state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 19:41:05 +02:00
Mathias Bergqvist
bd0c1d75fd feat(ingestion): implement session_log MCP tool
Appends a JSON line to brainDir/sessions/<session_id>.jsonl using the
session package copied in Task 2. Required for upcoming pass-rate
logging.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 13:04:40 +02:00
Mathias Bergqvist
8c87460bff feat(ingestion): implement brain_ingest MCP tool
Wraps pipeline.Run with the existing LLM client. Mirrors the HTTP
/ingest and /ingest-path semantics — accepts either path or
content+source, validates mutual exclusion, surfaces an explicit error
when the LLM client is not configured (test-mode).

ctx is threaded through to pipeline.Run for cancellation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 13:02:02 +02:00
Mathias Bergqvist
809d435480 feat(ingestion): implement brain_ingest_raw MCP tool
Wraps pipeline.RunRaw directly. Same dry-run semantics as the HTTP
/ingest-raw endpoint. Test exercises a single concept page; asserts
returned path and that no file is written under dry_run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 10:28:05 +02:00
Mathias Bergqvist
e4a94df4fc feat(ingestion): extract WriteNote helper and add brain_write MCP tool
api.WriteNote captures the file-write logic that was previously inline
in Handler.Write. The existing HTTP endpoint now delegates to it; the
new MCP brain_write tool reuses the same function. Path-traversal
guard is strengthened to explicitly reject filenames containing path
separators or "..", so the rejection is surfaced before filepath.Base
strips the suspicious component (the previous defense-in-depth prefix
check became unreachable for these inputs after Base normalisation).
HTTP error code for caller-input errors shifts from 500 to 400, which
is semantically correct and not exercised by any existing test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 10:25:38 +02:00
Mathias Bergqvist
7dcb5610fe feat(ingestion): implement brain_query MCP tool
Wraps the existing search.Query function. Same BM25 over
brain/knowledge/ and brain/wiki/ that the HTTP /query endpoint serves.
Plan note: handleCall switch replaces the single-line stub from Task 1
— no unknownToolError type to remove since Task 1 inlined the error.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 09:56:40 +02:00
Mathias Bergqvist
63c8d114e8 feat(ingestion): add session package for JSONL log persistence
Copy of internal/session from the supervisor module — the ingestion
service needs it for the upcoming session_log MCP tool. The supervisor
copy will be removed in the supervisor-retirement plan; until then
the two packages are intentionally identical and pinned (no edits).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 09:54:24 +02:00
Mathias Bergqvist
54f7d373bd feat(ingestion): add MCP server skeleton with tools/list
Adds an MCP HTTP handler under ingestion/internal/mcp. Implements
initialize, tools/list, and the JSON-RPC notification skip from prior
work. Tool dispatch is stubbed (returns unknown-tool error) and will be
filled in by subsequent tasks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 09:43:23 +02:00
Mathias Bergqvist
a412eee427 docs: add brain MCP migration plan
13 TDD-disciplined tasks moving brain_* and session_log out of the
supervisor pod and into the ingestion pod's MCP handler. Slice 1 of
the larger SKILL.md + routing-MCP architecture migration.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 09:27:28 +02:00
10 changed files with 2744 additions and 44 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,7 @@ import (
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
"github.com/mathiasbq/hyperguild/ingestion/internal/mcp"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/watcher"
)
@@ -54,6 +55,8 @@ func main() {
h := api.NewHandler(brainDir, logger, pipelineCfg)
mcpSrv := mcp.NewServer(brainDir, &pipelineCfg, llmClient.Complete)
ctx := context.Background()
if watchInterval > 0 {
watcher.Start(ctx, watcher.Config{
@@ -70,6 +73,7 @@ func main() {
mux.HandleFunc("POST /ingest-path", h.IngestPath)
mux.HandleFunc("POST /ingest-raw", h.IngestRaw)
mux.HandleFunc("POST /backfill-refs", h.BackfillRefs)
mux.Handle("POST /mcp", mcpSrv)
addr := ":" + port
watchIntervalLog := "disabled"
@@ -83,6 +87,7 @@ func main() {
"llm_model", llmModel,
"chunk_size", chunkSize,
"watch_interval", watchIntervalLog,
"mcp_enabled", true,
)
if err := http.ListenAndServe(addr, mux); err != nil {
logger.Error("server stopped", "err", err)

View File

@@ -85,6 +85,57 @@ func (h *Handler) Query(w http.ResponseWriter, r *http.Request) {
writeJSON(w, map[string]any{"results": results})
}
// WriteNote writes a markdown file to brainDir/knowledge/<filename>, optionally
// prefixed with YAML frontmatter built from typ and domain. Returns the path
// relative to brainDir (forward-slashed). Filename traversal is rejected.
func WriteNote(brainDir, content, filename, typ, domain string) (string, error) {
if content == "" {
return "", fmt.Errorf("content is required")
}
if filename == "" {
filename = fmt.Sprintf("%s-auto.md", time.Now().UTC().Format("2006-01-02-150405"))
}
rawDir := filepath.Join(brainDir, "knowledge")
if err := os.MkdirAll(rawDir, 0o755); err != nil {
return "", fmt.Errorf("create raw dir: %w", err)
}
finalContent := content
if typ != "" || domain != "" {
var fm strings.Builder
fm.WriteString("---\n")
if typ != "" {
fmt.Fprintf(&fm, "type: %s\n", typ)
}
if domain != "" {
fmt.Fprintf(&fm, "domain: %s\n", domain)
}
fm.WriteString("---\n")
finalContent = fm.String() + content
}
// Reject path separators outright; any non-flat filename is misuse.
if strings.ContainsAny(filename, `/\`) {
return "", fmt.Errorf("invalid filename")
}
base := filepath.Base(filename)
// After Base, "." and ".." remain. Reject those before adding .md.
if base == "." || base == ".." || base == "" {
return "", fmt.Errorf("invalid filename")
}
if !strings.HasSuffix(base, ".md") {
base += ".md"
}
dest := filepath.Join(rawDir, base)
if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil {
return "", fmt.Errorf("write: %w", err)
}
rel, _ := filepath.Rel(brainDir, dest)
return filepath.ToSlash(rel), nil
}
// Write handles POST /write — write raw content to brain/knowledge/.
func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
var req writeRequest
@@ -92,53 +143,13 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, "invalid JSON")
return
}
if req.Content == "" {
writeError(w, http.StatusBadRequest, "content is required")
return
}
filename := req.Filename
if filename == "" {
filename = fmt.Sprintf("%s-auto.md", time.Now().UTC().Format("2006-01-02-150405"))
}
rawDir := filepath.Join(h.brainDir, "knowledge")
if err := os.MkdirAll(rawDir, 0o755); err != nil {
writeError(w, http.StatusInternalServerError, "failed to create raw dir")
return
}
finalContent := req.Content
if req.Type != "" || req.Domain != "" {
var fm strings.Builder
fm.WriteString("---\n")
if req.Type != "" {
fmt.Fprintf(&fm, "type: %s\n", req.Type)
}
if req.Domain != "" {
fmt.Fprintf(&fm, "domain: %s\n", req.Domain)
}
fm.WriteString("---\n")
finalContent = fm.String() + req.Content
}
base := filepath.Base(filename)
if !strings.HasSuffix(base, ".md") {
base += ".md"
}
dest := filepath.Join(rawDir, base)
if !strings.HasPrefix(filepath.Clean(dest)+string(os.PathSeparator), filepath.Clean(rawDir)+string(os.PathSeparator)) {
writeError(w, http.StatusBadRequest, "invalid filename")
return
}
if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil {
relPath, err := WriteNote(h.brainDir, req.Content, req.Filename, req.Type, req.Domain)
if err != nil {
h.logger.Error("write failed", "err", err)
writeError(w, http.StatusInternalServerError, "write error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
rel, _ := filepath.Rel(h.brainDir, dest)
writeJSON(w, map[string]string{"path": filepath.ToSlash(rel)})
writeJSON(w, map[string]string{"path": relPath})
}
// Ingest handles POST /ingest — run the pipeline on provided content.

View File

@@ -0,0 +1,256 @@
package mcp
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
"github.com/mathiasbq/hyperguild/ingestion/internal/extract"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
"github.com/mathiasbq/hyperguild/ingestion/internal/session"
)
// tools returns the tool descriptors. Handler bodies for each tool are filled
// in subsequent tasks; this file currently only provides the descriptors.
func (s *Server) tools() []map[string]any {
str := func(desc string) map[string]any {
return map[string]any{"type": "string", "description": desc}
}
int_ := func(desc string) map[string]any {
return map[string]any{"type": "integer", "description": desc}
}
schema := func(required []string, props map[string]any) json.RawMessage {
b, _ := json.Marshal(map[string]any{
"type": "object", "required": required, "properties": props,
})
return b
}
return []map[string]any{
{
"name": "brain_query",
"description": "BM25 full-text search across brain/knowledge/ and brain/wiki/ markdown files.",
"inputSchema": schema([]string{"query"}, map[string]any{
"query": str("search terms"),
"limit": int_("max results, default 5"),
}),
},
{
"name": "brain_write",
"description": "Write a raw knowledge note to brain/knowledge/.",
"inputSchema": schema([]string{"content"}, map[string]any{
"content": str("markdown content"),
"filename": str("optional filename"),
"type": str("optional frontmatter type"),
"domain": str("optional frontmatter domain"),
}),
},
{
"name": "brain_ingest_raw",
"description": "Ingest pre-structured pages into the brain wiki, bypassing the LLM extraction step.",
"inputSchema": schema([]string{"source", "pages"}, map[string]any{
"source": str("source name"),
"pages": map[string]any{"type": "array"},
"dry_run": map[string]any{"type": "boolean"},
}),
},
{
"name": "brain_ingest",
"description": "Ingest content into the brain wiki via the LLM extraction pipeline.",
"inputSchema": schema([]string{}, map[string]any{
"content": str("raw content; required when path is empty"),
"source": str("source name; required when path is empty"),
"path": str("file path; mutually exclusive with content+source"),
"dry_run": map[string]any{"type": "boolean"},
}),
},
{
"name": "session_log",
"description": "Append a structured entry to brain/sessions/<session_id>.jsonl.",
"inputSchema": schema([]string{"session_id"}, map[string]any{
"session_id": str("session identifier"),
"skill": str("skill name"),
"phase": str("phase within the skill"),
"project_root": str("absolute project root"),
"final_status": str("ok | error | skipped"),
"file_path": str("optional file produced"),
"model_used": str("optional model identifier"),
"duration_ms": int_("optional duration in ms"),
"message": str("optional free-text"),
}),
},
}
}
type brainQueryArgs struct {
Query string `json:"query"`
Limit int `json:"limit,omitempty"`
}
func (s *Server) brainQuery(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainQueryArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Query == "" {
return nil, fmt.Errorf("query is required")
}
if a.Limit == 0 {
a.Limit = 5
}
results, err := search.Query(s.brainDir, a.Query, a.Limit)
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}
return json.Marshal(map[string]any{"results": results})
}
type brainWriteArgs struct {
Content string `json:"content"`
Filename string `json:"filename,omitempty"`
Type string `json:"type,omitempty"`
Domain string `json:"domain,omitempty"`
}
func (s *Server) brainWrite(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainWriteArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
relPath, err := api.WriteNote(s.brainDir, a.Content, a.Filename, a.Type, a.Domain)
if err != nil {
return nil, err
}
return json.Marshal(map[string]string{"path": relPath})
}
type brainIngestRawArgs struct {
Source string `json:"source"`
Pages []pipeline.RawPage `json:"pages"`
DryRun bool `json:"dry_run,omitempty"`
}
func (s *Server) brainIngestRaw(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainIngestRawArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Source == "" {
return nil, fmt.Errorf("source is required")
}
if len(a.Pages) == 0 {
return nil, fmt.Errorf("pages must be non-empty")
}
result, err := pipeline.RunRaw(s.brainDir, a.Source, a.Pages, a.DryRun)
if err != nil {
return nil, fmt.Errorf("ingest: %w", err)
}
pages := result.Pages
if pages == nil {
pages = []string{}
}
warnings := result.Warnings
if warnings == nil {
warnings = []string{}
}
return json.Marshal(map[string]any{"pages": pages, "warnings": warnings})
}
type brainIngestArgs struct {
Content string `json:"content,omitempty"`
Source string `json:"source,omitempty"`
Path string `json:"path,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
func (s *Server) brainIngest(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainIngestArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Path != "" && a.Content != "" {
return nil, fmt.Errorf("path and content+source are mutually exclusive")
}
if a.Path == "" && a.Content == "" {
return nil, fmt.Errorf("either path or content+source is required")
}
if s.pipeline.Complete == nil {
return nil, fmt.Errorf("LLM not configured: set INGEST_LLM_URL")
}
if a.Path != "" {
text, err := extract.Text(a.Path)
if err != nil {
return nil, fmt.Errorf("extract: %w", err)
}
source := a.Source
if source == "" {
source = filepath.Base(strings.TrimSuffix(a.Path, filepath.Ext(a.Path)))
}
return s.runIngest(ctx, text, source, a.DryRun)
}
if a.Source == "" {
return nil, fmt.Errorf("source is required when content is provided")
}
return s.runIngest(ctx, a.Content, a.Source, a.DryRun)
}
type sessionLogArgs struct {
SessionID string `json:"session_id"`
Skill string `json:"skill,omitempty"`
Phase string `json:"phase,omitempty"`
ProjectRoot string `json:"project_root,omitempty"`
FinalStatus string `json:"final_status,omitempty"`
FilePath string `json:"file_path,omitempty"`
ModelUsed string `json:"model_used,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
Message string `json:"message,omitempty"`
}
func (s *Server) sessionLog(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a sessionLogArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.SessionID == "" {
return nil, fmt.Errorf("session_id is required")
}
entry := session.Entry{
SessionID: a.SessionID,
Timestamp: time.Now().UTC(),
Skill: a.Skill,
Phase: a.Phase,
ProjectRoot: a.ProjectRoot,
FinalStatus: a.FinalStatus,
FilePath: a.FilePath,
ModelUsed: a.ModelUsed,
DurationMs: a.DurationMs,
Message: a.Message,
}
dir := filepath.Join(s.brainDir, "sessions")
if err := session.Append(dir, a.SessionID, entry); err != nil {
return nil, fmt.Errorf("append: %w", err)
}
return json.Marshal(map[string]string{"status": "ok", "session_id": a.SessionID})
}
func (s *Server) runIngest(ctx context.Context, content, source string, dryRun bool) (json.RawMessage, error) {
result, err := pipeline.Run(ctx, s.pipeline, s.brainDir, content, source, dryRun)
if err != nil {
return nil, fmt.Errorf("ingest: %w", err)
}
pages := result.Pages
if pages == nil {
pages = []string{}
}
warnings := result.Warnings
if warnings == nil {
warnings = []string{}
}
return json.Marshal(map[string]any{"pages": pages, "warnings": warnings})
}

View File

@@ -0,0 +1,196 @@
package mcp_test
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/mathiasbq/hyperguild/ingestion/internal/mcp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func toolCall(t *testing.T, srv http.Handler, name string, args map[string]any) map[string]any {
t.Helper()
bodyBytes, err := json.Marshal(map[string]any{
"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": map[string]any{"name": name, "arguments": args},
})
require.NoError(t, err)
req := httptest.NewRequest(http.MethodPost, "/mcp", bytes.NewReader(bodyBytes))
rr := httptest.NewRecorder()
srv.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp))
return resp
}
func TestBrainQueryReturnsResults(t *testing.T) {
brainDir := t.TempDir()
knowledge := filepath.Join(brainDir, "knowledge")
require.NoError(t, os.MkdirAll(knowledge, 0o755))
require.NoError(t, os.WriteFile(
filepath.Join(knowledge, "tdd.md"),
[]byte("# TDD\n\nTest-driven development is iterative.\n"),
0o644,
))
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_query", map[string]any{"query": "tdd"})
require.Nil(t, resp["error"])
result := resp["result"].(map[string]any)
content := result["content"].([]any)
require.NotEmpty(t, content)
text := content[0].(map[string]any)["text"].(string)
assert.Contains(t, text, "tdd.md")
}
func TestBrainWriteCreatesFile(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_write", map[string]any{
"content": "# Test\n\nbody",
"filename": "test.md",
"type": "note",
"domain": "personal",
})
require.Nil(t, resp["error"])
got, err := os.ReadFile(filepath.Join(brainDir, "knowledge", "test.md"))
require.NoError(t, err)
assert.Contains(t, string(got), "type: note")
assert.Contains(t, string(got), "domain: personal")
assert.Contains(t, string(got), "# Test")
}
func TestBrainWriteRejectsTraversal(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_write", map[string]any{
"content": "x",
"filename": "../escape.md",
})
require.NotNil(t, resp["error"])
}
func TestBrainWriteAcceptsDoubleDotInName(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_write", map[string]any{
"content": "x",
"filename": "notes..draft.md",
})
require.Nil(t, resp["error"])
_, err := os.Stat(filepath.Join(brainDir, "knowledge", "notes..draft.md"))
require.NoError(t, err, "filename with embedded .. should be allowed")
}
func TestBrainIngestRawDryRun(t *testing.T) {
brainDir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, "wiki", "concepts"), 0o755))
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_ingest_raw", map[string]any{
"source": "test-source",
"dry_run": true,
"pages": []map[string]any{
{
"title": "Test Concept",
"type": "concept",
"content": "## Definition\nA test concept.",
},
},
})
require.Nil(t, resp["error"])
result := resp["result"].(map[string]any)
content := result["content"].([]any)
text := content[0].(map[string]any)["text"].(string)
var parsed struct {
Pages []string `json:"pages"`
}
require.NoError(t, json.Unmarshal([]byte(text), &parsed))
require.NotEmpty(t, parsed.Pages, "expected at least one page path")
assert.Contains(t, parsed.Pages[0], "wiki/concepts/test-concept.md")
// dry_run: no file should exist
_, err := os.Stat(filepath.Join(brainDir, "wiki", "concepts", "test-concept.md"))
assert.True(t, os.IsNotExist(err))
}
func TestBrainIngestRejectsBoth(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_ingest", map[string]any{
"content": "x",
"source": "y",
"path": "/tmp/foo.md",
})
require.NotNil(t, resp["error"])
}
func TestBrainIngestRequiresOne(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_ingest", map[string]any{})
require.NotNil(t, resp["error"])
}
func TestBrainIngestRejectsContentWithoutSource(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "brain_ingest", map[string]any{
"content": "x",
})
require.NotNil(t, resp["error"])
}
func TestBrainIngestRequiresLLMConfigured(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil) // nil pipelineCfg → no LLM
resp := toolCall(t, srv, "brain_ingest", map[string]any{
"content": "some content",
"source": "test",
})
require.NotNil(t, resp["error"])
errObj := resp["error"].(map[string]any)
assert.Contains(t, errObj["message"].(string), "LLM not configured")
}
func TestSessionLogAppends(t *testing.T) {
brainDir := t.TempDir()
srv := mcp.NewServer(brainDir, nil, nil)
resp := toolCall(t, srv, "session_log", map[string]any{
"session_id": "session-x",
"skill": "tdd",
"phase": "red",
"final_status": "ok",
})
require.Nil(t, resp["error"])
got, err := os.ReadFile(filepath.Join(brainDir, "sessions", "session-x.jsonl"))
require.NoError(t, err)
assert.Contains(t, string(got), `"skill":"tdd"`)
assert.Contains(t, string(got), `"phase":"red"`)
}
func TestSessionLogRequiresSessionID(t *testing.T) {
srv := mcp.NewServer(t.TempDir(), nil, nil)
resp := toolCall(t, srv, "session_log", map[string]any{"skill": "tdd"})
require.NotNil(t, resp["error"])
}

View File

@@ -0,0 +1,35 @@
package mcp_test
import (
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/mathiasbq/hyperguild/ingestion/internal/mcp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMCPMountedHandler(t *testing.T) {
srv := mcp.NewServer(t.TempDir(), nil, nil)
mux := http.NewServeMux()
mux.Handle("POST /mcp", srv)
ts := httptest.NewServer(mux)
defer ts.Close()
body, err := json.Marshal(map[string]any{
"jsonrpc": "2.0", "id": 1, "method": "tools/list",
})
require.NoError(t, err)
resp, err := http.Post(ts.URL+"/mcp", "application/json", bytes.NewReader(body))
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
out, _ := io.ReadAll(resp.Body)
assert.Contains(t, string(out), `"brain_query"`)
}

View File

@@ -0,0 +1,132 @@
// Package mcp implements an MCP HTTP handler for the ingestion service.
// Exposed tools: brain_query, brain_write, brain_ingest, brain_ingest_raw, session_log.
package mcp
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
)
type request struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
}
type response struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id,omitempty"`
Result any `json:"result,omitempty"`
Error *rpcError `json:"error,omitempty"`
}
type rpcError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// Server handles MCP JSON-RPC over HTTP for the ingestion service.
type Server struct {
brainDir string
pipeline pipeline.Config
llm pipeline.CompleteFunc
}
// NewServer constructs a Server bound to brainDir. pipelineCfg supplies the
// LLM-backed pipeline; llm may be nil for non-LLM tools only.
func NewServer(brainDir string, pipelineCfg *pipeline.Config, llm pipeline.CompleteFunc) *Server {
cfg := pipeline.Config{}
if pipelineCfg != nil {
cfg = *pipelineCfg
}
return &Server{brainDir: brainDir, pipeline: cfg, llm: llm}
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, nil, -32700, "parse error")
return
}
// JSON-RPC 2.0 notifications (no id) must not receive a response.
if req.ID == nil {
return
}
var result any
var rpcErr *rpcError
switch req.Method {
case "initialize":
result = map[string]any{
"protocolVersion": "2024-11-05",
"capabilities": map[string]any{"tools": map[string]any{}},
"serverInfo": map[string]any{"name": "ingestion-brain", "version": "0.1.0"},
}
case "tools/list":
result = map[string]any{"tools": s.tools()}
case "tools/call":
var p struct {
Name string `json:"name"`
Arguments json.RawMessage `json:"arguments"`
}
if err := json.Unmarshal(req.Params, &p); err != nil {
rpcErr = &rpcError{Code: -32602, Message: "invalid params"}
break
}
out, err := s.handleCall(r.Context(), p.Name, p.Arguments)
if err != nil {
rpcErr = &rpcError{Code: -32000, Message: err.Error()}
break
}
result = map[string]any{
"content": []map[string]any{{"type": "text", "text": string(out)}},
}
default:
rpcErr = &rpcError{Code: -32601, Message: "method not found: " + req.Method}
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(response{
JSONRPC: "2.0",
ID: req.ID,
Result: result,
Error: rpcErr,
})
}
func writeError(w http.ResponseWriter, id any, code int, msg string) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(response{
JSONRPC: "2.0",
ID: id,
Error: &rpcError{Code: code, Message: msg},
})
}
// handleCall dispatches a tools/call to the appropriate tool handler.
func (s *Server) handleCall(ctx context.Context, name string, args json.RawMessage) (json.RawMessage, error) {
switch name {
case "brain_query":
return s.brainQuery(ctx, args)
case "brain_write":
return s.brainWrite(ctx, args)
case "brain_ingest_raw":
return s.brainIngestRaw(ctx, args)
case "brain_ingest":
return s.brainIngest(ctx, args)
case "session_log":
return s.sessionLog(ctx, args)
default:
return nil, fmt.Errorf("unknown tool: %s", name)
}
}

View File

@@ -0,0 +1,91 @@
package mcp_test
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/mathiasbq/hyperguild/ingestion/internal/mcp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func body(t *testing.T, v any) *bytes.Buffer {
t.Helper()
b, err := json.Marshal(v)
require.NoError(t, err)
return bytes.NewBuffer(b)
}
func TestServerInitialize(t *testing.T) {
srv := mcp.NewServer(t.TempDir(), nil, nil)
req := httptest.NewRequest(http.MethodPost, "/mcp", body(t, map[string]any{
"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": map[string]any{},
}))
rr := httptest.NewRecorder()
srv.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp))
result := resp["result"].(map[string]any)
assert.Equal(t, "2024-11-05", result["protocolVersion"])
}
func TestServerToolsList(t *testing.T) {
srv := mcp.NewServer(t.TempDir(), nil, nil)
req := httptest.NewRequest(http.MethodPost, "/mcp", body(t, map[string]any{
"jsonrpc": "2.0", "id": 2, "method": "tools/list",
}))
rr := httptest.NewRecorder()
srv.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp))
tools := resp["result"].(map[string]any)["tools"].([]any)
names := make([]string, 0, len(tools))
for _, t := range tools {
names = append(names, t.(map[string]any)["name"].(string))
}
assert.ElementsMatch(t, []string{
"brain_query", "brain_write", "brain_ingest_raw", "brain_ingest", "session_log",
}, names)
}
func TestServerNotificationGetsNoBody(t *testing.T) {
srv := mcp.NewServer(t.TempDir(), nil, nil)
req := httptest.NewRequest(http.MethodPost, "/mcp", body(t, map[string]any{
"jsonrpc": "2.0", "method": "notifications/initialized",
}))
rr := httptest.NewRecorder()
srv.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
assert.Empty(t, strings.TrimSpace(rr.Body.String()))
}
func TestServerUnknownMethodReturnsError(t *testing.T) {
srv := mcp.NewServer(t.TempDir(), nil, nil)
req := httptest.NewRequest(http.MethodPost, "/mcp", body(t, map[string]any{
"jsonrpc": "2.0", "id": 3, "method": "unknown/method",
}))
rr := httptest.NewRecorder()
srv.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp))
require.NotNil(t, resp["error"])
errObj := resp["error"].(map[string]any)
assert.Equal(t, float64(-32601), errObj["code"])
assert.Contains(t, errObj["message"].(string), "unknown/method")
}

View File

@@ -0,0 +1,98 @@
// ingestion/internal/session/session.go
package session
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"time"
)
// Entry is one skill invocation record, appended to the session JSONL log.
type Entry struct {
SessionID string `json:"session_id"`
Timestamp time.Time `json:"timestamp"`
Skill string `json:"skill"`
Phase string `json:"phase,omitempty"`
ProjectRoot string `json:"project_root,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
Attempts []Attempt `json:"attempts,omitempty"`
FinalStatus string `json:"final_status"`
FilePath string `json:"file_path,omitempty"`
ModelUsed string `json:"model_used,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
Message string `json:"message,omitempty"`
}
// Attempt represents one subprocess invocation within a skill call.
type Attempt struct {
Attempt int `json:"attempt"`
Model string `json:"model"`
Tier string `json:"tier"` // local | subagent | managed
DurationMs int64 `json:"duration_ms"`
WarmStart bool `json:"warm_start"` // model already loaded in llama-swap
Verified bool `json:"verified"`
Verdict string `json:"verdict,omitempty"` // accept | escalate | error
Feedback string `json:"feedback,omitempty"` // verifier feedback on escalation
OutputSummary string `json:"output_summary,omitempty"`
RunnerOutput string `json:"runner_output,omitempty"`
}
// Append writes entry as a single JSON line to sessionsDir/{sessionID}.jsonl.
func Append(sessionsDir, sessionID string, entry Entry) error {
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
return fmt.Errorf("create sessions dir: %w", err)
}
path := filepath.Join(sessionsDir, sessionID+".jsonl")
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open session log: %w", err)
}
line, err := json.Marshal(entry)
if err != nil {
_ = f.Close()
return fmt.Errorf("marshal entry: %w", err)
}
if _, err = fmt.Fprintf(f, "%s\n", line); err != nil {
_ = f.Close()
return fmt.Errorf("write entry: %w", err)
}
if err = f.Close(); err != nil {
return fmt.Errorf("close session log: %w", err)
}
return nil
}
// Read returns all entries for sessionID. Returns empty slice if no log exists.
func Read(sessionsDir, sessionID string) ([]Entry, error) {
path := filepath.Join(sessionsDir, sessionID+".jsonl")
f, err := os.Open(path)
if errors.Is(err, fs.ErrNotExist) {
return []Entry{}, nil
}
if err != nil {
return nil, fmt.Errorf("open session log: %w", err)
}
defer f.Close() //nolint:errcheck
var entries []Entry
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 1<<20) // up to 1 MB per line
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var e Entry
if err := json.Unmarshal(line, &e); err != nil {
return nil, fmt.Errorf("parse entry: %w", err)
}
entries = append(entries, e)
}
return entries, scanner.Err()
}

View File

@@ -0,0 +1,50 @@
package session_test
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/session"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAppendAndRead(t *testing.T) {
dir := t.TempDir()
sid := "test-session"
e1 := session.Entry{
SessionID: sid,
Timestamp: time.Now().UTC().Truncate(time.Second),
Skill: "tdd",
Phase: "red",
FinalStatus: "ok",
}
e2 := session.Entry{
SessionID: sid,
Timestamp: time.Now().UTC().Truncate(time.Second),
Skill: "tdd",
Phase: "green",
FinalStatus: "ok",
}
require.NoError(t, session.Append(dir, sid, e1))
require.NoError(t, session.Append(dir, sid, e2))
got, err := session.Read(dir, sid)
require.NoError(t, err)
require.Len(t, got, 2)
assert.Equal(t, "red", got[0].Phase)
assert.Equal(t, "green", got[1].Phase)
_, statErr := os.Stat(filepath.Join(dir, sid+".jsonl"))
require.NoError(t, statErr, "session file should exist on disk")
}
func TestReadMissingReturnsEmpty(t *testing.T) {
got, err := session.Read(t.TempDir(), "nope")
require.NoError(t, err)
assert.Empty(t, got)
}