From 78531bb23851670add84a7aded0e84ceffb6242a Mon Sep 17 00:00:00 2001 From: Mathias Bergqvist Date: Wed, 22 Apr 2026 22:54:03 +0200 Subject: [PATCH] feat(ingestion): add background file watcher for brain/raw/ Polls brain/raw/ on a configurable ticker, derives human-readable source names from filenames, runs the pipeline, and moves files to processed/YYYY-MM-DD/ on success or failed/ on error with a log.md entry. Co-Authored-By: Claude Sonnet 4.6 --- ingestion/internal/watcher/watcher.go | 165 ++++++++++++++++ ingestion/internal/watcher/watcher_test.go | 219 +++++++++++++++++++++ 2 files changed, 384 insertions(+) create mode 100644 ingestion/internal/watcher/watcher.go create mode 100644 ingestion/internal/watcher/watcher_test.go diff --git a/ingestion/internal/watcher/watcher.go b/ingestion/internal/watcher/watcher.go new file mode 100644 index 0000000..0f346ef --- /dev/null +++ b/ingestion/internal/watcher/watcher.go @@ -0,0 +1,165 @@ +// ingestion/internal/watcher/watcher.go +package watcher + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + "unicode" + + "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" +) + +// Config holds watcher configuration. +type Config struct { + BrainDir string + Interval time.Duration + Pipeline pipeline.Config +} + +// Start launches the watcher in a background goroutine. +// It returns immediately. The watcher stops when ctx is cancelled. +func Start(ctx context.Context, cfg Config) { + go func() { + ticker := time.NewTicker(cfg.Interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + date := time.Now().UTC().Format("2006-01-02") + errs := processDir(ctx, cfg, date) + for _, err := range errs { + slog.Error("watcher: error processing file", "error", err) + } + } + } + }() +} + +// processDir walks brain/raw/, processes each eligible file, returns any errors encountered. +func processDir(ctx context.Context, cfg Config, date string) []error { + rawDir := filepath.Join(cfg.BrainDir, "raw") + + var errs []error + err := filepath.WalkDir(rawDir, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + + // Skip the root itself. + if path == rawDir { + return nil + } + + // Skip processed/ and failed/ subdirectories entirely. + if d.IsDir() { + name := d.Name() + if name == "processed" || name == "failed" { + return filepath.SkipDir + } + return nil + } + + // Only process supported extensions. + ext := strings.ToLower(filepath.Ext(path)) + if ext != ".md" && ext != ".txt" && ext != ".pdf" { + return nil + } + + if err := processFile(ctx, cfg, path, date); err != nil { + errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err)) + } + return nil + }) + if err != nil { + errs = append(errs, fmt.Errorf("walk raw dir: %w", err)) + } + return errs +} + +// processFile reads a file, calls pipeline.Run, moves it to processed/ or failed/. +func processFile(ctx context.Context, cfg Config, path, date string) error { + filename := filepath.Base(path) + source := deriveSource(filename) + + content, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("read file: %w", err) + } + + _, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, string(content), source, false) + if runErr != nil { + // Move to failed/. + failedDir := filepath.Join(cfg.BrainDir, "raw", "failed") + if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil { + return fmt.Errorf("mkdir failed dir: %w", mkErr) + } + dest := filepath.Join(failedDir, filename) + if mvErr := os.Rename(path, dest); mvErr != nil { + return fmt.Errorf("move to failed: %w", mvErr) + } + + slog.Warn("watcher: file failed, moved to failed/", "file", filename, "error", runErr) + + if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil { + slog.Error("watcher: failed to write log entry", "error", logErr) + } + return runErr + } + + // Move to processed/YYYY-MM-DD/. + processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date) + if err := os.MkdirAll(processedDir, 0o755); err != nil { + return fmt.Errorf("mkdir processed dir: %w", err) + } + dest := filepath.Join(processedDir, filename) + if err := os.Rename(path, dest); err != nil { + return fmt.Errorf("move to processed: %w", err) + } + + slog.Info("watcher: file processed", "file", filename, "source", source) + return nil +} + +// deriveSource turns a filename into a human-readable source name. +// "shape-up-book.md" → "Shape Up Book" +func deriveSource(filename string) string { + // Strip extension. + name := strings.TrimSuffix(filename, filepath.Ext(filename)) + // Split on hyphens. + words := strings.Split(name, "-") + // Title-case each word. + for i, w := range words { + if w == "" { + continue + } + runes := []rune(w) + runes[0] = unicode.ToUpper(runes[0]) + words[i] = string(runes) + } + return strings.Join(words, " ") +} + +// appendWatcherLog appends a watcher error entry to brain/log.md. +func appendWatcherLog(brainDir, filename string, runErr error, date string) error { + entry := fmt.Sprintf("## %s — watcher error\n\n- **File:** %s\n- **Error:** %s\n\n", + date, filename, runErr.Error()) + + logPath := filepath.Join(brainDir, "log.md") + f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("open log: %w", err) + } + defer f.Close() + + if _, err = f.WriteString(entry); err != nil { + return fmt.Errorf("write log: %w", err) + } + return nil +} diff --git a/ingestion/internal/watcher/watcher_test.go b/ingestion/internal/watcher/watcher_test.go new file mode 100644 index 0000000..4b20d39 --- /dev/null +++ b/ingestion/internal/watcher/watcher_test.go @@ -0,0 +1,219 @@ +// ingestion/internal/watcher/watcher_test.go +package watcher + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" + "github.com/mathiasbq/hyperguild/ingestion/internal/wiki" +) + +// successComplete returns a valid JSON-encoded page array for any call. +func successComplete(page wiki.Page) pipeline.CompleteFunc { + return func(ctx context.Context, system, user string) (string, error) { + b, err := json.Marshal([]wiki.Page{page}) + if err != nil { + return "", err + } + return string(b), nil + } +} + +// errorComplete always returns an error simulating an LLM failure. +func errorComplete(_ context.Context, _, _ string) (string, error) { + return "", fmt.Errorf("LLM unavailable") +} + +func setupBrainDir(t *testing.T) string { + t.Helper() + brainDir := t.TempDir() + for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources", "raw"} { + require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755)) + } + return brainDir +} + +func TestStart_ProcessesFile(t *testing.T) { + brainDir := setupBrainDir(t) + + // Place a .md file in raw/. + rawFile := filepath.Join(brainDir, "raw", "shape-up-book.md") + require.NoError(t, os.WriteFile(rawFile, []byte("Content about Shape Up."), 0o644)) + + date := time.Now().UTC().Format("2006-01-02") + wikiPage := wiki.Page{ + Path: "wiki/sources/shape-up-book.md", + Content: "---\ntitle: Shape Up Book\ntype: article\ndomain: product-management\ndate_ingested: " + date + "\nlast_updated: " + date + "\naliases:\n - Shape Up Book\n---\n\n## Summary\n\nA book about Shape Up.\n", + } + + cfg := Config{ + BrainDir: brainDir, + Interval: 50 * time.Millisecond, + Pipeline: pipeline.Config{ + Complete: successComplete(wikiPage), + ChunkSize: 0, + Schema: "# Schema\nThree page types.", + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + Start(ctx, cfg) + + // Poll until the file is moved to processed/. + processedPath := filepath.Join(brainDir, "raw", "processed", date, "shape-up-book.md") + var found bool + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if _, err := os.Stat(processedPath); err == nil { + found = true + break + } + time.Sleep(20 * time.Millisecond) + } + require.True(t, found, "file should be moved to processed/") + + // Original file should be gone. + _, err := os.Stat(rawFile) + assert.True(t, os.IsNotExist(err), "original file should be gone from raw/") + + // Wiki page should exist. + wikiPath := filepath.Join(brainDir, "wiki", "sources", "shape-up-book.md") + _, err = os.Stat(wikiPath) + assert.NoError(t, err, "wiki page should be written") + + // log.md should contain an ingest record. + logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md")) + require.NoError(t, err) + assert.Contains(t, string(logContent), "— ingest") +} + +func TestStart_MovesToFailedOnError(t *testing.T) { + brainDir := setupBrainDir(t) + + rawFile := filepath.Join(brainDir, "raw", "bad-file.md") + require.NoError(t, os.WriteFile(rawFile, []byte("Some content."), 0o644)) + + cfg := Config{ + BrainDir: brainDir, + Interval: 50 * time.Millisecond, + Pipeline: pipeline.Config{ + Complete: errorComplete, + ChunkSize: 0, + Schema: "# Schema\nThree page types.", + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + Start(ctx, cfg) + + // Poll until the file is moved to failed/. + failedPath := filepath.Join(brainDir, "raw", "failed", "bad-file.md") + var found bool + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if _, err := os.Stat(failedPath); err == nil { + found = true + break + } + time.Sleep(20 * time.Millisecond) + } + require.True(t, found, "file should be moved to failed/") + + // Original file should be gone from raw/. + _, err := os.Stat(rawFile) + assert.True(t, os.IsNotExist(err), "original file should be gone from raw/") + + // log.md should contain a watcher error entry. + logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md")) + require.NoError(t, err) + assert.Contains(t, string(logContent), "— watcher error") + assert.Contains(t, string(logContent), "bad-file.md") +} + +func TestDeriveSource(t *testing.T) { + tests := []struct { + filename string + want string + }{ + {"shape-up-book.md", "Shape Up Book"}, + {"raft-consensus.txt", "Raft Consensus"}, + {"my-note.md", "My Note"}, + {"single.md", "Single"}, + {"no-extension", "No Extension"}, + } + + for _, tc := range tests { + t.Run(tc.filename, func(t *testing.T) { + got := deriveSource(tc.filename) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestProcessDir_SkipsSubdirs(t *testing.T) { + brainDir := setupBrainDir(t) + + // Create processed/ and failed/ subdirs with files inside. + for _, sub := range []string{"processed/2026-04-22", "failed"} { + require.NoError(t, os.MkdirAll(filepath.Join(brainDir, "raw", sub), 0o755)) + } + + processedFile := filepath.Join(brainDir, "raw", "processed", "2026-04-22", "old-file.md") + failedFile := filepath.Join(brainDir, "raw", "failed", "broken-file.md") + require.NoError(t, os.WriteFile(processedFile, []byte("old"), 0o644)) + require.NoError(t, os.WriteFile(failedFile, []byte("broken"), 0o644)) + + // Also place a valid file in raw/ root that should be processed. + validFile := filepath.Join(brainDir, "raw", "valid.md") + require.NoError(t, os.WriteFile(validFile, []byte("valid content"), 0o644)) + + date := time.Now().UTC().Format("2006-01-02") + + // Track which sources were passed to Complete. + var processedSources []string + completeFn := func(ctx context.Context, system, user string) (string, error) { + // Record that this was called; return a minimal valid page. + page := wiki.Page{ + Path: "wiki/sources/valid.md", + Content: "---\ntitle: Valid\n---\n\n## Summary\n\nValid.\n", + } + b, _ := json.Marshal([]wiki.Page{page}) + processedSources = append(processedSources, "called") + return string(b), nil + } + + cfg := Config{ + BrainDir: brainDir, + Interval: time.Hour, // not used; we call processDir directly + Pipeline: pipeline.Config{ + Complete: completeFn, + ChunkSize: 0, + Schema: "# Schema\nThree page types.", + }, + } + + errs := processDir(context.Background(), cfg, date) + assert.Empty(t, errs, "no errors expected") + + // Complete should have been called exactly once (for valid.md, not for files in subdirs). + assert.Len(t, processedSources, 1, "only the file in raw/ root should be processed") + + // Files in processed/ and failed/ must remain untouched. + _, err := os.Stat(processedFile) + assert.NoError(t, err, "processed subdir file should be untouched") + _, err = os.Stat(failedFile) + assert.NoError(t, err, "failed subdir file should be untouched") +}