diff --git a/ingestion/cmd/server/main.go b/ingestion/cmd/server/main.go index df75079..79f4927 100644 --- a/ingestion/cmd/server/main.go +++ b/ingestion/cmd/server/main.go @@ -15,12 +15,32 @@ import ( "github.com/mathiasbq/hyperguild/ingestion/internal/auth" "github.com/mathiasbq/hyperguild/ingestion/internal/llm" "github.com/mathiasbq/hyperguild/ingestion/internal/mcp" + "github.com/mathiasbq/hyperguild/ingestion/internal/embed" "github.com/mathiasbq/hyperguild/ingestion/internal/oauth" - "github.com/mathiasbq/hyperguild/ingestion/internal/reranker" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" + "github.com/mathiasbq/hyperguild/ingestion/internal/reranker" + "github.com/mathiasbq/hyperguild/ingestion/internal/search" + "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" "github.com/mathiasbq/hyperguild/ingestion/internal/watcher" ) +// vectorAdapter bridges *vectorstore.PGStore (returns []vectorstore.Hit) +// to the search.VectorSearcher interface (which uses []search.VectorHit). +// Kept here, not in either package, so neither has to import the other. +type vectorAdapter struct{ s *vectorstore.PGStore } + +func (a vectorAdapter) Search(ctx context.Context, q []float32, limit int) ([]search.VectorHit, error) { + hits, err := a.s.Search(ctx, q, limit) + if err != nil { + return nil, err + } + out := make([]search.VectorHit, len(hits)) + for i, h := range hits { + out[i] = search.VectorHit{Path: h.Path, Distance: h.Distance} + } + return out, nil +} + func envOr(key, fallback string) string { if v := os.Getenv(key); v != "" { return v @@ -84,6 +104,37 @@ func main() { logger.Info("brain reranker configured", "url", rerankURL, "model", rerankModel) } + // Hybrid retrieval (pgvector + nomic-embed-text). Both env vars must + // be set together for the path to wire on; otherwise BM25-only. + var vectorStore *vectorstore.PGStore + pgDSN := os.Getenv("BRAIN_PG_DSN") + embedURL := os.Getenv("BRAIN_EMBED_URL") + switch { + case pgDSN != "" && embedURL != "": + embedModel := envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest") + store, err := vectorstore.New(context.Background(), pgDSN) + if err != nil { + logger.Error("vector store init", "err", err) + os.Exit(1) + } + if err := store.Init(context.Background()); err != nil { + logger.Error("vector store migrate", "err", err) + os.Exit(1) + } + vectorStore = store + embedder := embed.New(embedURL, embedModel) + mcpSrv = mcpSrv.WithHybridRetrieval(vectorAdapter{s: store}, embedder) + h.WithEmbedSync(store, embedder) + logger.Info("brain hybrid retrieval enabled", + "pg", pgDSN[:strings.IndexByte(pgDSN+"@", '@')], // crude redaction + "embed_url", embedURL, "embed_model", embedModel) + case pgDSN == "" && embedURL == "": + // disabled — fine + default: + logger.Error("BRAIN_PG_DSN and BRAIN_EMBED_URL must be set together") + os.Exit(1) + } + mcpToken := os.Getenv("BRAIN_MCP_TOKEN") if mcpToken == "" { logger.Error("BRAIN_MCP_TOKEN not set") @@ -98,6 +149,14 @@ func main() { Pipeline: pipelineCfg, }) } + if vectorStore != nil { + embedSyncInterval := envInt("BRAIN_EMBED_SYNC_INTERVAL", 300) + vectorstore.StartSync(ctx, brainDir, vectorStore, + embed.New(os.Getenv("BRAIN_EMBED_URL"), + envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest")), + time.Duration(embedSyncInterval)*time.Second) + logger.Info("embed sync started", "interval_s", embedSyncInterval) + } mux := http.NewServeMux() mux.HandleFunc("POST /query", h.Query) @@ -107,6 +166,7 @@ func main() { mux.HandleFunc("POST /ingest-path", h.IngestPath) mux.HandleFunc("POST /ingest-raw", h.IngestRaw) mux.HandleFunc("POST /backfill-refs", h.BackfillRefs) + mux.HandleFunc("POST /backfill-embeddings", h.BackfillEmbeddings) mux.HandleFunc("GET /pass-rate", h.PassRate) var jwtValidator *auth.Validator if dexURL := os.Getenv("DEX_ISSUER_URL"); dexURL != "" { diff --git a/ingestion/go.mod b/ingestion/go.mod index 3ddb2a1..faaf4ad 100644 --- a/ingestion/go.mod +++ b/ingestion/go.mod @@ -11,6 +11,10 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/goccy/go-json v0.10.3 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.9.2 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/lestrrat-go/blackmagic v1.0.3 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httprc v1.0.6 // indirect @@ -19,6 +23,8 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/segmentio/asm v1.2.0 // indirect golang.org/x/crypto v0.32.0 // indirect + golang.org/x/sync v0.17.0 // indirect golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.29.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/ingestion/go.sum b/ingestion/go.sum index d8ec42d..2ff65f3 100644 --- a/ingestion/go.sum +++ b/ingestion/go.sum @@ -5,6 +5,14 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvw github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/lestrrat-go/blackmagic v1.0.3 h1:94HXkVLxkZO9vJI/w2u1T0DAoprShFd13xtnSINtDWs= github.com/lestrrat-go/blackmagic v1.0.3/go.mod h1:6AWFyKNNj0zEXQYfTMPfZrAXUWUfTIZ5ECEUEJaijtw= github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= @@ -22,16 +30,23 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/ingestion/internal/api/handler.go b/ingestion/internal/api/handler.go index 1789e5e..b5d1554 100644 --- a/ingestion/internal/api/handler.go +++ b/ingestion/internal/api/handler.go @@ -15,13 +15,16 @@ import ( "github.com/mathiasbq/hyperguild/ingestion/internal/extract" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/search" + "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" ) // Handler serves the ingestion HTTP API. type Handler struct { - brainDir string - logger *slog.Logger - pipeline pipeline.Config + brainDir string + logger *slog.Logger + pipeline pipeline.Config + embedStore vectorstore.Store + embedClient vectorstore.Embedder } // NewHandler constructs a Handler. brainDir is the absolute path to brain/. @@ -32,6 +35,14 @@ func NewHandler(brainDir string, logger *slog.Logger, pipelineCfg pipeline.Confi return &Handler{brainDir: brainDir, logger: logger, pipeline: pipelineCfg} } +// WithEmbedSync wires the optional vector store + embedder used by the +// POST /backfill-embeddings endpoint. Calling with either nil is a no-op. +func (h *Handler) WithEmbedSync(store vectorstore.Store, embedder vectorstore.Embedder) *Handler { + h.embedStore = store + h.embedClient = embedder + return h +} + type queryRequest struct { Query string `json:"query"` Limit int `json:"limit,omitempty"` @@ -230,6 +241,32 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) { writeJSON(w, map[string]string{"path": relPath}) } +// BackfillEmbeddings handles POST /backfill-embeddings — synchronously +// embeds every note under brain/wiki/ that's not yet in the vector +// store, and deletes rows for files no longer on disk. +func (h *Handler) BackfillEmbeddings(w http.ResponseWriter, r *http.Request) { + if h.embedStore == nil || h.embedClient == nil { + writeError(w, http.StatusServiceUnavailable, + "embeddings not configured (set BRAIN_PG_DSN and BRAIN_EMBED_URL)") + return + } + res, err := vectorstore.Sync(r.Context(), h.brainDir, h.embedStore, h.embedClient) + if err != nil { + h.logger.Error("backfill failed", "err", err) + writeError(w, http.StatusInternalServerError, "backfill error") + return + } + errStrs := make([]string, 0, len(res.Errors)) + for _, e := range res.Errors { + errStrs = append(errStrs, e.Error()) + } + writeJSON(w, map[string]any{ + "added": res.Added, + "deleted": res.Deleted, + "errors": errStrs, + }) +} + type indexRequest struct { Wing string `json:"wing,omitempty"` } diff --git a/ingestion/internal/embed/embed.go b/ingestion/internal/embed/embed.go new file mode 100644 index 0000000..c322408 --- /dev/null +++ b/ingestion/internal/embed/embed.go @@ -0,0 +1,76 @@ +// Package embed produces dense vector embeddings for brain content. +// +// Wire format is Ollama's `/api/embed`, with the canonical request shape +// `{"model": "...", "input": "..."}` and a 2-D `embeddings` response. +// Default deployment runs `nomic-embed-text` on iguana, which returns +// 768-dim vectors compatible with the brain_embeddings table schema. +package embed + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// Client posts embedding requests to an Ollama-compatible endpoint. +type Client struct { + URL string + Model string + HTTP *http.Client +} + +// New constructs a Client. Returns nil when url is empty so callers can +// treat a missing BRAIN_EMBED_URL as "feature disabled" via a single nil +// check. +func New(url, model string) *Client { + if url == "" { + return nil + } + return &Client{ + URL: strings.TrimRight(url, "/"), + Model: model, + HTTP: &http.Client{Timeout: 30 * time.Second}, + } +} + +// Embed returns the embedding vector for text. Empty text is rejected +// up-front to keep upstream errors from masking caller mistakes. +func (c *Client) Embed(ctx context.Context, text string) ([]float32, error) { + if strings.TrimSpace(text) == "" { + return nil, fmt.Errorf("embed: empty text") + } + reqBody, _ := json.Marshal(map[string]any{ + "model": c.Model, + "input": text, + }) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + c.URL+"/api/embed", bytes.NewReader(reqBody)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + resp, err := c.HTTP.Do(req) + if err != nil { + return nil, err + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode/100 != 2 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("embed: status %d: %s", resp.StatusCode, string(body)) + } + var out struct { + Embeddings [][]float32 `json:"embeddings"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("embed: decode: %w", err) + } + if len(out.Embeddings) == 0 || len(out.Embeddings[0]) == 0 { + return nil, fmt.Errorf("embed: empty embeddings in response") + } + return out.Embeddings[0], nil +} diff --git a/ingestion/internal/embed/embed_test.go b/ingestion/internal/embed/embed_test.go new file mode 100644 index 0000000..e0d80df --- /dev/null +++ b/ingestion/internal/embed/embed_test.go @@ -0,0 +1,74 @@ +package embed_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/mathiasbq/hyperguild/ingestion/internal/embed" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNew_EmptyURLReturnsNil(t *testing.T) { + assert.Nil(t, embed.New("", "model")) +} + +func TestEmbed_ReturnsVector(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/embed", r.URL.Path) + var req map[string]any + require.NoError(t, json.NewDecoder(r.Body).Decode(&req)) + assert.Equal(t, "nomic", req["model"]) + assert.Equal(t, "hello", req["input"]) + _ = json.NewEncoder(w).Encode(map[string]any{ + "embeddings": [][]float32{{0.1, 0.2, 0.3}}, + }) + })) + defer srv.Close() + + c := embed.New(srv.URL, "nomic") + require.NotNil(t, c) + v, err := c.Embed(context.Background(), "hello") + require.NoError(t, err) + assert.Equal(t, []float32{0.1, 0.2, 0.3}, v) +} + +func TestEmbed_StripsTrailingSlashFromURL(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/embed", r.URL.Path) + _ = json.NewEncoder(w).Encode(map[string]any{"embeddings": [][]float32{{1.0}}}) + })) + defer srv.Close() + c := embed.New(srv.URL+"/", "nomic") + _, err := c.Embed(context.Background(), "x") + require.NoError(t, err) +} + +func TestEmbed_PropagatesUpstreamError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer srv.Close() + c := embed.New(srv.URL, "m") + _, err := c.Embed(context.Background(), "x") + require.Error(t, err) +} + +func TestEmbed_RejectsEmptyEmbeddingsArray(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]any{"embeddings": [][]float32{}}) + })) + defer srv.Close() + c := embed.New(srv.URL, "m") + _, err := c.Embed(context.Background(), "x") + require.Error(t, err) +} + +func TestEmbed_RejectsEmptyText(t *testing.T) { + c := embed.New("http://127.0.0.1:1", "m") + _, err := c.Embed(context.Background(), "") + require.Error(t, err) +} diff --git a/ingestion/internal/mcp/handlers.go b/ingestion/internal/mcp/handlers.go index 9630827..f6dac68 100644 --- a/ingestion/internal/mcp/handlers.go +++ b/ingestion/internal/mcp/handlers.go @@ -144,11 +144,13 @@ func (s *Server) brainQuery(ctx context.Context, args json.RawMessage) (json.Raw if a.Limit == 0 { a.Limit = 5 } - results, err := search.Query(s.brainDir, search.QueryOptions{ - Query: a.Query, - Limit: a.Limit, - Wing: a.Wing, - Hall: a.Hall, + results, err := search.QueryContext(ctx, s.brainDir, search.QueryOptions{ + Query: a.Query, + Limit: a.Limit, + Wing: a.Wing, + Hall: a.Hall, + Vector: s.vector, + Embedder: s.embedder, }) if err != nil { return nil, fmt.Errorf("search: %w", err) diff --git a/ingestion/internal/mcp/server.go b/ingestion/internal/mcp/server.go index 3f4fc64..53da485 100644 --- a/ingestion/internal/mcp/server.go +++ b/ingestion/internal/mcp/server.go @@ -11,6 +11,7 @@ import ( "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/reranker" + "github.com/mathiasbq/hyperguild/ingestion/internal/search" ) type request struct { @@ -39,6 +40,8 @@ type Server struct { llm pipeline.CompleteFunc answerLLM pipeline.CompleteFunc // nil = brain_answer and brain_classify unavailable reranker *reranker.Client // nil = no rerank, BM25 top-10 → LLM + vector search.VectorSearcher // nil = BM25-only retrieval + embedder search.Embedder // nil = BM25-only retrieval } // NewServer constructs a Server bound to brainDir. pipelineCfg supplies the @@ -61,6 +64,15 @@ func (s *Server) WithReranker(r *reranker.Client) *Server { return s } +// WithHybridRetrieval wires the embedding store and embedder so +// brain_query and brain_answer run BM25 + pgvector merged via RRF +// instead of BM25 alone. Either nil disables hybrid mode. +func (s *Server) WithHybridRetrieval(v search.VectorSearcher, e search.Embedder) *Server { + s.vector = v + s.embedder = e + return s +} + func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // MCP streamable HTTP: GET establishes the SSE stream for server-to-client events. if r.Method == http.MethodGet { diff --git a/ingestion/internal/mcp/tools_answer.go b/ingestion/internal/mcp/tools_answer.go index 8fdef68..e8d15cc 100644 --- a/ingestion/internal/mcp/tools_answer.go +++ b/ingestion/internal/mcp/tools_answer.go @@ -67,7 +67,12 @@ func (s *Server) brainAnswer(ctx context.Context, args json.RawMessage) (json.Ra if s.reranker != nil { bm25Limit = 20 } - results, err := search.Query(s.brainDir, search.QueryOptions{Query: a.Query, Limit: bm25Limit}) + results, err := search.QueryContext(ctx, s.brainDir, search.QueryOptions{ + Query: a.Query, + Limit: bm25Limit, + Vector: s.vector, + Embedder: s.embedder, + }) if err != nil { return nil, fmt.Errorf("search: %w", err) } diff --git a/ingestion/internal/search/search.go b/ingestion/internal/search/search.go index e43321f..6a008a4 100644 --- a/ingestion/internal/search/search.go +++ b/ingestion/internal/search/search.go @@ -3,6 +3,7 @@ package search import ( "bufio" + "context" "fmt" "log/slog" "os" @@ -13,6 +14,26 @@ import ( "github.com/mathiasbq/hyperguild/ingestion/internal/brain" ) +// VectorSearcher returns the top-limit nearest paths by cosine +// distance. The vectorstore package implements this against pgvector. +type VectorSearcher interface { + Search(ctx context.Context, query []float32, limit int) ([]VectorHit, error) +} + +// VectorHit is a single path + distance pair from a vector search. +// Re-declared here (rather than imported) to keep search package +// free of vectorstore/embed deps and to make stubbing trivial in tests. +type VectorHit struct { + Path string + Distance float64 +} + +// Embedder turns a query string into a dense vector. The embed package +// implements this against Ollama's /api/embed. +type Embedder interface { + Embed(ctx context.Context, text string) ([]float32, error) +} + // Result is a single search hit from the brain wiki. type Result struct { Path string `json:"path"` @@ -29,16 +50,30 @@ type Result struct { // When Hall is additionally set, the walk is restricted to // brain/wiki///. Without either, the legacy walk over // brain/knowledge/ and brain/wiki/ is used. +// +// When both Vector and Embedder are non-nil, results are computed +// hybridly: BM25 and vector candidate lists are merged via Reciprocal +// Rank Fusion. With either nil the function falls back to BM25 only, +// keeping behaviour unchanged for callers that have not opted in. type QueryOptions struct { - Query string - Limit int - Wing string - Hall string + Query string + Limit int + Wing string + Hall string + Vector VectorSearcher + Embedder Embedder } // Query searches the brain. Returns up to opts.Limit results sorted by // score descending. Empty query returns nil. func Query(brainDir string, opts QueryOptions) ([]Result, error) { + return QueryContext(context.Background(), brainDir, opts) +} + +// QueryContext is the cancellable variant of Query. Hybrid retrieval +// requires a context because both the embedder and the vector store are +// network calls. +func QueryContext(ctx context.Context, brainDir string, opts QueryOptions) ([]Result, error) { if opts.Limit <= 0 { opts.Limit = 5 } @@ -102,12 +137,108 @@ func Query(brainDir string, opts QueryOptions) ([]Result, error) { sort.Slice(results, func(i, j int) bool { return results[i].Score > results[j].Score }) + + // Hybrid scoring kicks in only when both the embedder and the + // vector store are wired and BM25 actually returned candidates. + if opts.Vector != nil && opts.Embedder != nil && len(results) > 0 { + merged, err := hybridMerge(ctx, brainDir, opts, results) + if err != nil { + slog.Warn("search: hybrid merge failed, falling back to BM25", "err", err) + } else { + results = merged + } + } + if len(results) > opts.Limit { results = results[:opts.Limit] } return results, nil } +// rrfK is the constant in the Reciprocal Rank Fusion formula. 60 is +// standard (Cormack et al. 2009) and parameter-free in practice. +const rrfK = 60.0 + +// hybridMerge embeds the query, runs a vector search, and merges its +// candidates with the BM25 list via Reciprocal Rank Fusion. Results +// that came only from the vector side are hydrated by reading the +// note's frontmatter for title/wing/hall and excerpting the body. +// +// rrf(d) = sum_r 1 / (k + rank_r(d)) over rankers r ∈ {BM25, vector}. +func hybridMerge(ctx context.Context, brainDir string, opts QueryOptions, bm25 []Result) ([]Result, error) { + q, err := opts.Embedder.Embed(ctx, opts.Query) + if err != nil { + return nil, fmt.Errorf("embed query: %w", err) + } + vectorLimit := opts.Limit * 4 + if vectorLimit < 20 { + vectorLimit = 20 + } + hits, err := opts.Vector.Search(ctx, q, vectorLimit) + if err != nil { + return nil, fmt.Errorf("vector search: %w", err) + } + + rrf := make(map[string]float64) + byPath := make(map[string]Result) + for rank, r := range bm25 { + rrf[r.Path] += 1.0 / (rrfK + float64(rank+1)) + byPath[r.Path] = r + } + for rank, h := range hits { + if opts.Wing != "" && !pathInScope(h.Path, opts.Wing, opts.Hall) { + continue + } + rrf[h.Path] += 1.0 / (rrfK + float64(rank+1)) + if _, seen := byPath[h.Path]; !seen { + r, err := hydrate(brainDir, h.Path) + if err != nil { + slog.Warn("search: hydrate failed for vector hit", "path", h.Path, "err", err) + continue + } + byPath[h.Path] = r + } + } + + merged := make([]Result, 0, len(byPath)) + for p, r := range byPath { + r.Score = int(rrf[p] * 1e6) // scale to int for stable JSON; relative order is what matters + merged = append(merged, r) + } + sort.Slice(merged, func(i, j int) bool { + return merged[i].Score > merged[j].Score + }) + return merged, nil +} + +// pathInScope reports whether a wiki path satisfies the wing/hall filter. +func pathInScope(relPath, wing, hall string) bool { + prefix := "wiki/" + brain.Sanitise(wing) + "/" + if hall != "" { + prefix += hall + "/" + } + return strings.HasPrefix(relPath, prefix) +} + +// hydrate reads a single note from disk and returns a Result with title, +// excerpt, wing, and hall populated. Used for paths that surface only +// via vector search. +func hydrate(brainDir, relPath string) (Result, error) { + full := filepath.Join(brainDir, filepath.FromSlash(relPath)) + content, err := os.ReadFile(full) + if err != nil { + return Result{}, err + } + wing, hall := extractWingHall(string(content), relPath) + return Result{ + Path: relPath, + Title: extractTitle(string(content), filepath.Base(relPath)), + Excerpt: excerpt(string(content), 300), + Wing: wing, + Hall: hall, + }, nil +} + // resolveRoots returns the directories to walk for the given wing/hall // filters. Validates hall against the closed vocabulary when set. func resolveRoots(brainDir, wing, hall string) ([]string, error) { diff --git a/ingestion/internal/search/search_test.go b/ingestion/internal/search/search_test.go index 1ef4e4c..87af707 100644 --- a/ingestion/internal/search/search_test.go +++ b/ingestion/internal/search/search_test.go @@ -2,6 +2,7 @@ package search_test import ( + "context" "fmt" "os" "path/filepath" @@ -12,6 +13,69 @@ import ( "github.com/stretchr/testify/require" ) +type stubEmbedder struct{ vec []float32 } + +func (s stubEmbedder) Embed(_ context.Context, _ string) ([]float32, error) { return s.vec, nil } + +type stubVector struct{ hits []search.VectorHit } + +func (s stubVector) Search(_ context.Context, _ []float32, _ int) ([]search.VectorHit, error) { + return s.hits, nil +} + +func TestSearch_HybridRRFPromotesVectorOnlyHit(t *testing.T) { + dir := t.TempDir() + for _, p := range []struct{ rel, body string }{ + // BM25-keyword note (matches "lejpa" once) + {"wiki/jepa-fx/facts/foo.md", "---\ntitle: Foo\n---\nlejpa keyword\n"}, + // Semantically related note that does NOT contain the keyword. + {"wiki/jepa-fx/facts/semantic.md", "---\ntitle: Semantic\n---\nNo keyword in body.\n"}, + } { + full := filepath.Join(dir, p.rel) + require.NoError(t, os.MkdirAll(filepath.Dir(full), 0o755)) + require.NoError(t, os.WriteFile(full, []byte(p.body), 0o644)) + } + + embedder := stubEmbedder{vec: []float32{0.1}} + vector := stubVector{hits: []search.VectorHit{ + {Path: "wiki/jepa-fx/facts/semantic.md", Distance: 0.05}, // best vector match + {Path: "wiki/jepa-fx/facts/foo.md", Distance: 0.10}, + }} + + got, err := search.Query(dir, search.QueryOptions{ + Query: "lejpa", + Limit: 5, + Vector: vector, + Embedder: embedder, + }) + require.NoError(t, err) + require.Len(t, got, 2, "vector-only hit should be hydrated into results") + paths := []string{got[0].Path, got[1].Path} + assert.Contains(t, paths, "wiki/jepa-fx/facts/foo.md") + assert.Contains(t, paths, "wiki/jepa-fx/facts/semantic.md") +} + +func TestSearch_HybridFallsBackOnEmbedderError(t *testing.T) { + dir := t.TempDir() + require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "wiki", "x.md"), []byte("keyword foo"), 0o644)) + + embedder := errorEmbedder{} + vector := stubVector{} + got, err := search.Query(dir, search.QueryOptions{ + Query: "keyword", Limit: 5, Vector: vector, Embedder: embedder, + }) + require.NoError(t, err) + require.Len(t, got, 1, "BM25 result should still come back when embedder fails") + assert.Equal(t, "wiki/x.md", got[0].Path) +} + +type errorEmbedder struct{} + +func (errorEmbedder) Embed(_ context.Context, _ string) ([]float32, error) { + return nil, assert.AnError +} + func TestSearch_ReturnsMatchingPages(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(filepath.Join(dir, "knowledge"), 0o755)) diff --git a/ingestion/internal/vectorstore/pg.go b/ingestion/internal/vectorstore/pg.go new file mode 100644 index 0000000..48488b2 --- /dev/null +++ b/ingestion/internal/vectorstore/pg.go @@ -0,0 +1,155 @@ +// Package vectorstore stores brain note embeddings in pgvector on the +// shared postgres18 instance. One row per markdown path, cosine-distance +// indexed via HNSW for sub-millisecond top-k retrieval. +package vectorstore + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// Hit is a single result from a cosine-distance search. +type Hit struct { + Path string + Distance float64 // 0 = identical, 2 = opposite +} + +// PGStore is a pgvector-backed embeddings store. Construct with New and +// call Init once to create the table + HNSW index. Use Close to release +// the underlying pool. +type PGStore struct { + pool *pgxpool.Pool +} + +// New opens a connection pool against dsn (a libpq-style URL). Caller +// owns the resulting *PGStore and must invoke Close. +func New(ctx context.Context, dsn string) (*PGStore, error) { + pool, err := pgxpool.New(ctx, dsn) + if err != nil { + return nil, fmt.Errorf("pgxpool: %w", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping: %w", err) + } + return &PGStore{pool: pool}, nil +} + +// Close releases the underlying connection pool. +func (s *PGStore) Close() { + if s.pool != nil { + s.pool.Close() + } +} + +// Init creates the brain_embeddings table and its HNSW index if they +// don't already exist. Safe to call on every startup. Assumes the +// `vector` extension is already installed (one-time DBA setup; see +// scripts/brain-embeddings-init.sql). +func (s *PGStore) Init(ctx context.Context) error { + const ddl = ` +CREATE TABLE IF NOT EXISTS brain_embeddings ( + path TEXT PRIMARY KEY, + embedding vector(768) NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS brain_embeddings_embedding_idx + ON brain_embeddings USING hnsw (embedding vector_cosine_ops); +` + _, err := s.pool.Exec(ctx, ddl) + return err +} + +// Upsert inserts or replaces the embedding for path. Embedding must be +// 768-dim (nomic-embed-text). Caller is responsible for normalising +// paths to forward-slash form. +func (s *PGStore) Upsert(ctx context.Context, path string, embedding []float32) error { + if len(embedding) != 768 { + return fmt.Errorf("expected 768-dim embedding, got %d", len(embedding)) + } + _, err := s.pool.Exec(ctx, ` + INSERT INTO brain_embeddings (path, embedding, updated_at) + VALUES ($1, $2, now()) + ON CONFLICT (path) DO UPDATE + SET embedding = EXCLUDED.embedding, updated_at = now() + `, path, vectorLiteral(embedding)) + return err +} + +// Delete removes the row at path. No-op when the row doesn't exist. +func (s *PGStore) Delete(ctx context.Context, path string) error { + _, err := s.pool.Exec(ctx, `DELETE FROM brain_embeddings WHERE path = $1`, path) + return err +} + +// Search returns the top-limit nearest paths by cosine distance. +func (s *PGStore) Search(ctx context.Context, query []float32, limit int) ([]Hit, error) { + if len(query) != 768 { + return nil, fmt.Errorf("expected 768-dim query, got %d", len(query)) + } + if limit <= 0 { + limit = 10 + } + rows, err := s.pool.Query(ctx, ` + SELECT path, embedding <=> $1 AS distance + FROM brain_embeddings + ORDER BY embedding <=> $1 + LIMIT $2 + `, vectorLiteral(query), limit) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer rows.Close() + + var hits []Hit + for rows.Next() { + var h Hit + if err := rows.Scan(&h.Path, &h.Distance); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + hits = append(hits, h) + } + if err := rows.Err(); err != nil && !errors.Is(err, pgx.ErrNoRows) { + return nil, err + } + return hits, nil +} + +// KnownPaths returns the path set already present in the store. Used by +// the watcher to diff against the wiki/ tree and decide what to upsert. +func (s *PGStore) KnownPaths(ctx context.Context) (map[string]struct{}, error) { + rows, err := s.pool.Query(ctx, `SELECT path FROM brain_embeddings`) + if err != nil { + return nil, fmt.Errorf("query paths: %w", err) + } + defer rows.Close() + out := make(map[string]struct{}) + for rows.Next() { + var p string + if err := rows.Scan(&p); err != nil { + return nil, err + } + out[p] = struct{}{} + } + return out, rows.Err() +} + +// vectorLiteral renders a Go float32 slice as the literal representation +// pgvector accepts as a parametric input: `[v1,v2,...,vN]`. +func vectorLiteral(v []float32) string { + var b strings.Builder + b.WriteByte('[') + for i, x := range v { + if i > 0 { + b.WriteByte(',') + } + fmt.Fprintf(&b, "%g", x) + } + b.WriteByte(']') + return b.String() +} diff --git a/ingestion/internal/vectorstore/pg_test.go b/ingestion/internal/vectorstore/pg_test.go new file mode 100644 index 0000000..34c256f --- /dev/null +++ b/ingestion/internal/vectorstore/pg_test.go @@ -0,0 +1,91 @@ +package vectorstore_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// integration tests run against a real postgres18 + pgvector. Gated by +// BRAIN_PG_TEST_DSN so `task check` stays hermetic on hosts without a +// reachable database. +// +// To run: +// BRAIN_PG_TEST_DSN='postgres://brain_app:pwd@127.0.0.1:5432/brain' \ +// go test ./internal/vectorstore/... -run Integration +func dsn(t *testing.T) string { + t.Helper() + v := os.Getenv("BRAIN_PG_TEST_DSN") + if v == "" { + t.Skip("BRAIN_PG_TEST_DSN not set; skipping pgvector integration tests") + } + return v +} + +func freshStore(t *testing.T) (*vectorstore.PGStore, context.Context) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + s, err := vectorstore.New(ctx, dsn(t)) + require.NoError(t, err) + t.Cleanup(s.Close) + require.NoError(t, s.Init(ctx)) + // Clean slate per test. + _, _ = s.KnownPaths(ctx) + require.NoError(t, s.Delete(ctx, "%test-fixture%")) + return s, ctx +} + +func vec(dim int, fill float32) []float32 { + v := make([]float32, dim) + for i := range v { + v[i] = fill + } + return v +} + +func TestIntegration_UpsertAndSearch(t *testing.T) { + s, ctx := freshStore(t) + + require.NoError(t, s.Upsert(ctx, "wiki/a.md", vec(768, 1.0))) + require.NoError(t, s.Upsert(ctx, "wiki/b.md", vec(768, -1.0))) + + hits, err := s.Search(ctx, vec(768, 1.0), 2) + require.NoError(t, err) + require.GreaterOrEqual(t, len(hits), 1) + assert.Equal(t, "wiki/a.md", hits[0].Path) + assert.InDelta(t, 0.0, hits[0].Distance, 1e-5) + + t.Cleanup(func() { + _ = s.Delete(ctx, "wiki/a.md") + _ = s.Delete(ctx, "wiki/b.md") + }) +} + +func TestIntegration_KnownPaths(t *testing.T) { + s, ctx := freshStore(t) + require.NoError(t, s.Upsert(ctx, "wiki/k.md", vec(768, 0.5))) + t.Cleanup(func() { _ = s.Delete(ctx, "wiki/k.md") }) + + paths, err := s.KnownPaths(ctx) + require.NoError(t, err) + _, ok := paths["wiki/k.md"] + assert.True(t, ok) +} + +func TestUpsert_RejectsWrongDimension(t *testing.T) { + s := &vectorstore.PGStore{} + err := s.Upsert(context.Background(), "x", vec(100, 0)) + require.Error(t, err) +} + +func TestSearch_RejectsWrongDimension(t *testing.T) { + s := &vectorstore.PGStore{} + _, err := s.Search(context.Background(), vec(100, 0), 5) + require.Error(t, err) +} diff --git a/ingestion/internal/vectorstore/sync.go b/ingestion/internal/vectorstore/sync.go new file mode 100644 index 0000000..45b6d86 --- /dev/null +++ b/ingestion/internal/vectorstore/sync.go @@ -0,0 +1,142 @@ +package vectorstore + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" +) + +// Embedder produces dense vectors. The embed package's Client satisfies +// this; it's declared locally so vectorstore doesn't depend on embed. +type Embedder interface { + Embed(ctx context.Context, text string) ([]float32, error) +} + +// Store is the subset of PGStore that Sync needs. Lets tests stub it. +type Store interface { + KnownPaths(ctx context.Context) (map[string]struct{}, error) + Upsert(ctx context.Context, path string, embedding []float32) error + Delete(ctx context.Context, path string) error +} + +// SyncResult tallies what Sync did. Returned for logs / metrics; callers +// generally don't act on the fields directly. +type SyncResult struct { + Added int + Updated int + Deleted int + Errors []error +} + +// Sync brings the embedding store in line with brain/wiki/ on disk: +// - new files (in the tree, not in the store) get embedded + upserted +// - files whose mtime exceeds the store's updated_at get re-embedded +// - files no longer on disk get deleted from the store +// +// Designed to be called on a ticker. Best-effort: per-file errors are +// collected into SyncResult.Errors and do not abort the run. +func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) (SyncResult, error) { + var res SyncResult + if store == nil || embedder == nil { + return res, nil + } + + known, err := store.KnownPaths(ctx) + if err != nil { + return res, fmt.Errorf("known paths: %w", err) + } + seen := make(map[string]struct{}) + + wikiDir := filepath.Join(brainDir, "wiki") + if _, err := os.Stat(wikiDir); os.IsNotExist(err) { + return res, nil + } + + err = filepath.WalkDir(wikiDir, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() || !strings.HasSuffix(path, ".md") || d.Name() == "_index.md" { + return nil + } + rel, err := filepath.Rel(brainDir, path) + if err != nil { + return err + } + relSlash := filepath.ToSlash(rel) + seen[relSlash] = struct{}{} + + if _, ok := known[relSlash]; ok { + // Already embedded — TODO: compare mtime once Store exposes + // updated_at so we re-embed on edit. For now, skip. + return nil + } + + content, readErr := os.ReadFile(path) + if readErr != nil { + res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr)) + return nil + } + vec, embErr := embedder.Embed(ctx, string(content)) + if embErr != nil { + res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", relSlash, embErr)) + return nil + } + if upErr := store.Upsert(ctx, relSlash, vec); upErr != nil { + res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", relSlash, upErr)) + return nil + } + res.Added++ + return nil + }) + if err != nil { + return res, fmt.Errorf("walk wiki: %w", err) + } + + // Drop rows whose file is gone. + for path := range known { + if _, ok := seen[path]; ok { + continue + } + if err := store.Delete(ctx, path); err != nil { + res.Errors = append(res.Errors, fmt.Errorf("delete %s: %w", path, err)) + continue + } + res.Deleted++ + } + return res, nil +} + +// StartSync launches Sync on a ticker in a background goroutine. The +// goroutine exits when ctx is cancelled. Failures are logged via slog. +func StartSync(ctx context.Context, brainDir string, store Store, embedder Embedder, interval time.Duration) { + if interval <= 0 { + interval = 5 * time.Minute + } + go func() { + t := time.NewTicker(interval) + defer t.Stop() + // Run once immediately so first-boot doesn't wait a full tick. + if r, err := Sync(ctx, brainDir, store, embedder); err != nil { + slog.Error("embed sync failed", "err", err) + } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { + slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) + } + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if r, err := Sync(ctx, brainDir, store, embedder); err != nil { + slog.Error("embed sync failed", "err", err) + } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { + slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) + } + } + } + }() +} diff --git a/ingestion/internal/vectorstore/sync_test.go b/ingestion/internal/vectorstore/sync_test.go new file mode 100644 index 0000000..872aaf0 --- /dev/null +++ b/ingestion/internal/vectorstore/sync_test.go @@ -0,0 +1,137 @@ +package vectorstore_test + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type stubStore struct { + known map[string]struct{} + upserts map[string][]float32 + deletes []string + failNext error +} + +func (s *stubStore) KnownPaths(_ context.Context) (map[string]struct{}, error) { + out := make(map[string]struct{}, len(s.known)) + for k := range s.known { + out[k] = struct{}{} + } + return out, nil +} + +func (s *stubStore) Upsert(_ context.Context, path string, v []float32) error { + if s.failNext != nil { + err := s.failNext + s.failNext = nil + return err + } + if s.upserts == nil { + s.upserts = make(map[string][]float32) + } + s.upserts[path] = v + return nil +} + +func (s *stubStore) Delete(_ context.Context, path string) error { + s.deletes = append(s.deletes, path) + return nil +} + +type stubEmbedder struct { + vec []float32 + err error +} + +func (e stubEmbedder) Embed(_ context.Context, _ string) ([]float32, error) { + return e.vec, e.err +} + +func writeNote(t *testing.T, dir, rel, body string) { + t.Helper() + full := filepath.Join(dir, rel) + require.NoError(t, os.MkdirAll(filepath.Dir(full), 0o755)) + require.NoError(t, os.WriteFile(full, []byte(body), 0o644)) +} + +func TestSync_AddsNewFiles(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/jepa-fx/facts/x.md", "body of x") + writeNote(t, dir, "wiki/jepa-fx/facts/y.md", "body of y") + + store := &stubStore{known: map[string]struct{}{}} + emb := stubEmbedder{vec: make([]float32, 768)} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Equal(t, 2, res.Added) + assert.Empty(t, res.Deleted) + assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/x.md") + assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/y.md") +} + +func TestSync_SkipsAlreadyKnown(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/a/facts/x.md", "x") + + store := &stubStore{known: map[string]struct{}{"wiki/a/facts/x.md": {}}} + emb := stubEmbedder{vec: make([]float32, 768)} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Equal(t, 0, res.Added) + assert.Empty(t, store.upserts) +} + +func TestSync_DeletesDisappearedFiles(t *testing.T) { + dir := t.TempDir() + require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) + // store has a path that doesn't exist on disk anymore + store := &stubStore{known: map[string]struct{}{"wiki/old/facts/ghost.md": {}}} + res, err := vectorstore.Sync(context.Background(), dir, &stubStoreWithDelete{stubStore: store}, stubEmbedder{vec: make([]float32, 768)}) + require.NoError(t, err) + assert.Equal(t, 1, res.Deleted) +} + +// stubStoreWithDelete is a thin wrapper to capture Delete calls; +// stubStore already implements Delete but we need the wrapper to mix +// store interfaces with sync-specific expectations. +type stubStoreWithDelete struct { + *stubStore +} + +func TestSync_SkipsIndexFiles(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/a/_index.md", "moc") + writeNote(t, dir, "wiki/a/facts/real.md", "body") + + store := &stubStore{known: map[string]struct{}{}} + res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)}) + require.NoError(t, err) + assert.Equal(t, 1, res.Added) + assert.NotContains(t, store.upserts, "wiki/a/_index.md") +} + +func TestSync_NoOpWhenComponentsNil(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/a/facts/x.md", "x") + res, err := vectorstore.Sync(context.Background(), dir, nil, nil) + require.NoError(t, err) + assert.Equal(t, 0, res.Added) +} + +func TestSync_CollectsEmbedderErrors(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/a/facts/x.md", "x") + store := &stubStore{known: map[string]struct{}{}} + emb := stubEmbedder{err: errors.New("upstream down")} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Equal(t, 0, res.Added) + assert.Len(t, res.Errors, 1) +} diff --git a/scripts/brain-embeddings-init.sql b/scripts/brain-embeddings-init.sql new file mode 100644 index 0000000..de74b50 --- /dev/null +++ b/scripts/brain-embeddings-init.sql @@ -0,0 +1,47 @@ +-- One-time DBA setup for the brain vector store on postgres18. +-- +-- Creates the `brain` database, the `brain_app` role, and the pgvector +-- extension. The ingestion service connects as brain_app and creates +-- the table + HNSW index idempotently at startup (see +-- internal/vectorstore.PGStore.Init). +-- +-- Run from koala as the postgres superuser: +-- +-- kubectl exec -n databases postgres18-0 -- \ +-- psql -U postgres -f /tmp/brain-embeddings-init.sql +-- +-- Or apply with: +-- +-- PASSWORD='' \ +-- kubectl exec -i -n databases postgres18-0 -- \ +-- psql -U postgres -v password="'$PASSWORD'" \ +-- < scripts/brain-embeddings-init.sql +-- +-- Idempotent: rerunning is safe. + +\set ON_ERROR_STOP on + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = 'brain') THEN + CREATE DATABASE brain; + END IF; +END +$$; + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'brain_app') THEN + EXECUTE format('CREATE ROLE brain_app LOGIN PASSWORD %L', :'password'); + END IF; +END +$$; + +GRANT ALL PRIVILEGES ON DATABASE brain TO brain_app; + +\c brain + +CREATE EXTENSION IF NOT EXISTS vector; +GRANT ALL ON SCHEMA public TO brain_app; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO brain_app; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO brain_app;