// Package metrics is a tiny Prometheus exposition layer. // // Hand-rolled rather than pulling in github.com/prometheus/client_golang // to keep ingestion's dependency surface minimal (stdlib + jwx + testify // per the repo CLAUDE.md). The single histogram + counter it emits cover // the canary alert wired in k3s/apps/monitoring/ — see infra#50. // // Wire format follows the OpenMetrics text exposition that // kube-prometheus-stack scrapes by default. package metrics import ( "fmt" "net/http" "sort" "strings" "sync" "sync/atomic" "time" ) // histogram buckets in seconds. Tuned for in-cluster HTTP API // latencies: BM25 query is sub-10ms, hybrid retrieval + LLM-synthesis // can run into seconds. +Inf catch-all is implicit. var defaultBuckets = []float64{ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, } // Registry holds one histogram (request latency) labeled by path + status // and one counter (request total) with the same labels. Concurrent-safe. type Registry struct { mu sync.RWMutex series map[labelKey]*series buckets []float64 } type labelKey struct{ path, status string } type series struct { // One atomic counter per bucket (counts of observations ≤ bucket). // counts[len(buckets)] = +Inf bucket (== total observations). counts []atomic.Uint64 sumNs atomic.Uint64 // sum of durations in nanoseconds } // New returns a Registry pre-populated with no series; the first // observation per (path, status) lazy-creates one. func New() *Registry { return &Registry{ series: make(map[labelKey]*series), buckets: defaultBuckets, } } // Observe records a single request duration for the given path + status. func (r *Registry) Observe(path, status string, d time.Duration) { key := labelKey{path: path, status: status} r.mu.RLock() s := r.series[key] r.mu.RUnlock() if s == nil { r.mu.Lock() s = r.series[key] if s == nil { s = &series{counts: make([]atomic.Uint64, len(r.buckets)+1)} r.series[key] = s } r.mu.Unlock() } secs := d.Seconds() for i, b := range r.buckets { if secs <= b { s.counts[i].Add(1) } } // +Inf bucket always increments. s.counts[len(r.buckets)].Add(1) s.sumNs.Add(uint64(d.Nanoseconds())) } // Middleware wraps next, observing every request's duration + status. // The metric label `path` uses the request's Pattern (Go 1.22+ ServeMux), // falling back to the URL path if no Pattern is set. Pattern keeps // cardinality bounded (one series per route, not one per unique URL). func (r *Registry) Middleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { rec := &statusRecorder{ResponseWriter: w, code: http.StatusOK} start := time.Now() next.ServeHTTP(rec, req) path := req.Pattern if path == "" { path = req.URL.Path } r.Observe(path, statusClass(rec.code), time.Since(start)) }) } // Handler exposes /metrics in OpenMetrics text format. func (r *Registry) Handler() http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") r.write(w) } } func (r *Registry) write(w http.ResponseWriter) { r.mu.RLock() defer r.mu.RUnlock() _, _ = fmt.Fprintln(w, "# HELP brain_query_duration_seconds Brain HTTP API request latency in seconds.") _, _ = fmt.Fprintln(w, "# TYPE brain_query_duration_seconds histogram") // Sort keys for stable output (helps diffing scrape responses). keys := make([]labelKey, 0, len(r.series)) for k := range r.series { keys = append(keys, k) } sort.Slice(keys, func(i, j int) bool { if keys[i].path != keys[j].path { return keys[i].path < keys[j].path } return keys[i].status < keys[j].status }) for _, k := range keys { s := r.series[k] labels := fmt.Sprintf(`path=%q,status=%q`, k.path, k.status) for i, b := range r.buckets { _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=%q} %d\n", labels, formatBucket(b), s.counts[i].Load()) } // +Inf bucket inf := s.counts[len(r.buckets)].Load() _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=\"+Inf\"} %d\n", labels, inf) _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_sum{%s} %s\n", labels, formatSeconds(s.sumNs.Load())) _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_count{%s} %d\n", labels, inf) } } func formatBucket(b float64) string { // Match Prometheus convention: no trailing zeros. s := fmt.Sprintf("%g", b) if !strings.ContainsAny(s, ".e") { s = s + ".0" } return s } func formatSeconds(ns uint64) string { return fmt.Sprintf("%g", float64(ns)/1e9) } func statusClass(code int) string { switch { case code >= 200 && code < 300: return "2xx" case code >= 300 && code < 400: return "3xx" case code >= 400 && code < 500: return "4xx" case code >= 500 && code < 600: return "5xx" default: return "xxx" } } // statusRecorder captures the response code so middleware can label // the histogram by status class without buffering the body. type statusRecorder struct { http.ResponseWriter code int wroteHeader bool } func (r *statusRecorder) WriteHeader(code int) { if r.wroteHeader { return } r.code = code r.wroteHeader = true r.ResponseWriter.WriteHeader(code) } func (r *statusRecorder) Write(b []byte) (int, error) { if !r.wroteHeader { r.WriteHeader(http.StatusOK) } return r.ResponseWriter.Write(b) }