feat(ingestion): expose Prometheus /metrics for brain query latency
All checks were successful
CI / Lint / Test / Vet (push) Successful in 11s
CI / Mirror to GitHub (push) Successful in 3s

Closes infra#50.

Adds an internal/metrics package with a hand-rolled Prometheus
exposition layer (stdlib + sync/atomic only — no new dep) and wraps the
HTTP mux with a timing middleware. Every request emits one observation
on the `brain_query_duration_seconds` histogram labeled by
`path` (request Pattern, low cardinality) and `status` (2xx/3xx/4xx/5xx).

Dependency choice: hand-rolled rather than github.com/prometheus/client_golang
because the surface needed is small (one histogram + bucket constants)
and the repo CLAUDE.md keeps deps stdlib + jwx + testify only. ~150 LOC
of code + tests is cheaper than the chart of transitive prometheus deps.

Endpoints:
- GET /metrics  — OpenMetrics text exposition, no auth (cluster-internal)

Wire format pinned by tests in internal/metrics/metrics_test.go. The
ServiceMonitor that drives the kube-prometheus-stack scrape lives in
infra/k3s/apps/supervisor/ (separate commit on mathias/infra).

After this image deploys, the canary alert from
docs/superpowers/specs/2026-05-homelab-architecture-review.md becomes
wireable:

  histogram_quantile(0.95,
    sum(rate(brain_query_duration_seconds_bucket[5m])) by (le))
    > 1.5 * histogram_quantile(0.95,
        sum(rate(brain_query_duration_seconds_bucket[5m] offset 7d)) by (le))

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mathias
2026-05-22 07:13:05 +02:00
parent 815739758e
commit e49b36e463
3 changed files with 324 additions and 1 deletions

View File

