Files
hyperguild/ingestion/internal/watcher/watcher.go
Mathias Bergqvist 3607920601
All checks were successful
cd / Build and deploy (push) Successful in 10s
CI / Lint / Test / Vet (push) Successful in 10s
CI / Mirror to GitHub (push) Successful in 3s
fix(lint): resolve all errcheck violations in ingestion module
2026-04-23 16:20:59 +02:00

211 lines
6.0 KiB
Go

// ingestion/internal/watcher/watcher.go
package watcher
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
"unicode"
"github.com/mathiasbq/hyperguild/ingestion/internal/extract"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
)
// Config holds watcher configuration.
type Config struct {
BrainDir string
Interval time.Duration
Pipeline pipeline.Config
}
// Start launches the watcher in a background goroutine.
// It returns immediately. The watcher stops when ctx is cancelled.
func Start(ctx context.Context, cfg Config) {
go func() {
ticker := time.NewTicker(cfg.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
date := time.Now().UTC().Format("2006-01-02")
errs := processDir(ctx, cfg, date)
for _, err := range errs {
slog.Error("watcher: error processing file", "error", err)
}
}
}
}()
}
// processDir walks brain/raw/, processes each eligible file, returns any errors encountered.
func processDir(ctx context.Context, cfg Config, date string) []error {
rawDir := filepath.Join(cfg.BrainDir, "raw")
var errs []error
err := filepath.WalkDir(rawDir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
// Skip the root itself.
if path == rawDir {
return nil
}
// Skip processed/ and failed/ subdirectories entirely.
if d.IsDir() {
name := d.Name()
if name == "processed" || name == "failed" {
return filepath.SkipDir
}
return nil
}
// Only process supported extensions.
ext := strings.ToLower(filepath.Ext(path))
if ext != ".md" && ext != ".txt" && ext != ".pdf" {
return nil
}
// Skip files that have already been processed or permanently failed.
if _, err := os.Stat(path + ".processed"); err == nil {
return nil
}
if _, err := os.Stat(path + ".failed"); err == nil {
return nil
}
if err := processFile(ctx, cfg, path, date); err != nil {
errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err))
}
return nil
})
if err != nil {
errs = append(errs, fmt.Errorf("walk raw dir: %w", err))
}
return errs
}
// processFile reads a file, calls pipeline.Run, copies it to processed/ or failed/,
// and writes a marker file next to the original so the watcher skips it next poll.
// The original file is never deleted, keeping Syncthing-connected vaults (e.g. Obsidian) intact.
func processFile(ctx context.Context, cfg Config, path, date string) error {
filename := filepath.Base(path)
source := deriveSource(filename)
content, err := extract.Text(path)
if err != nil {
return fmt.Errorf("extract text: %w", err)
}
_, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, content, source, false)
if runErr != nil {
// Copy to failed/ and leave a .failed marker so we don't retry.
failedDir := filepath.Join(cfg.BrainDir, "raw", "failed")
if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil {
return fmt.Errorf("mkdir failed dir: %w", mkErr)
}
dest := filepath.Join(failedDir, filename)
if cpErr := copyFile(path, dest); cpErr != nil {
return fmt.Errorf("copy to failed: %w", cpErr)
}
if mkErr := os.WriteFile(path+".failed", []byte(runErr.Error()), 0o644); mkErr != nil {
slog.Error("watcher: failed to write .failed marker", "error", mkErr)
}
slog.Warn("watcher: file failed", "file", filename, "error", runErr)
if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil {
slog.Error("watcher: failed to write log entry", "error", logErr)
}
// Return nil: quarantine succeeded; error already logged.
return nil
}
// Copy to processed/YYYY-MM-DD/ and leave a .processed marker so we don't re-ingest.
processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date)
if err := os.MkdirAll(processedDir, 0o755); err != nil {
return fmt.Errorf("mkdir processed dir: %w", err)
}
dest := filepath.Join(processedDir, filename)
if _, err := os.Stat(dest); err == nil {
// Archive copy already exists; append timestamp to avoid overwriting.
ext := filepath.Ext(filename)
base := strings.TrimSuffix(filename, ext)
dest = filepath.Join(processedDir, base+"-"+time.Now().UTC().Format("150405")+ext)
}
if err := copyFile(path, dest); err != nil {
return fmt.Errorf("copy to processed: %w", err)
}
if err := os.WriteFile(path+".processed", []byte(date), 0o644); err != nil {
slog.Error("watcher: failed to write .processed marker", "error", err)
}
slog.Info("watcher: file processed", "file", filename, "source", source)
return nil
}
// copyFile copies src to dst, creating dst if it doesn't exist.
func copyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return fmt.Errorf("open src: %w", err)
}
defer in.Close() //nolint:errcheck
out, err := os.Create(dst)
if err != nil {
return fmt.Errorf("create dst: %w", err)
}
if _, err := io.Copy(out, in); err != nil {
out.Close() //nolint:errcheck
return fmt.Errorf("copy: %w", err)
}
return out.Close()
}
// deriveSource turns a filename into a human-readable source name.
// "shape-up-book.md" → "Shape Up Book"
func deriveSource(filename string) string {
// Strip extension.
name := strings.TrimSuffix(filename, filepath.Ext(filename))
// Split on hyphens.
words := strings.Split(name, "-")
// Title-case each word.
for i, w := range words {
if w == "" {
continue
}
runes := []rune(w)
runes[0] = unicode.ToUpper(runes[0])
words[i] = string(runes)
}
return strings.Join(words, " ")
}
// appendWatcherLog appends a watcher error entry to brain/log.md.
func appendWatcherLog(brainDir, filename string, runErr error, date string) error {
entry := fmt.Sprintf("## %s — watcher error\n\n- **File:** %s\n- **Error:** %s\n\n",
date, filename, runErr.Error())
logPath := filepath.Join(brainDir, "log.md")
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open log: %w", err)
}
if _, err = f.WriteString(entry); err != nil {
f.Close() //nolint:errcheck
return fmt.Errorf("write log: %w", err)
}
return f.Close()
}