Files
hyperguild/ingestion/internal/mcp/handlers.go
Mathias 57462b52ff
All checks were successful
CI / Lint / Test / Vet (push) Successful in 15s
CI / Mirror to GitHub (push) Successful in 3s
feat(brain): hybrid BM25 + pgvector retrieval (opt-in)
Wires nomic-embed-text (iguana ollama) + pgvector on the shared
postgres18 into brain_query / brain_answer via Reciprocal Rank Fusion.
Pure BM25 stays the default; setting BRAIN_PG_DSN and BRAIN_EMBED_URL
together opts in. Setting one without the other is misconfiguration →
exit 1.

New packages:

- internal/embed
  Client.Embed(ctx, text) → []float32 via POST {URL}/api/embed.
  Defaults to nomic-embed-text:latest (768 dim). nil-on-empty-URL so
  callers gate on a single nil check.

- internal/vectorstore
  PGStore wraps a pgxpool against postgres18. Init creates
  brain_embeddings(path PK, vector(768), updated_at) + HNSW cosine
  index idempotently. Upsert / Delete / Search / KnownPaths.
  Sync(brainDir, store, embedder) diffs brain/wiki/ against the store
  and upserts new files / deletes removed ones; StartSync runs it on
  a ticker (default 300s). Integration tests gated by BRAIN_PG_TEST_DSN.

- scripts/brain-embeddings-init.sql
  One-time DBA setup: brain DB, brain_app role, vector extension,
  GRANTs. Idempotent.

Search layer:

- search.QueryOptions gains Vector + Embedder fields.
- QueryContext is the cancellable variant; Query stays for callers.
- When both are set, BM25 (top-N) and pgvector (top-4N) candidates
  merge via Reciprocal Rank Fusion (k=60, Cormack et al. 2009 — no
  tuning knob, robust to scale differences between rankers).
- Vector-only hits are hydrated from disk so callers see uniform
  Result records (path, title, excerpt, wing, hall, score).
- Wing/hall filters still apply to vector candidates via path-prefix.
- On embedder/vector errors the search falls back to BM25 — embedding
  outage degrades quality but doesn't take the brain offline.

MCP wiring:

- mcp.Server.WithHybridRetrieval(v, e) opt-in setter, same shape as
  WithReranker.
- brainQuery and brainAnswer pass the wired vector/embedder through
  to search.QueryContext.

REST:

- POST /backfill-embeddings drives Sync synchronously. Returns
  {added, deleted, errors[]}. 503 when feature is unconfigured.

cmd/server/main.go:

- BRAIN_PG_DSN + BRAIN_EMBED_URL together enable hybrid; one alone
  → exit 1.
- vectorAdapter bridges *PGStore (returns []Hit) to
  search.VectorSearcher (which takes []VectorHit) without either
  package importing the other.
- BRAIN_EMBED_SYNC_INTERVAL (default 300s) controls the background
  Sync ticker.

Backend pivot from Qdrant to pgvector recorded in DECISIONS.md
2026-05-18 (supersedes 2026-04-08): postgres18 already runs in
databases/ ns, Qdrant was never deployed, one engine beats two.

Dependency: github.com/jackc/pgx/v5 — modern, native pgvector via
parametric vector literals.

Tests:
- embed.Client: empty-URL nil, request shape, dimension, upstream
  error propagation, empty-text rejection.
- vectorstore.PGStore: dimension validation (unit); upsert/search/
  KnownPaths (integration, BRAIN_PG_TEST_DSN-gated).
- vectorstore.Sync: adds new files, skips known, deletes
  disappeared, skips _index.md, no-op when nil, collects embedder
  errors.
- search.Query: hybrid promotes vector-only hits via RRF; falls
  back to BM25 on embedder error.

Closes hyperguild#8.
2026-05-18 23:11:25 +02:00

367 lines
12 KiB
Go

