diff --git a/ingestion/cmd/server/main.go b/ingestion/cmd/server/main.go index 226f598..0db641b 100644 --- a/ingestion/cmd/server/main.go +++ b/ingestion/cmd/server/main.go @@ -15,9 +15,11 @@ import ( chassisauth "gitea.d-ma.be/mathias/mcp-chassis/auth" "github.com/mathiasbq/hyperguild/ingestion/internal/api" + "github.com/mathiasbq/hyperguild/ingestion/internal/embed" + "github.com/mathiasbq/hyperguild/ingestion/internal/graphstore" + "github.com/mathiasbq/hyperguild/ingestion/internal/graphsync" "github.com/mathiasbq/hyperguild/ingestion/internal/llm" "github.com/mathiasbq/hyperguild/ingestion/internal/mcp" - "github.com/mathiasbq/hyperguild/ingestion/internal/embed" "github.com/mathiasbq/hyperguild/ingestion/internal/metrics" "github.com/mathiasbq/hyperguild/ingestion/internal/oauth" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" @@ -142,6 +144,32 @@ func main() { logger.Info("brain hybrid retrieval enabled", "pg", redactDSN(pgDSN), "embed_url", embedURL, "embed_model", embedModel) + + // Graph store shares the same postgres18 DSN as the vector + // store and is opt-in via BRAIN_GRAPH_ENABLED=true. Defaults + // to off so first rollout doesn't surprise — flip on after + // the migration completes and the backfill finishes. + if envOr("BRAIN_GRAPH_ENABLED", "false") == "true" { + gstore, gerr := graphstore.New(context.Background(), pgDSN) + if gerr != nil { + logger.Error("graph store init", "err", gerr) + os.Exit(1) + } + if gerr := gstore.Init(context.Background()); gerr != nil { + logger.Error("graph store migrate", "err", gerr) + os.Exit(1) + } + mcpSrv = mcpSrv.WithGraph(gstore) + if envOr("BRAIN_GRAPH_BACKFILL", "false") == "true" { + n, berr := graphsync.BackfillFromBrainDir(context.Background(), gstore, brainDir) + if berr != nil { + logger.Warn("graph backfill incomplete", "indexed", n, "err", berr) + } else { + logger.Info("graph backfill complete", "indexed", n) + } + } + logger.Info("brain graph enabled", "pg", redactDSN(pgDSN)) + } case pgDSN == "" && embedURL == "": // disabled — fine default: diff --git a/ingestion/internal/graphsync/graphsync.go b/ingestion/internal/graphsync/graphsync.go new file mode 100644 index 0000000..4677515 --- /dev/null +++ b/ingestion/internal/graphsync/graphsync.go @@ -0,0 +1,112 @@ +// Package graphsync glues the disk-resident brain markdown documents to +// the relational graph in [graphstore]. It is a tiny seam so that the +// MCP handlers can call one function after every successful write or +// ingest without having to know either the parser or the postgres +// schema. +// +// Every operation is best-effort from the caller's perspective: if the +// graph store is unconfigured or the doc parses to nothing usable, the +// helpers return nil. Real database errors are surfaced so the caller +// can log them. +package graphsync + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/mathiasbq/hyperguild/ingestion/internal/graph" + "github.com/mathiasbq/hyperguild/ingestion/internal/graphstore" +) + +// Store is the subset of graphstore.PGStore that graphsync requires. +// Tests can substitute a fake by satisfying this interface. +type Store interface { + UpsertEntity(ctx context.Context, e graph.Entity) error + ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error + DeleteByDoc(ctx context.Context, docPath string) error +} + +// Compile-time assertion that *graphstore.PGStore satisfies Store. +var _ Store = (*graphstore.PGStore)(nil) + +// IndexDoc reads docPath under brainDir and pushes one Entity + its +// outgoing wikilink Edges into store. relPath must be the +// forward-slash path relative to brainDir (the same shape returned by +// api.WriteNote). +// +// nil store is a valid no-op so callers can wire the helper +// unconditionally and let configuration decide whether the graph is +// populated. +func IndexDoc(ctx context.Context, store Store, brainDir, relPath string) error { + if store == nil { + return nil + } + if relPath == "" { + return nil + } + abs := filepath.Join(brainDir, filepath.FromSlash(relPath)) + content, err := os.ReadFile(abs) + if err != nil { + return fmt.Errorf("read %q: %w", relPath, err) + } + ent, edges, ok := graph.Extract(relPath, content) + if !ok { + return nil + } + if err := store.UpsertEntity(ctx, ent); err != nil { + return fmt.Errorf("upsert entity: %w", err) + } + if err := store.ReplaceEdgesForDoc(ctx, relPath, edges); err != nil { + return fmt.Errorf("replace edges: %w", err) + } + return nil +} + +// BackfillFromBrainDir walks every markdown file under brainDir/wiki/ +// and brainDir/knowledge/, parses each, and upserts the resulting +// Entity + Edges. Existing rows are overwritten; orphan rows for +// already-deleted files are NOT cleaned up — call this only on a +// fresh store, or follow with a separate prune pass. +// +// Intended for one-shot startup runs against a populated brain dir. +// Cost scales linearly with corpus size; ~30 wiki pages plus the +// knowledge corpus is a few hundred ms. +func BackfillFromBrainDir(ctx context.Context, store Store, brainDir string) (indexed int, _ error) { + if store == nil { + return 0, nil + } + roots := []string{"wiki", "knowledge"} + for _, root := range roots { + base := filepath.Join(brainDir, root) + if _, err := os.Stat(base); os.IsNotExist(err) { + continue + } + err := filepath.WalkDir(base, func(path string, d os.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + if d.IsDir() { + return nil + } + if filepath.Ext(path) != ".md" { + return nil + } + rel, relErr := filepath.Rel(brainDir, path) + if relErr != nil { + return fmt.Errorf("rel %q: %w", path, relErr) + } + rel = filepath.ToSlash(rel) + if err := IndexDoc(ctx, store, brainDir, rel); err != nil { + return fmt.Errorf("index %q: %w", rel, err) + } + indexed++ + return nil + }) + if err != nil { + return indexed, fmt.Errorf("walk %s: %w", root, err) + } + } + return indexed, nil +} diff --git a/ingestion/internal/graphsync/graphsync_test.go b/ingestion/internal/graphsync/graphsync_test.go new file mode 100644 index 0000000..99d547e --- /dev/null +++ b/ingestion/internal/graphsync/graphsync_test.go @@ -0,0 +1,134 @@ +package graphsync + +import ( + "context" + "errors" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mathiasbq/hyperguild/ingestion/internal/graph" +) + +// fakeStore captures the calls IndexDoc / BackfillFromBrainDir made. +type fakeStore struct { + mu sync.Mutex + upserts []graph.Entity + replaces map[string][]graph.Edge + deletes []string + failOn string // upsert fails when entity slug == failOn +} + +func newFakeStore() *fakeStore { + return &fakeStore{replaces: make(map[string][]graph.Edge)} +} + +func (f *fakeStore) UpsertEntity(_ context.Context, e graph.Entity) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.failOn != "" && e.Slug == f.failOn { + return errors.New("synthetic failure") + } + f.upserts = append(f.upserts, e) + return nil +} + +func (f *fakeStore) ReplaceEdgesForDoc(_ context.Context, docPath string, edges []graph.Edge) error { + f.mu.Lock() + defer f.mu.Unlock() + f.replaces[docPath] = append([]graph.Edge(nil), edges...) + return nil +} + +func (f *fakeStore) DeleteByDoc(_ context.Context, docPath string) error { + f.mu.Lock() + defer f.mu.Unlock() + f.deletes = append(f.deletes, docPath) + return nil +} + +func writeBrain(t *testing.T, brainDir, relPath, body string) { + t.Helper() + full := filepath.Join(brainDir, filepath.FromSlash(relPath)) + require.NoError(t, os.MkdirAll(filepath.Dir(full), 0o755)) + require.NoError(t, os.WriteFile(full, []byte(body), 0o644)) +} + +func TestIndexDoc_UpsertsEntityAndEdges(t *testing.T) { + tmp := t.TempDir() + writeBrain(t, tmp, "wiki/concepts/foo.md", `--- +title: Foo +--- +# Foo +Linking to [[bar]] and [[baz|Baz]]. +`) + fs := newFakeStore() + require.NoError(t, IndexDoc(context.Background(), fs, tmp, "wiki/concepts/foo.md")) + + require.Len(t, fs.upserts, 1) + assert.Equal(t, "foo", fs.upserts[0].Slug) + assert.Equal(t, "concept", fs.upserts[0].Type) + + edges := fs.replaces["wiki/concepts/foo.md"] + require.Len(t, edges, 2) + assert.Equal(t, "bar", edges[0].DstSlug) + assert.Equal(t, "baz", edges[1].DstSlug) +} + +func TestIndexDoc_NoopOnNilStore(t *testing.T) { + require.NoError(t, IndexDoc(context.Background(), nil, "anywhere", "foo.md")) +} + +func TestIndexDoc_NoopOnEmptyRelPath(t *testing.T) { + fs := newFakeStore() + require.NoError(t, IndexDoc(context.Background(), fs, "anywhere", "")) + assert.Empty(t, fs.upserts) +} + +func TestIndexDoc_ErrorsOnMissingFile(t *testing.T) { + fs := newFakeStore() + err := IndexDoc(context.Background(), fs, t.TempDir(), "wiki/nope.md") + require.Error(t, err) +} + +func TestIndexDoc_SurfacesStoreFailure(t *testing.T) { + tmp := t.TempDir() + writeBrain(t, tmp, "wiki/concepts/boom.md", "# Boom\n") + fs := newFakeStore() + fs.failOn = "boom" + err := IndexDoc(context.Background(), fs, tmp, "wiki/concepts/boom.md") + require.Error(t, err) +} + +func TestBackfillFromBrainDir_WalksWikiAndKnowledge(t *testing.T) { + tmp := t.TempDir() + writeBrain(t, tmp, "wiki/concepts/foo.md", "# Foo\n[[bar]]\n") + writeBrain(t, tmp, "wiki/entities/bar.md", "# Bar\n") + writeBrain(t, tmp, "knowledge/legacy.md", "# Legacy [[foo]]\n") + // non-markdown file should be skipped + writeBrain(t, tmp, "wiki/concepts/skip.txt", "ignore me") + + fs := newFakeStore() + n, err := BackfillFromBrainDir(context.Background(), fs, tmp) + require.NoError(t, err) + assert.Equal(t, 3, n) + assert.Len(t, fs.upserts, 3) +} + +func TestBackfillFromBrainDir_TolerantOfMissingDirs(t *testing.T) { + tmp := t.TempDir() + fs := newFakeStore() + n, err := BackfillFromBrainDir(context.Background(), fs, tmp) + require.NoError(t, err) + assert.Equal(t, 0, n) +} + +func TestBackfillFromBrainDir_NilStoreNoop(t *testing.T) { + n, err := BackfillFromBrainDir(context.Background(), nil, t.TempDir()) + require.NoError(t, err) + assert.Equal(t, 0, n) +} diff --git a/ingestion/internal/mcp/handlers.go b/ingestion/internal/mcp/handlers.go index f6dac68..82e1f4e 100644 --- a/ingestion/internal/mcp/handlers.go +++ b/ingestion/internal/mcp/handlers.go @@ -12,6 +12,7 @@ import ( "github.com/mathiasbq/hyperguild/ingestion/internal/api" "github.com/mathiasbq/hyperguild/ingestion/internal/brain" "github.com/mathiasbq/hyperguild/ingestion/internal/extract" + "github.com/mathiasbq/hyperguild/ingestion/internal/graphsync" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/search" "github.com/mathiasbq/hyperguild/ingestion/internal/session" @@ -194,9 +195,23 @@ func (s *Server) brainWrite(ctx context.Context, args json.RawMessage) (json.Raw slog.Warn("brain_write: auto-tunnel failed", "src", relPath, "err", err) } } + s.indexInGraph(ctx, "brain_write", relPath) return json.Marshal(map[string]string{"path": relPath}) } +// indexInGraph is a best-effort wrapper around graphsync.IndexDoc that +// logs failures but never propagates them — the underlying write/ingest +// has already succeeded and the graph is an augmentation, not a +// correctness invariant. +func (s *Server) indexInGraph(ctx context.Context, op, relPath string) { + if s.graph == nil || relPath == "" { + return + } + if err := graphsync.IndexDoc(ctx, s.graph, s.brainDir, relPath); err != nil { + slog.Warn(op+": graph index failed", "path", relPath, "err", err) + } +} + type brainTunnelArgs struct { Source string `json:"source"` Target string `json:"target"` @@ -213,6 +228,8 @@ func (s *Server) brainTunnel(ctx context.Context, args json.RawMessage) (json.Ra if err := brain.WriteTunnel(s.brainDir, a.Source, a.Target); err != nil { return nil, fmt.Errorf("tunnel: %w", err) } + s.indexInGraph(ctx, "brain_tunnel", a.Source) + s.indexInGraph(ctx, "brain_tunnel", a.Target) return json.Marshal(map[string]string{"status": "ok"}) } @@ -268,6 +285,11 @@ func (s *Server) brainIngestRaw(ctx context.Context, args json.RawMessage) (json if warnings == nil { warnings = []string{} } + if !a.DryRun { + for _, p := range pages { + s.indexInGraph(ctx, "brain_ingest_raw", p) + } + } return json.Marshal(map[string]any{"pages": pages, "warnings": warnings}) } @@ -358,6 +380,11 @@ func (s *Server) runIngest(ctx context.Context, content, source string, dryRun b if pages == nil { pages = []string{} } + if !dryRun { + for _, p := range pages { + s.indexInGraph(ctx, "brain_ingest", p) + } + } warnings := result.Warnings if warnings == nil { warnings = []string{} diff --git a/ingestion/internal/mcp/server.go b/ingestion/internal/mcp/server.go index 53da485..51ed470 100644 --- a/ingestion/internal/mcp/server.go +++ b/ingestion/internal/mcp/server.go @@ -9,6 +9,8 @@ import ( "fmt" "net/http" + "github.com/mathiasbq/hyperguild/ingestion/internal/graphstore" + "github.com/mathiasbq/hyperguild/ingestion/internal/graphsync" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/reranker" "github.com/mathiasbq/hyperguild/ingestion/internal/search" @@ -42,6 +44,7 @@ type Server struct { reranker *reranker.Client // nil = no rerank, BM25 top-10 → LLM vector search.VectorSearcher // nil = BM25-only retrieval embedder search.Embedder // nil = BM25-only retrieval + graph graphsync.Store // nil = brain_graph and GraphRAG augmentation disabled } // NewServer constructs a Server bound to brainDir. pipelineCfg supplies the @@ -73,6 +76,19 @@ func (s *Server) WithHybridRetrieval(v search.VectorSearcher, e search.Embedder) return s } +// WithGraph wires the brain entities + edges store so every successful +// brain_write / brain_ingest / brain_tunnel re-indexes its written docs +// into the graph, and so brain_graph + GraphRAG-augmented brain_answer +// are available. nil disables graph features and is the legacy default. +func (s *Server) WithGraph(g *graphstore.PGStore) *Server { + if g == nil { + s.graph = nil + return s + } + s.graph = g + return s +} + func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // MCP streamable HTTP: GET establishes the SSE stream for server-to-client events. if r.Method == http.MethodGet {