// ingestion/cmd/server/main.go package main import ( "context" "fmt" "log/slog" "net/http" "net/url" "os" "strconv" "strings" "time" chassisauth "gitea.d-ma.be/mathias/mcp-chassis/auth" "github.com/mathiasbq/hyperguild/ingestion/internal/api" "github.com/mathiasbq/hyperguild/ingestion/internal/claudewatcher" "github.com/mathiasbq/hyperguild/ingestion/internal/embed" "github.com/mathiasbq/hyperguild/ingestion/internal/graphstore" "github.com/mathiasbq/hyperguild/ingestion/internal/graphsync" "github.com/mathiasbq/hyperguild/ingestion/internal/llm" "github.com/mathiasbq/hyperguild/ingestion/internal/mcp" "github.com/mathiasbq/hyperguild/ingestion/internal/metrics" "github.com/mathiasbq/hyperguild/ingestion/internal/oauth" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/reranker" "github.com/mathiasbq/hyperguild/ingestion/internal/search" "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" "github.com/mathiasbq/hyperguild/ingestion/internal/watcher" ) // claudeSink converts each claudewatcher.Batch into one wiki note under // brain/wiki/claude-sessions/facts/. v1 emits one note per session // keyed by host + session id; classifier-driven hall routing is a // follow-up (hyperguild#27 v2). type claudeSink struct { brainDir string logger *slog.Logger } func (s *claudeSink) Ingest(ctx context.Context, b claudewatcher.Batch) error { if len(b.Turns) == 0 { return nil } var sb strings.Builder fmt.Fprintf(&sb, "# Claude session %s (%s)\n\n", b.SessionID, b.Host) fmt.Fprintf(&sb, "_Project: `%s`. File: `%s`. Turns: %d._\n\n", b.ProjectID, b.FilePath, len(b.Turns)) for _, t := range b.Turns { fmt.Fprintf(&sb, "## %s — %s\n\n", t.Type, t.Timestamp.UTC().Format(time.RFC3339)) if t.ToolName != "" { fmt.Fprintf(&sb, "_tool: `%s`_\n\n", t.ToolName) } // Cap per-turn excerpt to keep page size bounded; the full // transcript lives on disk under ~/.claude/projects/ already. content := t.Content if len(content) > 2000 { content = content[:2000] + "…" } sb.WriteString(content) sb.WriteString("\n\n") } slug := "session-" + b.Host + "-" + b.SessionID if _, err := api.WriteNote(s.brainDir, api.WriteNoteOptions{ Filename: slug, Wing: "claude-sessions", Hall: "facts", Type: "source", Domain: b.ProjectID, Content: sb.String(), }); err != nil { return fmt.Errorf("write claude session note: %w", err) } return nil } // redactDSN parses a Postgres URL and replaces its password with `***` // for safe inclusion in logs. Falls back to a non-leaking placeholder // if parsing fails — we never log a raw DSN. func redactDSN(dsn string) string { u, err := url.Parse(dsn) if err != nil || u.User == nil { return "postgres://***" } return u.Redacted() } // vectorAdapter bridges *vectorstore.PGStore (returns []vectorstore.Hit) // to the search.VectorSearcher interface (which uses []search.VectorHit). // Kept here, not in either package, so neither has to import the other. type vectorAdapter struct{ s *vectorstore.PGStore } func (a vectorAdapter) Search(ctx context.Context, q []float32, limit int) ([]search.VectorHit, error) { hits, err := a.s.Search(ctx, q, limit) if err != nil { return nil, err } out := make([]search.VectorHit, len(hits)) for i, h := range hits { out[i] = search.VectorHit{Path: h.Path, Distance: h.Distance} } return out, nil } func envOr(key, fallback string) string { if v := os.Getenv(key); v != "" { return v } return fallback } func envInt(key string, fallback int) int { if v := os.Getenv(key); v != "" { if n, err := strconv.Atoi(v); err == nil { return n } } return fallback } // systemHostname returns os.Hostname() with a "unknown" fallback so the // caller never has to handle the rare error path. func systemHostname() string { h, err := os.Hostname() if err != nil || h == "" { return "unknown" } return h } func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) brainDir := envOr("INGEST_BRAIN_DIR", "../brain") port := envOr("INGEST_PORT", "3300") llmURL := envOr("INGEST_LLM_URL", "http://iguana:4000/v1") llmKey := os.Getenv("INGEST_LLM_KEY") llmModel := envOr("INGEST_LLM_MODEL", "koala/qwen35-9b-fast") llmTimeoutMins := envInt("INGEST_LLM_TIMEOUT", 15) chunkSize := envInt("INGEST_CHUNK_SIZE", 6000) watchInterval := envInt("INGEST_WATCH_INTERVAL", 30) llmClient := llm.New(llmURL, llmKey, llmModel, time.Duration(llmTimeoutMins)*time.Minute) pipelineCfg := pipeline.Config{ Complete: llmClient.Complete, ChunkSize: chunkSize, } h := api.NewHandler(brainDir, logger, pipelineCfg) var answerComplete pipeline.CompleteFunc if primaryURL := os.Getenv("BRAIN_LLM_PRIMARY_URL"); primaryURL != "" { primaryModel := envOr("BRAIN_LLM_PRIMARY_MODEL", "gemma4:31b") primaryKey := os.Getenv("BERGET_API_KEY") timeoutMS := envInt("BRAIN_LLM_TIMEOUT_MS", 10000) timeout := time.Duration(timeoutMS) * time.Millisecond primary := llm.New(primaryURL, primaryKey, primaryModel, timeout) router := &llm.Router{Primary: primary} if fallbackURL := os.Getenv("BRAIN_LLM_FALLBACK_URL"); fallbackURL != "" { fallbackModel := envOr("BRAIN_LLM_FALLBACK_MODEL", "gemma4:31b") router.Fallback = llm.New(fallbackURL, "", fallbackModel, timeout) } answerComplete = router.Complete logger.Info("brain answer LLM configured", "primary", primaryURL, "model", primaryModel) } mcpSrv := mcp.NewServer(brainDir, &pipelineCfg, llmClient.Complete, answerComplete) if rerankURL := os.Getenv("BRAIN_RERANKER_URL"); rerankURL != "" { rerankModel := envOr("BRAIN_RERANKER_MODEL", "dengcao/Qwen3-Reranker-0.6B:F16") mcpSrv = mcpSrv.WithReranker(reranker.New(rerankURL, rerankModel)) logger.Info("brain reranker configured", "url", rerankURL, "model", rerankModel) } // Hybrid retrieval (pgvector + nomic-embed-text). Both env vars must // be set together for the path to wire on; otherwise BM25-only. var vectorStore *vectorstore.PGStore pgDSN := os.Getenv("BRAIN_PG_DSN") embedURL := os.Getenv("BRAIN_EMBED_URL") switch { case pgDSN != "" && embedURL != "": embedModel := envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest") store, err := vectorstore.New(context.Background(), pgDSN) if err != nil { logger.Error("vector store init", "err", err) os.Exit(1) } if err := store.Init(context.Background()); err != nil { logger.Error("vector store migrate", "err", err) os.Exit(1) } vectorStore = store embedder := embed.New(embedURL, embedModel) mcpSrv = mcpSrv.WithHybridRetrieval(vectorAdapter{s: store}, embedder) h.WithEmbedSync(store, embedder) logger.Info("brain hybrid retrieval enabled", "pg", redactDSN(pgDSN), "embed_url", embedURL, "embed_model", embedModel) // Graph store shares the same postgres18 DSN as the vector // store and is opt-in via BRAIN_GRAPH_ENABLED=true. Defaults // to off so first rollout doesn't surprise — flip on after // the migration completes and the backfill finishes. if envOr("BRAIN_GRAPH_ENABLED", "false") == "true" { gstore, gerr := graphstore.New(context.Background(), pgDSN) if gerr != nil { logger.Error("graph store init", "err", gerr) os.Exit(1) } if gerr := gstore.Init(context.Background()); gerr != nil { logger.Error("graph store migrate", "err", gerr) os.Exit(1) } mcpSrv = mcpSrv.WithGraph(gstore) if envOr("BRAIN_GRAPH_BACKFILL", "false") == "true" { n, berr := graphsync.BackfillFromBrainDir(context.Background(), gstore, brainDir) if berr != nil { logger.Warn("graph backfill incomplete", "indexed", n, "err", berr) } else { logger.Info("graph backfill complete", "indexed", n) } } logger.Info("brain graph enabled", "pg", redactDSN(pgDSN)) } case pgDSN == "" && embedURL == "": // disabled — fine default: logger.Error("BRAIN_PG_DSN and BRAIN_EMBED_URL must be set together") os.Exit(1) } mcpToken := os.Getenv("BRAIN_MCP_TOKEN") if mcpToken == "" { logger.Error("BRAIN_MCP_TOKEN not set") os.Exit(1) } ctx := context.Background() if watchInterval > 0 { watcher.Start(ctx, watcher.Config{ BrainDir: brainDir, Interval: time.Duration(watchInterval) * time.Second, Pipeline: pipelineCfg, }) } // Claude Code session ingestion (hyperguild#27 / infra#73 Track E.1). // Off by default — explicitly opt in by setting CLAUDE_SESSIONS_DIR // to the ~/.claude/projects path. Requires BRAIN_PG_DSN for the // cursor table (resumable offsets across restarts). if claudeDir := os.Getenv("CLAUDE_SESSIONS_DIR"); claudeDir != "" { if pgDSN == "" { logger.Error("CLAUDE_SESSIONS_DIR set but BRAIN_PG_DSN missing — claudewatcher needs the cursor table") os.Exit(1) } cursorStore, cerr := claudewatcher.NewCursorStore(ctx, pgDSN) if cerr != nil { logger.Error("claudewatcher cursor init", "err", cerr) os.Exit(1) } if cerr := cursorStore.Init(ctx); cerr != nil { logger.Error("claudewatcher cursor migrate", "err", cerr) os.Exit(1) } host := envOr("CLAUDE_INGEST_HOST", systemHostname()) interval := time.Duration(envInt("CLAUDE_INGEST_INTERVAL", 60)) * time.Second sink := &claudeSink{brainDir: brainDir, logger: logger} go func() { if err := claudewatcher.Watch(ctx, claudewatcher.Config{ SessionsDir: claudeDir, Host: host, Interval: interval, Sink: sink, Cursors: cursorStore, Logger: logger, }); err != nil && err != context.Canceled { logger.Error("claudewatcher exited", "err", err) } }() logger.Info("claudewatcher started", "sessions_dir", claudeDir, "host", host, "interval", interval) } if vectorStore != nil { embedSyncInterval := envInt("BRAIN_EMBED_SYNC_INTERVAL", 300) vectorstore.StartSync(ctx, brainDir, vectorStore, embed.New(os.Getenv("BRAIN_EMBED_URL"), envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest")), time.Duration(embedSyncInterval)*time.Second) logger.Info("embed sync started", "interval_s", embedSyncInterval) } mux := http.NewServeMux() mux.HandleFunc("POST /query", h.Query) mux.HandleFunc("POST /write", h.Write) mux.HandleFunc("POST /index", h.Index) mux.HandleFunc("POST /ingest", h.Ingest) mux.HandleFunc("POST /ingest-path", h.IngestPath) mux.HandleFunc("POST /ingest-raw", h.IngestRaw) mux.HandleFunc("POST /backfill-refs", h.BackfillRefs) mux.HandleFunc("POST /backfill-embeddings", h.BackfillEmbeddings) mux.HandleFunc("GET /pass-rate", h.PassRate) jwtValidator, err := chassisauth.NewJWTValidator(ctx, os.Getenv("DEX_ISSUER_URL"), os.Getenv("MCP_AUDIENCE")) if err != nil { logger.Error("build jwt validator", "err", err) os.Exit(1) } if jwtValidator != nil { logger.Info("jwt auth enabled", "issuer", os.Getenv("DEX_ISSUER_URL")) } // Resource-metadata URL is only emitted on 401 when Dex OAuth is // configured. Static-Bearer-only deployments leave this empty so // clients never see an OAuth challenge. var resourceMetadataURL string if dexURL := os.Getenv("DEX_ISSUER_URL"); dexURL != "" { resourceURL := os.Getenv("MCP_RESOURCE_URL") mux.HandleFunc("GET /.well-known/oauth-protected-resource", chassisauth.ProtectedResourceHandler(resourceURL, dexURL)) if resourceURL != "" { resourceMetadataURL = strings.TrimRight(resourceURL, "/") + "/.well-known/oauth-protected-resource" } } mux.Handle("/mcp", chassisauth.BearerMiddleware(mcpToken, jwtValidator, "brain", resourceMetadataURL, mcpSrv)) // Opt-in OAuth 2.0 client_credentials flow for claude.ai's custom-MCP // integration UI, which has no static-Bearer field. Setting both // OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET enables the token exchange; // setting only one is misconfiguration → fail fast. oauthID := os.Getenv("OAUTH_CLIENT_ID") oauthSecret := os.Getenv("OAUTH_CLIENT_SECRET") switch { case oauthID != "" && oauthSecret != "": issuer := os.Getenv("MCP_RESOURCE_URL") if issuer == "" { logger.Error("OAUTH_CLIENT_ID/SECRET set but MCP_RESOURCE_URL is empty; cannot derive issuer") os.Exit(1) } mux.HandleFunc("GET /.well-known/oauth-authorization-server", oauth.MetadataHandler(issuer)) mux.HandleFunc("POST /oauth/token", oauth.TokenHandler(oauth.TokenConfig{ ClientID: oauthID, ClientSecret: oauthSecret, AccessToken: mcpToken, })) logger.Info("oauth client_credentials enabled", "issuer", strings.TrimRight(issuer, "/")) case oauthID == "" && oauthSecret == "": // disabled — that's fine default: logger.Error("OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET must be set together") os.Exit(1) } // /metrics — unauthenticated Prometheus endpoint. kube-prometheus-stack // scrapes it via the ServiceMonitor in k3s/apps/supervisor/. The metrics // middleware below wraps every other registered handler so it observes // real request latency. /metrics itself is excluded from its own // observation by registering it on the outer mux (post-wrap). reg := metrics.New() mux.HandleFunc("GET /metrics", reg.Handler()) logger.Info("metrics endpoint registered", "path", "/metrics") addr := ":" + port watchIntervalLog := "disabled" if watchInterval > 0 { watchIntervalLog = fmt.Sprintf("%ds", watchInterval) } logger.Info("ingestion server starting", "addr", addr, "brain_dir", brainDir, "llm_url", llmURL, "llm_model", llmModel, "chunk_size", chunkSize, "watch_interval", watchIntervalLog, "mcp_enabled", true, ) if err := http.ListenAndServe(addr, reg.Middleware(mux)); err != nil { logger.Error("server stopped", "err", err) os.Exit(1) } }