// ingestion/cmd/server/main.go package main import ( "context" "fmt" "log/slog" "net/http" "net/url" "os" "strconv" "strings" "time" chassisauth "gitea.d-ma.be/mathias/mcp-chassis/auth" "github.com/mathiasbq/hyperguild/ingestion/internal/api" "github.com/mathiasbq/hyperguild/ingestion/internal/llm" "github.com/mathiasbq/hyperguild/ingestion/internal/mcp" "github.com/mathiasbq/hyperguild/ingestion/internal/embed" "github.com/mathiasbq/hyperguild/ingestion/internal/metrics" "github.com/mathiasbq/hyperguild/ingestion/internal/oauth" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/reranker" "github.com/mathiasbq/hyperguild/ingestion/internal/search" "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" "github.com/mathiasbq/hyperguild/ingestion/internal/watcher" ) // redactDSN parses a Postgres URL and replaces its password with `***` // for safe inclusion in logs. Falls back to a non-leaking placeholder // if parsing fails — we never log a raw DSN. func redactDSN(dsn string) string { u, err := url.Parse(dsn) if err != nil || u.User == nil { return "postgres://***" } return u.Redacted() } // vectorAdapter bridges *vectorstore.PGStore (returns []vectorstore.Hit) // to the search.VectorSearcher interface (which uses []search.VectorHit). // Kept here, not in either package, so neither has to import the other. type vectorAdapter struct{ s *vectorstore.PGStore } func (a vectorAdapter) Search(ctx context.Context, q []float32, limit int) ([]search.VectorHit, error) { hits, err := a.s.Search(ctx, q, limit) if err != nil { return nil, err } out := make([]search.VectorHit, len(hits)) for i, h := range hits { out[i] = search.VectorHit{Path: h.Path, Distance: h.Distance} } return out, nil } func envOr(key, fallback string) string { if v := os.Getenv(key); v != "" { return v } return fallback } func envInt(key string, fallback int) int { if v := os.Getenv(key); v != "" { if n, err := strconv.Atoi(v); err == nil { return n } } return fallback } func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) brainDir := envOr("INGEST_BRAIN_DIR", "../brain") port := envOr("INGEST_PORT", "3300") llmURL := envOr("INGEST_LLM_URL", "http://iguana:4000/v1") llmKey := os.Getenv("INGEST_LLM_KEY") llmModel := envOr("INGEST_LLM_MODEL", "koala/qwen35-9b-fast") llmTimeoutMins := envInt("INGEST_LLM_TIMEOUT", 15) chunkSize := envInt("INGEST_CHUNK_SIZE", 6000) watchInterval := envInt("INGEST_WATCH_INTERVAL", 30) llmClient := llm.New(llmURL, llmKey, llmModel, time.Duration(llmTimeoutMins)*time.Minute) pipelineCfg := pipeline.Config{ Complete: llmClient.Complete, ChunkSize: chunkSize, } h := api.NewHandler(brainDir, logger, pipelineCfg) var answerComplete pipeline.CompleteFunc if primaryURL := os.Getenv("BRAIN_LLM_PRIMARY_URL"); primaryURL != "" { primaryModel := envOr("BRAIN_LLM_PRIMARY_MODEL", "gemma4:31b") primaryKey := os.Getenv("BERGET_API_KEY") timeoutMS := envInt("BRAIN_LLM_TIMEOUT_MS", 10000) timeout := time.Duration(timeoutMS) * time.Millisecond primary := llm.New(primaryURL, primaryKey, primaryModel, timeout) router := &llm.Router{Primary: primary} if fallbackURL := os.Getenv("BRAIN_LLM_FALLBACK_URL"); fallbackURL != "" { fallbackModel := envOr("BRAIN_LLM_FALLBACK_MODEL", "gemma4:31b") router.Fallback = llm.New(fallbackURL, "", fallbackModel, timeout) } answerComplete = router.Complete logger.Info("brain answer LLM configured", "primary", primaryURL, "model", primaryModel) } mcpSrv := mcp.NewServer(brainDir, &pipelineCfg, llmClient.Complete, answerComplete) if rerankURL := os.Getenv("BRAIN_RERANKER_URL"); rerankURL != "" { rerankModel := envOr("BRAIN_RERANKER_MODEL", "dengcao/Qwen3-Reranker-0.6B:F16") mcpSrv = mcpSrv.WithReranker(reranker.New(rerankURL, rerankModel)) logger.Info("brain reranker configured", "url", rerankURL, "model", rerankModel) } // Hybrid retrieval (pgvector + nomic-embed-text). Both env vars must // be set together for the path to wire on; otherwise BM25-only. var vectorStore *vectorstore.PGStore pgDSN := os.Getenv("BRAIN_PG_DSN") embedURL := os.Getenv("BRAIN_EMBED_URL") switch { case pgDSN != "" && embedURL != "": embedModel := envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest") store, err := vectorstore.New(context.Background(), pgDSN) if err != nil { logger.Error("vector store init", "err", err) os.Exit(1) } if err := store.Init(context.Background()); err != nil { logger.Error("vector store migrate", "err", err) os.Exit(1) } vectorStore = store embedder := embed.New(embedURL, embedModel) mcpSrv = mcpSrv.WithHybridRetrieval(vectorAdapter{s: store}, embedder) h.WithEmbedSync(store, embedder) logger.Info("brain hybrid retrieval enabled", "pg", redactDSN(pgDSN), "embed_url", embedURL, "embed_model", embedModel) case pgDSN == "" && embedURL == "": // disabled — fine default: logger.Error("BRAIN_PG_DSN and BRAIN_EMBED_URL must be set together") os.Exit(1) } mcpToken := os.Getenv("BRAIN_MCP_TOKEN") if mcpToken == "" { logger.Error("BRAIN_MCP_TOKEN not set") os.Exit(1) } ctx := context.Background() if watchInterval > 0 { watcher.Start(ctx, watcher.Config{ BrainDir: brainDir, Interval: time.Duration(watchInterval) * time.Second, Pipeline: pipelineCfg, }) } if vectorStore != nil { embedSyncInterval := envInt("BRAIN_EMBED_SYNC_INTERVAL", 300) vectorstore.StartSync(ctx, brainDir, vectorStore, embed.New(os.Getenv("BRAIN_EMBED_URL"), envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest")), time.Duration(embedSyncInterval)*time.Second) logger.Info("embed sync started", "interval_s", embedSyncInterval) } mux := http.NewServeMux() mux.HandleFunc("POST /query", h.Query) mux.HandleFunc("POST /write", h.Write) mux.HandleFunc("POST /index", h.Index) mux.HandleFunc("POST /ingest", h.Ingest) mux.HandleFunc("POST /ingest-path", h.IngestPath) mux.HandleFunc("POST /ingest-raw", h.IngestRaw) mux.HandleFunc("POST /backfill-refs", h.BackfillRefs) mux.HandleFunc("POST /backfill-embeddings", h.BackfillEmbeddings) mux.HandleFunc("GET /pass-rate", h.PassRate) jwtValidator, err := chassisauth.NewJWTValidator(ctx, os.Getenv("DEX_ISSUER_URL"), os.Getenv("MCP_AUDIENCE")) if err != nil { logger.Error("build jwt validator", "err", err) os.Exit(1) } if jwtValidator != nil { logger.Info("jwt auth enabled", "issuer", os.Getenv("DEX_ISSUER_URL")) } // Resource-metadata URL is only emitted on 401 when Dex OAuth is // configured. Static-Bearer-only deployments leave this empty so // clients never see an OAuth challenge. var resourceMetadataURL string if dexURL := os.Getenv("DEX_ISSUER_URL"); dexURL != "" { resourceURL := os.Getenv("MCP_RESOURCE_URL") mux.HandleFunc("GET /.well-known/oauth-protected-resource", chassisauth.ProtectedResourceHandler(resourceURL, dexURL)) if resourceURL != "" { resourceMetadataURL = strings.TrimRight(resourceURL, "/") + "/.well-known/oauth-protected-resource" } } mux.Handle("/mcp", chassisauth.BearerMiddleware(mcpToken, jwtValidator, "brain", resourceMetadataURL, mcpSrv)) // Opt-in OAuth 2.0 client_credentials flow for claude.ai's custom-MCP // integration UI, which has no static-Bearer field. Setting both // OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET enables the token exchange; // setting only one is misconfiguration → fail fast. oauthID := os.Getenv("OAUTH_CLIENT_ID") oauthSecret := os.Getenv("OAUTH_CLIENT_SECRET") switch { case oauthID != "" && oauthSecret != "": issuer := os.Getenv("MCP_RESOURCE_URL") if issuer == "" { logger.Error("OAUTH_CLIENT_ID/SECRET set but MCP_RESOURCE_URL is empty; cannot derive issuer") os.Exit(1) } mux.HandleFunc("GET /.well-known/oauth-authorization-server", oauth.MetadataHandler(issuer)) mux.HandleFunc("POST /oauth/token", oauth.TokenHandler(oauth.TokenConfig{ ClientID: oauthID, ClientSecret: oauthSecret, AccessToken: mcpToken, })) logger.Info("oauth client_credentials enabled", "issuer", strings.TrimRight(issuer, "/")) case oauthID == "" && oauthSecret == "": // disabled — that's fine default: logger.Error("OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET must be set together") os.Exit(1) } // /metrics — unauthenticated Prometheus endpoint. kube-prometheus-stack // scrapes it via the ServiceMonitor in k3s/apps/supervisor/. The metrics // middleware below wraps every other registered handler so it observes // real request latency. /metrics itself is excluded from its own // observation by registering it on the outer mux (post-wrap). reg := metrics.New() mux.HandleFunc("GET /metrics", reg.Handler()) logger.Info("metrics endpoint registered", "path", "/metrics") addr := ":" + port watchIntervalLog := "disabled" if watchInterval > 0 { watchIntervalLog = fmt.Sprintf("%ds", watchInterval) } logger.Info("ingestion server starting", "addr", addr, "brain_dir", brainDir, "llm_url", llmURL, "llm_model", llmModel, "chunk_size", chunkSize, "watch_interval", watchIntervalLog, "mcp_enabled", true, ) if err := http.ListenAndServe(addr, reg.Middleware(mux)); err != nil { logger.Error("server stopped", "err", err) os.Exit(1) } }