Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
815739758e |
@@ -8,6 +8,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
@@ -120,21 +121,26 @@ func (s *PGStore) Search(ctx context.Context, query []float32, limit int) ([]Hit
|
||||
return hits, nil
|
||||
}
|
||||
|
||||
// KnownPaths returns the path set already present in the store. Used by
|
||||
// the watcher to diff against the wiki/ tree and decide what to upsert.
|
||||
func (s *PGStore) KnownPaths(ctx context.Context) (map[string]struct{}, error) {
|
||||
rows, err := s.pool.Query(ctx, `SELECT path FROM brain_embeddings`)
|
||||
// KnownPathsWithTime returns every embedded chunk path paired with the
|
||||
// row's updated_at. Sync uses the timestamps to decide whether a file
|
||||
// has been edited since its chunks were last embedded — when the file's
|
||||
// mtime exceeds the oldest chunk's updated_at, the file is re-embedded.
|
||||
func (s *PGStore) KnownPathsWithTime(ctx context.Context) (map[string]time.Time, error) {
|
||||
rows, err := s.pool.Query(ctx, `SELECT path, updated_at FROM brain_embeddings`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query paths: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
out := make(map[string]struct{})
|
||||
out := make(map[string]time.Time)
|
||||
for rows.Next() {
|
||||
var p string
|
||||
if err := rows.Scan(&p); err != nil {
|
||||
var (
|
||||
p string
|
||||
t time.Time
|
||||
)
|
||||
if err := rows.Scan(&p, &t); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[p] = struct{}{}
|
||||
out[p] = t
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func freshStore(t *testing.T) (*vectorstore.PGStore, context.Context) {
|
||||
t.Cleanup(s.Close)
|
||||
require.NoError(t, s.Init(ctx))
|
||||
// Clean slate per test.
|
||||
_, _ = s.KnownPaths(ctx)
|
||||
_, _ = s.KnownPathsWithTime(ctx)
|
||||
require.NoError(t, s.Delete(ctx, "%test-fixture%"))
|
||||
return s, ctx
|
||||
}
|
||||
@@ -67,15 +67,18 @@ func TestIntegration_UpsertAndSearch(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestIntegration_KnownPaths(t *testing.T) {
|
||||
func TestIntegration_KnownPathsWithTime(t *testing.T) {
|
||||
s, ctx := freshStore(t)
|
||||
before := time.Now()
|
||||
require.NoError(t, s.Upsert(ctx, "wiki/k.md", vec(768, 0.5)))
|
||||
t.Cleanup(func() { _ = s.Delete(ctx, "wiki/k.md") })
|
||||
|
||||
paths, err := s.KnownPaths(ctx)
|
||||
paths, err := s.KnownPathsWithTime(ctx)
|
||||
require.NoError(t, err)
|
||||
_, ok := paths["wiki/k.md"]
|
||||
assert.True(t, ok)
|
||||
at, ok := paths["wiki/k.md"]
|
||||
require.True(t, ok)
|
||||
assert.False(t, at.IsZero(), "updated_at must not be zero")
|
||||
assert.WithinDuration(t, before, at, 5*time.Second, "updated_at must be recent")
|
||||
}
|
||||
|
||||
func TestUpsert_RejectsWrongDimension(t *testing.T) {
|
||||
|
||||
@@ -18,7 +18,11 @@ type Embedder interface {
|
||||
|
||||
// Store is the subset of PGStore that Sync needs. Lets tests stub it.
|
||||
type Store interface {
|
||||
KnownPaths(ctx context.Context) (map[string]struct{}, error)
|
||||
// KnownPathsWithTime returns every embedded chunk path paired with the
|
||||
// row's updated_at. Sync uses the timestamp to detect edits — a file
|
||||
// whose mtime is newer than ANY of its chunks' updated_at is re-embedded
|
||||
// from scratch (old chunks deleted, fresh chunks upserted).
|
||||
KnownPathsWithTime(ctx context.Context) (map[string]time.Time, error)
|
||||
Upsert(ctx context.Context, path string, embedding []float32) error
|
||||
Delete(ctx context.Context, path string) error
|
||||
}
|
||||
@@ -58,15 +62,31 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
known, err := store.KnownPaths(ctx)
|
||||
known, err := store.KnownPathsWithTime(ctx)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("known paths: %w", err)
|
||||
}
|
||||
// Build a parent → "any chunk known?" set so we can skip files that
|
||||
// already have at least one chunk row in the store.
|
||||
knownParents := make(map[string]struct{}, len(known))
|
||||
for p := range known {
|
||||
knownParents[ParentPath(p)] = struct{}{}
|
||||
// Group known chunks by parent path and remember the EARLIEST
|
||||
// updated_at per parent. A file is considered stale if its mtime is
|
||||
// after the oldest of its chunk rows — i.e. at least one chunk hasn't
|
||||
// been refreshed since the last edit. Also keep the full chunk-path
|
||||
// list per parent so we can delete every old chunk before re-embedding
|
||||
// (handles "file shrunk → fewer chunks → orphan rows" cleanly).
|
||||
type parentState struct {
|
||||
minUpdatedAt time.Time
|
||||
chunkPaths []string
|
||||
}
|
||||
parents := make(map[string]*parentState, len(known))
|
||||
for p, t := range known {
|
||||
parent := ParentPath(p)
|
||||
ps, ok := parents[parent]
|
||||
if !ok {
|
||||
ps = &parentState{minUpdatedAt: t}
|
||||
parents[parent] = ps
|
||||
} else if t.Before(ps.minUpdatedAt) {
|
||||
ps.minUpdatedAt = t
|
||||
}
|
||||
ps.chunkPaths = append(ps.chunkPaths, p)
|
||||
}
|
||||
seenParents := make(map[string]struct{})
|
||||
|
||||
@@ -90,12 +110,27 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder)
|
||||
relSlash := filepath.ToSlash(rel)
|
||||
seenParents[relSlash] = struct{}{}
|
||||
|
||||
if _, ok := knownParents[relSlash]; ok {
|
||||
// File has at least one chunk in the store already.
|
||||
// TODO: compare mtime once Store exposes updated_at so we
|
||||
// re-embed on edit. For now, skip.
|
||||
if ps, ok := parents[relSlash]; ok {
|
||||
// File already has chunks in the store. Re-embed only when
|
||||
// the file has been edited since the oldest chunk was
|
||||
// written. Tolerate clock skew with a sub-second grace.
|
||||
info, statErr := d.Info()
|
||||
if statErr != nil {
|
||||
res.Errors = append(res.Errors, fmt.Errorf("stat %s: %w", relSlash, statErr))
|
||||
return nil
|
||||
}
|
||||
if !info.ModTime().After(ps.minUpdatedAt) {
|
||||
return nil
|
||||
}
|
||||
// Stale: delete old chunks before re-embedding so a shrunk
|
||||
// file doesn't leave orphan rows at higher #NNNN indexes.
|
||||
for _, oldPath := range ps.chunkPaths {
|
||||
if delErr := store.Delete(ctx, oldPath); delErr != nil {
|
||||
res.Errors = append(res.Errors, fmt.Errorf("delete %s for re-embed: %w", oldPath, delErr))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
content, readErr := os.ReadFile(path)
|
||||
if readErr != nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -14,16 +15,27 @@ import (
|
||||
)
|
||||
|
||||
type stubStore struct {
|
||||
known map[string]struct{}
|
||||
// known maps chunk-path → updated_at. Tests that don't care about
|
||||
// re-embed-on-mtime use a far-future time so the Sync skip path
|
||||
// always wins. Tests that do exercise the mtime path set the
|
||||
// updated_at explicitly.
|
||||
known map[string]time.Time
|
||||
upserts map[string][]float32
|
||||
deletes []string
|
||||
failNext error
|
||||
}
|
||||
|
||||
func (s *stubStore) KnownPaths(_ context.Context) (map[string]struct{}, error) {
|
||||
out := make(map[string]struct{}, len(s.known))
|
||||
for k := range s.known {
|
||||
out[k] = struct{}{}
|
||||
// farFuture is "newer than any file mtime", used as the default
|
||||
// updated_at in stubs that don't care about re-embed behavior.
|
||||
var farFuture = time.Now().Add(24 * time.Hour)
|
||||
|
||||
func (s *stubStore) KnownPathsWithTime(_ context.Context) (map[string]time.Time, error) {
|
||||
out := make(map[string]time.Time, len(s.known))
|
||||
for k, t := range s.known {
|
||||
if t.IsZero() {
|
||||
t = farFuture
|
||||
}
|
||||
out[k] = t
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
@@ -67,7 +79,7 @@ func TestSync_AddsNewFiles(t *testing.T) {
|
||||
writeNote(t, dir, "wiki/jepa-fx/facts/x.md", "body of x")
|
||||
writeNote(t, dir, "wiki/jepa-fx/facts/y.md", "body of y")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
store := &stubStore{known: map[string]time.Time{}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
@@ -81,7 +93,7 @@ func TestSync_SkipsAlreadyKnown(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{"wiki/a/facts/x.md#0001": {}}}
|
||||
store := &stubStore{known: map[string]time.Time{"wiki/a/facts/x.md#0001": {}}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
@@ -93,7 +105,7 @@ func TestSync_DeletesDisappearedFiles(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755))
|
||||
// store has a path that doesn't exist on disk anymore
|
||||
store := &stubStore{known: map[string]struct{}{"wiki/old/facts/ghost.md#0001": {}}}
|
||||
store := &stubStore{known: map[string]time.Time{"wiki/old/facts/ghost.md#0001": {}}}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, &stubStoreWithDelete{stubStore: store}, stubEmbedder{vec: make([]float32, 768)})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, res.Deleted)
|
||||
@@ -111,7 +123,7 @@ func TestSync_SkipsIndexFiles(t *testing.T) {
|
||||
writeNote(t, dir, "wiki/a/_index.md", "moc")
|
||||
writeNote(t, dir, "wiki/a/facts/real.md", "body")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
store := &stubStore{known: map[string]time.Time{}}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, res.Added)
|
||||
@@ -123,7 +135,7 @@ func TestSync_ScansKnowledgeDir(t *testing.T) {
|
||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||
writeNote(t, dir, "knowledge/2026-05-19-koala-gpu-setup.md", "knowledge body")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
store := &stubStore{known: map[string]time.Time{}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
@@ -143,7 +155,7 @@ func TestSync_ChunksLongFiles(t *testing.T) {
|
||||
}
|
||||
writeNote(t, dir, "knowledge/long.md", body)
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
store := &stubStore{known: map[string]time.Time{}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
@@ -164,7 +176,7 @@ func TestSync_ShortFileGetsSingleChunkRow(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/short.md", "tiny body\n")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
store := &stubStore{known: map[string]time.Time{}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
@@ -176,7 +188,7 @@ func TestSync_SkipsFileIfAnyChunkAlreadyKnown(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/foo.md", "body\n")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{
|
||||
store := &stubStore{known: map[string]time.Time{
|
||||
"wiki/foo.md#0001": {},
|
||||
}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
@@ -189,7 +201,7 @@ func TestSync_SkipsFileIfAnyChunkAlreadyKnown(t *testing.T) {
|
||||
func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755))
|
||||
store := &stubStore{known: map[string]struct{}{
|
||||
store := &stubStore{known: map[string]time.Time{
|
||||
"wiki/ghost.md#0001": {},
|
||||
"wiki/ghost.md#0002": {},
|
||||
"wiki/ghost.md#0003": {},
|
||||
@@ -199,6 +211,49 @@ func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) {
|
||||
assert.Equal(t, 3, res.Deleted)
|
||||
}
|
||||
|
||||
func TestSync_ReembedsFileWhenMtimeNewer(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/edited.md", "original body\n")
|
||||
// Force the file's mtime ahead of any plausible store updated_at.
|
||||
future := time.Now().Add(1 * time.Hour)
|
||||
require.NoError(t, os.Chtimes(filepath.Join(dir, "wiki/edited.md"), future, future))
|
||||
|
||||
store := &stubStore{
|
||||
known: map[string]time.Time{
|
||||
// Existing chunk row pre-dates the file's mtime.
|
||||
"wiki/edited.md#0001": time.Now().Add(-1 * time.Hour),
|
||||
},
|
||||
}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, res.Added, "file with newer mtime should be re-embedded")
|
||||
assert.Contains(t, store.upserts, "wiki/edited.md#0001")
|
||||
// Old chunks of the same parent must be deleted before re-embed so
|
||||
// shrunk files don't leave orphan rows at higher #NNNN indexes.
|
||||
assert.Contains(t, store.deletes, "wiki/edited.md#0001")
|
||||
}
|
||||
|
||||
func TestSync_SkipsFileWhenMtimeOlder(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/stable.md", "body\n")
|
||||
// Backdate mtime to before the store's recorded updated_at.
|
||||
past := time.Now().Add(-2 * time.Hour)
|
||||
require.NoError(t, os.Chtimes(filepath.Join(dir, "wiki/stable.md"), past, past))
|
||||
|
||||
store := &stubStore{
|
||||
known: map[string]time.Time{
|
||||
"wiki/stable.md#0001": time.Now(),
|
||||
},
|
||||
}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, res.Added)
|
||||
assert.Empty(t, store.upserts)
|
||||
assert.Empty(t, store.deletes)
|
||||
}
|
||||
|
||||
func TestSync_NoOpWhenComponentsNil(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||
@@ -210,7 +265,7 @@ func TestSync_NoOpWhenComponentsNil(t *testing.T) {
|
||||
func TestSync_CollectsEmbedderErrors(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
store := &stubStore{known: map[string]time.Time{}}
|
||||
emb := stubEmbedder{err: errors.New("upstream down")}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
|
||||
Reference in New Issue
Block a user