Files
Mathias Bergqvist 0a70d9e972
All checks were successful
CI / Lint / Test / Vet (push) Successful in 9s
CI / Mirror to GitHub (push) Has been skipped
feat(pipeline): add POST /ingest-raw for direct batch ingestion without LLM
Allows callers to provide pre-structured RawPage data directly, bypassing the
LLM extraction step. The pipeline still handles slug computation, frontmatter,
link canonicalization, source back-references, and dedup — only the extraction
is skipped. Useful when a more capable model or manual curation produces the
structured data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 11:15:59 +02:00

147 lines
4.6 KiB
Go

// ingestion/internal/pipeline/pipeline.go
package pipeline
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
// CompleteFunc is the function signature for LLM calls.
type CompleteFunc func(ctx context.Context, system, user string) (string, error)
// Config holds pipeline configuration.
type Config struct {
Complete CompleteFunc
ChunkSize int // 0 = no chunking
Schema string // overrides brain/schema.md when set (useful in tests)
}
// Result is the outcome of a pipeline run.
type Result struct {
Pages []string // relative paths written (or would-be written in dry-run)
Warnings []string
}
// Run ingests content and writes structured wiki pages to brainDir/wiki/.
// In dry-run mode, pages are returned but not written to disk.
func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) {
inventory, err := wiki.LoadInventory(brainDir)
if err != nil {
return Result{}, fmt.Errorf("load inventory: %w", err)
}
schema := cfg.Schema
if schema == "" {
schema = loadSchema(brainDir)
}
sourceSlug := wiki.Slug(source)
date := time.Now().UTC().Format("2006-01-02")
chunks := Chunk(content, cfg.ChunkSize)
var allRaw []RawPage
var allWarnings []string
for _, chunk := range chunks {
userPrompt := BuildPrompt(schema, source, chunk, inventory)
output, err := cfg.Complete(ctx, systemPrompt, userPrompt)
if err != nil {
return Result{}, fmt.Errorf("LLM call: %w", err)
}
raw, warnings := ParseRawPages(output)
allRaw = append(allRaw, raw...)
allWarnings = append(allWarnings, warnings...)
}
return buildAndWrite(allRaw, sourceSlug, date, brainDir, source, inventory, allWarnings, dryRun)
}
// RunRaw runs the pipeline on pre-parsed RawPages, skipping the LLM extraction
// step. Use this when the caller has already produced the structured RawPage data
// (e.g. from a more capable model or manual curation).
func RunRaw(brainDir, source string, rawPages []RawPage, dryRun bool) (Result, error) {
inventory, err := wiki.LoadInventory(brainDir)
if err != nil {
return Result{}, fmt.Errorf("load inventory: %w", err)
}
sourceSlug := wiki.Slug(source)
date := time.Now().UTC().Format("2006-01-02")
return buildAndWrite(rawPages, sourceSlug, date, brainDir, source, inventory, nil, dryRun)
}
// buildAndWrite runs BuildPages through write for both Run and RunRaw.
func buildAndWrite(rawPages []RawPage, sourceSlug, date, brainDir, source string, inventory map[wiki.PageType][]wiki.Entry, warnings []string, dryRun bool) (Result, error) {
pages, buildWarnings := BuildPages(rawPages, sourceSlug, date)
warnings = append(warnings, buildWarnings...)
resolved := Resolve(pages, inventory)
canonicalized, linkWarnings := CanonicalizeLinks(resolved, inventory)
warnings = append(warnings, linkWarnings...)
withRefs := injectSourceRefs(canonicalized, inventory, brainDir)
merged := mergeAll(withRefs)
var written []string
for _, page := range merged {
if !dryRun {
dest := filepath.Join(brainDir, filepath.FromSlash(page.Path))
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err)
}
if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil {
return Result{}, fmt.Errorf("write %s: %w", page.Path, err)
}
}
written = append(written, page.Path)
}
if !dryRun {
if err := wiki.RebuildIndex(brainDir, date); err != nil {
warnings = append(warnings, fmt.Sprintf("rebuild index: %v", err))
}
if err := wiki.AppendLog(brainDir, source, written, warnings, date); err != nil {
warnings = append(warnings, fmt.Sprintf("append log: %v", err))
}
}
return Result{Pages: written, Warnings: warnings}, nil
}
// mergeAll deduplicates pages by path, merging content from later occurrences.
func mergeAll(pages []wiki.Page) []wiki.Page {
order := make([]string, 0, len(pages))
byPath := make(map[string]wiki.Page, len(pages))
for _, p := range pages {
if _, seen := byPath[p.Path]; !seen {
order = append(order, p.Path)
byPath[p.Path] = p
} else {
byPath[p.Path] = wiki.Merge(byPath[p.Path], p)
}
}
result := make([]wiki.Page, 0, len(order))
for _, path := range order {
result = append(result, byPath[path])
}
return result
}
const defaultSchema = `# Brain Wiki Schema
Three page types: wiki/sources/, wiki/concepts/, wiki/entities/.
See brain/schema.md for the full schema.
`
func loadSchema(brainDir string) string {
b, err := os.ReadFile(filepath.Join(brainDir, "schema.md"))
if err != nil {
return defaultSchema
}
return strings.TrimSpace(string(b))
}