Files
hyperguild/ingestion/internal/claudewatcher/watcher.go
Mathias bc011cc1f0 feat(claudewatcher): ingest Claude Code session transcripts into brain
New package internal/claudewatcher. The volume gate (24 turns/week of
agentsquad logs vs 500/week gate) exposed that the real signal lives
in daily Claude Code usage at ~/.claude/projects/*/<uuid>.jsonl, not
in agentsquad output. This package captures that signal. See infra#73
Track E + hyperguild#27 for the full reframe.

Components:
- parser: tolerant JSONL parser over the observed Claude Code session
  schema (user / assistant / attachment / system + bookkeeping types).
  Skip-flag fast-paths queue-operation, last-prompt, permission-mode,
  ai-title, bridge-session, file-history-snapshot.
- scrubber: 11-rule fail-closed regex set for credential shapes
  (bearer, postgres URIs, PEM, ssh-key, ghp_/sk-/sk-ant-/AKIA, homelab
  env tokens, SOPS markers). Drop turn + log on match.
- cursor: postgres-backed claude_session_cursors table, keyed by
  (host, file_path) with byte_offset. Resumable across pod restarts.
- watcher: poll loop. Walks SessionsDir, processes each .jsonl from
  its cursor offset, runs scrubber, emits a Batch per file to a
  Sink interface, advances cursor on successful Ingest.

No classifier integration in this commit — every kept turn is emitted
in a per-session batch. The cmd/server wiring (next commit) routes
batches to brain/wiki/claude-sessions/facts/. Classifier-driven hall
routing (decisions / failures / hypotheses) is a follow-up.

19 unit tests across parser + scrubber + watcher. task check green.

Refs: infra#73, hyperguild#27
2026-05-25 19:58:58 +02:00

235 lines
6.1 KiB
Go

package claudewatcher
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
)
// Sink consumes batches of ingest-ready turns from the watcher. The
// production implementation builds wiki pages and calls pipeline.RunRaw
// against the brain. Tests substitute a counter.
//
// A Batch represents the turns ingested from one session file between
// two cursor checkpoints. Implementations must be idempotent — the
// watcher only advances the cursor on a nil return.
type Sink interface {
Ingest(ctx context.Context, b Batch) error
}
// Batch is a per-file slice of turns plus identifying metadata.
type Batch struct {
Host string // origin host, e.g. "koala"
FilePath string // absolute path to the source .jsonl file
SessionID string // first session_id seen in the batch
ProjectID string // basename of the parent dir, e.g. "-home-mathias-dev"
Turns []Turn // never empty; caller filters Skip + scrubber matches
}
// Config drives one Watch loop. SessionsDir is the absolute path to the
// Claude Code projects directory (~/.claude/projects). Host is the
// label written into cursors and ingested page frontmatter. Interval
// is the poll cadence; a zero or negative value disables the loop.
//
// Sink is required. Cursors is optional — when nil the watcher
// re-reads from byte 0 on every tick (useful for first-run testing
// without a postgres dependency).
type Config struct {
SessionsDir string
Host string
Interval time.Duration
Sink Sink
Cursors *CursorStore
Logger *slog.Logger
}
// Watch runs the polling loop until ctx is cancelled. Returns ctx.Err()
// on shutdown. Each tick walks SessionsDir for *.jsonl files, advances
// each file's cursor, and emits one Batch per file with new turns.
// Errors during a single file's parse or ingest are logged but do not
// abort the loop — a single bad file shouldn't block the others.
func Watch(ctx context.Context, cfg Config) error {
if cfg.SessionsDir == "" {
return fmt.Errorf("sessions dir is required")
}
if cfg.Sink == nil {
return fmt.Errorf("sink is required")
}
if cfg.Interval <= 0 {
return fmt.Errorf("interval must be positive")
}
if cfg.Host == "" {
cfg.Host = "unknown"
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
cfg.Logger.Info("claudewatcher: started",
"sessions_dir", cfg.SessionsDir,
"host", cfg.Host,
"interval", cfg.Interval)
ticker := time.NewTicker(cfg.Interval)
defer ticker.Stop()
// Run an immediate first sweep so first-launch users don't wait one
// tick before anything happens.
runTick(ctx, cfg)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
runTick(ctx, cfg)
}
}
}
// runTick is one polling pass. Exposed (lowercase) for tests via
// TickOnce.
func runTick(ctx context.Context, cfg Config) {
files, err := listSessionFiles(cfg.SessionsDir)
if err != nil {
cfg.Logger.Warn("claudewatcher: list session files", "err", err)
return
}
for _, f := range files {
if ctx.Err() != nil {
return
}
if err := processFile(ctx, cfg, f); err != nil {
cfg.Logger.Warn("claudewatcher: file failed",
"path", f, "err", err)
}
}
}
// TickOnce runs one sweep synchronously and returns. Used by tests +
// by ad-hoc CLI invocations.
func TickOnce(ctx context.Context, cfg Config) error {
if cfg.SessionsDir == "" || cfg.Sink == nil {
return fmt.Errorf("config invalid")
}
if cfg.Host == "" {
cfg.Host = "unknown"
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
runTick(ctx, cfg)
return nil
}
func listSessionFiles(root string) ([]string, error) {
var out []string
err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error {
if walkErr != nil {
return walkErr
}
if d.IsDir() {
return nil
}
if !strings.HasSuffix(path, ".jsonl") {
return nil
}
out = append(out, path)
return nil
})
if err != nil {
return nil, fmt.Errorf("walk %s: %w", root, err)
}
return out, nil
}
func processFile(ctx context.Context, cfg Config, path string) error {
startOffset := int64(0)
if cfg.Cursors != nil {
off, _, err := cfg.Cursors.GetOffset(ctx, cfg.Host, path)
if err != nil {
return fmt.Errorf("get cursor: %w", err)
}
startOffset = off
}
stat, err := os.Stat(path)
if err != nil {
return fmt.Errorf("stat: %w", err)
}
if stat.Size() <= startOffset {
return nil // nothing new
}
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer func() { _ = f.Close() }()
if _, err := f.Seek(startOffset, 0); err != nil {
return fmt.Errorf("seek: %w", err)
}
var keep []Turn
var sessionID string
var droppedScrub int
endOffset, err := ParseStream(f, startOffset,
func(format string, args ...any) {
cfg.Logger.Warn(fmt.Sprintf("claudewatcher: parse: "+format, args...))
},
func(t Turn) error {
if t.Skip || t.Content == "" {
return nil
}
if rule := Scrub(t.Content); rule != "" {
droppedScrub++
cfg.Logger.Warn("claudewatcher: turn dropped by scrubber",
"rule", rule, "path", path, "session_id", t.SessionID)
return nil
}
if sessionID == "" {
sessionID = t.SessionID
}
keep = append(keep, t)
return nil
})
if err != nil {
return fmt.Errorf("parse stream: %w", err)
}
if len(keep) == 0 {
if cfg.Cursors != nil {
if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil {
return fmt.Errorf("advance cursor (no-turns): %w", err)
}
}
if droppedScrub > 0 {
cfg.Logger.Info("claudewatcher: only scrubbed turns this tick",
"path", path, "dropped", droppedScrub)
}
return nil
}
batch := Batch{
Host: cfg.Host,
FilePath: path,
SessionID: sessionID,
ProjectID: filepath.Base(filepath.Dir(path)),
Turns: keep,
}
if err := cfg.Sink.Ingest(ctx, batch); err != nil {
return fmt.Errorf("sink ingest: %w", err)
}
if cfg.Cursors != nil {
if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil {
return fmt.Errorf("advance cursor: %w", err)
}
}
cfg.Logger.Info("claudewatcher: ingested batch",
"path", path, "session_id", sessionID,
"turns_kept", len(keep), "dropped_scrub", droppedScrub,
"new_offset", endOffset)
return nil
}