feat(graph): wire graphsync into MCP write/ingest/tunnel handlers
Commit 2 of Track A. Service stays a no-op until BRAIN_GRAPH_ENABLED= true; flipping it on creates the schema (idempotent), starts indexing every successful write, and optionally backfills the existing brain dir. - internal/graphsync: best-effort wrapper around graph.Extract + graphstore. IndexDoc reads docPath under brainDir, parses, upserts entity + replaces edges. BackfillFromBrainDir walks wiki/ + knowledge/. Both are no-ops on nil store so callers wire unconditionally. - mcp.Server gains WithGraph builder + graphsync.Store field. brain_write, brain_ingest, brain_ingest_raw, brain_tunnel call indexInGraph after success — failures slog.Warn but never propagate (graph is augmentation, not correctness). - cmd/server gates the wiring on BRAIN_GRAPH_ENABLED=true (default off so first rollout doesn't surprise). BRAIN_GRAPH_BACKFILL=true triggers a one-shot walk of the brain dir on boot. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -15,9 +15,11 @@ import (
|
||||
chassisauth "gitea.d-ma.be/mathias/mcp-chassis/auth"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/embed"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphstore"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphsync"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/mcp"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/embed"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/metrics"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/oauth"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||
@@ -142,6 +144,32 @@ func main() {
|
||||
logger.Info("brain hybrid retrieval enabled",
|
||||
"pg", redactDSN(pgDSN),
|
||||
"embed_url", embedURL, "embed_model", embedModel)
|
||||
|
||||
// Graph store shares the same postgres18 DSN as the vector
|
||||
// store and is opt-in via BRAIN_GRAPH_ENABLED=true. Defaults
|
||||
// to off so first rollout doesn't surprise — flip on after
|
||||
// the migration completes and the backfill finishes.
|
||||
if envOr("BRAIN_GRAPH_ENABLED", "false") == "true" {
|
||||
gstore, gerr := graphstore.New(context.Background(), pgDSN)
|
||||
if gerr != nil {
|
||||
logger.Error("graph store init", "err", gerr)
|
||||
os.Exit(1)
|
||||
}
|
||||
if gerr := gstore.Init(context.Background()); gerr != nil {
|
||||
logger.Error("graph store migrate", "err", gerr)
|
||||
os.Exit(1)
|
||||
}
|
||||
mcpSrv = mcpSrv.WithGraph(gstore)
|
||||
if envOr("BRAIN_GRAPH_BACKFILL", "false") == "true" {
|
||||
n, berr := graphsync.BackfillFromBrainDir(context.Background(), gstore, brainDir)
|
||||
if berr != nil {
|
||||
logger.Warn("graph backfill incomplete", "indexed", n, "err", berr)
|
||||
} else {
|
||||
logger.Info("graph backfill complete", "indexed", n)
|
||||
}
|
||||
}
|
||||
logger.Info("brain graph enabled", "pg", redactDSN(pgDSN))
|
||||
}
|
||||
case pgDSN == "" && embedURL == "":
|
||||
// disabled — fine
|
||||
default:
|
||||
|
||||
112
ingestion/internal/graphsync/graphsync.go
Normal file
112
ingestion/internal/graphsync/graphsync.go
Normal file
@@ -0,0 +1,112 @@
|
||||
// Package graphsync glues the disk-resident brain markdown documents to
|
||||
// the relational graph in [graphstore]. It is a tiny seam so that the
|
||||
// MCP handlers can call one function after every successful write or
|
||||
// ingest without having to know either the parser or the postgres
|
||||
// schema.
|
||||
//
|
||||
// Every operation is best-effort from the caller's perspective: if the
|
||||
// graph store is unconfigured or the doc parses to nothing usable, the
|
||||
// helpers return nil. Real database errors are surfaced so the caller
|
||||
// can log them.
|
||||
package graphsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graph"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphstore"
|
||||
)
|
||||
|
||||
// Store is the subset of graphstore.PGStore that graphsync requires.
|
||||
// Tests can substitute a fake by satisfying this interface.
|
||||
type Store interface {
|
||||
UpsertEntity(ctx context.Context, e graph.Entity) error
|
||||
ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error
|
||||
DeleteByDoc(ctx context.Context, docPath string) error
|
||||
}
|
||||
|
||||
// Compile-time assertion that *graphstore.PGStore satisfies Store.
|
||||
var _ Store = (*graphstore.PGStore)(nil)
|
||||
|
||||
// IndexDoc reads docPath under brainDir and pushes one Entity + its
|
||||
// outgoing wikilink Edges into store. relPath must be the
|
||||
// forward-slash path relative to brainDir (the same shape returned by
|
||||
// api.WriteNote).
|
||||
//
|
||||
// nil store is a valid no-op so callers can wire the helper
|
||||
// unconditionally and let configuration decide whether the graph is
|
||||
// populated.
|
||||
func IndexDoc(ctx context.Context, store Store, brainDir, relPath string) error {
|
||||
if store == nil {
|
||||
return nil
|
||||
}
|
||||
if relPath == "" {
|
||||
return nil
|
||||
}
|
||||
abs := filepath.Join(brainDir, filepath.FromSlash(relPath))
|
||||
content, err := os.ReadFile(abs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read %q: %w", relPath, err)
|
||||
}
|
||||
ent, edges, ok := graph.Extract(relPath, content)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if err := store.UpsertEntity(ctx, ent); err != nil {
|
||||
return fmt.Errorf("upsert entity: %w", err)
|
||||
}
|
||||
if err := store.ReplaceEdgesForDoc(ctx, relPath, edges); err != nil {
|
||||
return fmt.Errorf("replace edges: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BackfillFromBrainDir walks every markdown file under brainDir/wiki/
|
||||
// and brainDir/knowledge/, parses each, and upserts the resulting
|
||||
// Entity + Edges. Existing rows are overwritten; orphan rows for
|
||||
// already-deleted files are NOT cleaned up — call this only on a
|
||||
// fresh store, or follow with a separate prune pass.
|
||||
//
|
||||
// Intended for one-shot startup runs against a populated brain dir.
|
||||
// Cost scales linearly with corpus size; ~30 wiki pages plus the
|
||||
// knowledge corpus is a few hundred ms.
|
||||
func BackfillFromBrainDir(ctx context.Context, store Store, brainDir string) (indexed int, _ error) {
|
||||
if store == nil {
|
||||
return 0, nil
|
||||
}
|
||||
roots := []string{"wiki", "knowledge"}
|
||||
for _, root := range roots {
|
||||
base := filepath.Join(brainDir, root)
|
||||
if _, err := os.Stat(base); os.IsNotExist(err) {
|
||||
continue
|
||||
}
|
||||
err := filepath.WalkDir(base, func(path string, d os.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
if filepath.Ext(path) != ".md" {
|
||||
return nil
|
||||
}
|
||||
rel, relErr := filepath.Rel(brainDir, path)
|
||||
if relErr != nil {
|
||||
return fmt.Errorf("rel %q: %w", path, relErr)
|
||||
}
|
||||
rel = filepath.ToSlash(rel)
|
||||
if err := IndexDoc(ctx, store, brainDir, rel); err != nil {
|
||||
return fmt.Errorf("index %q: %w", rel, err)
|
||||
}
|
||||
indexed++
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return indexed, fmt.Errorf("walk %s: %w", root, err)
|
||||
}
|
||||
}
|
||||
return indexed, nil
|
||||
}
|
||||
134
ingestion/internal/graphsync/graphsync_test.go
Normal file
134
ingestion/internal/graphsync/graphsync_test.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package graphsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graph"
|
||||
)
|
||||
|
||||
// fakeStore captures the calls IndexDoc / BackfillFromBrainDir made.
|
||||
type fakeStore struct {
|
||||
mu sync.Mutex
|
||||
upserts []graph.Entity
|
||||
replaces map[string][]graph.Edge
|
||||
deletes []string
|
||||
failOn string // upsert fails when entity slug == failOn
|
||||
}
|
||||
|
||||
func newFakeStore() *fakeStore {
|
||||
return &fakeStore{replaces: make(map[string][]graph.Edge)}
|
||||
}
|
||||
|
||||
func (f *fakeStore) UpsertEntity(_ context.Context, e graph.Entity) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.failOn != "" && e.Slug == f.failOn {
|
||||
return errors.New("synthetic failure")
|
||||
}
|
||||
f.upserts = append(f.upserts, e)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) ReplaceEdgesForDoc(_ context.Context, docPath string, edges []graph.Edge) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.replaces[docPath] = append([]graph.Edge(nil), edges...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) DeleteByDoc(_ context.Context, docPath string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.deletes = append(f.deletes, docPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeBrain(t *testing.T, brainDir, relPath, body string) {
|
||||
t.Helper()
|
||||
full := filepath.Join(brainDir, filepath.FromSlash(relPath))
|
||||
require.NoError(t, os.MkdirAll(filepath.Dir(full), 0o755))
|
||||
require.NoError(t, os.WriteFile(full, []byte(body), 0o644))
|
||||
}
|
||||
|
||||
func TestIndexDoc_UpsertsEntityAndEdges(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
writeBrain(t, tmp, "wiki/concepts/foo.md", `---
|
||||
title: Foo
|
||||
---
|
||||
# Foo
|
||||
Linking to [[bar]] and [[baz|Baz]].
|
||||
`)
|
||||
fs := newFakeStore()
|
||||
require.NoError(t, IndexDoc(context.Background(), fs, tmp, "wiki/concepts/foo.md"))
|
||||
|
||||
require.Len(t, fs.upserts, 1)
|
||||
assert.Equal(t, "foo", fs.upserts[0].Slug)
|
||||
assert.Equal(t, "concept", fs.upserts[0].Type)
|
||||
|
||||
edges := fs.replaces["wiki/concepts/foo.md"]
|
||||
require.Len(t, edges, 2)
|
||||
assert.Equal(t, "bar", edges[0].DstSlug)
|
||||
assert.Equal(t, "baz", edges[1].DstSlug)
|
||||
}
|
||||
|
||||
func TestIndexDoc_NoopOnNilStore(t *testing.T) {
|
||||
require.NoError(t, IndexDoc(context.Background(), nil, "anywhere", "foo.md"))
|
||||
}
|
||||
|
||||
func TestIndexDoc_NoopOnEmptyRelPath(t *testing.T) {
|
||||
fs := newFakeStore()
|
||||
require.NoError(t, IndexDoc(context.Background(), fs, "anywhere", ""))
|
||||
assert.Empty(t, fs.upserts)
|
||||
}
|
||||
|
||||
func TestIndexDoc_ErrorsOnMissingFile(t *testing.T) {
|
||||
fs := newFakeStore()
|
||||
err := IndexDoc(context.Background(), fs, t.TempDir(), "wiki/nope.md")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestIndexDoc_SurfacesStoreFailure(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
writeBrain(t, tmp, "wiki/concepts/boom.md", "# Boom\n")
|
||||
fs := newFakeStore()
|
||||
fs.failOn = "boom"
|
||||
err := IndexDoc(context.Background(), fs, tmp, "wiki/concepts/boom.md")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestBackfillFromBrainDir_WalksWikiAndKnowledge(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
writeBrain(t, tmp, "wiki/concepts/foo.md", "# Foo\n[[bar]]\n")
|
||||
writeBrain(t, tmp, "wiki/entities/bar.md", "# Bar\n")
|
||||
writeBrain(t, tmp, "knowledge/legacy.md", "# Legacy [[foo]]\n")
|
||||
// non-markdown file should be skipped
|
||||
writeBrain(t, tmp, "wiki/concepts/skip.txt", "ignore me")
|
||||
|
||||
fs := newFakeStore()
|
||||
n, err := BackfillFromBrainDir(context.Background(), fs, tmp)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, n)
|
||||
assert.Len(t, fs.upserts, 3)
|
||||
}
|
||||
|
||||
func TestBackfillFromBrainDir_TolerantOfMissingDirs(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
fs := newFakeStore()
|
||||
n, err := BackfillFromBrainDir(context.Background(), fs, tmp)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, n)
|
||||
}
|
||||
|
||||
func TestBackfillFromBrainDir_NilStoreNoop(t *testing.T) {
|
||||
n, err := BackfillFromBrainDir(context.Background(), nil, t.TempDir())
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, n)
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/brain"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/extract"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphsync"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/session"
|
||||
@@ -194,9 +195,23 @@ func (s *Server) brainWrite(ctx context.Context, args json.RawMessage) (json.Raw
|
||||
slog.Warn("brain_write: auto-tunnel failed", "src", relPath, "err", err)
|
||||
}
|
||||
}
|
||||
s.indexInGraph(ctx, "brain_write", relPath)
|
||||
return json.Marshal(map[string]string{"path": relPath})
|
||||
}
|
||||
|
||||
// indexInGraph is a best-effort wrapper around graphsync.IndexDoc that
|
||||
// logs failures but never propagates them — the underlying write/ingest
|
||||
// has already succeeded and the graph is an augmentation, not a
|
||||
// correctness invariant.
|
||||
func (s *Server) indexInGraph(ctx context.Context, op, relPath string) {
|
||||
if s.graph == nil || relPath == "" {
|
||||
return
|
||||
}
|
||||
if err := graphsync.IndexDoc(ctx, s.graph, s.brainDir, relPath); err != nil {
|
||||
slog.Warn(op+": graph index failed", "path", relPath, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
type brainTunnelArgs struct {
|
||||
Source string `json:"source"`
|
||||
Target string `json:"target"`
|
||||
@@ -213,6 +228,8 @@ func (s *Server) brainTunnel(ctx context.Context, args json.RawMessage) (json.Ra
|
||||
if err := brain.WriteTunnel(s.brainDir, a.Source, a.Target); err != nil {
|
||||
return nil, fmt.Errorf("tunnel: %w", err)
|
||||
}
|
||||
s.indexInGraph(ctx, "brain_tunnel", a.Source)
|
||||
s.indexInGraph(ctx, "brain_tunnel", a.Target)
|
||||
return json.Marshal(map[string]string{"status": "ok"})
|
||||
}
|
||||
|
||||
@@ -268,6 +285,11 @@ func (s *Server) brainIngestRaw(ctx context.Context, args json.RawMessage) (json
|
||||
if warnings == nil {
|
||||
warnings = []string{}
|
||||
}
|
||||
if !a.DryRun {
|
||||
for _, p := range pages {
|
||||
s.indexInGraph(ctx, "brain_ingest_raw", p)
|
||||
}
|
||||
}
|
||||
return json.Marshal(map[string]any{"pages": pages, "warnings": warnings})
|
||||
}
|
||||
|
||||
@@ -358,6 +380,11 @@ func (s *Server) runIngest(ctx context.Context, content, source string, dryRun b
|
||||
if pages == nil {
|
||||
pages = []string{}
|
||||
}
|
||||
if !dryRun {
|
||||
for _, p := range pages {
|
||||
s.indexInGraph(ctx, "brain_ingest", p)
|
||||
}
|
||||
}
|
||||
warnings := result.Warnings
|
||||
if warnings == nil {
|
||||
warnings = []string{}
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphstore"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphsync"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/reranker"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
|
||||
@@ -42,6 +44,7 @@ type Server struct {
|
||||
reranker *reranker.Client // nil = no rerank, BM25 top-10 → LLM
|
||||
vector search.VectorSearcher // nil = BM25-only retrieval
|
||||
embedder search.Embedder // nil = BM25-only retrieval
|
||||
graph graphsync.Store // nil = brain_graph and GraphRAG augmentation disabled
|
||||
}
|
||||
|
||||
// NewServer constructs a Server bound to brainDir. pipelineCfg supplies the
|
||||
@@ -73,6 +76,19 @@ func (s *Server) WithHybridRetrieval(v search.VectorSearcher, e search.Embedder)
|
||||
return s
|
||||
}
|
||||
|
||||
// WithGraph wires the brain entities + edges store so every successful
|
||||
// brain_write / brain_ingest / brain_tunnel re-indexes its written docs
|
||||
// into the graph, and so brain_graph + GraphRAG-augmented brain_answer
|
||||
// are available. nil disables graph features and is the legacy default.
|
||||
func (s *Server) WithGraph(g *graphstore.PGStore) *Server {
|
||||
if g == nil {
|
||||
s.graph = nil
|
||||
return s
|
||||
}
|
||||
s.graph = g
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// MCP streamable HTTP: GET establishes the SSE stream for server-to-client events.
|
||||
if r.Method == http.MethodGet {
|
||||
|
||||
Reference in New Issue
Block a user