package vectorstore import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "time" ) // Embedder produces dense vectors. The embed package's Client satisfies // this; it's declared locally so vectorstore doesn't depend on embed. type Embedder interface { Embed(ctx context.Context, text string) ([]float32, error) } // Store is the subset of PGStore that Sync needs. Lets tests stub it. type Store interface { KnownPaths(ctx context.Context) (map[string]struct{}, error) Upsert(ctx context.Context, path string, embedding []float32) error Delete(ctx context.Context, path string) error } // SyncResult tallies what Sync did. Returned for logs / metrics; callers // generally don't act on the fields directly. type SyncResult struct { Added int Updated int Deleted int Errors []error } // scanDirs is the set of brainDir subdirectories whose .md files are // embedded for vector retrieval. wiki/ holds LLM-extracted entity and // source pages; knowledge/ holds curated hand-written entries. var scanDirs = []string{"wiki", "knowledge"} // maxChunkBytes is the per-chunk byte budget passed to ChunkMarkdown. // Sized to fit comfortably under nomic-embed-text's 2048-token default // context (~4 chars/token for English markdown → ~8 KB ceiling; we sit // at 4 KB to leave headroom for unicode, code blocks, and tokenizer // variance). const maxChunkBytes = 4000 // Sync brings the embedding store in line with brain/{wiki,knowledge}/ // on disk: // - new files (in the tree, not in the store) get embedded + upserted // - files whose mtime exceeds the store's updated_at get re-embedded // - files no longer on disk get deleted from the store // // Designed to be called on a ticker. Best-effort: per-file errors are // collected into SyncResult.Errors and do not abort the run. func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) (SyncResult, error) { var res SyncResult if store == nil || embedder == nil { return res, nil } known, err := store.KnownPaths(ctx) if err != nil { return res, fmt.Errorf("known paths: %w", err) } // Build a parent → "any chunk known?" set so we can skip files that // already have at least one chunk row in the store. knownParents := make(map[string]struct{}, len(known)) for p := range known { knownParents[ParentPath(p)] = struct{}{} } seenParents := make(map[string]struct{}) for _, sub := range scanDirs { root := filepath.Join(brainDir, sub) if _, err := os.Stat(root); os.IsNotExist(err) { continue } err = filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error { if err != nil { return err } if d.IsDir() || !strings.HasSuffix(path, ".md") || d.Name() == "_index.md" { return nil } rel, err := filepath.Rel(brainDir, path) if err != nil { return err } relSlash := filepath.ToSlash(rel) seenParents[relSlash] = struct{}{} if _, ok := knownParents[relSlash]; ok { // File has at least one chunk in the store already. // TODO: compare mtime once Store exposes updated_at so we // re-embed on edit. For now, skip. return nil } content, readErr := os.ReadFile(path) if readErr != nil { res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr)) return nil } chunks := NumberChunks(relSlash, ChunkMarkdown(string(content), maxChunkBytes)) for _, ch := range chunks { vec, embErr := embedder.Embed(ctx, ch.Content) if embErr != nil { res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", ch.Path, embErr)) continue } if upErr := store.Upsert(ctx, ch.Path, vec); upErr != nil { res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", ch.Path, upErr)) continue } res.Added++ } return nil }) if err != nil { return res, fmt.Errorf("walk %s: %w", sub, err) } } // Drop chunk rows whose parent file is gone. for path := range known { if _, ok := seenParents[ParentPath(path)]; ok { continue } if err := store.Delete(ctx, path); err != nil { res.Errors = append(res.Errors, fmt.Errorf("delete %s: %w", path, err)) continue } res.Deleted++ } return res, nil } // StartSync launches Sync on a ticker in a background goroutine. The // goroutine exits when ctx is cancelled. Failures are logged via slog. func StartSync(ctx context.Context, brainDir string, store Store, embedder Embedder, interval time.Duration) { if interval <= 0 { interval = 5 * time.Minute } go func() { t := time.NewTicker(interval) defer t.Stop() // Run once immediately so first-boot doesn't wait a full tick. if r, err := Sync(ctx, brainDir, store, embedder); err != nil { slog.Error("embed sync failed", "err", err) } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) for _, e := range r.Errors { slog.Warn("embed sync item failed", "err", e) } } for { select { case <-ctx.Done(): return case <-t.C: if r, err := Sync(ctx, brainDir, store, embedder); err != nil { slog.Error("embed sync failed", "err", err) } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) } } } }() }