merge: claudewatcher (closes hyperguild#27, refs infra#73)
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
||||
chassisauth "gitea.d-ma.be/mathias/mcp-chassis/auth"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/claudewatcher"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/embed"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphstore"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graphsync"
|
||||
@@ -29,6 +30,50 @@ import (
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/watcher"
|
||||
)
|
||||
|
||||
// claudeSink converts each claudewatcher.Batch into one wiki note under
|
||||
// brain/wiki/claude-sessions/facts/. v1 emits one note per session
|
||||
// keyed by host + session id; classifier-driven hall routing is a
|
||||
// follow-up (hyperguild#27 v2).
|
||||
type claudeSink struct {
|
||||
brainDir string
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (s *claudeSink) Ingest(ctx context.Context, b claudewatcher.Batch) error {
|
||||
if len(b.Turns) == 0 {
|
||||
return nil
|
||||
}
|
||||
var sb strings.Builder
|
||||
fmt.Fprintf(&sb, "# Claude session %s (%s)\n\n", b.SessionID, b.Host)
|
||||
fmt.Fprintf(&sb, "_Project: `%s`. File: `%s`. Turns: %d._\n\n", b.ProjectID, b.FilePath, len(b.Turns))
|
||||
for _, t := range b.Turns {
|
||||
fmt.Fprintf(&sb, "## %s — %s\n\n", t.Type, t.Timestamp.UTC().Format(time.RFC3339))
|
||||
if t.ToolName != "" {
|
||||
fmt.Fprintf(&sb, "_tool: `%s`_\n\n", t.ToolName)
|
||||
}
|
||||
// Cap per-turn excerpt to keep page size bounded; the full
|
||||
// transcript lives on disk under ~/.claude/projects/ already.
|
||||
content := t.Content
|
||||
if len(content) > 2000 {
|
||||
content = content[:2000] + "…"
|
||||
}
|
||||
sb.WriteString(content)
|
||||
sb.WriteString("\n\n")
|
||||
}
|
||||
slug := "session-" + b.Host + "-" + b.SessionID
|
||||
if _, err := api.WriteNote(s.brainDir, api.WriteNoteOptions{
|
||||
Filename: slug,
|
||||
Wing: "claude-sessions",
|
||||
Hall: "facts",
|
||||
Type: "source",
|
||||
Domain: b.ProjectID,
|
||||
Content: sb.String(),
|
||||
}); err != nil {
|
||||
return fmt.Errorf("write claude session note: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// redactDSN parses a Postgres URL and replaces its password with `***`
|
||||
// for safe inclusion in logs. Falls back to a non-leaking placeholder
|
||||
// if parsing fails — we never log a raw DSN.
|
||||
@@ -73,6 +118,16 @@ func envInt(key string, fallback int) int {
|
||||
return fallback
|
||||
}
|
||||
|
||||
// systemHostname returns os.Hostname() with a "unknown" fallback so the
|
||||
// caller never has to handle the rare error path.
|
||||
func systemHostname() string {
|
||||
h, err := os.Hostname()
|
||||
if err != nil || h == "" {
|
||||
return "unknown"
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
func main() {
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||||
|
||||
@@ -191,6 +246,43 @@ func main() {
|
||||
Pipeline: pipelineCfg,
|
||||
})
|
||||
}
|
||||
|
||||
// Claude Code session ingestion (hyperguild#27 / infra#73 Track E.1).
|
||||
// Off by default — explicitly opt in by setting CLAUDE_SESSIONS_DIR
|
||||
// to the ~/.claude/projects path. Requires BRAIN_PG_DSN for the
|
||||
// cursor table (resumable offsets across restarts).
|
||||
if claudeDir := os.Getenv("CLAUDE_SESSIONS_DIR"); claudeDir != "" {
|
||||
if pgDSN == "" {
|
||||
logger.Error("CLAUDE_SESSIONS_DIR set but BRAIN_PG_DSN missing — claudewatcher needs the cursor table")
|
||||
os.Exit(1)
|
||||
}
|
||||
cursorStore, cerr := claudewatcher.NewCursorStore(ctx, pgDSN)
|
||||
if cerr != nil {
|
||||
logger.Error("claudewatcher cursor init", "err", cerr)
|
||||
os.Exit(1)
|
||||
}
|
||||
if cerr := cursorStore.Init(ctx); cerr != nil {
|
||||
logger.Error("claudewatcher cursor migrate", "err", cerr)
|
||||
os.Exit(1)
|
||||
}
|
||||
host := envOr("CLAUDE_INGEST_HOST", systemHostname())
|
||||
interval := time.Duration(envInt("CLAUDE_INGEST_INTERVAL", 60)) * time.Second
|
||||
sink := &claudeSink{brainDir: brainDir, logger: logger}
|
||||
go func() {
|
||||
if err := claudewatcher.Watch(ctx, claudewatcher.Config{
|
||||
SessionsDir: claudeDir,
|
||||
Host: host,
|
||||
Interval: interval,
|
||||
Sink: sink,
|
||||
Cursors: cursorStore,
|
||||
Logger: logger,
|
||||
}); err != nil && err != context.Canceled {
|
||||
logger.Error("claudewatcher exited", "err", err)
|
||||
}
|
||||
}()
|
||||
logger.Info("claudewatcher started",
|
||||
"sessions_dir", claudeDir, "host", host, "interval", interval)
|
||||
}
|
||||
if vectorStore != nil {
|
||||
embedSyncInterval := envInt("BRAIN_EMBED_SYNC_INTERVAL", 300)
|
||||
vectorstore.StartSync(ctx, brainDir, vectorStore,
|
||||
|
||||
110
ingestion/internal/claudewatcher/cursor.go
Normal file
110
ingestion/internal/claudewatcher/cursor.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package claudewatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// CursorStore tracks how far the watcher has ingested into each
|
||||
// session JSONL file. Keyed by (host, file_path) so the same `~/.claude`
|
||||
// path on different hosts doesn't collide and resumability survives
|
||||
// pod restarts. Idempotent Init lives alongside the rest of the
|
||||
// claudewatcher schema; no separate migration framework.
|
||||
type CursorStore struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
// NewCursorStore opens a pool against dsn. Caller closes the store.
|
||||
func NewCursorStore(ctx context.Context, dsn string) (*CursorStore, error) {
|
||||
pool, err := pgxpool.New(ctx, dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgxpool: %w", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("ping: %w", err)
|
||||
}
|
||||
return &CursorStore{pool: pool}, nil
|
||||
}
|
||||
|
||||
// NewCursorStoreFromPool wraps an existing pool (so the watcher can
|
||||
// share the brain DSN pool with vectorstore/graphstore without a
|
||||
// second connection set). Caller must NOT close the wrapped pool via
|
||||
// the store — close the pool directly.
|
||||
func NewCursorStoreFromPool(pool *pgxpool.Pool) *CursorStore {
|
||||
return &CursorStore{pool: pool}
|
||||
}
|
||||
|
||||
// Close releases the underlying connection pool when this store owns
|
||||
// it. No-op when the pool was injected via NewCursorStoreFromPool —
|
||||
// pgxpool.Close is idempotent so we lean on that.
|
||||
func (s *CursorStore) Close() {
|
||||
if s.pool != nil {
|
||||
s.pool.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Init creates the claude_session_cursors table when missing.
|
||||
func (s *CursorStore) Init(ctx context.Context) error {
|
||||
const ddl = `
|
||||
CREATE TABLE IF NOT EXISTS claude_session_cursors (
|
||||
host TEXT NOT NULL,
|
||||
file_path TEXT NOT NULL,
|
||||
byte_offset BIGINT NOT NULL DEFAULT 0,
|
||||
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (host, file_path)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS claude_session_cursors_host_idx
|
||||
ON claude_session_cursors (host);
|
||||
`
|
||||
_, err := s.pool.Exec(ctx, ddl)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetOffset returns the last recorded byte offset for (host, filePath).
|
||||
// Missing rows are reported as offset=0, ok=false so the caller can
|
||||
// distinguish "never ingested" from "ingested at the start of the
|
||||
// file" (both produce identical behaviour but the metric is useful).
|
||||
func (s *CursorStore) GetOffset(ctx context.Context, host, filePath string) (int64, bool, error) {
|
||||
if host == "" || filePath == "" {
|
||||
return 0, false, errors.New("host and file_path are required")
|
||||
}
|
||||
var offset int64
|
||||
err := s.pool.QueryRow(ctx, `
|
||||
SELECT byte_offset FROM claude_session_cursors WHERE host = $1 AND file_path = $2
|
||||
`, host, filePath).Scan(&offset)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return 0, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("query: %w", err)
|
||||
}
|
||||
return offset, true, nil
|
||||
}
|
||||
|
||||
// SetOffset writes the new offset for (host, filePath). Used after
|
||||
// every successful parse + ingest batch so a crash mid-file rewinds
|
||||
// only to the last committed checkpoint.
|
||||
func (s *CursorStore) SetOffset(ctx context.Context, host, filePath string, offset int64) error {
|
||||
if host == "" || filePath == "" {
|
||||
return errors.New("host and file_path are required")
|
||||
}
|
||||
if offset < 0 {
|
||||
return errors.New("offset must be >= 0")
|
||||
}
|
||||
_, err := s.pool.Exec(ctx, `
|
||||
INSERT INTO claude_session_cursors (host, file_path, byte_offset, last_seen_at)
|
||||
VALUES ($1, $2, $3, now())
|
||||
ON CONFLICT (host, file_path) DO UPDATE
|
||||
SET byte_offset = EXCLUDED.byte_offset,
|
||||
last_seen_at = now()
|
||||
`, host, filePath, offset)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upsert offset: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
305
ingestion/internal/claudewatcher/parser.go
Normal file
305
ingestion/internal/claudewatcher/parser.go
Normal file
@@ -0,0 +1,305 @@
|
||||
// Package claudewatcher ingests Claude Code session transcripts
|
||||
// (`~/.claude/projects/*/<uuid>.jsonl`) into the brain corpus.
|
||||
//
|
||||
// Schema (observed 2026-05-25 across ~30 session files on koala):
|
||||
//
|
||||
// type=user — user prompts + tool results
|
||||
// type=assistant — model turns; tool_use blocks live in message.content
|
||||
// type=attachment — hook outputs, ingested files
|
||||
// type=system — turn-boundary metadata
|
||||
// type=file-history-snapshot — git-style snapshot of edited files
|
||||
// type=queue-operation, last-prompt, permission-mode, ai-title,
|
||||
// bridge-session — internal bookkeeping, ignored
|
||||
//
|
||||
// The parser is intentionally tolerant: malformed lines are skipped
|
||||
// (caller logs and advances), missing optional fields default to "",
|
||||
// and unknown `type` values are returned as Turn entries with
|
||||
// `Skip=true` so callers can filter cheaply.
|
||||
package claudewatcher
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Turn is one parsed JSONL entry from a Claude Code session log.
|
||||
//
|
||||
// Skip is true for entry types we never want to ingest (queue
|
||||
// bookkeeping, snapshots, etc.). Callers fast-path these without
|
||||
// running the scrubber or classifier.
|
||||
type Turn struct {
|
||||
SessionID string
|
||||
Type string
|
||||
ParentUUID string
|
||||
Timestamp time.Time
|
||||
Cwd string
|
||||
GitBranch string
|
||||
Content string // plain-text projection of the entry, ready for the scrubber/classifier
|
||||
ToolName string // populated when an assistant turn invokes a tool
|
||||
OffsetAfter int64 // byte offset in the file just past this entry
|
||||
Skip bool
|
||||
ParseWarning string // non-empty when the entry parsed but had a sub-field we couldn't normalise
|
||||
}
|
||||
|
||||
// ParseStream reads JSONL lines from r starting at startOffset and
|
||||
// invokes emit for each parsed entry. emit may return ErrStop to
|
||||
// terminate the scan cleanly. Other emit errors propagate.
|
||||
//
|
||||
// startOffset is informational — the caller is expected to have already
|
||||
// seeked the underlying reader to that offset. ParseStream adds the
|
||||
// number of bytes consumed per line to it to compute Turn.OffsetAfter.
|
||||
//
|
||||
// Lines that fail to unmarshal are logged via warnf and skipped; they
|
||||
// do NOT advance OffsetAfter past the malformed line by themselves,
|
||||
// but the next valid line resumes correctly because bufio.Scanner
|
||||
// preserves stream position.
|
||||
func ParseStream(
|
||||
r io.Reader,
|
||||
startOffset int64,
|
||||
warnf func(format string, args ...any),
|
||||
emit func(Turn) error,
|
||||
) (int64, error) {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024) // some lines are big (tool outputs)
|
||||
|
||||
offset := startOffset
|
||||
for scanner.Scan() {
|
||||
raw := scanner.Bytes()
|
||||
lineLen := int64(len(raw)) + 1 // +1 for the newline
|
||||
t, err := parseTurn(raw)
|
||||
if err != nil {
|
||||
if warnf != nil {
|
||||
warnf("parse: %v (%d bytes)", err, len(raw))
|
||||
}
|
||||
offset += lineLen
|
||||
continue
|
||||
}
|
||||
t.OffsetAfter = offset + lineLen
|
||||
if err := emit(t); err != nil {
|
||||
if errors.Is(err, ErrStop) {
|
||||
return t.OffsetAfter, nil
|
||||
}
|
||||
return offset, fmt.Errorf("emit: %w", err)
|
||||
}
|
||||
offset = t.OffsetAfter
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return offset, fmt.Errorf("scan: %w", err)
|
||||
}
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
// ErrStop terminates a ParseStream loop without surfacing an error.
|
||||
var ErrStop = errors.New("claudewatcher: stop")
|
||||
|
||||
// rawEntry is a permissive shape that covers every type observed in
|
||||
// the JSONL files. Fields we don't care about are intentionally
|
||||
// omitted to keep the unmarshal cheap.
|
||||
type rawEntry struct {
|
||||
Type string `json:"type"`
|
||||
SessionID string `json:"sessionId"`
|
||||
ParentUUID string `json:"parentUuid"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Cwd string `json:"cwd"`
|
||||
GitBranch string `json:"gitBranch"`
|
||||
Message json.RawMessage `json:"message"`
|
||||
Attachment json.RawMessage `json:"attachment"`
|
||||
Content string `json:"content"` // queue-operation
|
||||
LastPrompt string `json:"lastPrompt"` // last-prompt
|
||||
Subtype string `json:"subtype"` // system
|
||||
}
|
||||
|
||||
// skipTypes lists every entry type we want to never ingest. Marked Skip
|
||||
// at parse time so the caller's filter is a single boolean check.
|
||||
var skipTypes = map[string]struct{}{
|
||||
"queue-operation": {},
|
||||
"last-prompt": {},
|
||||
"permission-mode": {},
|
||||
"ai-title": {},
|
||||
"bridge-session": {},
|
||||
"file-history-snapshot": {},
|
||||
}
|
||||
|
||||
func parseTurn(raw []byte) (Turn, error) {
|
||||
var e rawEntry
|
||||
if err := json.Unmarshal(raw, &e); err != nil {
|
||||
return Turn{}, fmt.Errorf("unmarshal: %w", err)
|
||||
}
|
||||
t := Turn{
|
||||
Type: e.Type,
|
||||
SessionID: e.SessionID,
|
||||
ParentUUID: e.ParentUUID,
|
||||
Cwd: e.Cwd,
|
||||
GitBranch: e.GitBranch,
|
||||
}
|
||||
if _, skip := skipTypes[e.Type]; skip {
|
||||
t.Skip = true
|
||||
return t, nil
|
||||
}
|
||||
if e.Timestamp != "" {
|
||||
if ts, err := time.Parse(time.RFC3339Nano, e.Timestamp); err == nil {
|
||||
t.Timestamp = ts
|
||||
} else {
|
||||
t.ParseWarning = "timestamp"
|
||||
}
|
||||
}
|
||||
|
||||
switch e.Type {
|
||||
case "user":
|
||||
t.Content = extractMessageText(e.Message)
|
||||
case "assistant":
|
||||
t.Content, t.ToolName = extractAssistantTurn(e.Message)
|
||||
case "attachment":
|
||||
t.Content = extractAttachmentText(e.Attachment)
|
||||
case "system":
|
||||
t.Content = "[system " + e.Subtype + "]"
|
||||
default:
|
||||
// Unknown type — keep the row but mark Skip so callers ignore.
|
||||
t.Skip = true
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// extractMessageText pulls the textual projection out of a user/assistant
|
||||
// message field. The shape is the Anthropic Messages API content-block
|
||||
// array (an array of {type, text|tool_use|tool_result, ...}). We
|
||||
// concatenate every text-bearing block and ignore the rest.
|
||||
func extractMessageText(raw json.RawMessage) string {
|
||||
if len(raw) == 0 {
|
||||
return ""
|
||||
}
|
||||
var msg struct {
|
||||
Role string `json:"role"`
|
||||
Content json.RawMessage `json:"content"`
|
||||
Stop string `json:"stop_reason"`
|
||||
Model string `json:"model"`
|
||||
Usage map[string]any `json:"usage"`
|
||||
Meta map[string]string `json:"meta"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &msg); err != nil {
|
||||
// Some user turns have message as plain string.
|
||||
var s string
|
||||
if err2 := json.Unmarshal(raw, &s); err2 == nil {
|
||||
return s
|
||||
}
|
||||
return ""
|
||||
}
|
||||
// Content can be a string OR an array.
|
||||
var asString string
|
||||
if err := json.Unmarshal(msg.Content, &asString); err == nil {
|
||||
return asString
|
||||
}
|
||||
var blocks []struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
Content json.RawMessage `json:"content"`
|
||||
}
|
||||
if err := json.Unmarshal(msg.Content, &blocks); err != nil {
|
||||
return ""
|
||||
}
|
||||
var sb strings.Builder
|
||||
for _, b := range blocks {
|
||||
switch b.Type {
|
||||
case "text":
|
||||
sb.WriteString(b.Text)
|
||||
sb.WriteByte('\n')
|
||||
case "tool_result":
|
||||
// Tool result content may itself be a string or array of blocks.
|
||||
var s string
|
||||
if err := json.Unmarshal(b.Content, &s); err == nil {
|
||||
sb.WriteString("[tool_result] ")
|
||||
sb.WriteString(s)
|
||||
sb.WriteByte('\n')
|
||||
continue
|
||||
}
|
||||
var sub []struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
}
|
||||
if err := json.Unmarshal(b.Content, &sub); err == nil {
|
||||
for _, s := range sub {
|
||||
if s.Type == "text" {
|
||||
sb.WriteString("[tool_result] ")
|
||||
sb.WriteString(s.Text)
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return strings.TrimRight(sb.String(), "\n")
|
||||
}
|
||||
|
||||
// extractAssistantTurn pulls text + the first tool name (if any) from
|
||||
// an assistant content-block array. Multi-tool turns lose the second
|
||||
// name; the goal is signal for classification, not perfect fidelity.
|
||||
func extractAssistantTurn(raw json.RawMessage) (string, string) {
|
||||
if len(raw) == 0 {
|
||||
return "", ""
|
||||
}
|
||||
var msg struct {
|
||||
Content json.RawMessage `json:"content"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &msg); err != nil {
|
||||
return "", ""
|
||||
}
|
||||
var blocks []struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
Name string `json:"name"`
|
||||
Tool json.RawMessage `json:"input"`
|
||||
}
|
||||
if err := json.Unmarshal(msg.Content, &blocks); err != nil {
|
||||
return "", ""
|
||||
}
|
||||
var sb strings.Builder
|
||||
var firstTool string
|
||||
for _, b := range blocks {
|
||||
switch b.Type {
|
||||
case "text":
|
||||
sb.WriteString(b.Text)
|
||||
sb.WriteByte('\n')
|
||||
case "tool_use":
|
||||
if firstTool == "" {
|
||||
firstTool = b.Name
|
||||
}
|
||||
sb.WriteString("[tool_use:")
|
||||
sb.WriteString(b.Name)
|
||||
sb.WriteString("]\n")
|
||||
}
|
||||
}
|
||||
return strings.TrimRight(sb.String(), "\n"), firstTool
|
||||
}
|
||||
|
||||
// extractAttachmentText pulls text content from an attachment payload,
|
||||
// or returns a short tag when the attachment is a hook event.
|
||||
func extractAttachmentText(raw json.RawMessage) string {
|
||||
if len(raw) == 0 {
|
||||
return ""
|
||||
}
|
||||
var a struct {
|
||||
Type string `json:"type"`
|
||||
HookName string `json:"hookName"`
|
||||
HookEvent string `json:"hookEvent"`
|
||||
Content string `json:"content"`
|
||||
Text string `json:"text"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &a); err != nil {
|
||||
return ""
|
||||
}
|
||||
if a.Content != "" {
|
||||
return a.Content
|
||||
}
|
||||
if a.Text != "" {
|
||||
return a.Text
|
||||
}
|
||||
if a.HookName != "" {
|
||||
return "[hook " + a.HookEvent + ":" + a.HookName + "]"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
157
ingestion/internal/claudewatcher/parser_test.go
Normal file
157
ingestion/internal/claudewatcher/parser_test.go
Normal file
@@ -0,0 +1,157 @@
|
||||
package claudewatcher
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func collect(t *testing.T, body string) ([]Turn, int64, error) {
|
||||
t.Helper()
|
||||
var out []Turn
|
||||
end, err := ParseStream(strings.NewReader(body), 0, nil, func(tr Turn) error {
|
||||
out = append(out, tr)
|
||||
return nil
|
||||
})
|
||||
return out, end, err
|
||||
}
|
||||
|
||||
func TestParseStream_UserTurnStringContent(t *testing.T) {
|
||||
body := `{"type":"user","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":"hello world"}
|
||||
`
|
||||
turns, end, err := collect(t, body)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 1)
|
||||
assert.Equal(t, "user", turns[0].Type)
|
||||
assert.Equal(t, "S", turns[0].SessionID)
|
||||
assert.Equal(t, "hello world", turns[0].Content)
|
||||
assert.False(t, turns[0].Skip)
|
||||
assert.Equal(t, int64(len(body)), end)
|
||||
}
|
||||
|
||||
func TestParseStream_UserTurnContentBlocks(t *testing.T) {
|
||||
body := `{"type":"user","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":{"role":"user","content":[{"type":"text","text":"line 1"},{"type":"text","text":"line 2"}]}}
|
||||
`
|
||||
turns, _, err := collect(t, body)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 1)
|
||||
assert.Equal(t, "line 1\nline 2", turns[0].Content)
|
||||
}
|
||||
|
||||
func TestParseStream_AssistantToolUse(t *testing.T) {
|
||||
body := `{"type":"assistant","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":{"content":[{"type":"text","text":"calling now"},{"type":"tool_use","name":"Edit","input":{}}]}}
|
||||
`
|
||||
turns, _, err := collect(t, body)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 1)
|
||||
assert.Equal(t, "Edit", turns[0].ToolName)
|
||||
assert.Contains(t, turns[0].Content, "calling now")
|
||||
assert.Contains(t, turns[0].Content, "[tool_use:Edit]")
|
||||
}
|
||||
|
||||
func TestParseStream_AssistantToolResult(t *testing.T) {
|
||||
body := `{"type":"user","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","message":{"content":[{"type":"tool_result","content":"output of cmd"}]}}
|
||||
`
|
||||
turns, _, err := collect(t, body)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 1)
|
||||
assert.Contains(t, turns[0].Content, "[tool_result] output of cmd")
|
||||
}
|
||||
|
||||
func TestParseStream_SkipsBookkeepingTypes(t *testing.T) {
|
||||
body := strings.Join([]string{
|
||||
`{"type":"queue-operation","sessionId":"S","content":"x"}`,
|
||||
`{"type":"last-prompt","sessionId":"S","lastPrompt":"y"}`,
|
||||
`{"type":"permission-mode","sessionId":"S","permissionMode":"auto"}`,
|
||||
`{"type":"ai-title","sessionId":"S","aiTitle":"My session"}`,
|
||||
`{"type":"file-history-snapshot","messageId":"abc"}`,
|
||||
}, "\n") + "\n"
|
||||
turns, _, err := collect(t, body)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 5)
|
||||
for _, tr := range turns {
|
||||
assert.True(t, tr.Skip, "expected Skip=true for %q", tr.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseStream_UnknownTypeIsSkip(t *testing.T) {
|
||||
body := `{"type":"future-thing","sessionId":"S"}` + "\n"
|
||||
turns, _, err := collect(t, body)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 1)
|
||||
assert.True(t, turns[0].Skip)
|
||||
}
|
||||
|
||||
func TestParseStream_MalformedLineIsSkippedNotFatal(t *testing.T) {
|
||||
body := strings.Join([]string{
|
||||
`{"type":"user","sessionId":"S","message":"first"}`,
|
||||
`{not valid json`,
|
||||
`{"type":"user","sessionId":"S","message":"third"}`,
|
||||
}, "\n") + "\n"
|
||||
var warnings int
|
||||
var turns []Turn
|
||||
_, err := ParseStream(strings.NewReader(body), 0, func(format string, args ...any) {
|
||||
warnings++
|
||||
}, func(tr Turn) error {
|
||||
turns = append(turns, tr)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 2, "first + third should make it through")
|
||||
assert.Equal(t, 1, warnings)
|
||||
}
|
||||
|
||||
func TestParseStream_EmitErrStopHaltsCleanly(t *testing.T) {
|
||||
body := strings.Join([]string{
|
||||
`{"type":"user","sessionId":"S","message":"a"}`,
|
||||
`{"type":"user","sessionId":"S","message":"b"}`,
|
||||
`{"type":"user","sessionId":"S","message":"c"}`,
|
||||
}, "\n") + "\n"
|
||||
count := 0
|
||||
end, err := ParseStream(strings.NewReader(body), 0, nil, func(tr Turn) error {
|
||||
count++
|
||||
if count == 2 {
|
||||
return ErrStop
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, count)
|
||||
assert.Greater(t, end, int64(0))
|
||||
}
|
||||
|
||||
func TestParseStream_EmitOtherErrorPropagates(t *testing.T) {
|
||||
body := `{"type":"user","sessionId":"S","message":"a"}` + "\n"
|
||||
want := errors.New("boom")
|
||||
_, err := ParseStream(strings.NewReader(body), 0, nil, func(tr Turn) error {
|
||||
return want
|
||||
})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "boom")
|
||||
}
|
||||
|
||||
func TestParseStream_AttachmentHookEvent(t *testing.T) {
|
||||
body := `{"type":"attachment","sessionId":"S","timestamp":"2026-05-25T07:00:00Z","attachment":{"type":"hook_success","hookName":"SessionStart:startup","hookEvent":"SessionStart","content":"hook body"}}
|
||||
`
|
||||
turns, _, err := collect(t, body)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, turns, 1)
|
||||
assert.Equal(t, "hook body", turns[0].Content)
|
||||
}
|
||||
|
||||
func TestParseStream_OffsetAdvances(t *testing.T) {
|
||||
body := `{"type":"user","sessionId":"S","message":"a"}` + "\n" +
|
||||
`{"type":"user","sessionId":"S","message":"b"}` + "\n"
|
||||
var offsets []int64
|
||||
_, err := ParseStream(strings.NewReader(body), 100, nil, func(tr Turn) error {
|
||||
offsets = append(offsets, tr.OffsetAfter)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, offsets, 2)
|
||||
assert.Greater(t, offsets[0], int64(100))
|
||||
assert.Greater(t, offsets[1], offsets[0])
|
||||
}
|
||||
62
ingestion/internal/claudewatcher/scrubber.go
Normal file
62
ingestion/internal/claudewatcher/scrubber.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package claudewatcher
|
||||
|
||||
import "regexp"
|
||||
|
||||
// Scrubber drops any turn whose content matches a known-bad pattern.
|
||||
// Fail-closed by design: we'd rather lose signal than ingest credentials
|
||||
// into a public-readable brain. The caller logs the drop reason.
|
||||
//
|
||||
// Rules cover the credential shapes most common to leak through Claude
|
||||
// Code sessions: bearer tokens, postgres URIs with embedded auth, OAuth
|
||||
// secret values, SOPS-encrypted secret blobs (we don't want the
|
||||
// ciphertext either — it's a marker that the original message contained
|
||||
// secret state), PEM-encoded private keys, and the explicit env-var
|
||||
// naming conventions used in the homelab.
|
||||
//
|
||||
// Pattern philosophy: match by shape, not by content. A 40-char hex
|
||||
// string in isolation is fine; the same string after `Authorization:
|
||||
// Bearer ` is not. Tuned to catch known leak vectors from prior
|
||||
// secret-hygiene incidents (POSTGRES_PASSWORD via kubectl exec env,
|
||||
// INFRA_MCP_TOKEN via sops -d output) without dropping every Edit on a
|
||||
// config file.
|
||||
|
||||
// Rule is a single named regex with a redact hint shown in the warn log.
|
||||
type Rule struct {
|
||||
Name string
|
||||
RE *regexp.Regexp
|
||||
}
|
||||
|
||||
// DefaultRules is the regex set applied by Scrub. Mutable for tests but
|
||||
// callers should treat it as read-only at runtime.
|
||||
var DefaultRules = []Rule{
|
||||
// authorization-header is checked before the bare bearer rule so
|
||||
// contextual hits ("Authorization: Bearer X") report the more
|
||||
// specific match name in logs.
|
||||
{Name: "authorization-header", RE: regexp.MustCompile(`(?i)Authorization\s*:\s*[A-Za-z]+\s+\S{8,}`)},
|
||||
{Name: "bearer-token", RE: regexp.MustCompile(`(?i)Bearer\s+[A-Za-z0-9._\-]{16,}`)},
|
||||
{Name: "postgres-uri-with-password", RE: regexp.MustCompile(`postgres(?:ql)?://[^:\s/]+:[^@\s/]+@`)},
|
||||
{Name: "private-key", RE: regexp.MustCompile(`-----BEGIN[^-]*PRIVATE KEY-----`)},
|
||||
{Name: "ssh-key", RE: regexp.MustCompile(`ssh-(?:rsa|ed25519|ecdsa)\s+[A-Za-z0-9+/=]{40,}`)},
|
||||
{Name: "github-pat", RE: regexp.MustCompile(`\b(?:ghp|gho|ghu|ghr|gha)_[A-Za-z0-9]{30,}\b`)},
|
||||
{Name: "openai-sk", RE: regexp.MustCompile(`\bsk-(?:proj-)?[A-Za-z0-9]{32,}\b`)},
|
||||
{Name: "anthropic-sk", RE: regexp.MustCompile(`\bsk-ant-[A-Za-z0-9_\-]{32,}\b`)},
|
||||
{Name: "aws-access-key", RE: regexp.MustCompile(`\bAKIA[0-9A-Z]{16}\b`)},
|
||||
{Name: "homelab-env-token", RE: regexp.MustCompile(`(?i)(?:_TOKEN|_PASSWORD|_API_KEY|_SECRET)\s*[:=]\s*['"]?[A-Za-z0-9._/+\-]{12,}`)},
|
||||
{Name: "sops-encrypted-marker", RE: regexp.MustCompile(`ENC\[AES256_GCM,data:[A-Za-z0-9+/=]{8,}`)},
|
||||
}
|
||||
|
||||
// Scrub reports the first matching rule, or empty when content is clean.
|
||||
// Empty string is treated as clean. Caller decides what to do on a hit;
|
||||
// the convention in claudewatcher is to drop the turn entirely and emit
|
||||
// a slog.Warn naming the rule.
|
||||
func Scrub(content string) string {
|
||||
if content == "" {
|
||||
return ""
|
||||
}
|
||||
for _, r := range DefaultRules {
|
||||
if r.RE.MatchString(content) {
|
||||
return r.Name
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
57
ingestion/internal/claudewatcher/scrubber_test.go
Normal file
57
ingestion/internal/claudewatcher/scrubber_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package claudewatcher
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestScrub_PoisonedFixtures(t *testing.T) {
|
||||
// One representative bad-string per rule. If a rule fires for the
|
||||
// wrong content shape later, this table localises the regression.
|
||||
cases := []struct {
|
||||
name string
|
||||
content string
|
||||
want string
|
||||
}{
|
||||
{"bearer-token", "curl -H 'Authorization: Bearer abcdef1234567890ghijklmnop'", "authorization-header"},
|
||||
{"bearer-no-header", "header = Bearer eyJhbGciOiJIUzI1NiJ9.payload.sig", "bearer-token"},
|
||||
{"postgres-uri", "DATABASE_URL=postgres://user:s3cret@10.0.1.20:5432/brain", "postgres-uri-with-password"},
|
||||
{"private-key", "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAA", "private-key"},
|
||||
{"ssh-public", "deploy: ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIK1234567890abcdefghij user@host", "ssh-key"},
|
||||
{"github-pat-classic", "GH_TOKEN=ghp_aBcD1234EfGh5678IjKl9012MnOp3456QrSt", "github-pat"},
|
||||
{"openai-key", "OPENAI_API_KEY=sk-proj-AAAABBBBCCCCDDDDEEEEFFFFGGGGHHHHIIII", "openai-sk"},
|
||||
{"anthropic-key", "ANTHROPIC_API_KEY=sk-ant-api03-aaaaBBBBccccDDDDeeeeFFFFggggHHHHiiiiJJJJkkkk", "anthropic-sk"},
|
||||
{"aws-access-key", "AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE", "aws-access-key"},
|
||||
{"homelab-env", "POSTGRES_PASSWORD=hunter2supersecretvalue", "homelab-env-token"},
|
||||
{"sops-marker", "value: ENC[AES256_GCM,data:abc123def456,iv:zzz]", "sops-encrypted-marker"},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := Scrub(tc.content)
|
||||
assert.Equal(t, tc.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrub_CleanContentPassesThrough(t *testing.T) {
|
||||
cases := []string{
|
||||
"",
|
||||
"plain text with no credentials",
|
||||
"a 40 char hex string aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa is fine in isolation",
|
||||
"`Bearer` token mentioned in docs without an actual value",
|
||||
"file at ~/.ssh/id_ed25519",
|
||||
"the function Authorization() takes no args",
|
||||
"comment: see API key in 1Password",
|
||||
}
|
||||
for _, c := range cases {
|
||||
assert.Empty(t, Scrub(c), "expected clean for %q", c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrub_FirstMatchWins(t *testing.T) {
|
||||
// Content matching multiple rules: report the first rule order in
|
||||
// DefaultRules. Stability matters for log triage.
|
||||
content := "Authorization: Bearer ghp_aBcD1234EfGh5678IjKl9012MnOp3456QrSt"
|
||||
assert.Equal(t, "authorization-header", Scrub(content))
|
||||
}
|
||||
234
ingestion/internal/claudewatcher/watcher.go
Normal file
234
ingestion/internal/claudewatcher/watcher.go
Normal file
@@ -0,0 +1,234 @@
|
||||
package claudewatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Sink consumes batches of ingest-ready turns from the watcher. The
|
||||
// production implementation builds wiki pages and calls pipeline.RunRaw
|
||||
// against the brain. Tests substitute a counter.
|
||||
//
|
||||
// A Batch represents the turns ingested from one session file between
|
||||
// two cursor checkpoints. Implementations must be idempotent — the
|
||||
// watcher only advances the cursor on a nil return.
|
||||
type Sink interface {
|
||||
Ingest(ctx context.Context, b Batch) error
|
||||
}
|
||||
|
||||
// Batch is a per-file slice of turns plus identifying metadata.
|
||||
type Batch struct {
|
||||
Host string // origin host, e.g. "koala"
|
||||
FilePath string // absolute path to the source .jsonl file
|
||||
SessionID string // first session_id seen in the batch
|
||||
ProjectID string // basename of the parent dir, e.g. "-home-mathias-dev"
|
||||
Turns []Turn // never empty; caller filters Skip + scrubber matches
|
||||
}
|
||||
|
||||
// Config drives one Watch loop. SessionsDir is the absolute path to the
|
||||
// Claude Code projects directory (~/.claude/projects). Host is the
|
||||
// label written into cursors and ingested page frontmatter. Interval
|
||||
// is the poll cadence; a zero or negative value disables the loop.
|
||||
//
|
||||
// Sink is required. Cursors is optional — when nil the watcher
|
||||
// re-reads from byte 0 on every tick (useful for first-run testing
|
||||
// without a postgres dependency).
|
||||
type Config struct {
|
||||
SessionsDir string
|
||||
Host string
|
||||
Interval time.Duration
|
||||
Sink Sink
|
||||
Cursors *CursorStore
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
// Watch runs the polling loop until ctx is cancelled. Returns ctx.Err()
|
||||
// on shutdown. Each tick walks SessionsDir for *.jsonl files, advances
|
||||
// each file's cursor, and emits one Batch per file with new turns.
|
||||
// Errors during a single file's parse or ingest are logged but do not
|
||||
// abort the loop — a single bad file shouldn't block the others.
|
||||
func Watch(ctx context.Context, cfg Config) error {
|
||||
if cfg.SessionsDir == "" {
|
||||
return fmt.Errorf("sessions dir is required")
|
||||
}
|
||||
if cfg.Sink == nil {
|
||||
return fmt.Errorf("sink is required")
|
||||
}
|
||||
if cfg.Interval <= 0 {
|
||||
return fmt.Errorf("interval must be positive")
|
||||
}
|
||||
if cfg.Host == "" {
|
||||
cfg.Host = "unknown"
|
||||
}
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = slog.Default()
|
||||
}
|
||||
cfg.Logger.Info("claudewatcher: started",
|
||||
"sessions_dir", cfg.SessionsDir,
|
||||
"host", cfg.Host,
|
||||
"interval", cfg.Interval)
|
||||
|
||||
ticker := time.NewTicker(cfg.Interval)
|
||||
defer ticker.Stop()
|
||||
// Run an immediate first sweep so first-launch users don't wait one
|
||||
// tick before anything happens.
|
||||
runTick(ctx, cfg)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
runTick(ctx, cfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runTick is one polling pass. Exposed (lowercase) for tests via
|
||||
// TickOnce.
|
||||
func runTick(ctx context.Context, cfg Config) {
|
||||
files, err := listSessionFiles(cfg.SessionsDir)
|
||||
if err != nil {
|
||||
cfg.Logger.Warn("claudewatcher: list session files", "err", err)
|
||||
return
|
||||
}
|
||||
for _, f := range files {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if err := processFile(ctx, cfg, f); err != nil {
|
||||
cfg.Logger.Warn("claudewatcher: file failed",
|
||||
"path", f, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TickOnce runs one sweep synchronously and returns. Used by tests +
|
||||
// by ad-hoc CLI invocations.
|
||||
func TickOnce(ctx context.Context, cfg Config) error {
|
||||
if cfg.SessionsDir == "" || cfg.Sink == nil {
|
||||
return fmt.Errorf("config invalid")
|
||||
}
|
||||
if cfg.Host == "" {
|
||||
cfg.Host = "unknown"
|
||||
}
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = slog.Default()
|
||||
}
|
||||
runTick(ctx, cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func listSessionFiles(root string) ([]string, error) {
|
||||
var out []string
|
||||
err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
if !strings.HasSuffix(path, ".jsonl") {
|
||||
return nil
|
||||
}
|
||||
out = append(out, path)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("walk %s: %w", root, err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func processFile(ctx context.Context, cfg Config, path string) error {
|
||||
startOffset := int64(0)
|
||||
if cfg.Cursors != nil {
|
||||
off, _, err := cfg.Cursors.GetOffset(ctx, cfg.Host, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get cursor: %w", err)
|
||||
}
|
||||
startOffset = off
|
||||
}
|
||||
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat: %w", err)
|
||||
}
|
||||
if stat.Size() <= startOffset {
|
||||
return nil // nothing new
|
||||
}
|
||||
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open: %w", err)
|
||||
}
|
||||
defer func() { _ = f.Close() }()
|
||||
if _, err := f.Seek(startOffset, 0); err != nil {
|
||||
return fmt.Errorf("seek: %w", err)
|
||||
}
|
||||
|
||||
var keep []Turn
|
||||
var sessionID string
|
||||
var droppedScrub int
|
||||
endOffset, err := ParseStream(f, startOffset,
|
||||
func(format string, args ...any) {
|
||||
cfg.Logger.Warn(fmt.Sprintf("claudewatcher: parse: "+format, args...))
|
||||
},
|
||||
func(t Turn) error {
|
||||
if t.Skip || t.Content == "" {
|
||||
return nil
|
||||
}
|
||||
if rule := Scrub(t.Content); rule != "" {
|
||||
droppedScrub++
|
||||
cfg.Logger.Warn("claudewatcher: turn dropped by scrubber",
|
||||
"rule", rule, "path", path, "session_id", t.SessionID)
|
||||
return nil
|
||||
}
|
||||
if sessionID == "" {
|
||||
sessionID = t.SessionID
|
||||
}
|
||||
keep = append(keep, t)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse stream: %w", err)
|
||||
}
|
||||
|
||||
if len(keep) == 0 {
|
||||
if cfg.Cursors != nil {
|
||||
if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil {
|
||||
return fmt.Errorf("advance cursor (no-turns): %w", err)
|
||||
}
|
||||
}
|
||||
if droppedScrub > 0 {
|
||||
cfg.Logger.Info("claudewatcher: only scrubbed turns this tick",
|
||||
"path", path, "dropped", droppedScrub)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
batch := Batch{
|
||||
Host: cfg.Host,
|
||||
FilePath: path,
|
||||
SessionID: sessionID,
|
||||
ProjectID: filepath.Base(filepath.Dir(path)),
|
||||
Turns: keep,
|
||||
}
|
||||
if err := cfg.Sink.Ingest(ctx, batch); err != nil {
|
||||
return fmt.Errorf("sink ingest: %w", err)
|
||||
}
|
||||
if cfg.Cursors != nil {
|
||||
if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil {
|
||||
return fmt.Errorf("advance cursor: %w", err)
|
||||
}
|
||||
}
|
||||
cfg.Logger.Info("claudewatcher: ingested batch",
|
||||
"path", path, "session_id", sessionID,
|
||||
"turns_kept", len(keep), "dropped_scrub", droppedScrub,
|
||||
"new_offset", endOffset)
|
||||
return nil
|
||||
}
|
||||
174
ingestion/internal/claudewatcher/watcher_test.go
Normal file
174
ingestion/internal/claudewatcher/watcher_test.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package claudewatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// memSink captures batches without touching postgres. Thread-safe so
|
||||
// TickOnce can run from any goroutine in concurrent tests.
|
||||
type memSink struct {
|
||||
mu sync.Mutex
|
||||
batches []Batch
|
||||
failOn string // file basename to error on
|
||||
}
|
||||
|
||||
func (m *memSink) Ingest(_ context.Context, b Batch) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.failOn != "" && strings.Contains(b.FilePath, m.failOn) {
|
||||
return assert.AnError
|
||||
}
|
||||
m.batches = append(m.batches, b)
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeSession(t *testing.T, dir, sessionID string, lines []string) string {
|
||||
t.Helper()
|
||||
path := filepath.Join(dir, sessionID+".jsonl")
|
||||
body := strings.Join(lines, "\n") + "\n"
|
||||
require.NoError(t, os.WriteFile(path, []byte(body), 0o644))
|
||||
return path
|
||||
}
|
||||
|
||||
func TestTickOnce_NoCursorReingestsEverythingEveryTick(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
projectDir := filepath.Join(tmp, "-home-mathias-dev")
|
||||
require.NoError(t, os.MkdirAll(projectDir, 0o755))
|
||||
writeSession(t, projectDir, "sess1", []string{
|
||||
`{"type":"user","sessionId":"sess1","message":"first prompt"}`,
|
||||
`{"type":"assistant","sessionId":"sess1","message":{"content":[{"type":"text","text":"first answer"}]}}`,
|
||||
})
|
||||
|
||||
sink := &memSink{}
|
||||
cfg := Config{
|
||||
SessionsDir: tmp,
|
||||
Host: "koala",
|
||||
Sink: sink,
|
||||
}
|
||||
require.NoError(t, TickOnce(context.Background(), cfg))
|
||||
require.NoError(t, TickOnce(context.Background(), cfg))
|
||||
|
||||
require.Len(t, sink.batches, 2, "no cursor => re-emits same batch every tick")
|
||||
assert.Equal(t, "sess1", sink.batches[0].SessionID)
|
||||
assert.Equal(t, "koala", sink.batches[0].Host)
|
||||
assert.Equal(t, "-home-mathias-dev", sink.batches[0].ProjectID)
|
||||
assert.Len(t, sink.batches[0].Turns, 2)
|
||||
}
|
||||
|
||||
func TestTickOnce_FiltersSkipTurnsAndScrubberMatches(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
proj := filepath.Join(tmp, "-home-mathias-dev")
|
||||
require.NoError(t, os.MkdirAll(proj, 0o755))
|
||||
writeSession(t, proj, "sess-scrub", []string{
|
||||
`{"type":"queue-operation","sessionId":"sess-scrub","content":"x"}`, // Skip
|
||||
`{"type":"user","sessionId":"sess-scrub","message":"normal prompt"}`,
|
||||
`{"type":"assistant","sessionId":"sess-scrub","message":{"content":[{"type":"text","text":"value POSTGRES_PASSWORD=hunter2supersecretvalue"}]}}`, // scrubbed
|
||||
})
|
||||
sink := &memSink{}
|
||||
require.NoError(t, TickOnce(context.Background(), Config{
|
||||
SessionsDir: tmp, Host: "koala", Sink: sink,
|
||||
}))
|
||||
require.Len(t, sink.batches, 1)
|
||||
turns := sink.batches[0].Turns
|
||||
require.Len(t, turns, 1, "skip + scrubbed turns must not reach the sink")
|
||||
assert.Equal(t, "user", turns[0].Type)
|
||||
}
|
||||
|
||||
func TestTickOnce_AllScrubbedNoBatchEmitted(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
proj := filepath.Join(tmp, "-home-mathias-dev")
|
||||
require.NoError(t, os.MkdirAll(proj, 0o755))
|
||||
writeSession(t, proj, "all-bad", []string{
|
||||
`{"type":"user","sessionId":"all-bad","message":"Authorization: Bearer abcdef1234567890ghijklmnop"}`,
|
||||
})
|
||||
sink := &memSink{}
|
||||
require.NoError(t, TickOnce(context.Background(), Config{
|
||||
SessionsDir: tmp, Host: "koala", Sink: sink,
|
||||
}))
|
||||
assert.Empty(t, sink.batches, "no usable turns => no batch")
|
||||
}
|
||||
|
||||
func TestTickOnce_IgnoresNonJsonlFiles(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
proj := filepath.Join(tmp, "-home-mathias-dev")
|
||||
require.NoError(t, os.MkdirAll(proj, 0o755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(proj, "README.md"), []byte("ignore me"), 0o644))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(proj, "config.json"), []byte("{}"), 0o644))
|
||||
sink := &memSink{}
|
||||
require.NoError(t, TickOnce(context.Background(), Config{
|
||||
SessionsDir: tmp, Host: "koala", Sink: sink,
|
||||
}))
|
||||
assert.Empty(t, sink.batches)
|
||||
}
|
||||
|
||||
func TestTickOnce_HandlesMultipleProjectsAndSessions(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
projA := filepath.Join(tmp, "-home-mathias-dev")
|
||||
projB := filepath.Join(tmp, "-home-mathias-AI-infra")
|
||||
require.NoError(t, os.MkdirAll(projA, 0o755))
|
||||
require.NoError(t, os.MkdirAll(projB, 0o755))
|
||||
writeSession(t, projA, "a1", []string{`{"type":"user","sessionId":"a1","message":"q1"}`})
|
||||
writeSession(t, projA, "a2", []string{`{"type":"user","sessionId":"a2","message":"q2"}`})
|
||||
writeSession(t, projB, "b1", []string{`{"type":"user","sessionId":"b1","message":"q3"}`})
|
||||
|
||||
sink := &memSink{}
|
||||
require.NoError(t, TickOnce(context.Background(), Config{
|
||||
SessionsDir: tmp, Host: "koala", Sink: sink,
|
||||
}))
|
||||
require.Len(t, sink.batches, 3)
|
||||
|
||||
projects := map[string]int{}
|
||||
for _, b := range sink.batches {
|
||||
projects[b.ProjectID]++
|
||||
}
|
||||
assert.Equal(t, 2, projects["-home-mathias-dev"])
|
||||
assert.Equal(t, 1, projects["-home-mathias-AI-infra"])
|
||||
}
|
||||
|
||||
func TestTickOnce_SinkErrorDoesNotKillOtherFiles(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
proj := filepath.Join(tmp, "-home-mathias-dev")
|
||||
require.NoError(t, os.MkdirAll(proj, 0o755))
|
||||
writeSession(t, proj, "good", []string{`{"type":"user","sessionId":"good","message":"q"}`})
|
||||
writeSession(t, proj, "bad-session", []string{`{"type":"user","sessionId":"bad-session","message":"q"}`})
|
||||
|
||||
sink := &memSink{failOn: "bad-session"}
|
||||
require.NoError(t, TickOnce(context.Background(), Config{
|
||||
SessionsDir: tmp, Host: "koala", Sink: sink,
|
||||
}))
|
||||
require.Len(t, sink.batches, 1, "good session still ingested")
|
||||
assert.Equal(t, "good", sink.batches[0].SessionID)
|
||||
}
|
||||
|
||||
func TestWatch_RespectsContextCancel(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(tmp, "-home-mathias-dev"), 0o755))
|
||||
sink := &memSink{}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- Watch(ctx, Config{
|
||||
SessionsDir: tmp,
|
||||
Host: "koala",
|
||||
Interval: 10 * time.Millisecond,
|
||||
Sink: sink,
|
||||
})
|
||||
}()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cancel()
|
||||
select {
|
||||
case err := <-done:
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Watch did not return after cancel")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user