diff --git a/ingestion/internal/claudewatcher/cursor.go b/ingestion/internal/claudewatcher/cursor.go new file mode 100644 index 0000000..f53575b --- /dev/null +++ b/ingestion/internal/claudewatcher/cursor.go @@ -0,0 +1,110 @@ +package claudewatcher + +import ( + "context" + "errors" + "fmt" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// CursorStore tracks how far the watcher has ingested into each +// session JSONL file. Keyed by (host, file_path) so the same `~/.claude` +// path on different hosts doesn't collide and resumability survives +// pod restarts. Idempotent Init lives alongside the rest of the +// claudewatcher schema; no separate migration framework. +type CursorStore struct { + pool *pgxpool.Pool +} + +// NewCursorStore opens a pool against dsn. Caller closes the store. +func NewCursorStore(ctx context.Context, dsn string) (*CursorStore, error) { + pool, err := pgxpool.New(ctx, dsn) + if err != nil { + return nil, fmt.Errorf("pgxpool: %w", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping: %w", err) + } + return &CursorStore{pool: pool}, nil +} + +// NewCursorStoreFromPool wraps an existing pool (so the watcher can +// share the brain DSN pool with vectorstore/graphstore without a +// second connection set). Caller must NOT close the wrapped pool via +// the store — close the pool directly. +func NewCursorStoreFromPool(pool *pgxpool.Pool) *CursorStore { + return &CursorStore{pool: pool} +} + +// Close releases the underlying connection pool when this store owns +// it. No-op when the pool was injected via NewCursorStoreFromPool — +// pgxpool.Close is idempotent so we lean on that. +func (s *CursorStore) Close() { + if s.pool != nil { + s.pool.Close() + } +} + +// Init creates the claude_session_cursors table when missing. +func (s *CursorStore) Init(ctx context.Context) error { + const ddl = ` +CREATE TABLE IF NOT EXISTS claude_session_cursors ( + host TEXT NOT NULL, + file_path TEXT NOT NULL, + byte_offset BIGINT NOT NULL DEFAULT 0, + last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (host, file_path) +); +CREATE INDEX IF NOT EXISTS claude_session_cursors_host_idx + ON claude_session_cursors (host); +` + _, err := s.pool.Exec(ctx, ddl) + return err +} + +// GetOffset returns the last recorded byte offset for (host, filePath). +// Missing rows are reported as offset=0, ok=false so the caller can +// distinguish "never ingested" from "ingested at the start of the +// file" (both produce identical behaviour but the metric is useful). +func (s *CursorStore) GetOffset(ctx context.Context, host, filePath string) (int64, bool, error) { + if host == "" || filePath == "" { + return 0, false, errors.New("host and file_path are required") + } + var offset int64 + err := s.pool.QueryRow(ctx, ` + SELECT byte_offset FROM claude_session_cursors WHERE host = $1 AND file_path = $2 + `, host, filePath).Scan(&offset) + if errors.Is(err, pgx.ErrNoRows) { + return 0, false, nil + } + if err != nil { + return 0, false, fmt.Errorf("query: %w", err) + } + return offset, true, nil +} + +// SetOffset writes the new offset for (host, filePath). Used after +// every successful parse + ingest batch so a crash mid-file rewinds +// only to the last committed checkpoint. +func (s *CursorStore) SetOffset(ctx context.Context, host, filePath string, offset int64) error { + if host == "" || filePath == "" { + return errors.New("host and file_path are required") + } + if offset < 0 { + return errors.New("offset must be >= 0") + } + _, err := s.pool.Exec(ctx, ` + INSERT INTO claude_session_cursors (host, file_path, byte_offset, last_seen_at) + VALUES ($1, $2, $3, now()) + ON CONFLICT (host, file_path) DO UPDATE + SET byte_offset = EXCLUDED.byte_offset, + last_seen_at = now() + `, host, filePath, offset) + if err != nil { + return fmt.Errorf("upsert offset: %w", err) + } + return nil +} diff --git a/ingestion/internal/claudewatcher/parser.go b/ingestion/internal/claudewatcher/parser.go new file mode 100644 index 0000000..7b39d82 --- /dev/null +++ b/ingestion/internal/claudewatcher/parser.go @@ -0,0 +1,305 @@ +// Package claudewatcher ingests Claude Code session transcripts +// (`~/.claude/projects/*/.jsonl`) into the brain corpus. +// +// Schema (observed 2026-05-25 across ~30 session files on koala): +// +// type=user — user prompts + tool results +// type=assistant — model turns; tool_use blocks live in message.content +// type=attachment — hook outputs, ingested files +// type=system — turn-boundary metadata +// type=file-history-snapshot — git-style snapshot of edited files +// type=queue-operation, last-prompt, permission-mode, ai-title, +// bridge-session — internal bookkeeping, ignored +// +// The parser is intentionally tolerant: malformed lines are skipped +// (caller logs and advances), missing optional fields default to "", +// and unknown `type` values are returned as Turn entries with +// `Skip=true` so callers can filter cheaply. +package claudewatcher + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "strings" + "time" +) + +// Turn is one parsed JSONL entry from a Claude Code session log. +// +// Skip is true for entry types we never want to ingest (queue +// bookkeeping, snapshots, etc.). Callers fast-path these without +// running the scrubber or classifier. +type Turn struct { + SessionID string + Type string + ParentUUID string + Timestamp time.Time + Cwd string + GitBranch string + Content string // plain-text projection of the entry, ready for the scrubber/classifier + ToolName string // populated when an assistant turn invokes a tool + OffsetAfter int64 // byte offset in the file just past this entry + Skip bool + ParseWarning string // non-empty when the entry parsed but had a sub-field we couldn't normalise +} + +// ParseStream reads JSONL lines from r starting at startOffset and +// invokes emit for each parsed entry. emit may return ErrStop to +// terminate the scan cleanly. Other emit errors propagate. +// +// startOffset is informational — the caller is expected to have already +// seeked the underlying reader to that offset. ParseStream adds the +// number of bytes consumed per line to it to compute Turn.OffsetAfter. +// +// Lines that fail to unmarshal are logged via warnf and skipped; they +// do NOT advance OffsetAfter past the malformed line by themselves, +// but the next valid line resumes correctly because bufio.Scanner +// preserves stream position. +func ParseStream( + r io.Reader, + startOffset int64, + warnf func(format string, args ...any), + emit func(Turn) error, +) (int64, error) { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024) // some lines are big (tool outputs) + + offset := startOffset + for scanner.Scan() { + raw := scanner.Bytes() + lineLen := int64(len(raw)) + 1 // +1 for the newline + t, err := parseTurn(raw) + if err != nil { + if warnf != nil { + warnf("parse: %v (%d bytes)", err, len(raw)) + } + offset += lineLen + continue + } + t.OffsetAfter = offset + lineLen + if err := emit(t); err != nil { + if errors.Is(err, ErrStop) { + return t.OffsetAfter, nil + } + return offset, fmt.Errorf("emit: %w", err) + } + offset = t.OffsetAfter + } + if err := scanner.Err(); err != nil { + return offset, fmt.Errorf("scan: %w", err) + } + return offset, nil +} + +// ErrStop terminates a ParseStream loop without surfacing an error. +var ErrStop = errors.New("claudewatcher: stop") + +// rawEntry is a permissive shape that covers every type observed in +// the JSONL files. Fields we don't care about are intentionally +// omitted to keep the unmarshal cheap. +type rawEntry struct { + Type string `json:"type"` + SessionID string `json:"sessionId"` + ParentUUID string `json:"parentUuid"` + Timestamp string `json:"timestamp"` + Cwd string `json:"cwd"` + GitBranch string `json:"gitBranch"` + Message json.RawMessage `json:"message"` + Attachment json.RawMessage `json:"attachment"` + Content string `json:"content"` // queue-operation + LastPrompt string `json:"lastPrompt"` // last-prompt + Subtype string `json:"subtype"` // system +} + +// skipTypes lists every entry type we want to never ingest. Marked Skip +// at parse time so the caller's filter is a single boolean check. +var skipTypes = map[string]struct{}{ + "queue-operation": {}, + "last-prompt": {}, + "permission-mode": {}, + "ai-title": {}, + "bridge-session": {}, + "file-history-snapshot": {}, +} + +func parseTurn(raw []byte) (Turn, error) { + var e rawEntry + if err := json.Unmarshal(raw, &e); err != nil { + return Turn{}, fmt.Errorf("unmarshal: %w", err) + } + t := Turn{ + Type: e.Type, + SessionID: e.SessionID, + ParentUUID: e.ParentUUID, + Cwd: e.Cwd, + GitBranch: e.GitBranch, + } + if _, skip := skipTypes[e.Type]; skip { + t.Skip = true + return t, nil + } + if e.Timestamp != "" { + if ts, err := time.Parse(time.RFC3339Nano, e.Timestamp); err == nil { + t.Timestamp = ts + } else { + t.ParseWarning = "timestamp" + } + } + + switch e.Type { + case "user": + t.Content = extractMessageText(e.Message) + case "assistant": + t.Content, t.ToolName = extractAssistantTurn(e.Message) + case "attachment": + t.Content = extractAttachmentText(e.Attachment) + case "system": + t.Content = "[system " + e.Subtype + "]" + default: + // Unknown type — keep the row but mark Skip so callers ignore. + t.Skip = true + } + return t, nil +} + +// extractMessageText pulls the textual projection out of a user/assistant +// message field. The shape is the Anthropic Messages API content-block +// array (an array of {type, text|tool_use|tool_result, ...}). We +// concatenate every text-bearing block and ignore the rest. +func extractMessageText(raw json.RawMessage) string { + if len(raw) == 0 { + return "" + } + var msg struct { + Role string `json:"role"` + Content json.RawMessage `json:"content"` + Stop string `json:"stop_reason"` + Model string `json:"model"` + Usage map[string]any `json:"usage"` + Meta map[string]string `json:"meta"` + } + if err := json.Unmarshal(raw, &msg); err != nil { + // Some user turns have message as plain string. + var s string + if err2 := json.Unmarshal(raw, &s); err2 == nil { + return s + } + return "" + } + // Content can be a string OR an array. + var asString string + if err := json.Unmarshal(msg.Content, &asString); err == nil { + return asString + } + var blocks []struct { + Type string `json:"type"` + Text string `json:"text"` + Content json.RawMessage `json:"content"` + } + if err := json.Unmarshal(msg.Content, &blocks); err != nil { + return "" + } + var sb strings.Builder + for _, b := range blocks { + switch b.Type { + case "text": + sb.WriteString(b.Text) + sb.WriteByte('\n') + case "tool_result": + // Tool result content may itself be a string or array of blocks. + var s string + if err := json.Unmarshal(b.Content, &s); err == nil { + sb.WriteString("[tool_result] ") + sb.WriteString(s) + sb.WriteByte('\n') + continue + } + var sub []struct { + Type string `json:"type"` + Text string `json:"text"` + } + if err := json.Unmarshal(b.Content, &sub); err == nil { + for _, s := range sub { + if s.Type == "text" { + sb.WriteString("[tool_result] ") + sb.WriteString(s.Text) + sb.WriteByte('\n') + } + } + } + } + } + return strings.TrimRight(sb.String(), "\n") +} + +// extractAssistantTurn pulls text + the first tool name (if any) from +// an assistant content-block array. Multi-tool turns lose the second +// name; the goal is signal for classification, not perfect fidelity. +func extractAssistantTurn(raw json.RawMessage) (string, string) { + if len(raw) == 0 { + return "", "" + } + var msg struct { + Content json.RawMessage `json:"content"` + } + if err := json.Unmarshal(raw, &msg); err != nil { + return "", "" + } + var blocks []struct { + Type string `json:"type"` + Text string `json:"text"` + Name string `json:"name"` + Tool json.RawMessage `json:"input"` + } + if err := json.Unmarshal(msg.Content, &blocks); err != nil { + return "", "" + } + var sb strings.Builder + var firstTool string + for _, b := range blocks { + switch b.Type { + case "text": + sb.WriteString(b.Text) + sb.WriteByte('\n') + case "tool_use": + if firstTool == "" { + firstTool = b.Name + } + sb.WriteString("[tool_use:") + sb.WriteString(b.Name) + sb.WriteString("]\n") + } + } + return strings.TrimRight(sb.String(), "\n"), firstTool +} + +// extractAttachmentText pulls text content from an attachment payload, +// or returns a short tag when the attachment is a hook event. +func extractAttachmentText(raw json.RawMessage) string { + if len(raw) == 0 { + return "" + } + var a struct { + Type string `json:"type"` + HookName string `json:"hookName"` + HookEvent string `json:"hookEvent"` + Content string `json:"content"` + Text string `json:"text"` + } + if err := json.Unmarshal(raw, &a); err != nil { + return "" + } + if a.Content != "" { + return a.Content + } + if a.Text != "" { + return a.Text + } + if a.HookName != "" { + return "[hook " + a.HookEvent + ":" + a.HookName + "]" + } + return "" +} diff --git a/ingestion/internal/claudewatcher/parser_test.go b/ingestion/internal/claudewatcher/parser_test.go new file mode 100644 index 0000000..f43f440 --- /dev/null +++ b/ingestion/internal/claudewatcher/parser_test.go @@ -0,0 +1,157 @@ +package claudewatcher + +import ( + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func collect(t *testing.T, body string) ([]Turn, int64, error) { + t.Helper() + var out []Turn + end, err := ParseStream(strings.NewReader(body), 0, nil, func(tr Turn) error { + out = append(out, tr) + return nil + }) + return out, end, err +} + +func TestParseStream_UserTurnStringContent(t *testing.T) { + body := `{"type":"user","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":"hello world"} +` + turns, end, err := collect(t, body) + require.NoError(t, err) + require.Len(t, turns, 1) + assert.Equal(t, "user", turns[0].Type) + assert.Equal(t, "S", turns[0].SessionID) + assert.Equal(t, "hello world", turns[0].Content) + assert.False(t, turns[0].Skip) + assert.Equal(t, int64(len(body)), end) +} + +func TestParseStream_UserTurnContentBlocks(t *testing.T) { + body := `{"type":"user","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":{"role":"user","content":[{"type":"text","text":"line 1"},{"type":"text","text":"line 2"}]}} +` + turns, _, err := collect(t, body) + require.NoError(t, err) + require.Len(t, turns, 1) + assert.Equal(t, "line 1\nline 2", turns[0].Content) +} + +func TestParseStream_AssistantToolUse(t *testing.T) { + body := `{"type":"assistant","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":{"content":[{"type":"text","text":"calling now"},{"type":"tool_use","name":"Edit","input":{}}]}} +` + turns, _, err := collect(t, body) + require.NoError(t, err) + require.Len(t, turns, 1) + assert.Equal(t, "Edit", turns[0].ToolName) + assert.Contains(t, turns[0].Content, "calling now") + assert.Contains(t, turns[0].Content, "[tool_use:Edit]") +} + +func TestParseStream_AssistantToolResult(t *testing.T) { + body := `{"type":"user","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":{"content":[{"type":"tool_result","content":"output of cmd"}]}} +` + turns, _, err := collect(t, body) + require.NoError(t, err) + require.Len(t, turns, 1) + assert.Contains(t, turns[0].Content, "[tool_result] output of cmd") +} + +func TestParseStream_SkipsBookkeepingTypes(t *testing.T) { + body := strings.Join([]string{ + `{"type":"queue-operation","sessionId":"S","content":"x"}`, + `{"type":"last-prompt","sessionId":"S","lastPrompt":"y"}`, + `{"type":"permission-mode","sessionId":"S","permissionMode":"auto"}`, + `{"type":"ai-title","sessionId":"S","aiTitle":"My session"}`, + `{"type":"file-history-snapshot","messageId":"abc"}`, + }, "\n") + "\n" + turns, _, err := collect(t, body) + require.NoError(t, err) + require.Len(t, turns, 5) + for _, tr := range turns { + assert.True(t, tr.Skip, "expected Skip=true for %q", tr.Type) + } +} + +func TestParseStream_UnknownTypeIsSkip(t *testing.T) { + body := `{"type":"future-thing","sessionId":"S"}` + "\n" + turns, _, err := collect(t, body) + require.NoError(t, err) + require.Len(t, turns, 1) + assert.True(t, turns[0].Skip) +} + +func TestParseStream_MalformedLineIsSkippedNotFatal(t *testing.T) { + body := strings.Join([]string{ + `{"type":"user","sessionId":"S","message":"first"}`, + `{not valid json`, + `{"type":"user","sessionId":"S","message":"third"}`, + }, "\n") + "\n" + var warnings int + var turns []Turn + _, err := ParseStream(strings.NewReader(body), 0, func(format string, args ...any) { + warnings++ + }, func(tr Turn) error { + turns = append(turns, tr) + return nil + }) + require.NoError(t, err) + require.Len(t, turns, 2, "first + third should make it through") + assert.Equal(t, 1, warnings) +} + +func TestParseStream_EmitErrStopHaltsCleanly(t *testing.T) { + body := strings.Join([]string{ + `{"type":"user","sessionId":"S","message":"a"}`, + `{"type":"user","sessionId":"S","message":"b"}`, + `{"type":"user","sessionId":"S","message":"c"}`, + }, "\n") + "\n" + count := 0 + end, err := ParseStream(strings.NewReader(body), 0, nil, func(tr Turn) error { + count++ + if count == 2 { + return ErrStop + } + return nil + }) + require.NoError(t, err) + assert.Equal(t, 2, count) + assert.Greater(t, end, int64(0)) +} + +func TestParseStream_EmitOtherErrorPropagates(t *testing.T) { + body := `{"type":"user","sessionId":"S","message":"a"}` + "\n" + want := errors.New("boom") + _, err := ParseStream(strings.NewReader(body), 0, nil, func(tr Turn) error { + return want + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "boom") +} + +func TestParseStream_AttachmentHookEvent(t *testing.T) { + body := `{"type":"attachment","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","attachment":{"type":"hook_success","hookName":"SessionStart:startup","hookEvent":"SessionStart","content":"hook body"}} +` + turns, _, err := collect(t, body) + require.NoError(t, err) + require.Len(t, turns, 1) + assert.Equal(t, "hook body", turns[0].Content) +} + +func TestParseStream_OffsetAdvances(t *testing.T) { + body := `{"type":"user","sessionId":"S","message":"a"}` + "\n" + + `{"type":"user","sessionId":"S","message":"b"}` + "\n" + var offsets []int64 + _, err := ParseStream(strings.NewReader(body), 100, nil, func(tr Turn) error { + offsets = append(offsets, tr.OffsetAfter) + return nil + }) + require.NoError(t, err) + require.Len(t, offsets, 2) + assert.Greater(t, offsets[0], int64(100)) + assert.Greater(t, offsets[1], offsets[0]) +} diff --git a/ingestion/internal/claudewatcher/scrubber.go b/ingestion/internal/claudewatcher/scrubber.go new file mode 100644 index 0000000..a9063f7 --- /dev/null +++ b/ingestion/internal/claudewatcher/scrubber.go @@ -0,0 +1,62 @@ +package claudewatcher + +import "regexp" + +// Scrubber drops any turn whose content matches a known-bad pattern. +// Fail-closed by design: we'd rather lose signal than ingest credentials +// into a public-readable brain. The caller logs the drop reason. +// +// Rules cover the credential shapes most common to leak through Claude +// Code sessions: bearer tokens, postgres URIs with embedded auth, OAuth +// secret values, SOPS-encrypted secret blobs (we don't want the +// ciphertext either — it's a marker that the original message contained +// secret state), PEM-encoded private keys, and the explicit env-var +// naming conventions used in the homelab. +// +// Pattern philosophy: match by shape, not by content. A 40-char hex +// string in isolation is fine; the same string after `Authorization: +// Bearer ` is not. Tuned to catch known leak vectors from prior +// secret-hygiene incidents (POSTGRES_PASSWORD via kubectl exec env, +// INFRA_MCP_TOKEN via sops -d output) without dropping every Edit on a +// config file. + +// Rule is a single named regex with a redact hint shown in the warn log. +type Rule struct { + Name string + RE *regexp.Regexp +} + +// DefaultRules is the regex set applied by Scrub. Mutable for tests but +// callers should treat it as read-only at runtime. +var DefaultRules = []Rule{ + // authorization-header is checked before the bare bearer rule so + // contextual hits ("Authorization: Bearer X") report the more + // specific match name in logs. + {Name: "authorization-header", RE: regexp.MustCompile(`(?i)Authorization\s*:\s*[A-Za-z]+\s+\S{8,}`)}, + {Name: "bearer-token", RE: regexp.MustCompile(`(?i)Bearer\s+[A-Za-z0-9._\-]{16,}`)}, + {Name: "postgres-uri-with-password", RE: regexp.MustCompile(`postgres(?:ql)?://[^:\s/]+:[^@\s/]+@`)}, + {Name: "private-key", RE: regexp.MustCompile(`-----BEGIN[^-]*PRIVATE KEY-----`)}, + {Name: "ssh-key", RE: regexp.MustCompile(`ssh-(?:rsa|ed25519|ecdsa)\s+[A-Za-z0-9+/=]{40,}`)}, + {Name: "github-pat", RE: regexp.MustCompile(`\b(?:ghp|gho|ghu|ghr|gha)_[A-Za-z0-9]{30,}\b`)}, + {Name: "openai-sk", RE: regexp.MustCompile(`\bsk-(?:proj-)?[A-Za-z0-9]{32,}\b`)}, + {Name: "anthropic-sk", RE: regexp.MustCompile(`\bsk-ant-[A-Za-z0-9_\-]{32,}\b`)}, + {Name: "aws-access-key", RE: regexp.MustCompile(`\bAKIA[0-9A-Z]{16}\b`)}, + {Name: "homelab-env-token", RE: regexp.MustCompile(`(?i)(?:_TOKEN|_PASSWORD|_API_KEY|_SECRET)\s*[:=]\s*['"]?[A-Za-z0-9._/+\-]{12,}`)}, + {Name: "sops-encrypted-marker", RE: regexp.MustCompile(`ENC\[AES256_GCM,data:[A-Za-z0-9+/=]{8,}`)}, +} + +// Scrub reports the first matching rule, or empty when content is clean. +// Empty string is treated as clean. Caller decides what to do on a hit; +// the convention in claudewatcher is to drop the turn entirely and emit +// a slog.Warn naming the rule. +func Scrub(content string) string { + if content == "" { + return "" + } + for _, r := range DefaultRules { + if r.RE.MatchString(content) { + return r.Name + } + } + return "" +} diff --git a/ingestion/internal/claudewatcher/scrubber_test.go b/ingestion/internal/claudewatcher/scrubber_test.go new file mode 100644 index 0000000..7bf9b16 --- /dev/null +++ b/ingestion/internal/claudewatcher/scrubber_test.go @@ -0,0 +1,57 @@ +package claudewatcher + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestScrub_PoisonedFixtures(t *testing.T) { + // One representative bad-string per rule. If a rule fires for the + // wrong content shape later, this table localises the regression. + cases := []struct { + name string + content string + want string + }{ + {"bearer-token", "curl -H 'Authorization: Bearer abcdef1234567890ghijklmnop'", "authorization-header"}, + {"bearer-no-header", "header = Bearer eyJhbGciOiJIUzI1NiJ9.payload.sig", "bearer-token"}, + {"postgres-uri", "DATABASE_URL=postgres://user:s3cret@10.0.1.20:5432/brain", "postgres-uri-with-password"}, + {"private-key", "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAA", "private-key"}, + {"ssh-public", "deploy: ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIK1234567890abcdefghij user@host", "ssh-key"}, + {"github-pat-classic", "GH_TOKEN=ghp_aBcD1234EfGh5678IjKl9012MnOp3456QrSt", "github-pat"}, + {"openai-key", "OPENAI_API_KEY=sk-proj-AAAABBBBCCCCDDDDEEEEFFFFGGGGHHHHIIII", "openai-sk"}, + {"anthropic-key", "ANTHROPIC_API_KEY=sk-ant-api03-aaaaBBBBccccDDDDeeeeFFFFggggHHHHiiiiJJJJkkkk", "anthropic-sk"}, + {"aws-access-key", "AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE", "aws-access-key"}, + {"homelab-env", "POSTGRES_PASSWORD=hunter2supersecretvalue", "homelab-env-token"}, + {"sops-marker", "value: ENC[AES256_GCM,data:abc123def456,iv:zzz]", "sops-encrypted-marker"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := Scrub(tc.content) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestScrub_CleanContentPassesThrough(t *testing.T) { + cases := []string{ + "", + "plain text with no credentials", + "a 40 char hex string aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa is fine in isolation", + "`Bearer` token mentioned in docs without an actual value", + "file at ~/.ssh/id_ed25519", + "the function Authorization() takes no args", + "comment: see API key in 1Password", + } + for _, c := range cases { + assert.Empty(t, Scrub(c), "expected clean for %q", c) + } +} + +func TestScrub_FirstMatchWins(t *testing.T) { + // Content matching multiple rules: report the first rule order in + // DefaultRules. Stability matters for log triage. + content := "Authorization: Bearer ghp_aBcD1234EfGh5678IjKl9012MnOp3456QrSt" + assert.Equal(t, "authorization-header", Scrub(content)) +} diff --git a/ingestion/internal/claudewatcher/watcher.go b/ingestion/internal/claudewatcher/watcher.go new file mode 100644 index 0000000..26b5873 --- /dev/null +++ b/ingestion/internal/claudewatcher/watcher.go @@ -0,0 +1,234 @@ +package claudewatcher + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" +) + +// Sink consumes batches of ingest-ready turns from the watcher. The +// production implementation builds wiki pages and calls pipeline.RunRaw +// against the brain. Tests substitute a counter. +// +// A Batch represents the turns ingested from one session file between +// two cursor checkpoints. Implementations must be idempotent — the +// watcher only advances the cursor on a nil return. +type Sink interface { + Ingest(ctx context.Context, b Batch) error +} + +// Batch is a per-file slice of turns plus identifying metadata. +type Batch struct { + Host string // origin host, e.g. "koala" + FilePath string // absolute path to the source .jsonl file + SessionID string // first session_id seen in the batch + ProjectID string // basename of the parent dir, e.g. "-home-mathias-dev" + Turns []Turn // never empty; caller filters Skip + scrubber matches +} + +// Config drives one Watch loop. SessionsDir is the absolute path to the +// Claude Code projects directory (~/.claude/projects). Host is the +// label written into cursors and ingested page frontmatter. Interval +// is the poll cadence; a zero or negative value disables the loop. +// +// Sink is required. Cursors is optional — when nil the watcher +// re-reads from byte 0 on every tick (useful for first-run testing +// without a postgres dependency). +type Config struct { + SessionsDir string + Host string + Interval time.Duration + Sink Sink + Cursors *CursorStore + Logger *slog.Logger +} + +// Watch runs the polling loop until ctx is cancelled. Returns ctx.Err() +// on shutdown. Each tick walks SessionsDir for *.jsonl files, advances +// each file's cursor, and emits one Batch per file with new turns. +// Errors during a single file's parse or ingest are logged but do not +// abort the loop — a single bad file shouldn't block the others. +func Watch(ctx context.Context, cfg Config) error { + if cfg.SessionsDir == "" { + return fmt.Errorf("sessions dir is required") + } + if cfg.Sink == nil { + return fmt.Errorf("sink is required") + } + if cfg.Interval <= 0 { + return fmt.Errorf("interval must be positive") + } + if cfg.Host == "" { + cfg.Host = "unknown" + } + if cfg.Logger == nil { + cfg.Logger = slog.Default() + } + cfg.Logger.Info("claudewatcher: started", + "sessions_dir", cfg.SessionsDir, + "host", cfg.Host, + "interval", cfg.Interval) + + ticker := time.NewTicker(cfg.Interval) + defer ticker.Stop() + // Run an immediate first sweep so first-launch users don't wait one + // tick before anything happens. + runTick(ctx, cfg) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + runTick(ctx, cfg) + } + } +} + +// runTick is one polling pass. Exposed (lowercase) for tests via +// TickOnce. +func runTick(ctx context.Context, cfg Config) { + files, err := listSessionFiles(cfg.SessionsDir) + if err != nil { + cfg.Logger.Warn("claudewatcher: list session files", "err", err) + return + } + for _, f := range files { + if ctx.Err() != nil { + return + } + if err := processFile(ctx, cfg, f); err != nil { + cfg.Logger.Warn("claudewatcher: file failed", + "path", f, "err", err) + } + } +} + +// TickOnce runs one sweep synchronously and returns. Used by tests + +// by ad-hoc CLI invocations. +func TickOnce(ctx context.Context, cfg Config) error { + if cfg.SessionsDir == "" || cfg.Sink == nil { + return fmt.Errorf("config invalid") + } + if cfg.Host == "" { + cfg.Host = "unknown" + } + if cfg.Logger == nil { + cfg.Logger = slog.Default() + } + runTick(ctx, cfg) + return nil +} + +func listSessionFiles(root string) ([]string, error) { + var out []string + err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + if d.IsDir() { + return nil + } + if !strings.HasSuffix(path, ".jsonl") { + return nil + } + out = append(out, path) + return nil + }) + if err != nil { + return nil, fmt.Errorf("walk %s: %w", root, err) + } + return out, nil +} + +func processFile(ctx context.Context, cfg Config, path string) error { + startOffset := int64(0) + if cfg.Cursors != nil { + off, _, err := cfg.Cursors.GetOffset(ctx, cfg.Host, path) + if err != nil { + return fmt.Errorf("get cursor: %w", err) + } + startOffset = off + } + + stat, err := os.Stat(path) + if err != nil { + return fmt.Errorf("stat: %w", err) + } + if stat.Size() <= startOffset { + return nil // nothing new + } + + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("open: %w", err) + } + defer func() { _ = f.Close() }() + if _, err := f.Seek(startOffset, 0); err != nil { + return fmt.Errorf("seek: %w", err) + } + + var keep []Turn + var sessionID string + var droppedScrub int + endOffset, err := ParseStream(f, startOffset, + func(format string, args ...any) { + cfg.Logger.Warn(fmt.Sprintf("claudewatcher: parse: "+format, args...)) + }, + func(t Turn) error { + if t.Skip || t.Content == "" { + return nil + } + if rule := Scrub(t.Content); rule != "" { + droppedScrub++ + cfg.Logger.Warn("claudewatcher: turn dropped by scrubber", + "rule", rule, "path", path, "session_id", t.SessionID) + return nil + } + if sessionID == "" { + sessionID = t.SessionID + } + keep = append(keep, t) + return nil + }) + if err != nil { + return fmt.Errorf("parse stream: %w", err) + } + + if len(keep) == 0 { + if cfg.Cursors != nil { + if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil { + return fmt.Errorf("advance cursor (no-turns): %w", err) + } + } + if droppedScrub > 0 { + cfg.Logger.Info("claudewatcher: only scrubbed turns this tick", + "path", path, "dropped", droppedScrub) + } + return nil + } + + batch := Batch{ + Host: cfg.Host, + FilePath: path, + SessionID: sessionID, + ProjectID: filepath.Base(filepath.Dir(path)), + Turns: keep, + } + if err := cfg.Sink.Ingest(ctx, batch); err != nil { + return fmt.Errorf("sink ingest: %w", err) + } + if cfg.Cursors != nil { + if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil { + return fmt.Errorf("advance cursor: %w", err) + } + } + cfg.Logger.Info("claudewatcher: ingested batch", + "path", path, "session_id", sessionID, + "turns_kept", len(keep), "dropped_scrub", droppedScrub, + "new_offset", endOffset) + return nil +} diff --git a/ingestion/internal/claudewatcher/watcher_test.go b/ingestion/internal/claudewatcher/watcher_test.go new file mode 100644 index 0000000..4a33df1 --- /dev/null +++ b/ingestion/internal/claudewatcher/watcher_test.go @@ -0,0 +1,174 @@ +package claudewatcher + +import ( + "context" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// memSink captures batches without touching postgres. Thread-safe so +// TickOnce can run from any goroutine in concurrent tests. +type memSink struct { + mu sync.Mutex + batches []Batch + failOn string // file basename to error on +} + +func (m *memSink) Ingest(_ context.Context, b Batch) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.failOn != "" && strings.Contains(b.FilePath, m.failOn) { + return assert.AnError + } + m.batches = append(m.batches, b) + return nil +} + +func writeSession(t *testing.T, dir, sessionID string, lines []string) string { + t.Helper() + path := filepath.Join(dir, sessionID+".jsonl") + body := strings.Join(lines, "\n") + "\n" + require.NoError(t, os.WriteFile(path, []byte(body), 0o644)) + return path +} + +func TestTickOnce_NoCursorReingestsEverythingEveryTick(t *testing.T) { + tmp := t.TempDir() + projectDir := filepath.Join(tmp, "-home-mathias-dev") + require.NoError(t, os.MkdirAll(projectDir, 0o755)) + writeSession(t, projectDir, "sess1", []string{ + `{"type":"user","sessionId":"sess1","message":"first prompt"}`, + `{"type":"assistant","sessionId":"sess1","message":{"content":[{"type":"text","text":"first answer"}]}}`, + }) + + sink := &memSink{} + cfg := Config{ + SessionsDir: tmp, + Host: "koala", + Sink: sink, + } + require.NoError(t, TickOnce(context.Background(), cfg)) + require.NoError(t, TickOnce(context.Background(), cfg)) + + require.Len(t, sink.batches, 2, "no cursor => re-emits same batch every tick") + assert.Equal(t, "sess1", sink.batches[0].SessionID) + assert.Equal(t, "koala", sink.batches[0].Host) + assert.Equal(t, "-home-mathias-dev", sink.batches[0].ProjectID) + assert.Len(t, sink.batches[0].Turns, 2) +} + +func TestTickOnce_FiltersSkipTurnsAndScrubberMatches(t *testing.T) { + tmp := t.TempDir() + proj := filepath.Join(tmp, "-home-mathias-dev") + require.NoError(t, os.MkdirAll(proj, 0o755)) + writeSession(t, proj, "sess-scrub", []string{ + `{"type":"queue-operation","sessionId":"sess-scrub","content":"x"}`, // Skip + `{"type":"user","sessionId":"sess-scrub","message":"normal prompt"}`, + `{"type":"assistant","sessionId":"sess-scrub","message":{"content":[{"type":"text","text":"value POSTGRES_PASSWORD=hunter2supersecretvalue"}]}}`, // scrubbed + }) + sink := &memSink{} + require.NoError(t, TickOnce(context.Background(), Config{ + SessionsDir: tmp, Host: "koala", Sink: sink, + })) + require.Len(t, sink.batches, 1) + turns := sink.batches[0].Turns + require.Len(t, turns, 1, "skip + scrubbed turns must not reach the sink") + assert.Equal(t, "user", turns[0].Type) +} + +func TestTickOnce_AllScrubbedNoBatchEmitted(t *testing.T) { + tmp := t.TempDir() + proj := filepath.Join(tmp, "-home-mathias-dev") + require.NoError(t, os.MkdirAll(proj, 0o755)) + writeSession(t, proj, "all-bad", []string{ + `{"type":"user","sessionId":"all-bad","message":"Authorization: Bearer abcdef1234567890ghijklmnop"}`, + }) + sink := &memSink{} + require.NoError(t, TickOnce(context.Background(), Config{ + SessionsDir: tmp, Host: "koala", Sink: sink, + })) + assert.Empty(t, sink.batches, "no usable turns => no batch") +} + +func TestTickOnce_IgnoresNonJsonlFiles(t *testing.T) { + tmp := t.TempDir() + proj := filepath.Join(tmp, "-home-mathias-dev") + require.NoError(t, os.MkdirAll(proj, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(proj, "README.md"), []byte("ignore me"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(proj, "config.json"), []byte("{}"), 0o644)) + sink := &memSink{} + require.NoError(t, TickOnce(context.Background(), Config{ + SessionsDir: tmp, Host: "koala", Sink: sink, + })) + assert.Empty(t, sink.batches) +} + +func TestTickOnce_HandlesMultipleProjectsAndSessions(t *testing.T) { + tmp := t.TempDir() + projA := filepath.Join(tmp, "-home-mathias-dev") + projB := filepath.Join(tmp, "-home-mathias-AI-infra") + require.NoError(t, os.MkdirAll(projA, 0o755)) + require.NoError(t, os.MkdirAll(projB, 0o755)) + writeSession(t, projA, "a1", []string{`{"type":"user","sessionId":"a1","message":"q1"}`}) + writeSession(t, projA, "a2", []string{`{"type":"user","sessionId":"a2","message":"q2"}`}) + writeSession(t, projB, "b1", []string{`{"type":"user","sessionId":"b1","message":"q3"}`}) + + sink := &memSink{} + require.NoError(t, TickOnce(context.Background(), Config{ + SessionsDir: tmp, Host: "koala", Sink: sink, + })) + require.Len(t, sink.batches, 3) + + projects := map[string]int{} + for _, b := range sink.batches { + projects[b.ProjectID]++ + } + assert.Equal(t, 2, projects["-home-mathias-dev"]) + assert.Equal(t, 1, projects["-home-mathias-AI-infra"]) +} + +func TestTickOnce_SinkErrorDoesNotKillOtherFiles(t *testing.T) { + tmp := t.TempDir() + proj := filepath.Join(tmp, "-home-mathias-dev") + require.NoError(t, os.MkdirAll(proj, 0o755)) + writeSession(t, proj, "good", []string{`{"type":"user","sessionId":"good","message":"q"}`}) + writeSession(t, proj, "bad-session", []string{`{"type":"user","sessionId":"bad-session","message":"q"}`}) + + sink := &memSink{failOn: "bad-session"} + require.NoError(t, TickOnce(context.Background(), Config{ + SessionsDir: tmp, Host: "koala", Sink: sink, + })) + require.Len(t, sink.batches, 1, "good session still ingested") + assert.Equal(t, "good", sink.batches[0].SessionID) +} + +func TestWatch_RespectsContextCancel(t *testing.T) { + tmp := t.TempDir() + require.NoError(t, os.MkdirAll(filepath.Join(tmp, "-home-mathias-dev"), 0o755)) + sink := &memSink{} + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + done <- Watch(ctx, Config{ + SessionsDir: tmp, + Host: "koala", + Interval: 10 * time.Millisecond, + Sink: sink, + }) + }() + time.Sleep(50 * time.Millisecond) + cancel() + select { + case err := <-done: + assert.ErrorIs(t, err, context.Canceled) + case <-time.After(2 * time.Second): + t.Fatal("Watch did not return after cancel") + } +}