diff --git a/ingestion/internal/vectorstore/pg.go b/ingestion/internal/vectorstore/pg.go index 48488b2..394b162 100644 --- a/ingestion/internal/vectorstore/pg.go +++ b/ingestion/internal/vectorstore/pg.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -120,21 +121,26 @@ func (s *PGStore) Search(ctx context.Context, query []float32, limit int) ([]Hit return hits, nil } -// KnownPaths returns the path set already present in the store. Used by -// the watcher to diff against the wiki/ tree and decide what to upsert. -func (s *PGStore) KnownPaths(ctx context.Context) (map[string]struct{}, error) { - rows, err := s.pool.Query(ctx, `SELECT path FROM brain_embeddings`) +// KnownPathsWithTime returns every embedded chunk path paired with the +// row's updated_at. Sync uses the timestamps to decide whether a file +// has been edited since its chunks were last embedded — when the file's +// mtime exceeds the oldest chunk's updated_at, the file is re-embedded. +func (s *PGStore) KnownPathsWithTime(ctx context.Context) (map[string]time.Time, error) { + rows, err := s.pool.Query(ctx, `SELECT path, updated_at FROM brain_embeddings`) if err != nil { return nil, fmt.Errorf("query paths: %w", err) } defer rows.Close() - out := make(map[string]struct{}) + out := make(map[string]time.Time) for rows.Next() { - var p string - if err := rows.Scan(&p); err != nil { + var ( + p string + t time.Time + ) + if err := rows.Scan(&p, &t); err != nil { return nil, err } - out[p] = struct{}{} + out[p] = t } return out, rows.Err() } diff --git a/ingestion/internal/vectorstore/pg_test.go b/ingestion/internal/vectorstore/pg_test.go index 34c256f..30e3bca 100644 --- a/ingestion/internal/vectorstore/pg_test.go +++ b/ingestion/internal/vectorstore/pg_test.go @@ -36,7 +36,7 @@ func freshStore(t *testing.T) (*vectorstore.PGStore, context.Context) { t.Cleanup(s.Close) require.NoError(t, s.Init(ctx)) // Clean slate per test. - _, _ = s.KnownPaths(ctx) + _, _ = s.KnownPathsWithTime(ctx) require.NoError(t, s.Delete(ctx, "%test-fixture%")) return s, ctx } @@ -67,15 +67,18 @@ func TestIntegration_UpsertAndSearch(t *testing.T) { }) } -func TestIntegration_KnownPaths(t *testing.T) { +func TestIntegration_KnownPathsWithTime(t *testing.T) { s, ctx := freshStore(t) + before := time.Now() require.NoError(t, s.Upsert(ctx, "wiki/k.md", vec(768, 0.5))) t.Cleanup(func() { _ = s.Delete(ctx, "wiki/k.md") }) - paths, err := s.KnownPaths(ctx) + paths, err := s.KnownPathsWithTime(ctx) require.NoError(t, err) - _, ok := paths["wiki/k.md"] - assert.True(t, ok) + at, ok := paths["wiki/k.md"] + require.True(t, ok) + assert.False(t, at.IsZero(), "updated_at must not be zero") + assert.WithinDuration(t, before, at, 5*time.Second, "updated_at must be recent") } func TestUpsert_RejectsWrongDimension(t *testing.T) { diff --git a/ingestion/internal/vectorstore/sync.go b/ingestion/internal/vectorstore/sync.go index d1591fa..1ecf05f 100644 --- a/ingestion/internal/vectorstore/sync.go +++ b/ingestion/internal/vectorstore/sync.go @@ -18,7 +18,11 @@ type Embedder interface { // Store is the subset of PGStore that Sync needs. Lets tests stub it. type Store interface { - KnownPaths(ctx context.Context) (map[string]struct{}, error) + // KnownPathsWithTime returns every embedded chunk path paired with the + // row's updated_at. Sync uses the timestamp to detect edits — a file + // whose mtime is newer than ANY of its chunks' updated_at is re-embedded + // from scratch (old chunks deleted, fresh chunks upserted). + KnownPathsWithTime(ctx context.Context) (map[string]time.Time, error) Upsert(ctx context.Context, path string, embedding []float32) error Delete(ctx context.Context, path string) error } @@ -58,15 +62,31 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) return res, nil } - known, err := store.KnownPaths(ctx) + known, err := store.KnownPathsWithTime(ctx) if err != nil { return res, fmt.Errorf("known paths: %w", err) } - // Build a parent → "any chunk known?" set so we can skip files that - // already have at least one chunk row in the store. - knownParents := make(map[string]struct{}, len(known)) - for p := range known { - knownParents[ParentPath(p)] = struct{}{} + // Group known chunks by parent path and remember the EARLIEST + // updated_at per parent. A file is considered stale if its mtime is + // after the oldest of its chunk rows — i.e. at least one chunk hasn't + // been refreshed since the last edit. Also keep the full chunk-path + // list per parent so we can delete every old chunk before re-embedding + // (handles "file shrunk → fewer chunks → orphan rows" cleanly). + type parentState struct { + minUpdatedAt time.Time + chunkPaths []string + } + parents := make(map[string]*parentState, len(known)) + for p, t := range known { + parent := ParentPath(p) + ps, ok := parents[parent] + if !ok { + ps = &parentState{minUpdatedAt: t} + parents[parent] = ps + } else if t.Before(ps.minUpdatedAt) { + ps.minUpdatedAt = t + } + ps.chunkPaths = append(ps.chunkPaths, p) } seenParents := make(map[string]struct{}) @@ -90,11 +110,26 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) relSlash := filepath.ToSlash(rel) seenParents[relSlash] = struct{}{} - if _, ok := knownParents[relSlash]; ok { - // File has at least one chunk in the store already. - // TODO: compare mtime once Store exposes updated_at so we - // re-embed on edit. For now, skip. - return nil + if ps, ok := parents[relSlash]; ok { + // File already has chunks in the store. Re-embed only when + // the file has been edited since the oldest chunk was + // written. Tolerate clock skew with a sub-second grace. + info, statErr := d.Info() + if statErr != nil { + res.Errors = append(res.Errors, fmt.Errorf("stat %s: %w", relSlash, statErr)) + return nil + } + if !info.ModTime().After(ps.minUpdatedAt) { + return nil + } + // Stale: delete old chunks before re-embedding so a shrunk + // file doesn't leave orphan rows at higher #NNNN indexes. + for _, oldPath := range ps.chunkPaths { + if delErr := store.Delete(ctx, oldPath); delErr != nil { + res.Errors = append(res.Errors, fmt.Errorf("delete %s for re-embed: %w", oldPath, delErr)) + return nil + } + } } content, readErr := os.ReadFile(path) diff --git a/ingestion/internal/vectorstore/sync_test.go b/ingestion/internal/vectorstore/sync_test.go index 54e67a6..0a800f4 100644 --- a/ingestion/internal/vectorstore/sync_test.go +++ b/ingestion/internal/vectorstore/sync_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" "github.com/stretchr/testify/assert" @@ -14,16 +15,27 @@ import ( ) type stubStore struct { - known map[string]struct{} + // known maps chunk-path → updated_at. Tests that don't care about + // re-embed-on-mtime use a far-future time so the Sync skip path + // always wins. Tests that do exercise the mtime path set the + // updated_at explicitly. + known map[string]time.Time upserts map[string][]float32 deletes []string failNext error } -func (s *stubStore) KnownPaths(_ context.Context) (map[string]struct{}, error) { - out := make(map[string]struct{}, len(s.known)) - for k := range s.known { - out[k] = struct{}{} +// farFuture is "newer than any file mtime", used as the default +// updated_at in stubs that don't care about re-embed behavior. +var farFuture = time.Now().Add(24 * time.Hour) + +func (s *stubStore) KnownPathsWithTime(_ context.Context) (map[string]time.Time, error) { + out := make(map[string]time.Time, len(s.known)) + for k, t := range s.known { + if t.IsZero() { + t = farFuture + } + out[k] = t } return out, nil } @@ -67,7 +79,7 @@ func TestSync_AddsNewFiles(t *testing.T) { writeNote(t, dir, "wiki/jepa-fx/facts/x.md", "body of x") writeNote(t, dir, "wiki/jepa-fx/facts/y.md", "body of y") - store := &stubStore{known: map[string]struct{}{}} + store := &stubStore{known: map[string]time.Time{}} emb := stubEmbedder{vec: make([]float32, 768)} res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err) @@ -81,7 +93,7 @@ func TestSync_SkipsAlreadyKnown(t *testing.T) { dir := t.TempDir() writeNote(t, dir, "wiki/a/facts/x.md", "x") - store := &stubStore{known: map[string]struct{}{"wiki/a/facts/x.md#0001": {}}} + store := &stubStore{known: map[string]time.Time{"wiki/a/facts/x.md#0001": {}}} emb := stubEmbedder{vec: make([]float32, 768)} res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err) @@ -93,7 +105,7 @@ func TestSync_DeletesDisappearedFiles(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) // store has a path that doesn't exist on disk anymore - store := &stubStore{known: map[string]struct{}{"wiki/old/facts/ghost.md#0001": {}}} + store := &stubStore{known: map[string]time.Time{"wiki/old/facts/ghost.md#0001": {}}} res, err := vectorstore.Sync(context.Background(), dir, &stubStoreWithDelete{stubStore: store}, stubEmbedder{vec: make([]float32, 768)}) require.NoError(t, err) assert.Equal(t, 1, res.Deleted) @@ -111,7 +123,7 @@ func TestSync_SkipsIndexFiles(t *testing.T) { writeNote(t, dir, "wiki/a/_index.md", "moc") writeNote(t, dir, "wiki/a/facts/real.md", "body") - store := &stubStore{known: map[string]struct{}{}} + store := &stubStore{known: map[string]time.Time{}} res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)}) require.NoError(t, err) assert.Equal(t, 1, res.Added) @@ -123,7 +135,7 @@ func TestSync_ScansKnowledgeDir(t *testing.T) { writeNote(t, dir, "wiki/a/facts/x.md", "x") writeNote(t, dir, "knowledge/2026-05-19-koala-gpu-setup.md", "knowledge body") - store := &stubStore{known: map[string]struct{}{}} + store := &stubStore{known: map[string]time.Time{}} emb := stubEmbedder{vec: make([]float32, 768)} res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err) @@ -143,7 +155,7 @@ func TestSync_ChunksLongFiles(t *testing.T) { } writeNote(t, dir, "knowledge/long.md", body) - store := &stubStore{known: map[string]struct{}{}} + store := &stubStore{known: map[string]time.Time{}} emb := stubEmbedder{vec: make([]float32, 768)} res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err) @@ -164,7 +176,7 @@ func TestSync_ShortFileGetsSingleChunkRow(t *testing.T) { dir := t.TempDir() writeNote(t, dir, "wiki/short.md", "tiny body\n") - store := &stubStore{known: map[string]struct{}{}} + store := &stubStore{known: map[string]time.Time{}} emb := stubEmbedder{vec: make([]float32, 768)} res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err) @@ -176,7 +188,7 @@ func TestSync_SkipsFileIfAnyChunkAlreadyKnown(t *testing.T) { dir := t.TempDir() writeNote(t, dir, "wiki/foo.md", "body\n") - store := &stubStore{known: map[string]struct{}{ + store := &stubStore{known: map[string]time.Time{ "wiki/foo.md#0001": {}, }} emb := stubEmbedder{vec: make([]float32, 768)} @@ -189,7 +201,7 @@ func TestSync_SkipsFileIfAnyChunkAlreadyKnown(t *testing.T) { func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) - store := &stubStore{known: map[string]struct{}{ + store := &stubStore{known: map[string]time.Time{ "wiki/ghost.md#0001": {}, "wiki/ghost.md#0002": {}, "wiki/ghost.md#0003": {}, @@ -199,6 +211,49 @@ func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) { assert.Equal(t, 3, res.Deleted) } +func TestSync_ReembedsFileWhenMtimeNewer(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/edited.md", "original body\n") + // Force the file's mtime ahead of any plausible store updated_at. + future := time.Now().Add(1 * time.Hour) + require.NoError(t, os.Chtimes(filepath.Join(dir, "wiki/edited.md"), future, future)) + + store := &stubStore{ + known: map[string]time.Time{ + // Existing chunk row pre-dates the file's mtime. + "wiki/edited.md#0001": time.Now().Add(-1 * time.Hour), + }, + } + emb := stubEmbedder{vec: make([]float32, 768)} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Equal(t, 1, res.Added, "file with newer mtime should be re-embedded") + assert.Contains(t, store.upserts, "wiki/edited.md#0001") + // Old chunks of the same parent must be deleted before re-embed so + // shrunk files don't leave orphan rows at higher #NNNN indexes. + assert.Contains(t, store.deletes, "wiki/edited.md#0001") +} + +func TestSync_SkipsFileWhenMtimeOlder(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/stable.md", "body\n") + // Backdate mtime to before the store's recorded updated_at. + past := time.Now().Add(-2 * time.Hour) + require.NoError(t, os.Chtimes(filepath.Join(dir, "wiki/stable.md"), past, past)) + + store := &stubStore{ + known: map[string]time.Time{ + "wiki/stable.md#0001": time.Now(), + }, + } + emb := stubEmbedder{vec: make([]float32, 768)} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Equal(t, 0, res.Added) + assert.Empty(t, store.upserts) + assert.Empty(t, store.deletes) +} + func TestSync_NoOpWhenComponentsNil(t *testing.T) { dir := t.TempDir() writeNote(t, dir, "wiki/a/facts/x.md", "x") @@ -210,7 +265,7 @@ func TestSync_NoOpWhenComponentsNil(t *testing.T) { func TestSync_CollectsEmbedderErrors(t *testing.T) { dir := t.TempDir() writeNote(t, dir, "wiki/a/facts/x.md", "x") - store := &stubStore{known: map[string]struct{}{}} + store := &stubStore{known: map[string]time.Time{}} emb := stubEmbedder{err: errors.New("upstream down")} res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err)