feat(ingestion): add background file watcher for brain/raw/
Polls brain/raw/ on a configurable ticker, derives human-readable source names from filenames, runs the pipeline, and moves files to processed/YYYY-MM-DD/ on success or failed/ on error with a log.md entry. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
165
ingestion/internal/watcher/watcher.go
Normal file
165
ingestion/internal/watcher/watcher.go
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
// ingestion/internal/watcher/watcher.go
|
||||||
|
package watcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
"unicode"
|
||||||
|
|
||||||
|
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config holds watcher configuration.
|
||||||
|
type Config struct {
|
||||||
|
BrainDir string
|
||||||
|
Interval time.Duration
|
||||||
|
Pipeline pipeline.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start launches the watcher in a background goroutine.
|
||||||
|
// It returns immediately. The watcher stops when ctx is cancelled.
|
||||||
|
func Start(ctx context.Context, cfg Config) {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(cfg.Interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
date := time.Now().UTC().Format("2006-01-02")
|
||||||
|
errs := processDir(ctx, cfg, date)
|
||||||
|
for _, err := range errs {
|
||||||
|
slog.Error("watcher: error processing file", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// processDir walks brain/raw/, processes each eligible file, returns any errors encountered.
|
||||||
|
func processDir(ctx context.Context, cfg Config, date string) []error {
|
||||||
|
rawDir := filepath.Join(cfg.BrainDir, "raw")
|
||||||
|
|
||||||
|
var errs []error
|
||||||
|
err := filepath.WalkDir(rawDir, func(path string, d os.DirEntry, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip the root itself.
|
||||||
|
if path == rawDir {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip processed/ and failed/ subdirectories entirely.
|
||||||
|
if d.IsDir() {
|
||||||
|
name := d.Name()
|
||||||
|
if name == "processed" || name == "failed" {
|
||||||
|
return filepath.SkipDir
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only process supported extensions.
|
||||||
|
ext := strings.ToLower(filepath.Ext(path))
|
||||||
|
if ext != ".md" && ext != ".txt" && ext != ".pdf" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := processFile(ctx, cfg, path, date); err != nil {
|
||||||
|
errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, fmt.Errorf("walk raw dir: %w", err))
|
||||||
|
}
|
||||||
|
return errs
|
||||||
|
}
|
||||||
|
|
||||||
|
// processFile reads a file, calls pipeline.Run, moves it to processed/ or failed/.
|
||||||
|
func processFile(ctx context.Context, cfg Config, path, date string) error {
|
||||||
|
filename := filepath.Base(path)
|
||||||
|
source := deriveSource(filename)
|
||||||
|
|
||||||
|
content, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, string(content), source, false)
|
||||||
|
if runErr != nil {
|
||||||
|
// Move to failed/.
|
||||||
|
failedDir := filepath.Join(cfg.BrainDir, "raw", "failed")
|
||||||
|
if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil {
|
||||||
|
return fmt.Errorf("mkdir failed dir: %w", mkErr)
|
||||||
|
}
|
||||||
|
dest := filepath.Join(failedDir, filename)
|
||||||
|
if mvErr := os.Rename(path, dest); mvErr != nil {
|
||||||
|
return fmt.Errorf("move to failed: %w", mvErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Warn("watcher: file failed, moved to failed/", "file", filename, "error", runErr)
|
||||||
|
|
||||||
|
if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil {
|
||||||
|
slog.Error("watcher: failed to write log entry", "error", logErr)
|
||||||
|
}
|
||||||
|
return runErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move to processed/YYYY-MM-DD/.
|
||||||
|
processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date)
|
||||||
|
if err := os.MkdirAll(processedDir, 0o755); err != nil {
|
||||||
|
return fmt.Errorf("mkdir processed dir: %w", err)
|
||||||
|
}
|
||||||
|
dest := filepath.Join(processedDir, filename)
|
||||||
|
if err := os.Rename(path, dest); err != nil {
|
||||||
|
return fmt.Errorf("move to processed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("watcher: file processed", "file", filename, "source", source)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deriveSource turns a filename into a human-readable source name.
|
||||||
|
// "shape-up-book.md" → "Shape Up Book"
|
||||||
|
func deriveSource(filename string) string {
|
||||||
|
// Strip extension.
|
||||||
|
name := strings.TrimSuffix(filename, filepath.Ext(filename))
|
||||||
|
// Split on hyphens.
|
||||||
|
words := strings.Split(name, "-")
|
||||||
|
// Title-case each word.
|
||||||
|
for i, w := range words {
|
||||||
|
if w == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
runes := []rune(w)
|
||||||
|
runes[0] = unicode.ToUpper(runes[0])
|
||||||
|
words[i] = string(runes)
|
||||||
|
}
|
||||||
|
return strings.Join(words, " ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// appendWatcherLog appends a watcher error entry to brain/log.md.
|
||||||
|
func appendWatcherLog(brainDir, filename string, runErr error, date string) error {
|
||||||
|
entry := fmt.Sprintf("## %s — watcher error\n\n- **File:** %s\n- **Error:** %s\n\n",
|
||||||
|
date, filename, runErr.Error())
|
||||||
|
|
||||||
|
logPath := filepath.Join(brainDir, "log.md")
|
||||||
|
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open log: %w", err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
if _, err = f.WriteString(entry); err != nil {
|
||||||
|
return fmt.Errorf("write log: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
219
ingestion/internal/watcher/watcher_test.go
Normal file
219
ingestion/internal/watcher/watcher_test.go
Normal file
@@ -0,0 +1,219 @@
|
|||||||
|
// ingestion/internal/watcher/watcher_test.go
|
||||||
|
package watcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||||
|
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
|
||||||
|
)
|
||||||
|
|
||||||
|
// successComplete returns a valid JSON-encoded page array for any call.
|
||||||
|
func successComplete(page wiki.Page) pipeline.CompleteFunc {
|
||||||
|
return func(ctx context.Context, system, user string) (string, error) {
|
||||||
|
b, err := json.Marshal([]wiki.Page{page})
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(b), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// errorComplete always returns an error simulating an LLM failure.
|
||||||
|
func errorComplete(_ context.Context, _, _ string) (string, error) {
|
||||||
|
return "", fmt.Errorf("LLM unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupBrainDir(t *testing.T) string {
|
||||||
|
t.Helper()
|
||||||
|
brainDir := t.TempDir()
|
||||||
|
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources", "raw"} {
|
||||||
|
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
|
||||||
|
}
|
||||||
|
return brainDir
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStart_ProcessesFile(t *testing.T) {
|
||||||
|
brainDir := setupBrainDir(t)
|
||||||
|
|
||||||
|
// Place a .md file in raw/.
|
||||||
|
rawFile := filepath.Join(brainDir, "raw", "shape-up-book.md")
|
||||||
|
require.NoError(t, os.WriteFile(rawFile, []byte("Content about Shape Up."), 0o644))
|
||||||
|
|
||||||
|
date := time.Now().UTC().Format("2006-01-02")
|
||||||
|
wikiPage := wiki.Page{
|
||||||
|
Path: "wiki/sources/shape-up-book.md",
|
||||||
|
Content: "---\ntitle: Shape Up Book\ntype: article\ndomain: product-management\ndate_ingested: " + date + "\nlast_updated: " + date + "\naliases:\n - Shape Up Book\n---\n\n## Summary\n\nA book about Shape Up.\n",
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := Config{
|
||||||
|
BrainDir: brainDir,
|
||||||
|
Interval: 50 * time.Millisecond,
|
||||||
|
Pipeline: pipeline.Config{
|
||||||
|
Complete: successComplete(wikiPage),
|
||||||
|
ChunkSize: 0,
|
||||||
|
Schema: "# Schema\nThree page types.",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
Start(ctx, cfg)
|
||||||
|
|
||||||
|
// Poll until the file is moved to processed/.
|
||||||
|
processedPath := filepath.Join(brainDir, "raw", "processed", date, "shape-up-book.md")
|
||||||
|
var found bool
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if _, err := os.Stat(processedPath); err == nil {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
}
|
||||||
|
require.True(t, found, "file should be moved to processed/")
|
||||||
|
|
||||||
|
// Original file should be gone.
|
||||||
|
_, err := os.Stat(rawFile)
|
||||||
|
assert.True(t, os.IsNotExist(err), "original file should be gone from raw/")
|
||||||
|
|
||||||
|
// Wiki page should exist.
|
||||||
|
wikiPath := filepath.Join(brainDir, "wiki", "sources", "shape-up-book.md")
|
||||||
|
_, err = os.Stat(wikiPath)
|
||||||
|
assert.NoError(t, err, "wiki page should be written")
|
||||||
|
|
||||||
|
// log.md should contain an ingest record.
|
||||||
|
logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Contains(t, string(logContent), "— ingest")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStart_MovesToFailedOnError(t *testing.T) {
|
||||||
|
brainDir := setupBrainDir(t)
|
||||||
|
|
||||||
|
rawFile := filepath.Join(brainDir, "raw", "bad-file.md")
|
||||||
|
require.NoError(t, os.WriteFile(rawFile, []byte("Some content."), 0o644))
|
||||||
|
|
||||||
|
cfg := Config{
|
||||||
|
BrainDir: brainDir,
|
||||||
|
Interval: 50 * time.Millisecond,
|
||||||
|
Pipeline: pipeline.Config{
|
||||||
|
Complete: errorComplete,
|
||||||
|
ChunkSize: 0,
|
||||||
|
Schema: "# Schema\nThree page types.",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
Start(ctx, cfg)
|
||||||
|
|
||||||
|
// Poll until the file is moved to failed/.
|
||||||
|
failedPath := filepath.Join(brainDir, "raw", "failed", "bad-file.md")
|
||||||
|
var found bool
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if _, err := os.Stat(failedPath); err == nil {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
}
|
||||||
|
require.True(t, found, "file should be moved to failed/")
|
||||||
|
|
||||||
|
// Original file should be gone from raw/.
|
||||||
|
_, err := os.Stat(rawFile)
|
||||||
|
assert.True(t, os.IsNotExist(err), "original file should be gone from raw/")
|
||||||
|
|
||||||
|
// log.md should contain a watcher error entry.
|
||||||
|
logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Contains(t, string(logContent), "— watcher error")
|
||||||
|
assert.Contains(t, string(logContent), "bad-file.md")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeriveSource(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
filename string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{"shape-up-book.md", "Shape Up Book"},
|
||||||
|
{"raft-consensus.txt", "Raft Consensus"},
|
||||||
|
{"my-note.md", "My Note"},
|
||||||
|
{"single.md", "Single"},
|
||||||
|
{"no-extension", "No Extension"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.filename, func(t *testing.T) {
|
||||||
|
got := deriveSource(tc.filename)
|
||||||
|
assert.Equal(t, tc.want, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessDir_SkipsSubdirs(t *testing.T) {
|
||||||
|
brainDir := setupBrainDir(t)
|
||||||
|
|
||||||
|
// Create processed/ and failed/ subdirs with files inside.
|
||||||
|
for _, sub := range []string{"processed/2026-04-22", "failed"} {
|
||||||
|
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, "raw", sub), 0o755))
|
||||||
|
}
|
||||||
|
|
||||||
|
processedFile := filepath.Join(brainDir, "raw", "processed", "2026-04-22", "old-file.md")
|
||||||
|
failedFile := filepath.Join(brainDir, "raw", "failed", "broken-file.md")
|
||||||
|
require.NoError(t, os.WriteFile(processedFile, []byte("old"), 0o644))
|
||||||
|
require.NoError(t, os.WriteFile(failedFile, []byte("broken"), 0o644))
|
||||||
|
|
||||||
|
// Also place a valid file in raw/ root that should be processed.
|
||||||
|
validFile := filepath.Join(brainDir, "raw", "valid.md")
|
||||||
|
require.NoError(t, os.WriteFile(validFile, []byte("valid content"), 0o644))
|
||||||
|
|
||||||
|
date := time.Now().UTC().Format("2006-01-02")
|
||||||
|
|
||||||
|
// Track which sources were passed to Complete.
|
||||||
|
var processedSources []string
|
||||||
|
completeFn := func(ctx context.Context, system, user string) (string, error) {
|
||||||
|
// Record that this was called; return a minimal valid page.
|
||||||
|
page := wiki.Page{
|
||||||
|
Path: "wiki/sources/valid.md",
|
||||||
|
Content: "---\ntitle: Valid\n---\n\n## Summary\n\nValid.\n",
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal([]wiki.Page{page})
|
||||||
|
processedSources = append(processedSources, "called")
|
||||||
|
return string(b), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := Config{
|
||||||
|
BrainDir: brainDir,
|
||||||
|
Interval: time.Hour, // not used; we call processDir directly
|
||||||
|
Pipeline: pipeline.Config{
|
||||||
|
Complete: completeFn,
|
||||||
|
ChunkSize: 0,
|
||||||
|
Schema: "# Schema\nThree page types.",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := processDir(context.Background(), cfg, date)
|
||||||
|
assert.Empty(t, errs, "no errors expected")
|
||||||
|
|
||||||
|
// Complete should have been called exactly once (for valid.md, not for files in subdirs).
|
||||||
|
assert.Len(t, processedSources, 1, "only the file in raw/ root should be processed")
|
||||||
|
|
||||||
|
// Files in processed/ and failed/ must remain untouched.
|
||||||
|
_, err := os.Stat(processedFile)
|
||||||
|
assert.NoError(t, err, "processed subdir file should be untouched")
|
||||||
|
_, err = os.Stat(failedFile)
|
||||||
|
assert.NoError(t, err, "failed subdir file should be untouched")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user