From a25bb18c5463ab05a8678a8ae41c126d860cd065 Mon Sep 17 00:00:00 2001 From: Mathias Bergqvist Date: Wed, 22 Apr 2026 22:54:28 +0200 Subject: [PATCH] feat(ingestion): add /ingest and /ingest-path HTTP handlers Wires pipeline.Run into the HTTP layer so callers can ingest raw text or files/directories without touching the filesystem directly. Rewrites main.go to parse LLM and watcher env vars and build pipeline.Config. Co-Authored-By: Claude Sonnet 4.6 --- ingestion/cmd/server/main.go | 57 ++++++-- ingestion/internal/api/handler.go | 179 +++++++++++++++++++++++-- ingestion/internal/api/handler_test.go | 141 ++++++++++++++++++- 3 files changed, 356 insertions(+), 21 deletions(-) diff --git a/ingestion/cmd/server/main.go b/ingestion/cmd/server/main.go index 98e7e8d..abe09b3 100644 --- a/ingestion/cmd/server/main.go +++ b/ingestion/cmd/server/main.go @@ -5,31 +5,70 @@ import ( "log/slog" "net/http" "os" + "strconv" + "time" "github.com/mathiasbq/hyperguild/ingestion/internal/api" + "github.com/mathiasbq/hyperguild/ingestion/internal/llm" + "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" ) +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return fallback +} + func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) - brainDir := os.Getenv("INGEST_BRAIN_DIR") - if brainDir == "" { - brainDir = "../brain" + brainDir := envOr("INGEST_BRAIN_DIR", "../brain") + port := envOr("INGEST_PORT", "3300") + + llmURL := envOr("INGEST_LLM_URL", "http://iguana:4000/v1") + llmKey := os.Getenv("INGEST_LLM_KEY") + llmModel := envOr("INGEST_LLM_MODEL", "koala/qwen35-9b-fast") + llmTimeoutMins := envInt("INGEST_LLM_TIMEOUT", 15) + chunkSize := envInt("INGEST_CHUNK_SIZE", 6000) + watchInterval := envInt("INGEST_WATCH_INTERVAL", 30) + + llmClient := llm.New(llmURL, llmKey, llmModel, time.Duration(llmTimeoutMins)*time.Minute) + + pipelineCfg := pipeline.Config{ + Complete: llmClient.Complete, + ChunkSize: chunkSize, } - port := os.Getenv("INGEST_PORT") - if port == "" { - port = "3300" - } + h := api.NewHandler(brainDir, logger, pipelineCfg) - h := api.NewHandler(brainDir, logger) + // TODO(task11): start watcher when INGEST_WATCH_INTERVAL > 0 + _ = watchInterval mux := http.NewServeMux() mux.HandleFunc("/query", h.Query) mux.HandleFunc("/write", h.Write) + mux.HandleFunc("/ingest", h.Ingest) + mux.HandleFunc("/ingest-path", h.IngestPath) addr := ":" + port - logger.Info("ingestion server starting", "addr", addr, "brain_dir", brainDir) + logger.Info("ingestion server starting", + "addr", addr, + "brain_dir", brainDir, + "llm_url", llmURL, + "llm_model", llmModel, + "chunk_size", chunkSize, + "watch_interval_s", watchInterval, + ) if err := http.ListenAndServe(addr, mux); err != nil { logger.Error("server stopped", "err", err) os.Exit(1) diff --git a/ingestion/internal/api/handler.go b/ingestion/internal/api/handler.go index 98cc784..53364ff 100644 --- a/ingestion/internal/api/handler.go +++ b/ingestion/internal/api/handler.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/search" ) @@ -18,11 +19,15 @@ import ( type Handler struct { brainDir string logger *slog.Logger + pipeline pipeline.Config } // NewHandler constructs a Handler. brainDir is the absolute path to brain/. -func NewHandler(brainDir string, logger *slog.Logger) *Handler { - return &Handler{brainDir: brainDir, logger: logger} +func NewHandler(brainDir string, logger *slog.Logger, pipelineCfg pipeline.Config) *Handler { + if logger == nil { + logger = slog.Default() + } + return &Handler{brainDir: brainDir, logger: logger, pipeline: pipelineCfg} } type queryRequest struct { @@ -37,15 +42,32 @@ type writeRequest struct { Domain string `json:"domain,omitempty"` } +type ingestRequest struct { + Content string `json:"content"` + Source string `json:"source"` + DryRun bool `json:"dry_run"` +} + +type ingestPathRequest struct { + Path string `json:"path"` + Source string `json:"source"` + DryRun bool `json:"dry_run"` +} + +type ingestResponse struct { + Pages []string `json:"pages"` + Warnings []string `json:"warnings"` +} + // Query handles POST /query — full-text search across the brain wiki. func (h *Handler) Query(w http.ResponseWriter, r *http.Request) { var req queryRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "invalid JSON", http.StatusBadRequest) + writeError(w, http.StatusBadRequest, "invalid JSON") return } if strings.TrimSpace(req.Query) == "" { - http.Error(w, "query is required", http.StatusBadRequest) + writeError(w, http.StatusBadRequest, "query is required") return } if req.Limit == 0 { @@ -55,22 +77,22 @@ func (h *Handler) Query(w http.ResponseWriter, r *http.Request) { results, err := search.Query(h.brainDir, req.Query, req.Limit) if err != nil { h.logger.Error("query failed", "err", err) - http.Error(w, "search error", http.StatusInternalServerError) + writeError(w, http.StatusInternalServerError, "search error") return } writeJSON(w, map[string]any{"results": results}) } -// Write handles POST /write — write raw content to brain/raw/. +// Write handles POST /write — write raw content to brain/knowledge/. func (h *Handler) Write(w http.ResponseWriter, r *http.Request) { var req writeRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "invalid JSON", http.StatusBadRequest) + writeError(w, http.StatusBadRequest, "invalid JSON") return } if req.Content == "" { - http.Error(w, "content is required", http.StatusBadRequest) + writeError(w, http.StatusBadRequest, "content is required") return } @@ -81,7 +103,7 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) { rawDir := filepath.Join(h.brainDir, "knowledge") if err := os.MkdirAll(rawDir, 0o755); err != nil { - http.Error(w, "failed to create raw dir", http.StatusInternalServerError) + writeError(w, http.StatusInternalServerError, "failed to create raw dir") return } @@ -106,7 +128,7 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) { dest := filepath.Join(rawDir, base) if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil { h.logger.Error("write failed", "err", err) - http.Error(w, "write error", http.StatusInternalServerError) + writeError(w, http.StatusInternalServerError, "write error") return } @@ -114,7 +136,144 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) { writeJSON(w, map[string]string{"path": filepath.ToSlash(rel)}) } +// Ingest handles POST /ingest — run the pipeline on provided content. +func (h *Handler) Ingest(w http.ResponseWriter, r *http.Request) { + var req ingestRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid JSON") + return + } + if strings.TrimSpace(req.Content) == "" { + writeError(w, http.StatusBadRequest, "content is required") + return + } + if strings.TrimSpace(req.Source) == "" { + writeError(w, http.StatusBadRequest, "source is required") + return + } + + result, err := pipeline.Run(r.Context(), h.pipeline, h.brainDir, req.Content, req.Source, req.DryRun) + if err != nil { + h.logger.Error("ingest failed", "source", req.Source, "err", err) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("ingest error: %v", err)) + return + } + + pages := result.Pages + if pages == nil { + pages = []string{} + } + warnings := result.Warnings + if warnings == nil { + warnings = []string{} + } + writeJSON(w, ingestResponse{Pages: pages, Warnings: warnings}) +} + +// supportedExtensions lists file extensions that IngestPath will process. +var supportedExtensions = map[string]bool{ + ".md": true, + ".txt": true, + ".pdf": true, +} + +// IngestPath handles POST /ingest-path — ingest a file or directory. +func (h *Handler) IngestPath(w http.ResponseWriter, r *http.Request) { + var req ingestPathRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid JSON") + return + } + if strings.TrimSpace(req.Path) == "" { + writeError(w, http.StatusBadRequest, "path is required") + return + } + + info, err := os.Stat(req.Path) + if err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("path not accessible: %v", err)) + return + } + + var allPages []string + var allWarnings []string + + if info.IsDir() { + err = filepath.WalkDir(req.Path, func(path string, d os.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + if d.IsDir() { + return nil + } + ext := strings.ToLower(filepath.Ext(path)) + if !supportedExtensions[ext] { + return nil + } + content, readErr := os.ReadFile(path) + if readErr != nil { + allWarnings = append(allWarnings, fmt.Sprintf("read %s: %v", path, readErr)) + return nil + } + source := req.Source + if source == "" { + source = filepath.Base(path) + } + result, runErr := pipeline.Run(r.Context(), h.pipeline, h.brainDir, string(content), source, req.DryRun) + if runErr != nil { + allWarnings = append(allWarnings, fmt.Sprintf("ingest %s: %v", path, runErr)) + return nil + } + allPages = append(allPages, result.Pages...) + allWarnings = append(allWarnings, result.Warnings...) + return nil + }) + if err != nil { + h.logger.Error("walk dir failed", "path", req.Path, "err", err) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("walk error: %v", err)) + return + } + } else { + ext := strings.ToLower(filepath.Ext(req.Path)) + if !supportedExtensions[ext] { + writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported file extension: %s", ext)) + return + } + content, readErr := os.ReadFile(req.Path) + if readErr != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("read file: %v", readErr)) + return + } + source := req.Source + if source == "" { + source = filepath.Base(req.Path) + } + result, runErr := pipeline.Run(r.Context(), h.pipeline, h.brainDir, string(content), source, req.DryRun) + if runErr != nil { + h.logger.Error("ingest-path failed", "path", req.Path, "err", runErr) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("ingest error: %v", runErr)) + return + } + allPages = result.Pages + allWarnings = result.Warnings + } + + if allPages == nil { + allPages = []string{} + } + if allWarnings == nil { + allWarnings = []string{} + } + writeJSON(w, ingestResponse{Pages: allPages, Warnings: allWarnings}) +} + func writeJSON(w http.ResponseWriter, v any) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(v) //nolint:errcheck } + +func writeError(w http.ResponseWriter, code int, msg string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + json.NewEncoder(w).Encode(map[string]string{"error": msg}) //nolint:errcheck +} diff --git a/ingestion/internal/api/handler_test.go b/ingestion/internal/api/handler_test.go index bc9b59c..5080168 100644 --- a/ingestion/internal/api/handler_test.go +++ b/ingestion/internal/api/handler_test.go @@ -3,6 +3,7 @@ package api_test import ( "bytes" + "context" "encoding/json" "log/slog" "net/http" @@ -12,11 +13,26 @@ import ( "strings" "testing" - "github.com/mathiasbq/hyperguild/ingestion/internal/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/mathiasbq/hyperguild/ingestion/internal/api" + "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" ) +// stubComplete returns a fixed JSON page so tests never call a real LLM. +func stubComplete(_ context.Context, _, _ string) (string, error) { + return `[{"path":"wiki/sources/test-source.md","content":"# Test Source\n\nSome content here.\n"}]`, nil +} + +func stubPipelineCfg() pipeline.Config { + return pipeline.Config{ + Complete: stubComplete, + ChunkSize: 0, + Schema: "# Test Schema\nwiki/sources/, wiki/concepts/, wiki/entities/", + } +} + func setup(t *testing.T) (string, *api.Handler) { t.Helper() dir := t.TempDir() @@ -27,9 +43,13 @@ func setup(t *testing.T) (string, *api.Handler) { 0o644, )) logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) - return dir, api.NewHandler(dir, logger) + return dir, api.NewHandler(dir, logger, stubPipelineCfg()) } +// --------------------------------------------------------------------------- +// Existing tests (Write / Query) +// --------------------------------------------------------------------------- + func TestQuery_ReturnsResults(t *testing.T) { _, h := setup(t) body, _ := json.Marshal(map[string]any{"query": "test driven", "limit": 5}) @@ -112,3 +132,120 @@ func TestWrite_GeneratesFilenameIfAbsent(t *testing.T) { assert.Len(t, entries, 2) assert.True(t, strings.HasSuffix(entries[1].Name(), ".md")) } + +// --------------------------------------------------------------------------- +// POST /ingest +// --------------------------------------------------------------------------- + +func TestIngest_MissingContent(t *testing.T) { + _, h := setup(t) + body, _ := json.Marshal(map[string]any{"source": "test-source"}) + req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.Ingest(rec, req) + + assert.Equal(t, http.StatusBadRequest, rec.Code) +} + +func TestIngest_MissingSource(t *testing.T) { + _, h := setup(t) + body, _ := json.Marshal(map[string]any{"content": "some content"}) + req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.Ingest(rec, req) + + assert.Equal(t, http.StatusBadRequest, rec.Code) +} + +func TestIngest_Success(t *testing.T) { + _, h := setup(t) + body, _ := json.Marshal(map[string]any{ + "content": "some content about shape-up methodology", + "source": "shape-up-book", + "dry_run": true, + }) + req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.Ingest(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp map[string]any + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + pages, ok := resp["pages"] + require.True(t, ok, "response must have pages field") + pagesSlice, ok := pages.([]any) + require.True(t, ok, "pages must be an array") + assert.NotEmpty(t, pagesSlice) +} + +// --------------------------------------------------------------------------- +// POST /ingest-path +// --------------------------------------------------------------------------- + +func TestIngestPath_MissingPath(t *testing.T) { + _, h := setup(t) + body, _ := json.Marshal(map[string]any{"source": "test-source"}) + req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.IngestPath(rec, req) + + assert.Equal(t, http.StatusBadRequest, rec.Code) +} + +func TestIngestPath_File(t *testing.T) { + _, h := setup(t) + + // Create a temp file with content + dir := t.TempDir() + f := filepath.Join(dir, "doc.md") + require.NoError(t, os.WriteFile(f, []byte("# Hello\nThis is markdown content."), 0o644)) + + body, _ := json.Marshal(map[string]any{ + "path": f, + "source": "test-doc", + "dry_run": true, + }) + req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.IngestPath(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp map[string]any + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + pages, ok := resp["pages"] + require.True(t, ok, "response must have pages field") + pagesSlice, ok := pages.([]any) + require.True(t, ok, "pages must be an array") + assert.NotEmpty(t, pagesSlice) +} + +func TestIngestPath_Directory(t *testing.T) { + _, h := setup(t) + + // Create a temp dir with one .md file + dir := t.TempDir() + require.NoError(t, os.WriteFile(filepath.Join(dir, "notes.md"), []byte("# Notes\nSome notes."), 0o644)) + + body, _ := json.Marshal(map[string]any{ + "path": dir, + "dry_run": true, + }) + req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + h.IngestPath(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp map[string]any + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + pages, ok := resp["pages"] + require.True(t, ok, "response must have pages field") + pagesSlice, ok := pages.([]any) + require.True(t, ok, "pages must be an array") + assert.NotEmpty(t, pagesSlice) +}