package vectorstore import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "time" ) // Embedder produces dense vectors. The embed package's Client satisfies // this; it's declared locally so vectorstore doesn't depend on embed. type Embedder interface { Embed(ctx context.Context, text string) ([]float32, error) } // Store is the subset of PGStore that Sync needs. Lets tests stub it. type Store interface { KnownPaths(ctx context.Context) (map[string]struct{}, error) Upsert(ctx context.Context, path string, embedding []float32) error Delete(ctx context.Context, path string) error } // SyncResult tallies what Sync did. Returned for logs / metrics; callers // generally don't act on the fields directly. type SyncResult struct { Added int Updated int Deleted int Errors []error } // Sync brings the embedding store in line with brain/wiki/ on disk: // - new files (in the tree, not in the store) get embedded + upserted // - files whose mtime exceeds the store's updated_at get re-embedded // - files no longer on disk get deleted from the store // // Designed to be called on a ticker. Best-effort: per-file errors are // collected into SyncResult.Errors and do not abort the run. func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) (SyncResult, error) { var res SyncResult if store == nil || embedder == nil { return res, nil } known, err := store.KnownPaths(ctx) if err != nil { return res, fmt.Errorf("known paths: %w", err) } seen := make(map[string]struct{}) wikiDir := filepath.Join(brainDir, "wiki") if _, err := os.Stat(wikiDir); os.IsNotExist(err) { return res, nil } err = filepath.WalkDir(wikiDir, func(path string, d os.DirEntry, err error) error { if err != nil { return err } if d.IsDir() || !strings.HasSuffix(path, ".md") || d.Name() == "_index.md" { return nil } rel, err := filepath.Rel(brainDir, path) if err != nil { return err } relSlash := filepath.ToSlash(rel) seen[relSlash] = struct{}{} if _, ok := known[relSlash]; ok { // Already embedded — TODO: compare mtime once Store exposes // updated_at so we re-embed on edit. For now, skip. return nil } content, readErr := os.ReadFile(path) if readErr != nil { res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr)) return nil } vec, embErr := embedder.Embed(ctx, string(content)) if embErr != nil { res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", relSlash, embErr)) return nil } if upErr := store.Upsert(ctx, relSlash, vec); upErr != nil { res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", relSlash, upErr)) return nil } res.Added++ return nil }) if err != nil { return res, fmt.Errorf("walk wiki: %w", err) } // Drop rows whose file is gone. for path := range known { if _, ok := seen[path]; ok { continue } if err := store.Delete(ctx, path); err != nil { res.Errors = append(res.Errors, fmt.Errorf("delete %s: %w", path, err)) continue } res.Deleted++ } return res, nil } // StartSync launches Sync on a ticker in a background goroutine. The // goroutine exits when ctx is cancelled. Failures are logged via slog. func StartSync(ctx context.Context, brainDir string, store Store, embedder Embedder, interval time.Duration) { if interval <= 0 { interval = 5 * time.Minute } go func() { t := time.NewTicker(interval) defer t.Stop() // Run once immediately so first-boot doesn't wait a full tick. if r, err := Sync(ctx, brainDir, store, embedder); err != nil { slog.Error("embed sync failed", "err", err) } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) } for { select { case <-ctx.Done(): return case <-t.C: if r, err := Sync(ctx, brainDir, store, embedder); err != nil { slog.Error("embed sync failed", "err", err) } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) } } } }() }