// ingestion/internal/watcher/watcher.go package watcher import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "time" "unicode" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" ) // Config holds watcher configuration. type Config struct { BrainDir string Interval time.Duration Pipeline pipeline.Config } // Start launches the watcher in a background goroutine. // It returns immediately. The watcher stops when ctx is cancelled. func Start(ctx context.Context, cfg Config) { go func() { ticker := time.NewTicker(cfg.Interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: date := time.Now().UTC().Format("2006-01-02") errs := processDir(ctx, cfg, date) for _, err := range errs { slog.Error("watcher: error processing file", "error", err) } } } }() } // processDir walks brain/raw/, processes each eligible file, returns any errors encountered. func processDir(ctx context.Context, cfg Config, date string) []error { rawDir := filepath.Join(cfg.BrainDir, "raw") var errs []error err := filepath.WalkDir(rawDir, func(path string, d os.DirEntry, err error) error { if err != nil { return err } // Skip the root itself. if path == rawDir { return nil } // Skip processed/ and failed/ subdirectories entirely. if d.IsDir() { name := d.Name() if name == "processed" || name == "failed" { return filepath.SkipDir } return nil } // Only process supported extensions. ext := strings.ToLower(filepath.Ext(path)) if ext != ".md" && ext != ".txt" && ext != ".pdf" { return nil } if err := processFile(ctx, cfg, path, date); err != nil { errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err)) } return nil }) if err != nil { errs = append(errs, fmt.Errorf("walk raw dir: %w", err)) } return errs } // processFile reads a file, calls pipeline.Run, moves it to processed/ or failed/. func processFile(ctx context.Context, cfg Config, path, date string) error { filename := filepath.Base(path) source := deriveSource(filename) content, err := os.ReadFile(path) if err != nil { return fmt.Errorf("read file: %w", err) } _, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, string(content), source, false) if runErr != nil { // Move to failed/. failedDir := filepath.Join(cfg.BrainDir, "raw", "failed") if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil { return fmt.Errorf("mkdir failed dir: %w", mkErr) } dest := filepath.Join(failedDir, filename) if mvErr := os.Rename(path, dest); mvErr != nil { return fmt.Errorf("move to failed: %w", mvErr) } slog.Warn("watcher: file failed, moved to failed/", "file", filename, "error", runErr) if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil { slog.Error("watcher: failed to write log entry", "error", logErr) } return runErr } // Move to processed/YYYY-MM-DD/. processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date) if err := os.MkdirAll(processedDir, 0o755); err != nil { return fmt.Errorf("mkdir processed dir: %w", err) } dest := filepath.Join(processedDir, filename) if err := os.Rename(path, dest); err != nil { return fmt.Errorf("move to processed: %w", err) } slog.Info("watcher: file processed", "file", filename, "source", source) return nil } // deriveSource turns a filename into a human-readable source name. // "shape-up-book.md" → "Shape Up Book" func deriveSource(filename string) string { // Strip extension. name := strings.TrimSuffix(filename, filepath.Ext(filename)) // Split on hyphens. words := strings.Split(name, "-") // Title-case each word. for i, w := range words { if w == "" { continue } runes := []rune(w) runes[0] = unicode.ToUpper(runes[0]) words[i] = string(runes) } return strings.Join(words, " ") } // appendWatcherLog appends a watcher error entry to brain/log.md. func appendWatcherLog(brainDir, filename string, runErr error, date string) error { entry := fmt.Sprintf("## %s — watcher error\n\n- **File:** %s\n- **Error:** %s\n\n", date, filename, runErr.Error()) logPath := filepath.Join(brainDir, "log.md") f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return fmt.Errorf("open log: %w", err) } defer f.Close() if _, err = f.WriteString(entry); err != nil { return fmt.Errorf("write log: %w", err) } return nil }