package claudewatcher import ( "context" "errors" "fmt" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // CursorStore tracks how far the watcher has ingested into each // session JSONL file. Keyed by (host, file_path) so the same `~/.claude` // path on different hosts doesn't collide and resumability survives // pod restarts. Idempotent Init lives alongside the rest of the // claudewatcher schema; no separate migration framework. type CursorStore struct { pool *pgxpool.Pool } // NewCursorStore opens a pool against dsn. Caller closes the store. func NewCursorStore(ctx context.Context, dsn string) (*CursorStore, error) { pool, err := pgxpool.New(ctx, dsn) if err != nil { return nil, fmt.Errorf("pgxpool: %w", err) } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, fmt.Errorf("ping: %w", err) } return &CursorStore{pool: pool}, nil } // NewCursorStoreFromPool wraps an existing pool (so the watcher can // share the brain DSN pool with vectorstore/graphstore without a // second connection set). Caller must NOT close the wrapped pool via // the store — close the pool directly. func NewCursorStoreFromPool(pool *pgxpool.Pool) *CursorStore { return &CursorStore{pool: pool} } // Close releases the underlying connection pool when this store owns // it. No-op when the pool was injected via NewCursorStoreFromPool — // pgxpool.Close is idempotent so we lean on that. func (s *CursorStore) Close() { if s.pool != nil { s.pool.Close() } } // Init creates the claude_session_cursors table when missing. func (s *CursorStore) Init(ctx context.Context) error { const ddl = ` CREATE TABLE IF NOT EXISTS claude_session_cursors ( host TEXT NOT NULL, file_path TEXT NOT NULL, byte_offset BIGINT NOT NULL DEFAULT 0, last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (host, file_path) ); CREATE INDEX IF NOT EXISTS claude_session_cursors_host_idx ON claude_session_cursors (host); ` _, err := s.pool.Exec(ctx, ddl) return err } // GetOffset returns the last recorded byte offset for (host, filePath). // Missing rows are reported as offset=0, ok=false so the caller can // distinguish "never ingested" from "ingested at the start of the // file" (both produce identical behaviour but the metric is useful). func (s *CursorStore) GetOffset(ctx context.Context, host, filePath string) (int64, bool, error) { if host == "" || filePath == "" { return 0, false, errors.New("host and file_path are required") } var offset int64 err := s.pool.QueryRow(ctx, ` SELECT byte_offset FROM claude_session_cursors WHERE host = $1 AND file_path = $2 `, host, filePath).Scan(&offset) if errors.Is(err, pgx.ErrNoRows) { return 0, false, nil } if err != nil { return 0, false, fmt.Errorf("query: %w", err) } return offset, true, nil } // SetOffset writes the new offset for (host, filePath). Used after // every successful parse + ingest batch so a crash mid-file rewinds // only to the last committed checkpoint. func (s *CursorStore) SetOffset(ctx context.Context, host, filePath string, offset int64) error { if host == "" || filePath == "" { return errors.New("host and file_path are required") } if offset < 0 { return errors.New("offset must be >= 0") } _, err := s.pool.Exec(ctx, ` INSERT INTO claude_session_cursors (host, file_path, byte_offset, last_seen_at) VALUES ($1, $2, $3, now()) ON CONFLICT (host, file_path) DO UPDATE SET byte_offset = EXCLUDED.byte_offset, last_seen_at = now() `, host, filePath, offset) if err != nil { return fmt.Errorf("upsert offset: %w", err) } return nil }