package mcp
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
"github.com/mathiasbq/hyperguild/ingestion/internal/brain"
"github.com/mathiasbq/hyperguild/ingestion/internal/extract"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
"github.com/mathiasbq/hyperguild/ingestion/internal/session"
)
// tools returns the tool descriptors. Handler bodies for each tool are filled
// in subsequent tasks; this file currently only provides the descriptors.
func (s *Server) tools() []map[string]any {
str := func(desc string) map[string]any {
return map[string]any{"type": "string", "description": desc}
}
int_ := func(desc string) map[string]any {
return map[string]any{"type": "integer", "description": desc}
}
enum := func(desc string, vals ...string) map[string]any {
return map[string]any{"type": "string", "description": desc, "enum": vals}
}
halls := []string{"facts", "decisions", "failures", "hypotheses", "sources"}
schema := func(required []string, props map[string]any) json.RawMessage {
b, _ := json.Marshal(map[string]any{
"type": "object", "required": required, "properties": props,
})
return b
}
return []map[string]any{
{
"name": "brain_query",
"description": "BM25 full-text search across brain/knowledge/ and brain/wiki/ markdown files. Optionally scope by wing (topic domain) and hall (memory type).",
"inputSchema": schema([]string{"query"}, map[string]any{
"query": str("search terms"),
"limit": int_("max results, default 5"),
"wing": str("optional wing to scope to, e.g. jepa-fx"),
"hall": enum("optional hall to scope to (requires wing)", halls...),
}),
},
{
"name": "brain_write",
"description": "Write a markdown note to the brain. With wing+hall set, routes to brain/wiki/<wing>/<hall>/ with wing/hall/created_at frontmatter; otherwise writes to brain/knowledge/ (legacy).",
"inputSchema": schema([]string{"content"}, map[string]any{
"content": str("markdown content"),
"filename": str("optional filename or slug"),
"type": str("optional frontmatter type (legacy)"),
"domain": str("optional frontmatter domain (legacy)"),
"wing": str("optional topic domain, e.g. jepa-fx"),
"hall": enum("optional memory type (requires wing)", halls...),
}),
},
{
"name": "brain_tunnel",
"description": "Create an explicit bidirectional [[wikilink]] between two notes in different wings. Idempotent.",
"inputSchema": schema([]string{"source", "target"}, map[string]any{
"source": str("path of source note relative to brain dir, e.g. wiki/jepa-fx/decisions/val-vol.md"),
"target": str("path of target note (must be in a different wing)"),
}),
},
{
"name": "brain_index",
"description": "Regenerate _index.md (Map of Content) for one or all wings under brain/wiki/. Auto-called after brain_write with wing+hall.",
"inputSchema": schema([]string{}, map[string]any{
"wing": str("optional wing to index; if absent, rebuilds every wing"),
}),
},
{
"name": "brain_ingest_raw",
"description": "Ingest pre-structured pages into the brain wiki, bypassing the LLM extraction step.",
"inputSchema": schema([]string{"source", "pages"}, map[string]any{
"source": str("source name"),
"pages": map[string]any{"type": "array"},
"dry_run": map[string]any{"type": "boolean"},
}),
},
{
"name": "brain_ingest",
"description": "Ingest content into the brain wiki via the LLM extraction pipeline.",
"inputSchema": schema([]string{}, map[string]any{
"content": str("raw content; required when path is empty"),
"source": str("source name; required when path is empty"),
"path": str("file path; mutually exclusive with content+source"),
"dry_run": map[string]any{"type": "boolean"},
}),
},
{
"name": "brain_answer",
"description": "Retrieve relevant brain content via BM25 and synthesize a coherent answer using an LLM.",
"inputSchema": schema([]string{"query"}, map[string]any{
"query": str("question to answer"),
}),
},
{
"name": "brain_classify",
"description": "Classify raw text into doc type, title, and tags using an LLM.",
"inputSchema": schema([]string{"text"}, map[string]any{
"text": str("raw document text to classify (first 3000 chars used)"),
}),
},
{
"name": "session_log",
"description": "Append a structured entry to brain/sessions/<session_id>.jsonl.",
"inputSchema": schema([]string{"session_id"}, map[string]any{
"session_id": str("session identifier"),
"skill": str("skill name"),
"phase": str("phase within the skill"),
"project_root": str("absolute project root"),
"final_status": str("pass | fail | skip (legacy: ok | error | skipped also accepted)"),
"file_path": str("optional file produced"),
"model_used": str("optional model identifier"),
"duration_ms": int_("optional duration in ms"),
"message": str("optional free-text"),
}),
},
}
}
type brainQueryArgs struct {
Query string `json:"query"`
Limit int `json:"limit,omitempty"`
Wing string `json:"wing,omitempty"`
Hall string `json:"hall,omitempty"`
}
func (s *Server) brainQuery(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainQueryArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Query == "" {
return nil, fmt.Errorf("query is required")
}
if a.Limit == 0 {
a.Limit = 5
}
results, err := search.QueryContext(ctx, s.brainDir, search.QueryOptions{
Query: a.Query,
Limit: a.Limit,
Wing: a.Wing,
Hall: a.Hall,
Vector: s.vector,
Embedder: s.embedder,
})
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}
return json.Marshal(map[string]any{"results": results})
}
type brainWriteArgs struct {
Content string `json:"content"`
Filename string `json:"filename,omitempty"`
Type string `json:"type,omitempty"`
Domain string `json:"domain,omitempty"`
Wing string `json:"wing,omitempty"`
Hall string `json:"hall,omitempty"`
}
func (s *Server) brainWrite(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainWriteArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
relPath, err := api.WriteNote(s.brainDir, api.WriteNoteOptions{
Content: a.Content,
Filename: a.Filename,
Type: a.Type,
Domain: a.Domain,
Wing: a.Wing,
Hall: a.Hall,
})
if err != nil {
return nil, err
}
// Auto-regenerate the wing _index.md when the write landed in the
// structured wiki, and auto-tunnel cross-wing matches. Both are
// best-effort: the note is already written.
if a.Wing != "" && a.Hall != "" {
if err := brain.BuildWingIndex(s.brainDir, a.Wing); err != nil {
slog.Warn("brain_write: auto-index failed", "wing", a.Wing, "err", err)
}
if err := brain.AutoTunnel(s.brainDir, relPath, a.Content); err != nil {
slog.Warn("brain_write: auto-tunnel failed", "src", relPath, "err", err)
}
}
return json.Marshal(map[string]string{"path": relPath})
}
type brainTunnelArgs struct {
Source string `json:"source"`
Target string `json:"target"`
}
func (s *Server) brainTunnel(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainTunnelArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Source == "" || a.Target == "" {
return nil, fmt.Errorf("source and target are required")
}
if err := brain.WriteTunnel(s.brainDir, a.Source, a.Target); err != nil {
return nil, fmt.Errorf("tunnel: %w", err)
}
return json.Marshal(map[string]string{"status": "ok"})
}
type brainIndexArgs struct {
Wing string `json:"wing,omitempty"`
}
func (s *Server) brainIndex(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainIndexArgs
if len(args) > 0 {
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
}
if a.Wing == "" {
if err := brain.BuildAllWingIndexes(s.brainDir); err != nil {
return nil, fmt.Errorf("index: %w", err)
}
return json.Marshal(map[string]any{"status": "ok", "scope": "all"})
}
if err := brain.BuildWingIndex(s.brainDir, a.Wing); err != nil {
return nil, fmt.Errorf("index: %w", err)
}
return json.Marshal(map[string]any{"status": "ok", "scope": a.Wing})
}
type brainIngestRawArgs struct {
Source string `json:"source"`
Pages []pipeline.RawPage `json:"pages"`
DryRun bool `json:"dry_run,omitempty"`
}
func (s *Server) brainIngestRaw(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainIngestRawArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Source == "" {
return nil, fmt.Errorf("source is required")
}
if len(a.Pages) == 0 {
return nil, fmt.Errorf("pages must be non-empty")
}
result, err := pipeline.RunRaw(s.brainDir, a.Source, a.Pages, a.DryRun)
if err != nil {
return nil, fmt.Errorf("ingest: %w", err)
}
pages := result.Pages
if pages == nil {
pages = []string{}
}
warnings := result.Warnings
if warnings == nil {
warnings = []string{}
}
return json.Marshal(map[string]any{"pages": pages, "warnings": warnings})
}
type brainIngestArgs struct {
Content string `json:"content,omitempty"`
Source string `json:"source,omitempty"`
Path string `json:"path,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
func (s *Server) brainIngest(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a brainIngestArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Path != "" && a.Content != "" {
return nil, fmt.Errorf("path and content+source are mutually exclusive")
}
if a.Path == "" && a.Content == "" {
return nil, fmt.Errorf("either path or content+source is required")
}
if s.pipeline.Complete == nil {
return nil, fmt.Errorf("LLM not configured: set INGEST_LLM_URL")
}
if a.Path != "" {
text, err := extract.Text(a.Path)
if err != nil {
return nil, fmt.Errorf("extract: %w", err)
}
source := a.Source
if source == "" {
source = filepath.Base(strings.TrimSuffix(a.Path, filepath.Ext(a.Path)))
}
return s.runIngest(ctx, text, source, a.DryRun)
}
if a.Source == "" {
return nil, fmt.Errorf("source is required when content is provided")
}
return s.runIngest(ctx, a.Content, a.Source, a.DryRun)
}
type sessionLogArgs struct {
SessionID string `json:"session_id"`
Skill string `json:"skill,omitempty"`
Phase string `json:"phase,omitempty"`
ProjectRoot string `json:"project_root,omitempty"`
FinalStatus string `json:"final_status,omitempty"`
FilePath string `json:"file_path,omitempty"`
ModelUsed string `json:"model_used,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
Message string `json:"message,omitempty"`
}
func (s *Server) sessionLog(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a sessionLogArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.SessionID == "" {
return nil, fmt.Errorf("session_id is required")
}
entry := session.Entry{
SessionID: a.SessionID,
Timestamp: time.Now().UTC(),
Skill: a.Skill,
Phase: a.Phase,
ProjectRoot: a.ProjectRoot,
FinalStatus: a.FinalStatus,
FilePath: a.FilePath,
ModelUsed: a.ModelUsed,
DurationMs: a.DurationMs,
Message: a.Message,
}
dir := filepath.Join(s.brainDir, "sessions")
if err := session.Append(dir, a.SessionID, entry); err != nil {
return nil, fmt.Errorf("append: %w", err)
}
return json.Marshal(map[string]string{"status": "ok", "session_id": a.SessionID})
}
func (s *Server) runIngest(ctx context.Context, content, source string, dryRun bool) (json.RawMessage, error) {
result, err := pipeline.Run(ctx, s.pipeline, s.brainDir, content, source, dryRun)
if err != nil {
return nil, fmt.Errorf("ingest: %w", err)
}
pages := result.Pages
if pages == nil {
pages = []string{}
}
warnings := result.Warnings
if warnings == nil {
warnings = []string{}
}
return json.Marshal(map[string]any{"pages": pages, "warnings": warnings})
}