Files
hyperguild/docs/superpowers/plans/2026-04-22-brain-ingestion-pipeline.md
2026-04-22 22:14:59 +02:00

2609 lines
71 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Brain Ingestion Pipeline Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add a structured LLM-powered ingestion pipeline to the hyperguild brain: two new HTTP endpoints (`/ingest`, `/ingest-path`) and a background file watcher on `brain/raw/`, producing concepts/entities/sources wiki pages.
**Architecture:** Raw content (passed directly or read from files) is chunked, sent to an OpenAI-compatible LLM with the brain schema and existing wiki inventory as context, and the returned JSON array of `{path, content}` objects is merged and written to `brain/wiki/`. A background goroutine polls `brain/raw/` and processes dropped files automatically. The supervisor's `brain_ingest` skill connects to the ingestion server's `/ingest` endpoint.
**Tech Stack:** Go stdlib, `testify` (already in go.mod), OpenAI-compatible chat completions API via LiteLLM.
---
## File Map
### New files — `ingestion` module
| File | Responsibility |
|---|---|
| `ingestion/internal/wiki/types.go` | `Page` struct, `PageType` constants |
| `ingestion/internal/wiki/slug.go` | `Slug(title) string` |
| `ingestion/internal/wiki/slug_test.go` | Slug edge case tests |
| `ingestion/internal/wiki/merge.go` | `Merge(a, b Page) Page`, section parsing |
| `ingestion/internal/wiki/merge_test.go` | Merge strategy tests |
| `ingestion/internal/wiki/inventory.go` | `LoadInventory(brainDir) (map[PageType][]Entry, error)` |
| `ingestion/internal/wiki/inventory_test.go` | Inventory loading tests |
| `ingestion/internal/wiki/index.go` | `RebuildIndex(brainDir, date string) error` |
| `ingestion/internal/wiki/log.go` | `AppendLog(brainDir, source string, pages, warnings []string, date string) error` |
| `ingestion/internal/wiki/index_test.go` | Index + log tests |
| `ingestion/internal/llm/client.go` | `Client`, `Complete(ctx, system, user string) (string, error)` |
| `ingestion/internal/llm/client_test.go` | Mock server tests |
| `ingestion/internal/pipeline/chunk.go` | `Chunk(content string, maxSize int) []string` |
| `ingestion/internal/pipeline/chunk_test.go` | Chunking tests |
| `ingestion/internal/pipeline/parse.go` | `ParsePages(output string) ([]wiki.Page, []string)` |
| `ingestion/internal/pipeline/parse_test.go` | Parse + truncation recovery tests |
| `ingestion/internal/pipeline/prompt.go` | `BuildPrompt(...)` |
| `ingestion/internal/pipeline/pipeline.go` | `Config`, `Run(...)` |
| `ingestion/internal/pipeline/pipeline_test.go` | Integration test with mock LLM |
| `ingestion/internal/watcher/watcher.go` | `Config`, `Watcher`, `Start`, `processFile` |
| `ingestion/internal/watcher/watcher_test.go` | File move behaviour tests |
| `brain/CLAUDE.md` | Schema injected into every ingest prompt |
### Modified files
| File | Change |
|---|---|
| `ingestion/internal/api/handler.go` | Add `Ingest`, `IngestPath` methods; update `NewHandler` signature |
| `ingestion/internal/api/handler_test.go` | Update `NewHandler` call; add ingest handler tests |
| `ingestion/cmd/server/main.go` | New env vars, wire pipeline config, start watcher |
| `internal/config/config.go` | Add `IngestSvcURL`, `KBRetrievalURL` fields |
| `cmd/supervisor/main.go` | Pass new fields to `brain.New()` |
---
## Task 1: Supervisor config wiring
Wire `IngestSvcURL` and `KBRetrievalURL` into the supervisor so `brain_ingest` and `brain_search` become active when configured.
**Files:**
- Modify: `internal/config/config.go`
- Modify: `cmd/supervisor/main.go:106-108`
- [ ] **Step 1: Add fields to Config**
In `internal/config/config.go`, add two fields and their env-var loading:
```go
type Config struct {
Port string
LiteLLMBaseURL string
LiteLLMAPIKey string
ConfigDir string
ModelsFile string
IngestBaseURL string
IngestSvcURL string // INGEST_SVC_URL — brain_ingest endpoint
KBRetrievalURL string // KB_RETRIEVAL_URL — brain_search endpoint
SessionsDir string
BrainDir string
}
func Load() (Config, error) {
cfg := Config{
Port: envOr("SUPERVISOR_PORT", "3200"),
LiteLLMBaseURL: envOr("LITELLM_BASE_URL", "http://iguana:4000"),
LiteLLMAPIKey: os.Getenv("LITELLM_API_KEY"),
ConfigDir: envOr("SUPERVISOR_CONFIG_DIR", "./config/supervisor"),
}
cfg.ModelsFile = envOr("SUPERVISOR_MODELS_FILE", cfg.ConfigDir+"/../models.yaml")
cfg.IngestBaseURL = envOr("INGEST_BASE_URL", "http://localhost:3300")
cfg.IngestSvcURL = os.Getenv("INGEST_SVC_URL")
cfg.KBRetrievalURL = os.Getenv("KB_RETRIEVAL_URL")
cfg.SessionsDir = envOr("SUPERVISOR_SESSIONS_DIR", "./brain/sessions")
cfg.BrainDir = envOr("SUPERVISOR_BRAIN_DIR", "./brain")
return cfg, nil
}
```
- [ ] **Step 2: Pass to brain.New() in main.go**
Find the `brain.New(...)` call in `cmd/supervisor/main.go` (around line 106) and update it:
```go
reg.Register(brain.New(brain.Config{
IngestBaseURL: cfg.IngestBaseURL,
IngestSvcURL: cfg.IngestSvcURL,
KBRetrievalURL: cfg.KBRetrievalURL,
}))
```
- [ ] **Step 3: Build and verify**
```bash
cd /Users/mathias/dev/AI/hyperguild && go build ./...
```
Expected: no output (clean build).
- [ ] **Step 4: Commit**
```bash
git add internal/config/config.go cmd/supervisor/main.go
git commit -m "feat(config): add IngestSvcURL and KBRetrievalURL to supervisor config"
```
---
## Task 2: brain/CLAUDE.md schema document
Create the schema document injected into every ingest prompt. This teaches the LLM the wiki structure and rules.
**Files:**
- Create: `brain/CLAUDE.md`
- [ ] **Step 1: Write the schema**
Create `brain/CLAUDE.md`:
```markdown
# Brain Wiki Schema
This document defines the three page types in the brain wiki.
The LLM must follow this schema exactly when generating wiki pages.
## Wikilink Format
All cross-references use `[[slug|Display Text]]`.
Rules:
- slug = lowercase filename without .md, spaces → hyphens, strip all non-alphanumeric except hyphens
- The `|` separator is REQUIRED — never use `[[Title]]` without a slug
- Examples: `[[domain-driven-design|Domain Driven Design]]`, `[[ryan-singer|Ryan Singer]]`
- Slugs must resolve to an existing file in the inventory, or a file you are creating in this response
Slug generation examples:
- "Domain Driven Design" → `domain-driven-design`
- "It's Complicated" → `its-complicated`
- "gRPC" → `grpc`
- "GPT-4o" → `gpt-4o`
## Domains
Use one of: `ai-llm`, `software-engineering`, `product-strategy`, `finance-markets`,
`personal`, `consulting`, `climate`, `infrastructure`, `security`
---
## Source Pages — wiki/sources/<slug>.md
One page per ingested source. Books are NEVER split across multiple source pages — update the existing one.
Required frontmatter:
```yaml
title: <exact title>
type: article | pdf | book | video | note | project
domain: <domain>
date_ingested: YYYY-MM-DD
last_updated: YYYY-MM-DD
aliases:
- <exact title>
```
Body sections (in this order):
### Summary
23 sentences. Core argument or finding.
### Key Claims
Bulleted list. Paraphrase — no verbatim quotes or code.
### Concepts Introduced or Reinforced
Wikilinks to wiki/concepts/ ONLY. One per line.
### Entities Mentioned
Wikilinks to wiki/entities/ ONLY. One per line.
### Open Questions Raised
Gaps or follow-up questions from this source.
For books only, also add:
### Chapters
One bullet per chapter with 12 sentence summary.
### Argument Arc
Overall narrative as it becomes clear across chapters.
### Updates
Dated entries appended on re-ingestion. NEVER rewrite — only append.
---
## Concept Pages — wiki/concepts/<slug>.md
One page per idea, framework, methodology, or pattern.
Required frontmatter:
```yaml
title: <concept name>
domain: <domain>
last_updated: YYYY-MM-DD
aliases:
- <exact title>
```
Body sections (in this order):
### Definition
One-paragraph plain-language explanation.
### Why It Matters
Practical significance. Why should anyone care?
### Related Concepts
Wikilinks to wiki/concepts/ ONLY.
### Related Entities
Wikilinks to wiki/entities/ ONLY.
### Sources
Wikilinks to wiki/sources/ ONLY.
### Evolving Notes
Updated as new sources arrive. Append, do not rewrite.
---
## Entity Pages — wiki/entities/<slug>.md
One page per person, tool, organisation, technology, or product.
Required frontmatter:
```yaml
title: <name>
type: person | company | tool | model | framework | technology
domain: <domain>
last_updated: YYYY-MM-DD
aliases:
- <exact title>
```
Body sections (in this order):
### Description
One-line description.
### Relevance
Why this entity matters to this knowledge base.
### Key Positions, Products, or Claims
With dates where known.
### Related Concepts
Wikilinks to wiki/concepts/ ONLY.
### Related Entities
Wikilinks to wiki/entities/ ONLY.
### Sources
Wikilinks to wiki/sources/ ONLY.
---
## Non-Negotiable Rules
1. Output ONLY a valid JSON array — no markdown fences, no prose before or after
2. Each element: `{"path": "wiki/<type>/<slug>.md", "content": "...full markdown..."}`
3. Slugs are kebab-case: lowercase, spaces→hyphens, strip special characters
4. Every wikilink must be `[[slug|Display Text]]` — the pipe separator is required
5. Dates always YYYY-MM-DD
6. Never reproduce verbatim code — describe the pattern or technique
7. Section links must match their section type (Related Concepts → concepts/ only, etc.)
8. One source page per book — if inventory shows it exists, include it as an UPDATE
```
- [ ] **Step 2: Commit**
```bash
git add brain/CLAUDE.md
git commit -m "docs(brain): add wiki schema document for ingest prompt"
```
---
## Task 3: wiki/slug — slug generation
**Files:**
- Create: `ingestion/internal/wiki/types.go`
- Create: `ingestion/internal/wiki/slug.go`
- Create: `ingestion/internal/wiki/slug_test.go`
- [ ] **Step 1: Write types.go**
```go
// ingestion/internal/wiki/types.go
package wiki
// PageType identifies the wiki subdirectory for a page.
type PageType string
const (
PageTypeConcept PageType = "concepts"
PageTypeEntity PageType = "entities"
PageTypeSource PageType = "sources"
)
// Page is a wiki page to be written to disk.
type Page struct {
Path string // relative to brainDir, e.g. "wiki/sources/foo.md"
Content string // full markdown including YAML frontmatter
}
// Entry is a summary of an existing wiki page used to build the inventory.
type Entry struct {
Slug string
Title string
Type PageType
}
```
- [ ] **Step 2: Write the failing test**
```go
// ingestion/internal/wiki/slug_test.go
package wiki
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSlug(t *testing.T) {
tests := []struct {
input string
want string
}{
{"Domain Driven Design", "domain-driven-design"},
{"It's Complicated", "its-complicated"},
{"gRPC", "grpc"},
{"GPT-4o", "gpt-4o"},
{"Property 1: It's Rough", "property-1-its-rough"},
{" leading spaces ", "leading-spaces"},
{"multiple spaces", "multiple-spaces"},
{"already-kebab", "already-kebab"},
}
for _, tc := range tests {
t.Run(tc.input, func(t *testing.T) {
assert.Equal(t, tc.want, Slug(tc.input))
})
}
}
```
- [ ] **Step 3: Run test to verify it fails**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... 2>&1
```
Expected: `undefined: Slug`
- [ ] **Step 4: Write slug.go**
```go
// ingestion/internal/wiki/slug.go
package wiki
import (
"strings"
"unicode"
)
// Slug converts a title to a kebab-case slug suitable for wiki filenames.
// Rules: lowercase, spaces/hyphens/underscores → hyphens, strip everything else.
func Slug(title string) string {
var b strings.Builder
prevHyphen := true // start true to trim leading hyphens
for _, r := range strings.ToLower(title) {
switch {
case r == ' ' || r == '-' || r == '_':
if !prevHyphen {
b.WriteRune('-')
prevHyphen = true
}
case unicode.IsLetter(r) || unicode.IsDigit(r):
b.WriteRune(r)
prevHyphen = false
// all other characters (apostrophes, colons, dots, etc.) are dropped
}
}
return strings.TrimRight(b.String(), "-")
}
```
- [ ] **Step 5: Run test to verify it passes**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... -run TestSlug -v
```
Expected: all cases PASS.
- [ ] **Step 6: Commit**
```bash
git add ingestion/internal/wiki/
git commit -m "feat(ingestion): add wiki package with Page types and slug generation"
```
---
## Task 4: wiki/merge — page merge logic
**Files:**
- Create: `ingestion/internal/wiki/merge.go`
- Create: `ingestion/internal/wiki/merge_test.go`
- [ ] **Step 1: Write the failing tests**
```go
// ingestion/internal/wiki/merge_test.go
package wiki
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMerge_BulletSectionsUnion(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Related Concepts\n\n- [[bar|Bar]]\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Related Concepts\n\n- [[bar|Bar]]\n- [[baz|Baz]]\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "[[bar|Bar]]")
assert.Contains(t, got.Content, "[[baz|Baz]]")
// bar appears only once
assert.Equal(t, 1, strings.Count(got.Content, "[[bar|Bar]]"))
}
func TestMerge_AppendSections(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Evolving Notes\n\nFirst note.\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Evolving Notes\n\nSecond note.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "First note.")
assert.Contains(t, got.Content, "Second note.")
}
func TestMerge_KeepFirstForOtherSections(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nFirst definition.\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nSecond definition.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "First definition.")
assert.NotContains(t, got.Content, "Second definition.")
}
func TestMerge_NewSectionFromB(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nA thing.\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Why It Matters\n\nBecause reasons.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "A thing.")
assert.Contains(t, got.Content, "Because reasons.")
}
func TestMerge_KeepsFrontmatterFromA(t *testing.T) {
a := Page{Path: "p.md", Content: "---\ntitle: A\nlast_updated: 2026-01-01\n---\n\n## Definition\n\nA.\n"}
b := Page{Path: "p.md", Content: "---\ntitle: B\nlast_updated: 2026-06-01\n---\n\n## Definition\n\nB.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "title: A")
assert.NotContains(t, got.Content, "title: B")
}
```
Add `"strings"` import to the test file.
- [ ] **Step 2: Run test to verify it fails**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... -run TestMerge 2>&1
```
Expected: `undefined: Merge`
- [ ] **Step 3: Write merge.go**
```go
// ingestion/internal/wiki/merge.go
package wiki
import (
"fmt"
"strings"
)
var bulletSections = map[string]bool{
"Related Concepts": true,
"Related Entities": true,
"Sources": true,
"Key Claims": true,
"Entities Mentioned": true,
"Concepts Introduced or Reinforced": true,
"Chapters": true,
}
var appendSections = map[string]bool{
"Evolving Notes": true,
"Updates": true,
"Open Questions Raised": true,
"Open Questions": true,
}
type section struct {
heading string
content string
}
// Merge combines two Page values with the same path.
// Frontmatter is taken from a. Sections are merged by strategy.
func Merge(a, b Page) Page {
fmA, secsA := parseSections(a.Content)
_, secsB := parseSections(b.Content)
// Build heading → index map for A's sections.
idx := make(map[string]int, len(secsA))
for i, s := range secsA {
idx[s.heading] = i
}
for _, sB := range secsB {
i, exists := idx[sB.heading]
if !exists {
// New section not in A — append it.
idx[sB.heading] = len(secsA)
secsA = append(secsA, sB)
continue
}
sA := secsA[i]
switch {
case bulletSections[sB.heading]:
secsA[i].content = mergeBullets(sA.content, sB.content)
case appendSections[sB.heading]:
secsA[i].content = strings.TrimRight(sA.content, "\n") + "\n\n" + strings.TrimLeft(sB.content, "\n")
// default: keep first — do nothing
}
}
return Page{Path: a.Path, Content: rebuildContent(fmA, secsA)}
}
func parseSections(markdown string) (frontmatter string, sections []section) {
lines := strings.Split(markdown, "\n")
i := 0
// Parse YAML frontmatter between --- markers.
if i < len(lines) && strings.TrimSpace(lines[i]) == "---" {
i++
var fmLines []string
for i < len(lines) {
if strings.TrimSpace(lines[i]) == "---" {
i++
break
}
fmLines = append(fmLines, lines[i])
i++
}
frontmatter = fmt.Sprintf("---\n%s\n---\n", strings.Join(fmLines, "\n"))
}
// Parse ## sections.
var cur *section
for ; i < len(lines); i++ {
line := lines[i]
if strings.HasPrefix(line, "## ") {
if cur != nil {
sections = append(sections, *cur)
}
cur = &section{heading: strings.TrimPrefix(line, "## ")}
} else if cur != nil {
cur.content += line + "\n"
}
}
if cur != nil {
sections = append(sections, *cur)
}
return
}
func rebuildContent(frontmatter string, sections []section) string {
var sb strings.Builder
sb.WriteString(frontmatter)
for _, sec := range sections {
fmt.Fprintf(&sb, "\n## %s\n\n%s", sec.heading, sec.content)
}
return sb.String()
}
func mergeBullets(a, b string) string {
seen := make(map[string]bool)
var lines []string
for _, line := range strings.Split(a+b, "\n") {
trimmed := strings.TrimSpace(line)
if trimmed == "" || seen[trimmed] {
continue
}
seen[trimmed] = true
lines = append(lines, line)
}
return strings.Join(lines, "\n") + "\n"
}
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... -v
```
Expected: all PASS.
- [ ] **Step 5: Commit**
```bash
git add ingestion/internal/wiki/merge.go ingestion/internal/wiki/merge_test.go
git commit -m "feat(ingestion): add wiki page merge logic"
```
---
## Task 5: wiki/inventory — load existing wiki pages
Reads `brain/wiki/` and builds a grouped slug index for injection into the LLM prompt.
**Files:**
- Create: `ingestion/internal/wiki/inventory.go`
- Create: `ingestion/internal/wiki/inventory_test.go`
- [ ] **Step 1: Write the failing test**
```go
// ingestion/internal/wiki/inventory_test.go
package wiki
import (
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLoadInventory(t *testing.T) {
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "concepts"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "entities"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "sources"), 0o755))
require.NoError(t, os.WriteFile(
filepath.Join(dir, "wiki", "concepts", "domain-driven-design.md"),
[]byte("---\ntitle: Domain Driven Design\n---\n\n## Definition\n\nA thing.\n"),
0o644,
))
require.NoError(t, os.WriteFile(
filepath.Join(dir, "wiki", "entities", "ryan-singer.md"),
[]byte("---\ntitle: Ryan Singer\n---\n\n## Description\n\nDesigner.\n"),
0o644,
))
inv, err := LoadInventory(dir)
require.NoError(t, err)
assert.Len(t, inv[PageTypeConcept], 1)
assert.Equal(t, "domain-driven-design", inv[PageTypeConcept][0].Slug)
assert.Equal(t, "Domain Driven Design", inv[PageTypeConcept][0].Title)
assert.Len(t, inv[PageTypeEntity], 1)
assert.Equal(t, "ryan-singer", inv[PageTypeEntity][0].Slug)
assert.Empty(t, inv[PageTypeSource])
}
```
- [ ] **Step 2: Run test to verify it fails**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... -run TestLoadInventory 2>&1
```
Expected: `undefined: LoadInventory`
- [ ] **Step 3: Write inventory.go**
```go
// ingestion/internal/wiki/inventory.go
package wiki
import (
"bufio"
"os"
"path/filepath"
"strings"
)
// LoadInventory walks brain/wiki/ and returns all pages grouped by type.
func LoadInventory(brainDir string) (map[PageType][]Entry, error) {
result := map[PageType][]Entry{
PageTypeConcept: {},
PageTypeEntity: {},
PageTypeSource: {},
}
for pt := range result {
dir := filepath.Join(brainDir, "wiki", string(pt))
entries, err := os.ReadDir(dir)
if os.IsNotExist(err) {
continue
}
if err != nil {
return nil, err
}
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".md") {
continue
}
slug := strings.TrimSuffix(e.Name(), ".md")
path := filepath.Join(dir, e.Name())
title := readTitle(path, slug)
result[pt] = append(result[pt], Entry{Slug: slug, Title: title, Type: pt})
}
}
return result, nil
}
// readTitle extracts the title from YAML frontmatter, falling back to slug.
func readTitle(path, fallback string) string {
f, err := os.Open(path)
if err != nil {
return fallback
}
defer f.Close()
scanner := bufio.NewScanner(f)
inFM := false
for scanner.Scan() {
line := scanner.Text()
if strings.TrimSpace(line) == "---" {
if !inFM {
inFM = true
continue
}
break
}
if inFM {
key, val, ok := strings.Cut(line, ":")
if ok && strings.TrimSpace(key) == "title" {
return strings.Trim(strings.TrimSpace(val), `"'`)
}
}
}
return fallback
}
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... -v
```
Expected: all PASS.
- [ ] **Step 5: Commit**
```bash
git add ingestion/internal/wiki/inventory.go ingestion/internal/wiki/inventory_test.go
git commit -m "feat(ingestion): add wiki inventory loader"
```
---
## Task 6: wiki/index + wiki/log — index rebuilder and audit log
**Files:**
- Create: `ingestion/internal/wiki/index.go`
- Create: `ingestion/internal/wiki/log.go`
- Create: `ingestion/internal/wiki/index_test.go`
- [ ] **Step 1: Write failing tests**
```go
// ingestion/internal/wiki/index_test.go
package wiki
import (
"os"
"path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func setupWikiDir(t *testing.T) string {
t.Helper()
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "concepts"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "entities"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "sources"), 0o755))
require.NoError(t, os.WriteFile(
filepath.Join(dir, "wiki", "concepts", "tdd.md"),
[]byte("---\ntitle: TDD\n---\n\n## Definition\n\nTest-driven development is a discipline.\n"),
0o644,
))
return dir
}
func TestRebuildIndex(t *testing.T) {
dir := setupWikiDir(t)
require.NoError(t, RebuildIndex(dir, "2026-04-22"))
content, err := os.ReadFile(filepath.Join(dir, "wiki", "index.md"))
require.NoError(t, err)
s := string(content)
assert.Contains(t, s, "# Wiki Index")
assert.Contains(t, s, "2026-04-22")
assert.Contains(t, s, "[[tdd|TDD]]")
assert.Contains(t, s, "## Concepts")
}
func TestAppendLog(t *testing.T) {
dir := t.TempDir()
require.NoError(t, AppendLog(dir, "shape-up-book",
[]string{"wiki/sources/shape-up.md", "wiki/concepts/betting-table.md"},
nil, "2026-04-22"))
content, err := os.ReadFile(filepath.Join(dir, "log.md"))
require.NoError(t, err)
s := string(content)
assert.Contains(t, s, "shape-up-book")
assert.Contains(t, s, "wiki/sources/shape-up.md")
assert.True(t, strings.HasPrefix(s, "## 2026-04-22"))
}
func TestAppendLog_AppendsOnSecondCall(t *testing.T) {
dir := t.TempDir()
require.NoError(t, AppendLog(dir, "source-a", []string{"wiki/sources/a.md"}, nil, "2026-04-22"))
require.NoError(t, AppendLog(dir, "source-b", []string{"wiki/sources/b.md"}, nil, "2026-04-22"))
content, err := os.ReadFile(filepath.Join(dir, "log.md"))
require.NoError(t, err)
assert.Contains(t, string(content), "source-a")
assert.Contains(t, string(content), "source-b")
}
```
- [ ] **Step 2: Run test to verify it fails**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... -run "TestRebuildIndex|TestAppendLog" 2>&1
```
Expected: `undefined: RebuildIndex`
- [ ] **Step 3: Write index.go**
```go
// ingestion/internal/wiki/index.go
package wiki
import (
"fmt"
"os"
"path/filepath"
"strings"
)
// RebuildIndex writes brain/wiki/index.md from the current wiki contents.
func RebuildIndex(brainDir, date string) error {
inv, err := LoadInventory(brainDir)
if err != nil {
return fmt.Errorf("load inventory: %w", err)
}
total := len(inv[PageTypeConcept]) + len(inv[PageTypeEntity]) + len(inv[PageTypeSource])
var sb strings.Builder
fmt.Fprintf(&sb, "# Wiki Index\n\n")
fmt.Fprintf(&sb, "_Updated: %s — %d pages (%d concepts, %d entities, %d sources)_\n\n",
date, total,
len(inv[PageTypeConcept]),
len(inv[PageTypeEntity]),
len(inv[PageTypeSource]))
for _, pt := range []PageType{PageTypeConcept, PageTypeEntity, PageTypeSource} {
entries := inv[pt]
if len(entries) == 0 {
continue
}
fmt.Fprintf(&sb, "## %s\n\n", strings.Title(string(pt)))
for _, e := range entries {
summary := pageFirstSentence(brainDir, e)
fmt.Fprintf(&sb, "- [[%s|%s]] — %s\n", e.Slug, e.Title, summary)
}
sb.WriteString("\n")
}
dest := filepath.Join(brainDir, "wiki", "index.md")
return os.WriteFile(dest, []byte(sb.String()), 0o644)
}
func pageFirstSentence(brainDir string, e Entry) string {
path := filepath.Join(brainDir, "wiki", string(e.Type), e.Slug+".md")
content, err := os.ReadFile(path)
if err != nil {
return ""
}
// Skip frontmatter, find first non-empty line after a ## heading.
parts := strings.SplitN(string(content), "---", 3)
body := string(content)
if len(parts) == 3 {
body = parts[2]
}
for _, line := range strings.Split(body, "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
if len(line) > 100 {
return line[:100] + "…"
}
return line
}
return ""
}
```
- [ ] **Step 4: Write log.go**
```go
// ingestion/internal/wiki/log.go
package wiki
import (
"fmt"
"os"
"path/filepath"
"strings"
)
// AppendLog appends one ingestion record to brain/log.md.
func AppendLog(brainDir, source string, pages, warnings []string, date string) error {
var sb strings.Builder
fmt.Fprintf(&sb, "## %s — ingest\n\n", date)
fmt.Fprintf(&sb, "- **Source:** %s\n", source)
if len(pages) > 0 {
sb.WriteString("- **Pages written:**\n")
for _, p := range pages {
fmt.Fprintf(&sb, " - %s\n", p)
}
}
if len(warnings) > 0 {
sb.WriteString("- **Warnings:**\n")
for _, w := range warnings {
fmt.Fprintf(&sb, " - %s\n", w)
}
}
sb.WriteString("\n")
logPath := filepath.Join(brainDir, "log.md")
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open log: %w", err)
}
defer f.Close()
_, err = f.WriteString(sb.String())
return err
}
```
- [ ] **Step 5: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/wiki/... -v
```
Expected: all PASS. (Note: `strings.Title` may show a deprecation warning — it is fine for this use case.)
- [ ] **Step 6: Commit**
```bash
git add ingestion/internal/wiki/index.go ingestion/internal/wiki/log.go ingestion/internal/wiki/index_test.go
git commit -m "feat(ingestion): add wiki index rebuilder and audit log"
```
---
## Task 7: llm/client — OpenAI-compatible HTTP client
**Files:**
- Create: `ingestion/internal/llm/client.go`
- Create: `ingestion/internal/llm/client_test.go`
- [ ] **Step 1: Write the failing test**
```go
// ingestion/internal/llm/client_test.go
package llm
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func mockLLMServer(t *testing.T, response string) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/chat/completions", r.URL.Path)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{
{"message": map[string]any{"role": "assistant", "content": response}},
},
})
}))
}
func TestClient_Complete(t *testing.T) {
srv := mockLLMServer(t, "hello world")
defer srv.Close()
c := New(srv.URL, "", "test-model", 10*time.Second)
got, err := c.Complete(context.Background(), "you are helpful", "say hello")
require.NoError(t, err)
assert.Equal(t, "hello world", got)
}
func TestClient_ReturnsErrorOnNon200(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "overloaded", http.StatusServiceUnavailable)
}))
defer srv.Close()
c := New(srv.URL, "", "test-model", 10*time.Second)
_, err := c.Complete(context.Background(), "sys", "user")
assert.Error(t, err)
}
func TestClient_SendsAuthHeader(t *testing.T) {
var gotAuth string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotAuth = r.Header.Get("Authorization")
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{{"message": map[string]any{"content": "ok"}}},
})
}))
defer srv.Close()
c := New(srv.URL, "my-key", "test-model", 10*time.Second)
_, err := c.Complete(context.Background(), "sys", "user")
require.NoError(t, err)
assert.Equal(t, "Bearer my-key", gotAuth)
}
```
- [ ] **Step 2: Run test to verify it fails**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/llm/... 2>&1
```
Expected: `undefined: New`
- [ ] **Step 3: Write client.go**
```go
// ingestion/internal/llm/client.go
package llm
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// Client calls an OpenAI-compatible chat completions endpoint.
type Client struct {
baseURL string
apiKey string
model string
httpClient *http.Client
}
// New constructs a Client.
func New(baseURL, apiKey, model string, timeout time.Duration) *Client {
return &Client{
baseURL: strings.TrimRight(baseURL, "/"),
apiKey: apiKey,
model: model,
httpClient: &http.Client{Timeout: timeout},
}
}
type chatRequest struct {
Model string `json:"model"`
Messages []message `json:"messages"`
Temperature float64 `json:"temperature"`
}
type message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type chatResponse struct {
Choices []struct {
Message message `json:"message"`
} `json:"choices"`
}
// Complete sends a system + user message and returns the assistant's reply.
func (c *Client) Complete(ctx context.Context, system, user string) (string, error) {
body := chatRequest{
Model: c.model,
Messages: []message{
{Role: "system", Content: system},
{Role: "user", Content: user},
},
Temperature: 0.2,
}
b, err := json.Marshal(body)
if err != nil {
return "", fmt.Errorf("marshal request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/chat/completions", bytes.NewReader(b))
if err != nil {
return "", fmt.Errorf("build request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if c.apiKey != "" {
req.Header.Set("Authorization", "Bearer "+c.apiKey)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("call LLM: %w", err)
}
defer resp.Body.Close()
out, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("LLM returned %d: %s", resp.StatusCode, out)
}
var cr chatResponse
if err := json.Unmarshal(out, &cr); err != nil {
return "", fmt.Errorf("parse response: %w", err)
}
if len(cr.Choices) == 0 {
return "", fmt.Errorf("LLM returned no choices")
}
return cr.Choices[0].Message.Content, nil
}
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/llm/... -v
```
Expected: all PASS.
- [ ] **Step 5: Commit**
```bash
git add ingestion/internal/llm/
git commit -m "feat(ingestion): add OpenAI-compatible LLM HTTP client"
```
---
## Task 8: pipeline/chunk + pipeline/parse
**Files:**
- Create: `ingestion/internal/pipeline/chunk.go`
- Create: `ingestion/internal/pipeline/chunk_test.go`
- Create: `ingestion/internal/pipeline/parse.go`
- Create: `ingestion/internal/pipeline/parse_test.go`
- [ ] **Step 1: Write failing tests**
```go
// ingestion/internal/pipeline/chunk_test.go
package pipeline
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestChunk_NoChunkingWhenZero(t *testing.T) {
content := strings.Repeat("word ", 1000)
chunks := Chunk(content, 0)
assert.Len(t, chunks, 1)
assert.Equal(t, strings.TrimSpace(content), chunks[0])
}
func TestChunk_SplitsAtParagraph(t *testing.T) {
// Two paragraphs; each ~30 chars. maxSize = 40 should split them.
content := "First paragraph here.\n\nSecond paragraph here."
chunks := Chunk(content, 40)
assert.Len(t, chunks, 2)
assert.Equal(t, "First paragraph here.", chunks[0])
assert.Equal(t, "Second paragraph here.", chunks[1])
}
func TestChunk_SingleLargeParagraph(t *testing.T) {
// A single paragraph larger than maxSize stays as one chunk.
content := strings.Repeat("x", 100)
chunks := Chunk(content, 50)
assert.Len(t, chunks, 1)
}
```
```go
// ingestion/internal/pipeline/parse_test.go
package pipeline
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
func TestParsePages_ValidJSON(t *testing.T) {
input := `[{"path":"wiki/sources/foo.md","content":"# Foo"},{"path":"wiki/concepts/bar.md","content":"# Bar"}]`
pages, warnings := ParsePages(input)
require.Len(t, pages, 2)
assert.Empty(t, warnings)
assert.Equal(t, "wiki/sources/foo.md", pages[0].Path)
}
func TestParsePages_StripsFences(t *testing.T) {
input := "```json\n[{\"path\":\"wiki/sources/foo.md\",\"content\":\"# Foo\"}]\n```"
pages, warnings := ParsePages(input)
assert.Len(t, pages, 1)
assert.Empty(t, warnings)
}
func TestParsePages_TruncationRecovery(t *testing.T) {
// Valid first object, second truncated mid-way.
input := `[{"path":"wiki/sources/foo.md","content":"# Foo"},{"path":"wiki/concepts/bar.md","content":"trunc`
pages, warnings := ParsePages(input)
require.Len(t, pages, 1)
assert.Equal(t, "wiki/sources/foo.md", pages[0].Path)
assert.NotEmpty(t, warnings)
}
func TestParsePages_EmptyInput(t *testing.T) {
pages, warnings := ParsePages("")
assert.Empty(t, pages)
assert.NotEmpty(t, warnings)
}
// Ensure wiki.Page is used correctly (compile-time check).
var _ []wiki.Page = nil
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/pipeline/... 2>&1
```
Expected: `undefined: Chunk` and `undefined: ParsePages`
- [ ] **Step 3: Write chunk.go**
```go
// ingestion/internal/pipeline/chunk.go
package pipeline
import "strings"
// Chunk splits content into pieces of at most maxSize bytes, splitting at
// paragraph boundaries (\n\n). If maxSize <= 0, returns content as one chunk.
func Chunk(content string, maxSize int) []string {
content = strings.TrimSpace(content)
if maxSize <= 0 || len(content) <= maxSize {
return []string{content}
}
paragraphs := strings.Split(content, "\n\n")
var chunks []string
var cur strings.Builder
for _, para := range paragraphs {
para = strings.TrimSpace(para)
if para == "" {
continue
}
// If adding this paragraph would exceed maxSize and we already have content,
// flush the current chunk first.
addition := para
if cur.Len() > 0 {
addition = "\n\n" + para
}
if cur.Len() > 0 && cur.Len()+len(addition) > maxSize {
chunks = append(chunks, cur.String())
cur.Reset()
cur.WriteString(para)
} else {
cur.WriteString(addition)
}
}
if cur.Len() > 0 {
chunks = append(chunks, cur.String())
}
return chunks
}
```
- [ ] **Step 4: Write parse.go**
```go
// ingestion/internal/pipeline/parse.go
package pipeline
import (
"encoding/json"
"fmt"
"strings"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
// ParsePages parses LLM output as a JSON array of {path, content} objects.
// If the array is truncated mid-object (token limit), it salvages all complete objects.
func ParsePages(output string) ([]wiki.Page, []string) {
output = strings.TrimSpace(output)
if output == "" {
return nil, []string{"LLM returned empty output"}
}
output = stripFences(output)
var pages []wiki.Page
if err := json.Unmarshal([]byte(output), &pages); err == nil {
return pages, nil
}
// Truncation recovery: find last `}` that closes a complete object.
idx := strings.LastIndex(output, "}")
if idx < 0 {
return nil, []string{"LLM output contained no complete JSON objects"}
}
start := strings.Index(output, "[")
if start < 0 {
return nil, []string{"LLM output contained no JSON array opening bracket"}
}
candidate := output[start:idx+1] + "]"
if err := json.Unmarshal([]byte(candidate), &pages); err != nil {
return nil, []string{fmt.Sprintf("truncation recovery failed: %v", err)}
}
return pages, []string{fmt.Sprintf("LLM output was truncated; recovered %d page(s)", len(pages))}
}
func stripFences(s string) string {
for _, prefix := range []string{"```json\n", "```json\r\n", "```\n", "```\r\n"} {
if strings.HasPrefix(s, prefix) {
s = strings.TrimPrefix(s, prefix)
s = strings.TrimSuffix(strings.TrimSpace(s), "```")
return strings.TrimSpace(s)
}
}
return s
}
```
- [ ] **Step 5: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/pipeline/... -v
```
Expected: all PASS.
- [ ] **Step 6: Commit**
```bash
git add ingestion/internal/pipeline/chunk.go ingestion/internal/pipeline/chunk_test.go \
ingestion/internal/pipeline/parse.go ingestion/internal/pipeline/parse_test.go
git commit -m "feat(ingestion): add content chunking and LLM JSON output parser"
```
---
## Task 9: pipeline/prompt + pipeline/pipeline — prompt builder and orchestrator
**Files:**
- Create: `ingestion/internal/pipeline/prompt.go`
- Create: `ingestion/internal/pipeline/pipeline.go`
- Create: `ingestion/internal/pipeline/pipeline_test.go`
- [ ] **Step 1: Write the failing integration test**
```go
// ingestion/internal/pipeline/pipeline_test.go
package pipeline
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
func TestRun_WritesPages(t *testing.T) {
brainDir := t.TempDir()
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
// Mock LLM returns two pages.
llmResponse := mustJSON([]wiki.Page{
{
Path: "wiki/sources/test-article.md",
Content: "---\ntitle: Test Article\ntype: article\ndomain: software-engineering\ndate_ingested: 2026-04-22\nlast_updated: 2026-04-22\naliases:\n - Test Article\n---\n\n## Summary\n\nA test article.\n\n## Key Claims\n\n- It tests things.\n\n## Concepts Introduced or Reinforced\n\n## Entities Mentioned\n\n## Open Questions Raised\n",
},
{
Path: "wiki/concepts/testing.md",
Content: "---\ntitle: Testing\ndomain: software-engineering\nlast_updated: 2026-04-22\naliases:\n - Testing\n---\n\n## Definition\n\nThe practice of verifying software.\n\n## Why It Matters\n\nCatches bugs.\n\n## Related Concepts\n\n## Related Entities\n\n## Sources\n\n## Evolving Notes\n",
},
})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{
{"message": map[string]any{"role": "assistant", "content": llmResponse}},
},
})
}))
defer srv.Close()
cfg := Config{
Complete: llm.New(srv.URL, "", "test-model", 30*time.Second).Complete,
ChunkSize: 0,
}
result, err := Run(context.Background(), cfg, brainDir, "An article about testing.", "test-article", false)
require.NoError(t, err)
assert.Len(t, result.Pages, 2)
assert.Empty(t, result.Warnings)
// Files must exist on disk.
_, err = os.Stat(filepath.Join(brainDir, "wiki", "sources", "test-article.md"))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(brainDir, "wiki", "concepts", "testing.md"))
require.NoError(t, err)
// Index must be rebuilt.
_, err = os.Stat(filepath.Join(brainDir, "wiki", "index.md"))
require.NoError(t, err)
// Log must be appended.
_, err = os.Stat(filepath.Join(brainDir, "log.md"))
require.NoError(t, err)
}
func TestRun_DryRunDoesNotWrite(t *testing.T) {
brainDir := t.TempDir()
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
llmResponse := mustJSON([]wiki.Page{{
Path: "wiki/sources/foo.md",
Content: "---\ntitle: Foo\n---\n\n## Summary\n\nFoo.\n",
}})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{{"message": map[string]any{"content": llmResponse}}},
})
}))
defer srv.Close()
cfg := Config{Complete: llm.New(srv.URL, "", "m", 30*time.Second).Complete}
result, err := Run(context.Background(), cfg, brainDir, "foo content", "foo", true)
require.NoError(t, err)
assert.Len(t, result.Pages, 1)
// No files written in dry-run.
_, err = os.Stat(filepath.Join(brainDir, "wiki", "sources", "foo.md"))
assert.True(t, os.IsNotExist(err))
}
func mustJSON(v any) string {
b, _ := json.Marshal(v)
return string(b)
}
```
- [ ] **Step 2: Run test to verify it fails**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/pipeline/... -run "TestRun" 2>&1
```
Expected: `undefined: Run` and `undefined: Config`
- [ ] **Step 3: Write prompt.go**
```go
// ingestion/internal/pipeline/prompt.go
package pipeline
import (
"fmt"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
const systemPrompt = `You are a wiki agent. Read the source material and produce structured wiki pages following the schema provided.
Output ONLY a valid JSON array — no markdown fences, no other text before or after.
Each element must have:
"path" — relative path within the wiki, e.g. "wiki/sources/foo.md"
"content" — full markdown content of the page including YAML frontmatter
Follow the schema strictly: correct frontmatter fields, wikilinks as [[slug|Display Text]],
dates in YYYY-MM-DD format, and paraphrase rather than quoting verbatim.`
// BuildPrompt constructs the user prompt for a single chunk.
func BuildPrompt(schema, source, content string, inventory map[wiki.PageType][]wiki.Entry) string {
var sb strings.Builder
fmt.Fprintf(&sb, "Today's date is %s.\n\n", time.Now().UTC().Format("2006-01-02"))
sb.WriteString("## Schema\n\n")
sb.WriteString(schema)
sb.WriteString("\n\n")
sb.WriteString("## Existing wiki pages\n\n")
sb.WriteString("Link ONLY to pages that appear in this inventory or that you are creating in this response.\n\n")
for _, pt := range []wiki.PageType{wiki.PageTypeConcept, wiki.PageTypeEntity, wiki.PageTypeSource} {
entries := inventory[pt]
label := strings.ToTitle(string(pt))
if len(entries) == 0 {
fmt.Fprintf(&sb, "%s — (none yet)\n\n", label)
continue
}
fmt.Fprintf(&sb, "%s — link ONLY under the matching section:\n", label)
for _, e := range entries {
fmt.Fprintf(&sb, " - [[%s|%s]]\n", e.Slug, e.Title)
}
sb.WriteString("\n")
}
sb.WriteString("## Non-negotiable rules\n\n")
sb.WriteString("1. Output ONLY a valid JSON array — no prose, no fences.\n")
sb.WriteString("2. Slugs are kebab-case: lowercase, spaces→hyphens, no special chars.\n")
sb.WriteString("3. Wikilinks: [[slug|Display Text]] — the pipe is required.\n")
sb.WriteString("4. Section links must match their section type.\n")
sb.WriteString("5. One source page per book — update it if inventory shows it exists.\n\n")
fmt.Fprintf(&sb, "## Source: %s\n\n", source)
sb.WriteString(content)
return sb.String()
}
```
- [ ] **Step 4: Write pipeline.go**
```go
// ingestion/internal/pipeline/pipeline.go
package pipeline
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
// CompleteFunc is the function signature for LLM calls.
type CompleteFunc func(ctx context.Context, system, user string) (string, error)
// Config holds pipeline configuration.
type Config struct {
Complete CompleteFunc
ChunkSize int // 0 = no chunking
Schema string // overrides brain/CLAUDE.md when set (used in tests)
}
// Result is the outcome of a pipeline run.
type Result struct {
Pages []string // relative paths written (or would-be written in dry-run)
Warnings []string
}
// Run ingests content and writes structured wiki pages to brainDir/wiki/.
// In dry-run mode, pages are returned but not written to disk.
func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) {
// 1. Load inventory.
inventory, err := wiki.LoadInventory(brainDir)
if err != nil {
return Result{}, fmt.Errorf("load inventory: %w", err)
}
// 2. Load schema.
schema := cfg.Schema
if schema == "" {
schema = loadSchema(brainDir)
}
// 3. Chunk.
chunks := Chunk(content, cfg.ChunkSize)
// 4 & 5. Call LLM per chunk, parse output.
var allPages []wiki.Page
var allWarnings []string
for _, chunk := range chunks {
userPrompt := BuildPrompt(schema, source, chunk, inventory)
output, err := cfg.Complete(ctx, systemPrompt, userPrompt)
if err != nil {
return Result{}, fmt.Errorf("LLM call: %w", err)
}
pages, warnings := ParsePages(output)
allPages = append(allPages, pages...)
allWarnings = append(allWarnings, warnings...)
}
// 6. Merge pages with the same path.
merged := mergeAll(allPages)
// 7. Write (or preview in dry-run).
date := time.Now().UTC().Format("2006-01-02")
var written []string
for _, page := range merged {
if !dryRun {
dest := filepath.Join(brainDir, filepath.FromSlash(page.Path))
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err)
}
if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil {
return Result{}, fmt.Errorf("write %s: %w", page.Path, err)
}
}
written = append(written, page.Path)
}
if !dryRun {
// 8. Rebuild index.
if err := wiki.RebuildIndex(brainDir, date); err != nil {
allWarnings = append(allWarnings, fmt.Sprintf("rebuild index: %v", err))
}
// 9. Append log.
if err := wiki.AppendLog(brainDir, source, written, allWarnings, date); err != nil {
allWarnings = append(allWarnings, fmt.Sprintf("append log: %v", err))
}
}
return Result{Pages: written, Warnings: allWarnings}, nil
}
// mergeAll deduplicates pages by path, merging content from later occurrences.
func mergeAll(pages []wiki.Page) []wiki.Page {
order := make([]string, 0, len(pages))
byPath := make(map[string]wiki.Page, len(pages))
for _, p := range pages {
if _, seen := byPath[p.Path]; !seen {
order = append(order, p.Path)
byPath[p.Path] = p
} else {
byPath[p.Path] = wiki.Merge(byPath[p.Path], p)
}
}
result := make([]wiki.Page, 0, len(order))
for _, path := range order {
result = append(result, byPath[path])
}
return result
}
// defaultSchema is embedded when brain/CLAUDE.md is absent.
const defaultSchema = `# Brain Wiki Schema
See brain/CLAUDE.md for the full schema. Using minimal fallback.
Three page types: wiki/sources/, wiki/concepts/, wiki/entities/.
`
func loadSchema(brainDir string) string {
b, err := os.ReadFile(filepath.Join(brainDir, "CLAUDE.md"))
if err != nil {
return defaultSchema
}
return strings.TrimSpace(string(b))
}
```
- [ ] **Step 5: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/pipeline/... -v
```
Expected: all PASS.
- [ ] **Step 6: Commit**
```bash
git add ingestion/internal/pipeline/
git commit -m "feat(ingestion): add pipeline orchestrator with prompt builder"
```
---
## Task 10: api — /ingest and /ingest-path handlers + main.go wiring
**Files:**
- Modify: `ingestion/internal/api/handler.go`
- Modify: `ingestion/internal/api/handler_test.go`
- Modify: `ingestion/cmd/server/main.go`
- [ ] **Step 1: Write failing handler tests**
Add these test functions to `ingestion/internal/api/handler_test.go`.
First update the `setup` helper to accept `pipeline.Config` — change the existing `setup` function:
```go
func setup(t *testing.T) (string, *api.Handler) {
t.Helper()
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "knowledge"), 0o755))
require.NoError(t, os.WriteFile(
filepath.Join(dir, "knowledge", "tdd.md"),
[]byte("---\ntitle: TDD\ndomain: software\n---\n\nTest-driven development is a discipline.\n"),
0o644,
))
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
return dir, api.NewHandler(dir, logger, pipeline.Config{})
}
```
Then add the new test functions at the bottom of the file:
```go
func TestIngest_RequiresContent(t *testing.T) {
_, h := setup(t)
body, _ := json.Marshal(map[string]any{"source": "test"})
req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.Ingest(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
func TestIngest_RequiresSource(t *testing.T) {
_, h := setup(t)
body, _ := json.Marshal(map[string]any{"content": "something"})
req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.Ingest(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
func TestIngestPath_RequiresPath(t *testing.T) {
_, h := setup(t)
body, _ := json.Marshal(map[string]any{})
req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.IngestPath(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
func TestIngestPath_RejectsNonexistentPath(t *testing.T) {
_, h := setup(t)
body, _ := json.Marshal(map[string]any{"path": "/does/not/exist.md"})
req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.IngestPath(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
```
Also add these imports to `handler_test.go`:
```go
import (
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
)
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/api/... 2>&1
```
Expected: compile error — `NewHandler` wrong number of args, `Ingest`/`IngestPath` undefined.
- [ ] **Step 3: Update handler.go**
Replace the full contents of `ingestion/internal/api/handler.go`:
```go
// ingestion/internal/api/handler.go
package api
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
)
// Handler serves the ingestion HTTP API.
type Handler struct {
brainDir string
logger *slog.Logger
pipeline pipeline.Config
}
// NewHandler constructs a Handler.
func NewHandler(brainDir string, logger *slog.Logger, pc pipeline.Config) *Handler {
return &Handler{brainDir: brainDir, logger: logger, pipeline: pc}
}
// --- /query ---
type queryRequest struct {
Query string `json:"query"`
Limit int `json:"limit,omitempty"`
}
func (h *Handler) Query(w http.ResponseWriter, r *http.Request) {
var req queryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest)
return
}
if strings.TrimSpace(req.Query) == "" {
http.Error(w, "query is required", http.StatusBadRequest)
return
}
if req.Limit == 0 {
req.Limit = 5
}
results, err := search.Query(h.brainDir, req.Query, req.Limit)
if err != nil {
h.logger.Error("query failed", "err", err)
http.Error(w, "search error", http.StatusInternalServerError)
return
}
writeJSON(w, map[string]any{"results": results})
}
// --- /write ---
type writeRequest struct {
Content string `json:"content"`
Filename string `json:"filename,omitempty"`
Type string `json:"type,omitempty"`
Domain string `json:"domain,omitempty"`
}
func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
var req writeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest)
return
}
if req.Content == "" {
http.Error(w, "content is required", http.StatusBadRequest)
return
}
filename := req.Filename
if filename == "" {
filename = fmt.Sprintf("%s-auto.md", time.Now().UTC().Format("2006-01-02-150405"))
}
rawDir := filepath.Join(h.brainDir, "knowledge")
if err := os.MkdirAll(rawDir, 0o755); err != nil {
http.Error(w, "failed to create raw dir", http.StatusInternalServerError)
return
}
finalContent := req.Content
if req.Type != "" || req.Domain != "" {
var fm strings.Builder
fm.WriteString("---\n")
if req.Type != "" {
fmt.Fprintf(&fm, "type: %s\n", req.Type)
}
if req.Domain != "" {
fmt.Fprintf(&fm, "domain: %s\n", req.Domain)
}
fm.WriteString("---\n")
finalContent = fm.String() + req.Content
}
base := filepath.Base(filename)
if !strings.HasSuffix(base, ".md") {
base += ".md"
}
dest := filepath.Join(rawDir, base)
if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil {
h.logger.Error("write failed", "err", err)
http.Error(w, "write error", http.StatusInternalServerError)
return
}
rel, _ := filepath.Rel(h.brainDir, dest)
writeJSON(w, map[string]string{"path": filepath.ToSlash(rel)})
}
// --- /ingest ---
type ingestRequest struct {
Content string `json:"content"`
Source string `json:"source"`
DryRun bool `json:"dry_run,omitempty"`
}
func (h *Handler) Ingest(w http.ResponseWriter, r *http.Request) {
var req ingestRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest)
return
}
if req.Content == "" {
http.Error(w, "content is required", http.StatusBadRequest)
return
}
if req.Source == "" {
http.Error(w, "source is required", http.StatusBadRequest)
return
}
result, err := pipeline.Run(r.Context(), h.pipeline, h.brainDir, req.Content, req.Source, req.DryRun)
if err != nil {
h.logger.Error("ingest failed", "source", req.Source, "err", err)
http.Error(w, "ingest error: "+err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, map[string]any{"pages": result.Pages, "warnings": result.Warnings})
}
// --- /ingest-path ---
type ingestPathRequest struct {
Path string `json:"path"`
Source string `json:"source,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
func (h *Handler) IngestPath(w http.ResponseWriter, r *http.Request) {
var req ingestPathRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest)
return
}
if req.Path == "" {
http.Error(w, "path is required", http.StatusBadRequest)
return
}
info, err := os.Stat(req.Path)
if err != nil {
http.Error(w, "path not found: "+req.Path, http.StatusBadRequest)
return
}
var files []string
if info.IsDir() {
files, err = collectFiles(req.Path)
if err != nil {
h.logger.Error("collect files failed", "path", req.Path, "err", err)
http.Error(w, "failed to read directory", http.StatusInternalServerError)
return
}
} else {
files = []string{req.Path}
}
var allPages []string
var allWarnings []string
for _, f := range files {
content, err := os.ReadFile(f)
if err != nil {
allWarnings = append(allWarnings, fmt.Sprintf("skip %s: %v", f, err))
continue
}
source := req.Source
if source == "" {
source = sourceFromFilename(filepath.Base(f))
}
result, err := pipeline.Run(r.Context(), h.pipeline, h.brainDir, string(content), source, req.DryRun)
if err != nil {
h.logger.Error("ingest-path failed", "file", f, "err", err)
allWarnings = append(allWarnings, fmt.Sprintf("ingest %s: %v", f, err))
continue
}
allPages = append(allPages, result.Pages...)
allWarnings = append(allWarnings, result.Warnings...)
}
writeJSON(w, map[string]any{"pages": allPages, "warnings": allWarnings})
}
var supportedExts = map[string]bool{".md": true, ".txt": true, ".pdf": true}
func collectFiles(dir string) ([]string, error) {
var files []string
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return nil
}
if d.IsDir() {
return nil
}
if supportedExts[strings.ToLower(filepath.Ext(path))] {
files = append(files, path)
}
return nil
})
return files, err
}
func sourceFromFilename(filename string) string {
name := strings.TrimSuffix(filename, filepath.Ext(filename))
words := strings.FieldsFunc(name, func(r rune) bool { return r == '-' || r == '_' })
for i, w := range words {
if len(w) > 0 {
words[i] = strings.ToUpper(w[:1]) + w[1:]
}
}
return strings.Join(words, " ")
}
func writeJSON(w http.ResponseWriter, v any) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(v) //nolint:errcheck
}
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/api/... -v
```
Expected: all PASS.
- [ ] **Step 5: Update main.go**
Replace `ingestion/cmd/server/main.go`:
```go
// ingestion/cmd/server/main.go
package main
import (
"context"
"log/slog"
"net/http"
"os"
"strconv"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/watcher"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
brainDir := envOr("INGEST_BRAIN_DIR", "../brain")
port := envOr("INGEST_PORT", "3300")
llmClient := llm.New(
envOr("INGEST_LLM_URL", "http://iguana:4000/v1"),
os.Getenv("INGEST_LLM_KEY"),
envOr("INGEST_LLM_MODEL", "koala/qwen35-9b-fast"),
time.Duration(parseInt(envOr("INGEST_LLM_TIMEOUT", "15")))*time.Minute,
)
pc := pipeline.Config{
Complete: llmClient.Complete,
ChunkSize: parseInt(envOr("INGEST_CHUNK_SIZE", "6000")),
}
h := api.NewHandler(brainDir, logger, pc)
mux := http.NewServeMux()
mux.HandleFunc("/query", h.Query)
mux.HandleFunc("/write", h.Write)
mux.HandleFunc("/ingest", h.Ingest)
mux.HandleFunc("/ingest-path", h.IngestPath)
watchInterval := parseInt(envOr("INGEST_WATCH_INTERVAL", "30"))
if watchInterval > 0 {
w := watcher.New(watcher.Config{
BrainDir: brainDir,
Interval: time.Duration(watchInterval) * time.Second,
Pipeline: pc,
Logger: logger,
})
go w.Start(context.Background())
logger.Info("watcher started", "interval_s", watchInterval, "raw_dir", brainDir+"/raw")
}
addr := ":" + port
logger.Info("ingestion server starting", "addr", addr, "brain_dir", brainDir)
if err := http.ListenAndServe(addr, mux); err != nil {
logger.Error("server stopped", "err", err)
os.Exit(1)
}
}
func envOr(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func parseInt(s string) int {
n, _ := strconv.Atoi(s)
return n
}
```
- [ ] **Step 6: Build to verify (watcher package doesn't exist yet — expect one error)**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go build ./... 2>&1
```
Expected: `cannot find package "…/watcher"` — this is fine, Task 11 adds it.
- [ ] **Step 7: Commit what compiles**
```bash
git add ingestion/internal/api/handler.go ingestion/internal/api/handler_test.go \
ingestion/cmd/server/main.go
git commit -m "feat(ingestion): add /ingest and /ingest-path HTTP handlers"
```
---
## Task 11: watcher — background file processor
**Files:**
- Create: `ingestion/internal/watcher/watcher.go`
- Create: `ingestion/internal/watcher/watcher_test.go`
- [ ] **Step 1: Write the failing test**
```go
// ingestion/internal/watcher/watcher_test.go
package watcher
import (
"context"
"os"
"path/filepath"
"testing"
"time"
"log/slog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
func TestWatcher_SuccessfulFileMovedToProcessed(t *testing.T) {
brainDir := t.TempDir()
rawDir := filepath.Join(brainDir, "raw")
require.NoError(t, os.MkdirAll(rawDir, 0o755))
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
// Drop a file into raw/.
rawFile := filepath.Join(rawDir, "test-note.md")
require.NoError(t, os.WriteFile(rawFile, []byte("test content"), 0o644))
// Pipeline complete func returns one page.
successComplete := func(_ context.Context, _, _ string) (string, error) {
pages := []wiki.Page{{Path: "wiki/sources/test-note.md", Content: "---\ntitle: Test\n---\n"}}
b, _ := json.Marshal(pages)
return string(b), nil
}
w := New(Config{
BrainDir: brainDir,
Interval: 50 * time.Millisecond,
Pipeline: pipeline.Config{Complete: successComplete},
Logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go w.Start(ctx)
<-ctx.Done()
// File must be gone from raw/.
_, err := os.Stat(rawFile)
assert.True(t, os.IsNotExist(err), "file should have been moved out of raw/")
// File must be in processed/.
entries, err := filepath.Glob(filepath.Join(brainDir, "raw", "processed", "*", "test-note.md"))
require.NoError(t, err)
assert.Len(t, entries, 1)
}
func TestWatcher_FailedFileMovedToFailed(t *testing.T) {
brainDir := t.TempDir()
rawDir := filepath.Join(brainDir, "raw")
require.NoError(t, os.MkdirAll(rawDir, 0o755))
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
rawFile := filepath.Join(rawDir, "bad-note.md")
require.NoError(t, os.WriteFile(rawFile, []byte("bad"), 0o644))
errorComplete := func(_ context.Context, _, _ string) (string, error) {
return "", fmt.Errorf("LLM unavailable")
}
w := New(Config{
BrainDir: brainDir,
Interval: 50 * time.Millisecond,
Pipeline: pipeline.Config{Complete: errorComplete},
Logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go w.Start(ctx)
<-ctx.Done()
_, err := os.Stat(rawFile)
assert.True(t, os.IsNotExist(err))
_, err = os.Stat(filepath.Join(brainDir, "raw", "failed", "bad-note.md"))
require.NoError(t, err)
}
```
Add imports to the test file:
```go
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
```
- [ ] **Step 2: Run test to verify it fails**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/watcher/... 2>&1
```
Expected: `undefined: New`
- [ ] **Step 3: Write watcher.go**
```go
// ingestion/internal/watcher/watcher.go
package watcher
import (
"context"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
)
var supportedExts = map[string]bool{".md": true, ".txt": true, ".pdf": true}
// Config configures the file watcher.
type Config struct {
BrainDir string
Interval time.Duration
Pipeline pipeline.Config
Logger *slog.Logger
}
// Watcher polls brain/raw/ and triggers ingestion for new files.
type Watcher struct {
cfg Config
}
// New constructs a Watcher.
func New(cfg Config) *Watcher { return &Watcher{cfg: cfg} }
// Start polls brain/raw/ at cfg.Interval until ctx is cancelled.
func (w *Watcher) Start(ctx context.Context) {
ticker := time.NewTicker(w.cfg.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.poll(ctx)
}
}
}
func (w *Watcher) poll(ctx context.Context) {
rawDir := filepath.Join(w.cfg.BrainDir, "raw")
entries, err := os.ReadDir(rawDir)
if err != nil {
if !os.IsNotExist(err) {
w.cfg.Logger.Warn("watcher: read raw dir", "err", err)
}
return
}
for _, e := range entries {
if e.IsDir() {
continue
}
ext := strings.ToLower(filepath.Ext(e.Name()))
if !supportedExts[ext] {
continue
}
w.processFile(ctx, filepath.Join(rawDir, e.Name()))
}
}
func (w *Watcher) processFile(ctx context.Context, path string) {
content, err := os.ReadFile(path)
if err != nil {
w.cfg.Logger.Error("watcher: read file", "path", path, "err", err)
return
}
source := sourceFromFilename(filepath.Base(path))
_, err = pipeline.Run(ctx, w.cfg.Pipeline, w.cfg.BrainDir, string(content), source, false)
if err != nil {
w.cfg.Logger.Error("watcher: ingest failed", "path", path, "err", err)
w.moveFile(path, filepath.Join(w.cfg.BrainDir, "raw", "failed"))
return
}
date := time.Now().UTC().Format("2006-01-02")
w.moveFile(path, filepath.Join(w.cfg.BrainDir, "raw", "processed", date))
}
func (w *Watcher) moveFile(src, destDir string) {
if err := os.MkdirAll(destDir, 0o755); err != nil {
w.cfg.Logger.Error("watcher: mkdir", "dir", destDir, "err", err)
return
}
dest := filepath.Join(destDir, filepath.Base(src))
if err := os.Rename(src, dest); err != nil {
w.cfg.Logger.Error("watcher: move file", "src", src, "dest", dest, "err", err)
}
}
func sourceFromFilename(filename string) string {
name := strings.TrimSuffix(filename, filepath.Ext(filename))
words := strings.FieldsFunc(name, func(r rune) bool { return r == '-' || r == '_' })
for i, word := range words {
if len(word) > 0 {
words[i] = strings.ToUpper(word[:1]) + word[1:]
}
}
return strings.Join(words, " ")
}
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go test ./internal/watcher/... -v
```
Expected: all PASS.
- [ ] **Step 5: Full build and test**
```bash
cd /Users/mathias/dev/AI/hyperguild/ingestion && go build ./... && go test ./... 2>&1
```
Expected: clean build, all tests PASS.
- [ ] **Step 6: Commit**
```bash
git add ingestion/internal/watcher/
git commit -m "feat(ingestion): add background file watcher for brain/raw/"
```
- [ ] **Step 7: Final supervisor build check**
```bash
cd /Users/mathias/dev/AI/hyperguild && go build ./...
```
Expected: no output (clean build).
- [ ] **Step 8: Final commit**
```bash
git add ingestion/cmd/server/main.go
git commit -m "feat(ingestion): wire watcher and new env vars in server main"
```
---
## Self-Review Checklist
**Spec coverage:**
- [x] `/ingest` endpoint → Task 10
- [x] `/ingest-path` endpoint (file + directory) → Task 10
- [x] File watcher on `brain/raw/` → Task 11
- [x] Success → `processed/YYYY-MM-DD/`, failure → `failed/` → Task 11
- [x] LLM client with retry-on-429 — **GAP**: the client handles non-200 errors but the spec says "retry on 429". Add retry to `llm/client.go` in Task 7 Step 3.
- [x] Pipeline: inventory → schema → chunk → LLM → parse → merge → write → index → log → Task 9
- [x] Truncation recovery → Task 8
- [x] `brain/CLAUDE.md` embedded fallback → Task 9 (`defaultSchema` constant)
- [x] Supervisor config wiring → Task 1
- [x] `brain_ingest` optional `path` field — **GAP**: `internal/skills/brain/handlers.go` already has `brain_ingest` calling `/ingest`. The spec says `brain_ingest` should gain a `path` field to call `/ingest-path` instead.
**Fixing gaps:**
For the 429 retry, add to `llm/client.go` after the `httpClient.Do` call in `Complete`:
```go
// Retry once on 429 with Retry-After header or 5s backoff.
if resp.StatusCode == http.StatusTooManyRequests {
resp.Body.Close()
wait := 5 * time.Second
if ra := resp.Header.Get("Retry-After"); ra != "" {
if secs, err := strconv.Atoi(ra); err == nil {
wait = time.Duration(secs) * time.Second
}
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(wait):
}
resp, err = c.httpClient.Do(req.Clone(ctx))
if err != nil {
return "", fmt.Errorf("retry LLM call: %w", err)
}
defer resp.Body.Close()
out, err = io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read retry response: %w", err)
}
}
```
Add `"strconv"` to imports in `client.go`. Note: `req.Clone(ctx)` resets the body reader — since we used `bytes.NewReader`, build the request inside a helper or reconstruct it. Simplest fix: extract request building into a closure. This is handled inline in Task 7 Step 3 — plan readers should add this block after the initial `httpClient.Do`.
For the `brain_ingest` `path` field, add to `internal/skills/brain/handlers.go` in the existing `ingestArgs` struct:
```go
type ingestArgs struct {
Content string `json:"content"`
Source string `json:"source"`
Path string `json:"path,omitempty"` // ← add this
DryRun bool `json:"dry_run,omitempty"`
}
```
And in the `ingest` method, choose the endpoint based on whether `Path` is set:
```go
func (s *Skill) ingest(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a ingestArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Path == "" && a.Content == "" {
return nil, fmt.Errorf("content or path is required")
}
if a.Source == "" && a.Path == "" {
return nil, fmt.Errorf("source is required when content is provided")
}
if s.cfg.IngestSvcURL == "" {
return nil, fmt.Errorf("brain_ingest: INGEST_SVC_URL not configured")
}
if a.Path != "" {
return s.postTo(ctx, s.cfg.IngestSvcURL+"/ingest-path", a)
}
return s.postTo(ctx, s.cfg.IngestSvcURL+"/ingest", a)
}
```
Also update the `brain_ingest` tool description in `skill.go` to mention the `path` field:
```go
InputSchema: schema([]string{}, map[string]any{
"content": str,
"path": map[string]any{"type": "string", "description": "file or directory path to ingest (alternative to content)"},
"source": map[string]any{"type": "string", "description": "human-readable source name, e.g. 'shape-up-book'"},
"dry_run": map[string]any{"type": "boolean"},
}),
```
These two fixes should be incorporated into Task 7 and after Task 11 respectively, or applied as a final cleanup commit.