@@ -17,6 +17,7 @@ import (
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
"github.com/mathiasbq/hyperguild/ingestion/internal/mcp"
"github.com/mathiasbq/hyperguild/ingestion/internal/embed"
"github.com/mathiasbq/hyperguild/ingestion/internal/metrics"
"github.com/mathiasbq/hyperguild/ingestion/internal/oauth"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/reranker"
@@ -235,6 +236,15 @@ func main() {
os.Exit(1)
}
// /metrics — unauthenticated Prometheus endpoint. kube-prometheus-stack
// scrapes it via the ServiceMonitor in k3s/apps/supervisor/. The metrics
// middleware below wraps every other registered handler so it observes
// real request latency. /metrics itself is excluded from its own
// observation by registering it on the outer mux (post-wrap).
reg := metrics.New()
mux.HandleFunc("GET /metrics", reg.Handler())
logger.Info("metrics endpoint registered", "path", "/metrics")
addr := ":" + port
watchIntervalLog := "disabled"
if watchInterval > 0 {
@@ -249,7 +259,7 @@ func main() {
"watch_interval", watchIntervalLog,
"mcp_enabled", true,
)
if err := http.ListenAndServe(addr, mux); err != nil {
if err := http.ListenAndServe(addr, reg.Middleware(mux)); err != nil {
logger.Error("server stopped", "err", err)
os.Exit(1)
}

View File

@@ -0,0 +1,194 @@
// Package metrics is a tiny Prometheus exposition layer.
//
// Hand-rolled rather than pulling in github.com/prometheus/client_golang
// to keep ingestion's dependency surface minimal (stdlib + jwx + testify
// per the repo CLAUDE.md). The single histogram + counter it emits cover
// the canary alert wired in k3s/apps/monitoring/ — see infra#50.
//
// Wire format follows the OpenMetrics text exposition that
// kube-prometheus-stack scrapes by default.
package metrics
import (
"fmt"
"net/http"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)
// histogram buckets in seconds. Tuned for in-cluster HTTP API
// latencies: BM25 query is sub-10ms, hybrid retrieval + LLM-synthesis
// can run into seconds. +Inf catch-all is implicit.
var defaultBuckets = []float64{
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10,
}
// Registry holds one histogram (request latency) labeled by path + status
// and one counter (request total) with the same labels. Concurrent-safe.
type Registry struct {
mu sync.RWMutex
series map[labelKey]*series
buckets []float64
}
type labelKey struct{ path, status string }
type series struct {
// One atomic counter per bucket (counts of observations ≤ bucket).
// counts[len(buckets)] = +Inf bucket (== total observations).
counts []atomic.Uint64
sumNs atomic.Uint64 // sum of durations in nanoseconds
}
// New returns a Registry pre-populated with no series; the first
// observation per (path, status) lazy-creates one.
func New() *Registry {
return &Registry{
series: make(map[labelKey]*series),
buckets: defaultBuckets,
}
}
// Observe records a single request duration for the given path + status.
func (r *Registry) Observe(path, status string, d time.Duration) {
key := labelKey{path: path, status: status}
r.mu.RLock()
s := r.series[key]
r.mu.RUnlock()
if s == nil {
r.mu.Lock()
s = r.series[key]
if s == nil {
s = &series{counts: make([]atomic.Uint64, len(r.buckets)+1)}
r.series[key] = s
}
r.mu.Unlock()
}
secs := d.Seconds()
for i, b := range r.buckets {
if secs <= b {
s.counts[i].Add(1)
}
}
// +Inf bucket always increments.
s.counts[len(r.buckets)].Add(1)
s.sumNs.Add(uint64(d.Nanoseconds()))
}
// Middleware wraps next, observing every request's duration + status.
// The metric label `path` uses the request's Pattern (Go 1.22+ ServeMux),
// falling back to the URL path if no Pattern is set. Pattern keeps
// cardinality bounded (one series per route, not one per unique URL).
func (r *Registry) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
rec := &statusRecorder{ResponseWriter: w, code: http.StatusOK}
start := time.Now()
next.ServeHTTP(rec, req)
path := req.Pattern
if path == "" {
path = req.URL.Path
}
r.Observe(path, statusClass(rec.code), time.Since(start))
})
}
// Handler exposes /metrics in OpenMetrics text format.
func (r *Registry) Handler() http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
r.write(w)
}
}
func (r *Registry) write(w http.ResponseWriter) {
r.mu.RLock()
defer r.mu.RUnlock()
_, _ = fmt.Fprintln(w, "# HELP brain_query_duration_seconds Brain HTTP API request latency in seconds.")
_, _ = fmt.Fprintln(w, "# TYPE brain_query_duration_seconds histogram")
// Sort keys for stable output (helps diffing scrape responses).
keys := make([]labelKey, 0, len(r.series))
for k := range r.series {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
if keys[i].path != keys[j].path {
return keys[i].path < keys[j].path
}
return keys[i].status < keys[j].status
})
for _, k := range keys {
s := r.series[k]
labels := fmt.Sprintf(`path=%q,status=%q`, k.path, k.status)
for i, b := range r.buckets {
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=%q} %d\n",
labels, formatBucket(b), s.counts[i].Load())
}
// +Inf bucket
inf := s.counts[len(r.buckets)].Load()
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=\"+Inf\"} %d\n", labels, inf)
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_sum{%s} %s\n",
labels, formatSeconds(s.sumNs.Load()))
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_count{%s} %d\n", labels, inf)
}
}
func formatBucket(b float64) string {
// Match Prometheus convention: no trailing zeros.
s := fmt.Sprintf("%g", b)
if !strings.ContainsAny(s, ".e") {
s = s + ".0"
}
return s
}
func formatSeconds(ns uint64) string {
return fmt.Sprintf("%g", float64(ns)/1e9)
}
func statusClass(code int) string {
switch {
case code >= 200 && code < 300:
return "2xx"
case code >= 300 && code < 400:
return "3xx"
case code >= 400 && code < 500:
return "4xx"
case code >= 500 && code < 600:
return "5xx"
default:
return "xxx"
}
}
// statusRecorder captures the response code so middleware can label
// the histogram by status class without buffering the body.
type statusRecorder struct {
http.ResponseWriter
code int
wroteHeader bool
}
func (r *statusRecorder) WriteHeader(code int) {
if r.wroteHeader {
return
}
r.code = code
r.wroteHeader = true
r.ResponseWriter.WriteHeader(code)
}
func (r *statusRecorder) Write(b []byte) (int, error) {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
return r.ResponseWriter.Write(b)
}

