diff --git a/ingestion/cmd/server/main.go b/ingestion/cmd/server/main.go index a44a02d..4ba6cc4 100644 --- a/ingestion/cmd/server/main.go +++ b/ingestion/cmd/server/main.go @@ -68,6 +68,7 @@ func main() { mux.HandleFunc("POST /write", h.Write) mux.HandleFunc("POST /ingest", h.Ingest) mux.HandleFunc("POST /ingest-path", h.IngestPath) + mux.HandleFunc("POST /ingest-raw", h.IngestRaw) mux.HandleFunc("POST /backfill-refs", h.BackfillRefs) addr := ":" + port diff --git a/ingestion/internal/api/handler.go b/ingestion/internal/api/handler.go index 961007d..b9bd28e 100644 --- a/ingestion/internal/api/handler.go +++ b/ingestion/internal/api/handler.go @@ -272,6 +272,48 @@ func (h *Handler) IngestPath(w http.ResponseWriter, r *http.Request) { writeJSON(w, ingestResponse{Pages: allPages, Warnings: allWarnings}) } +type ingestRawRequest struct { + Source string `json:"source"` + Pages []pipeline.RawPage `json:"pages"` + DryRun bool `json:"dry_run"` +} + +// IngestRaw handles POST /ingest-raw — run the pipeline on pre-parsed RawPages, +// skipping the LLM extraction step. Use when the caller has already produced +// structured page data (e.g. from a more capable model or manual curation). +func (h *Handler) IngestRaw(w http.ResponseWriter, r *http.Request) { + var req ingestRawRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid JSON") + return + } + if strings.TrimSpace(req.Source) == "" { + writeError(w, http.StatusBadRequest, "source is required") + return + } + if len(req.Pages) == 0 { + writeError(w, http.StatusBadRequest, "pages is required and must be non-empty") + return + } + + result, err := pipeline.RunRaw(h.brainDir, req.Source, req.Pages, req.DryRun) + if err != nil { + h.logger.Error("ingest-raw failed", "source", req.Source, "err", err) + writeError(w, http.StatusInternalServerError, "ingest error") + return + } + + pages := result.Pages + if pages == nil { + pages = []string{} + } + warnings := result.Warnings + if warnings == nil { + warnings = []string{} + } + writeJSON(w, ingestResponse{Pages: pages, Warnings: warnings}) +} + // BackfillRefs handles POST /backfill-refs — injects source back-references // into all concept and entity pages based on existing wiki/sources/ pages. func (h *Handler) BackfillRefs(w http.ResponseWriter, r *http.Request) { diff --git a/ingestion/internal/api/handler_test.go b/ingestion/internal/api/handler_test.go index 7ac9e8a..8a3a701 100644 --- a/ingestion/internal/api/handler_test.go +++ b/ingestion/internal/api/handler_test.go @@ -226,6 +226,85 @@ func TestIngestPath_File(t *testing.T) { assert.NotEmpty(t, pagesSlice) } +// --------------------------------------------------------------------------- +// POST /ingest-raw +// --------------------------------------------------------------------------- + +func TestIngestRaw_Validation(t *testing.T) { + cases := []struct { + name string + body map[string]any + }{ + {"missing source", map[string]any{"pages": []any{map[string]any{"title": "X", "type": "concept", "content": "x"}}}}, + {"missing pages", map[string]any{"source": "test-source"}}, + {"empty pages", map[string]any{"source": "test-source", "pages": []any{}}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, h := setup(t) + body, _ := json.Marshal(tc.body) + req := httptest.NewRequest(http.MethodPost, "/ingest-raw", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.IngestRaw(rec, req) + + assert.Equal(t, http.StatusBadRequest, rec.Code) + }) + } +} + +func TestIngestRaw_Success(t *testing.T) { + dir, h := setup(t) + body, _ := json.Marshal(map[string]any{ + "source": "test-article", + "pages": []any{ + map[string]any{"title": "Test Article", "type": "source", "subtype": "article", "domain": "Testing", "content": "## Summary\n\nThis is a test article about [[Test Concept]].\n"}, + map[string]any{"title": "Test Concept", "type": "concept", "domain": "Testing", "content": "A concept for testing.\n"}, + }, + }) + req := httptest.NewRequest(http.MethodPost, "/ingest-raw", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.IngestRaw(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp map[string]any + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + pages := resp["pages"].([]any) + assert.Len(t, pages, 2) + + // Verify files were written + sourcePath := filepath.Join(dir, "wiki", "sources", "test-article.md") + assert.FileExists(t, sourcePath) + conceptPath := filepath.Join(dir, "wiki", "concepts", "test-concept.md") + assert.FileExists(t, conceptPath) +} + +func TestIngestRaw_DryRun(t *testing.T) { + dir, h := setup(t) + body, _ := json.Marshal(map[string]any{ + "source": "dry-run-test", + "pages": []any{ + map[string]any{"title": "Dry Run Source", "type": "source", "subtype": "article", "content": "Content."}, + }, + "dry_run": true, + }) + req := httptest.NewRequest(http.MethodPost, "/ingest-raw", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.IngestRaw(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp map[string]any + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + pages := resp["pages"].([]any) + assert.NotEmpty(t, pages) + + // Verify no files were written + sourcePath := filepath.Join(dir, "wiki", "sources", "dry-run-test.md") + assert.NoFileExists(t, sourcePath) +} + func TestIngestPath_Directory(t *testing.T) { _, h := setup(t) diff --git a/ingestion/internal/pipeline/pipeline.go b/ingestion/internal/pipeline/pipeline.go index 19b650a..30d1cce 100644 --- a/ingestion/internal/pipeline/pipeline.go +++ b/ingestion/internal/pipeline/pipeline.go @@ -59,11 +59,31 @@ func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryR allWarnings = append(allWarnings, warnings...) } - pages, buildWarnings := BuildPages(allRaw, sourceSlug, date) - allWarnings = append(allWarnings, buildWarnings...) + return buildAndWrite(allRaw, sourceSlug, date, brainDir, source, inventory, allWarnings, dryRun) +} + +// RunRaw runs the pipeline on pre-parsed RawPages, skipping the LLM extraction +// step. Use this when the caller has already produced the structured RawPage data +// (e.g. from a more capable model or manual curation). +func RunRaw(brainDir, source string, rawPages []RawPage, dryRun bool) (Result, error) { + inventory, err := wiki.LoadInventory(brainDir) + if err != nil { + return Result{}, fmt.Errorf("load inventory: %w", err) + } + + sourceSlug := wiki.Slug(source) + date := time.Now().UTC().Format("2006-01-02") + + return buildAndWrite(rawPages, sourceSlug, date, brainDir, source, inventory, nil, dryRun) +} + +// buildAndWrite runs BuildPages through write for both Run and RunRaw. +func buildAndWrite(rawPages []RawPage, sourceSlug, date, brainDir, source string, inventory map[wiki.PageType][]wiki.Entry, warnings []string, dryRun bool) (Result, error) { + pages, buildWarnings := BuildPages(rawPages, sourceSlug, date) + warnings = append(warnings, buildWarnings...) resolved := Resolve(pages, inventory) canonicalized, linkWarnings := CanonicalizeLinks(resolved, inventory) - allWarnings = append(allWarnings, linkWarnings...) + warnings = append(warnings, linkWarnings...) withRefs := injectSourceRefs(canonicalized, inventory, brainDir) merged := mergeAll(withRefs) @@ -83,14 +103,14 @@ func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryR if !dryRun { if err := wiki.RebuildIndex(brainDir, date); err != nil { - allWarnings = append(allWarnings, fmt.Sprintf("rebuild index: %v", err)) + warnings = append(warnings, fmt.Sprintf("rebuild index: %v", err)) } - if err := wiki.AppendLog(brainDir, source, written, allWarnings, date); err != nil { - allWarnings = append(allWarnings, fmt.Sprintf("append log: %v", err)) + if err := wiki.AppendLog(brainDir, source, written, warnings, date); err != nil { + warnings = append(warnings, fmt.Sprintf("append log: %v", err)) } } - return Result{Pages: written, Warnings: allWarnings}, nil + return Result{Pages: written, Warnings: warnings}, nil } // mergeAll deduplicates pages by path, merging content from later occurrences. diff --git a/internal/skills/brain/handlers.go b/internal/skills/brain/handlers.go index a79ead9..063916c 100644 --- a/internal/skills/brain/handlers.go +++ b/internal/skills/brain/handlers.go @@ -17,6 +17,8 @@ func (s *Skill) Handle(ctx context.Context, tool string, args json.RawMessage) ( return s.query(ctx, args) case "brain_write": return s.write(ctx, args) + case "brain_ingest_raw": + return s.ingestRaw(ctx, args) case "brain_ingest": return s.ingest(ctx, args) case "brain_search": @@ -98,6 +100,33 @@ func (s *Skill) ingest(ctx context.Context, args json.RawMessage) (json.RawMessa return nil, fmt.Errorf("either content+source or path is required") } +type ingestRawArgs struct { + Source string `json:"source"` + Pages []any `json:"pages"` + DryRun bool `json:"dry_run,omitempty"` +} + +func (s *Skill) ingestRaw(ctx context.Context, args json.RawMessage) (json.RawMessage, error) { + var a ingestRawArgs + if err := json.Unmarshal(args, &a); err != nil { + return nil, fmt.Errorf("parse args: %w", err) + } + if s.cfg.IngestSvcURL == "" { + return nil, fmt.Errorf("brain_ingest_raw: INGEST_SVC_URL not configured") + } + if a.Source == "" { + return nil, fmt.Errorf("source is required") + } + if len(a.Pages) == 0 { + return nil, fmt.Errorf("pages is required and must be non-empty") + } + return s.postTo(ctx, s.cfg.IngestSvcURL+"/ingest-raw", map[string]any{ + "source": a.Source, + "pages": a.Pages, + "dry_run": a.DryRun, + }) +} + type searchArgs struct { Query string `json:"query"` Collection string `json:"collection,omitempty"` diff --git a/internal/skills/brain/skill.go b/internal/skills/brain/skill.go index 260dc5a..3f136e8 100644 --- a/internal/skills/brain/skill.go +++ b/internal/skills/brain/skill.go @@ -55,6 +55,32 @@ func (s *Skill) Tools() []registry.ToolDef { }, } if s.cfg.IngestSvcURL != "" { + tools = append(tools, registry.ToolDef{ + Name: "brain_ingest_raw", + Description: "Ingest pre-structured pages into the brain wiki, bypassing the LLM extraction step. " + + "Use when you (the calling agent) have already extracted entities, concepts, and content from a source. " + + "Provide source (human-readable name) and pages (array of {title, type, subtype, domain, content} objects). " + + "The pipeline computes slugs, paths, frontmatter, wikilink canonicalization, and source back-references. " + + "Returns the list of wiki pages written.", + InputSchema: schema([]string{"source", "pages"}, map[string]any{ + "source": map[string]any{"type": "string", "description": "human-readable name for the source, e.g. 'shape-up-book'"}, + "pages": map[string]any{ + "type": "array", + "items": map[string]any{ + "type": "object", + "required": []string{"title", "type", "content"}, + "properties": map[string]any{ + "title": map[string]any{"type": "string", "description": "page title, e.g. 'Hash Encoding'"}, + "type": map[string]any{"type": "string", "enum": []string{"source", "concept", "entity"}, "description": "page type"}, + "subtype": map[string]any{"type": "string", "description": "entity: person|company|tool|model|framework|technology; source: article|pdf|book|video|note|project"}, + "domain": map[string]any{"type": "string", "description": "knowledge domain, e.g. 'Machine Learning'"}, + "content": map[string]any{"type": "string", "description": "markdown body — no frontmatter, use [[Display Name]] for wikilinks"}, + }, + }, + }, + "dry_run": map[string]any{"type": "boolean"}, + }), + }) tools = append(tools, registry.ToolDef{ Name: "brain_ingest", Description: "Ingest content into the brain wiki (brain/wiki/). Calls an LLM to produce structured wiki pages. " +