From 63c8d114e83ef116d4f0f0bbe38e602091d63180 Mon Sep 17 00:00:00 2001 From: Mathias Bergqvist Date: Fri, 1 May 2026 09:47:34 +0200 Subject: [PATCH] feat(ingestion): add session package for JSONL log persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Copy of internal/session from the supervisor module — the ingestion service needs it for the upcoming session_log MCP tool. The supervisor copy will be removed in the supervisor-retirement plan; until then the two packages are intentionally identical and pinned (no edits). Co-Authored-By: Claude Opus 4.7 (1M context) --- ingestion/internal/session/session.go | 98 ++++++++++++++++++++++ ingestion/internal/session/session_test.go | 50 +++++++++++ 2 files changed, 148 insertions(+) create mode 100644 ingestion/internal/session/session.go create mode 100644 ingestion/internal/session/session_test.go diff --git a/ingestion/internal/session/session.go b/ingestion/internal/session/session.go new file mode 100644 index 0000000..161e5f0 --- /dev/null +++ b/ingestion/internal/session/session.go @@ -0,0 +1,98 @@ +// ingestion/internal/session/session.go +package session + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "time" +) + +// Entry is one skill invocation record, appended to the session JSONL log. +type Entry struct { + SessionID string `json:"session_id"` + Timestamp time.Time `json:"timestamp"` + Skill string `json:"skill"` + Phase string `json:"phase,omitempty"` + ProjectRoot string `json:"project_root,omitempty"` + Input json.RawMessage `json:"input,omitempty"` + Attempts []Attempt `json:"attempts,omitempty"` + FinalStatus string `json:"final_status"` + FilePath string `json:"file_path,omitempty"` + ModelUsed string `json:"model_used,omitempty"` + DurationMs int64 `json:"duration_ms,omitempty"` + Message string `json:"message,omitempty"` +} + +// Attempt represents one subprocess invocation within a skill call. +type Attempt struct { + Attempt int `json:"attempt"` + Model string `json:"model"` + Tier string `json:"tier"` // local | subagent | managed + DurationMs int64 `json:"duration_ms"` + WarmStart bool `json:"warm_start"` // model already loaded in llama-swap + Verified bool `json:"verified"` + Verdict string `json:"verdict,omitempty"` // accept | escalate | error + Feedback string `json:"feedback,omitempty"` // verifier feedback on escalation + OutputSummary string `json:"output_summary,omitempty"` + RunnerOutput string `json:"runner_output,omitempty"` +} + +// Append writes entry as a single JSON line to sessionsDir/{sessionID}.jsonl. +func Append(sessionsDir, sessionID string, entry Entry) error { + if err := os.MkdirAll(sessionsDir, 0o755); err != nil { + return fmt.Errorf("create sessions dir: %w", err) + } + path := filepath.Join(sessionsDir, sessionID+".jsonl") + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("open session log: %w", err) + } + + line, err := json.Marshal(entry) + if err != nil { + _ = f.Close() + return fmt.Errorf("marshal entry: %w", err) + } + if _, err = fmt.Fprintf(f, "%s\n", line); err != nil { + _ = f.Close() + return fmt.Errorf("write entry: %w", err) + } + if err = f.Close(); err != nil { + return fmt.Errorf("close session log: %w", err) + } + return nil +} + +// Read returns all entries for sessionID. Returns empty slice if no log exists. +func Read(sessionsDir, sessionID string) ([]Entry, error) { + path := filepath.Join(sessionsDir, sessionID+".jsonl") + f, err := os.Open(path) + if errors.Is(err, fs.ErrNotExist) { + return []Entry{}, nil + } + if err != nil { + return nil, fmt.Errorf("open session log: %w", err) + } + defer f.Close() //nolint:errcheck + + var entries []Entry + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 256*1024), 1<<20) // up to 1 MB per line + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var e Entry + if err := json.Unmarshal(line, &e); err != nil { + return nil, fmt.Errorf("parse entry: %w", err) + } + entries = append(entries, e) + } + return entries, scanner.Err() +} diff --git a/ingestion/internal/session/session_test.go b/ingestion/internal/session/session_test.go new file mode 100644 index 0000000..3d9456a --- /dev/null +++ b/ingestion/internal/session/session_test.go @@ -0,0 +1,50 @@ +package session_test + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/mathiasbq/hyperguild/ingestion/internal/session" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAppendAndRead(t *testing.T) { + dir := t.TempDir() + sid := "test-session" + + e1 := session.Entry{ + SessionID: sid, + Timestamp: time.Now().UTC().Truncate(time.Second), + Skill: "tdd", + Phase: "red", + FinalStatus: "ok", + } + e2 := session.Entry{ + SessionID: sid, + Timestamp: time.Now().UTC().Truncate(time.Second), + Skill: "tdd", + Phase: "green", + FinalStatus: "ok", + } + + require.NoError(t, session.Append(dir, sid, e1)) + require.NoError(t, session.Append(dir, sid, e2)) + + got, err := session.Read(dir, sid) + require.NoError(t, err) + require.Len(t, got, 2) + assert.Equal(t, "red", got[0].Phase) + assert.Equal(t, "green", got[1].Phase) + + _, statErr := os.Stat(filepath.Join(dir, sid+".jsonl")) + require.NoError(t, statErr, "session file should exist on disk") +} + +func TestReadMissingReturnsEmpty(t *testing.T) { + got, err := session.Read(t.TempDir(), "nope") + require.NoError(t, err) + assert.Empty(t, got) +}