122 lines
3.4 KiB
Go
122 lines
3.4 KiB
Go
// ingestion/internal/pipeline/pipeline.go
|
|
package pipeline
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
|
|
)
|
|
|
|
// CompleteFunc is the function signature for LLM calls.
|
|
type CompleteFunc func(ctx context.Context, system, user string) (string, error)
|
|
|
|
// Config holds pipeline configuration.
|
|
type Config struct {
|
|
Complete CompleteFunc
|
|
ChunkSize int // 0 = no chunking
|
|
Schema string // overrides brain/schema.md when set (useful in tests)
|
|
}
|
|
|
|
// Result is the outcome of a pipeline run.
|
|
type Result struct {
|
|
Pages []string // relative paths written (or would-be written in dry-run)
|
|
Warnings []string
|
|
}
|
|
|
|
// Run ingests content and writes structured wiki pages to brainDir/wiki/.
|
|
// In dry-run mode, pages are returned but not written to disk.
|
|
func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) {
|
|
inventory, err := wiki.LoadInventory(brainDir)
|
|
if err != nil {
|
|
return Result{}, fmt.Errorf("load inventory: %w", err)
|
|
}
|
|
|
|
schema := cfg.Schema
|
|
if schema == "" {
|
|
schema = loadSchema(brainDir)
|
|
}
|
|
|
|
chunks := Chunk(content, cfg.ChunkSize)
|
|
|
|
var allPages []wiki.Page
|
|
var allWarnings []string
|
|
|
|
for _, chunk := range chunks {
|
|
userPrompt := BuildPrompt(schema, source, chunk, inventory)
|
|
output, err := cfg.Complete(ctx, systemPrompt, userPrompt)
|
|
if err != nil {
|
|
return Result{}, fmt.Errorf("LLM call: %w", err)
|
|
}
|
|
pages, warnings := ParsePages(output)
|
|
allPages = append(allPages, pages...)
|
|
allWarnings = append(allWarnings, warnings...)
|
|
}
|
|
|
|
resolved := Resolve(allPages, inventory)
|
|
merged := mergeAll(resolved)
|
|
|
|
date := time.Now().UTC().Format("2006-01-02")
|
|
var written []string
|
|
|
|
for _, page := range merged {
|
|
if !dryRun {
|
|
dest := filepath.Join(brainDir, filepath.FromSlash(page.Path))
|
|
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
|
|
return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err)
|
|
}
|
|
if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil {
|
|
return Result{}, fmt.Errorf("write %s: %w", page.Path, err)
|
|
}
|
|
}
|
|
written = append(written, page.Path)
|
|
}
|
|
|
|
if !dryRun {
|
|
if err := wiki.RebuildIndex(brainDir, date); err != nil {
|
|
allWarnings = append(allWarnings, fmt.Sprintf("rebuild index: %v", err))
|
|
}
|
|
if err := wiki.AppendLog(brainDir, source, written, allWarnings, date); err != nil {
|
|
allWarnings = append(allWarnings, fmt.Sprintf("append log: %v", err))
|
|
}
|
|
}
|
|
|
|
return Result{Pages: written, Warnings: allWarnings}, nil
|
|
}
|
|
|
|
// mergeAll deduplicates pages by path, merging content from later occurrences.
|
|
func mergeAll(pages []wiki.Page) []wiki.Page {
|
|
order := make([]string, 0, len(pages))
|
|
byPath := make(map[string]wiki.Page, len(pages))
|
|
for _, p := range pages {
|
|
if _, seen := byPath[p.Path]; !seen {
|
|
order = append(order, p.Path)
|
|
byPath[p.Path] = p
|
|
} else {
|
|
byPath[p.Path] = wiki.Merge(byPath[p.Path], p)
|
|
}
|
|
}
|
|
result := make([]wiki.Page, 0, len(order))
|
|
for _, path := range order {
|
|
result = append(result, byPath[path])
|
|
}
|
|
return result
|
|
}
|
|
|
|
const defaultSchema = `# Brain Wiki Schema
|
|
Three page types: wiki/sources/, wiki/concepts/, wiki/entities/.
|
|
See brain/schema.md for the full schema.
|
|
`
|
|
|
|
func loadSchema(brainDir string) string {
|
|
b, err := os.ReadFile(filepath.Join(brainDir, "schema.md"))
|
|
if err != nil {
|
|
return defaultSchema
|
|
}
|
|
return strings.TrimSpace(string(b))
|
|
}
|