View File

@@ -0,0 +1,119 @@
package metrics
import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)
func TestRegistry_ObserveAndExpose(t *testing.T) {
t.Parallel()
r := New()
// Three observations on the same series; one falls into each
// representative band.
r.Observe("/query", "2xx", 4*time.Millisecond) // ≤ 5ms
r.Observe("/query", "2xx", 20*time.Millisecond) // ≤ 25ms
r.Observe("/query", "2xx", 600*time.Millisecond) // ≤ 1s
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
rec := httptest.NewRecorder()
r.Handler().ServeHTTP(rec, req)
body := rec.Body.String()
mustContain := []string{
`# TYPE brain_query_duration_seconds histogram`,
`brain_query_duration_seconds_bucket{path="/query",status="2xx",le="0.005"} 1`,
`brain_query_duration_seconds_bucket{path="/query",status="2xx",le="0.025"} 2`,
`brain_query_duration_seconds_bucket{path="/query",status="2xx",le="1.0"} 3`,
`brain_query_duration_seconds_bucket{path="/query",status="2xx",le="+Inf"} 3`,
`brain_query_duration_seconds_count{path="/query",status="2xx"} 3`,
}
for _, want := range mustContain {
if !strings.Contains(body, want) {
t.Errorf("missing line: %q\n--- body ---\n%s", want, body)
}
}
if got := rec.Header().Get("Content-Type"); !strings.HasPrefix(got, "text/plain") {
t.Errorf("content-type = %q, want text/plain prefix", got)
}
}
func TestRegistry_LabelsByStatus(t *testing.T) {
t.Parallel()
r := New()
r.Observe("/query", "2xx", time.Millisecond)
r.Observe("/query", "5xx", time.Millisecond)
r.Observe("/write", "2xx", time.Millisecond)
rec := httptest.NewRecorder()
r.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil))
body := rec.Body.String()
for _, want := range []string{
`brain_query_duration_seconds_count{path="/query",status="2xx"} 1`,
`brain_query_duration_seconds_count{path="/query",status="5xx"} 1`,
`brain_query_duration_seconds_count{path="/write",status="2xx"} 1`,
} {
if !strings.Contains(body, want) {
t.Errorf("missing %q in body:\n%s", want, body)
}
}
}
func TestMiddleware_RecordsTiming(t *testing.T) {
t.Parallel()
r := New()
handler := r.Middleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
time.Sleep(2 * time.Millisecond)
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, "ok")
}))
srv := httptest.NewServer(handler)
defer srv.Close()
resp, err := http.Get(srv.URL + "/query")
if err != nil {
t.Fatalf("get: %v", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("status %d, want 200", resp.StatusCode)
}
// Exposition should now include /query.
rec := httptest.NewRecorder()
r.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil))
body := rec.Body.String()
if !strings.Contains(body, `path="/query"`) {
t.Errorf("expected /query series, got body:\n%s", body)
}
if !strings.Contains(body, `status="2xx"`) {
t.Errorf("expected 2xx status class, got body:\n%s", body)
}
}
func TestStatusRecorder_DefaultsTo200(t *testing.T) {
t.Parallel()
r := New()
handler := r.Middleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("hello"))
}))
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/x", nil))
if rec.Code != http.StatusOK {
t.Errorf("code %d, want 200", rec.Code)
}
}