Closes infra#50.
Adds an internal/metrics package with a hand-rolled Prometheus
exposition layer (stdlib + sync/atomic only — no new dep) and wraps the
HTTP mux with a timing middleware. Every request emits one observation
on the `brain_query_duration_seconds` histogram labeled by
`path` (request Pattern, low cardinality) and `status` (2xx/3xx/4xx/5xx).
Dependency choice: hand-rolled rather than github.com/prometheus/client_golang
because the surface needed is small (one histogram + bucket constants)
and the repo CLAUDE.md keeps deps stdlib + jwx + testify only. ~150 LOC
of code + tests is cheaper than the chart of transitive prometheus deps.
Endpoints:
- GET /metrics — OpenMetrics text exposition, no auth (cluster-internal)
Wire format pinned by tests in internal/metrics/metrics_test.go. The
ServiceMonitor that drives the kube-prometheus-stack scrape lives in
infra/k3s/apps/supervisor/ (separate commit on mathias/infra).
After this image deploys, the canary alert from
docs/superpowers/specs/2026-05-homelab-architecture-review.md becomes
wireable:
histogram_quantile(0.95,
sum(rate(brain_query_duration_seconds_bucket[5m])) by (le))
> 1.5 * histogram_quantile(0.95,
sum(rate(brain_query_duration_seconds_bucket[5m] offset 7d)) by (le))
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
195 lines
5.3 KiB
Go
195 lines
5.3 KiB
Go
// Package metrics is a tiny Prometheus exposition layer.
|
|
//
|
|
// Hand-rolled rather than pulling in github.com/prometheus/client_golang
|
|
// to keep ingestion's dependency surface minimal (stdlib + jwx + testify
|
|
// per the repo CLAUDE.md). The single histogram + counter it emits cover
|
|
// the canary alert wired in k3s/apps/monitoring/ — see infra#50.
|
|
//
|
|
// Wire format follows the OpenMetrics text exposition that
|
|
// kube-prometheus-stack scrapes by default.
|
|
package metrics
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// histogram buckets in seconds. Tuned for in-cluster HTTP API
|
|
// latencies: BM25 query is sub-10ms, hybrid retrieval + LLM-synthesis
|
|
// can run into seconds. +Inf catch-all is implicit.
|
|
var defaultBuckets = []float64{
|
|
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10,
|
|
}
|
|
|
|
// Registry holds one histogram (request latency) labeled by path + status
|
|
// and one counter (request total) with the same labels. Concurrent-safe.
|
|
type Registry struct {
|
|
mu sync.RWMutex
|
|
series map[labelKey]*series
|
|
buckets []float64
|
|
}
|
|
|
|
type labelKey struct{ path, status string }
|
|
|
|
type series struct {
|
|
// One atomic counter per bucket (counts of observations ≤ bucket).
|
|
// counts[len(buckets)] = +Inf bucket (== total observations).
|
|
counts []atomic.Uint64
|
|
sumNs atomic.Uint64 // sum of durations in nanoseconds
|
|
}
|
|
|
|
// New returns a Registry pre-populated with no series; the first
|
|
// observation per (path, status) lazy-creates one.
|
|
func New() *Registry {
|
|
return &Registry{
|
|
series: make(map[labelKey]*series),
|
|
buckets: defaultBuckets,
|
|
}
|
|
}
|
|
|
|
// Observe records a single request duration for the given path + status.
|
|
func (r *Registry) Observe(path, status string, d time.Duration) {
|
|
key := labelKey{path: path, status: status}
|
|
|
|
r.mu.RLock()
|
|
s := r.series[key]
|
|
r.mu.RUnlock()
|
|
|
|
if s == nil {
|
|
r.mu.Lock()
|
|
s = r.series[key]
|
|
if s == nil {
|
|
s = &series{counts: make([]atomic.Uint64, len(r.buckets)+1)}
|
|
r.series[key] = s
|
|
}
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
secs := d.Seconds()
|
|
for i, b := range r.buckets {
|
|
if secs <= b {
|
|
s.counts[i].Add(1)
|
|
}
|
|
}
|
|
// +Inf bucket always increments.
|
|
s.counts[len(r.buckets)].Add(1)
|
|
s.sumNs.Add(uint64(d.Nanoseconds()))
|
|
}
|
|
|
|
// Middleware wraps next, observing every request's duration + status.
|
|
// The metric label `path` uses the request's Pattern (Go 1.22+ ServeMux),
|
|
// falling back to the URL path if no Pattern is set. Pattern keeps
|
|
// cardinality bounded (one series per route, not one per unique URL).
|
|
func (r *Registry) Middleware(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
rec := &statusRecorder{ResponseWriter: w, code: http.StatusOK}
|
|
start := time.Now()
|
|
next.ServeHTTP(rec, req)
|
|
path := req.Pattern
|
|
if path == "" {
|
|
path = req.URL.Path
|
|
}
|
|
r.Observe(path, statusClass(rec.code), time.Since(start))
|
|
})
|
|
}
|
|
|
|
// Handler exposes /metrics in OpenMetrics text format.
|
|
func (r *Registry) Handler() http.HandlerFunc {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
|
r.write(w)
|
|
}
|
|
}
|
|
|
|
func (r *Registry) write(w http.ResponseWriter) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
_, _ = fmt.Fprintln(w, "# HELP brain_query_duration_seconds Brain HTTP API request latency in seconds.")
|
|
_, _ = fmt.Fprintln(w, "# TYPE brain_query_duration_seconds histogram")
|
|
|
|
// Sort keys for stable output (helps diffing scrape responses).
|
|
keys := make([]labelKey, 0, len(r.series))
|
|
for k := range r.series {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Slice(keys, func(i, j int) bool {
|
|
if keys[i].path != keys[j].path {
|
|
return keys[i].path < keys[j].path
|
|
}
|
|
return keys[i].status < keys[j].status
|
|
})
|
|
|
|
for _, k := range keys {
|
|
s := r.series[k]
|
|
labels := fmt.Sprintf(`path=%q,status=%q`, k.path, k.status)
|
|
for i, b := range r.buckets {
|
|
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=%q} %d\n",
|
|
labels, formatBucket(b), s.counts[i].Load())
|
|
}
|
|
// +Inf bucket
|
|
inf := s.counts[len(r.buckets)].Load()
|
|
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=\"+Inf\"} %d\n", labels, inf)
|
|
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_sum{%s} %s\n",
|
|
labels, formatSeconds(s.sumNs.Load()))
|
|
_, _ = fmt.Fprintf(w, "brain_query_duration_seconds_count{%s} %d\n", labels, inf)
|
|
}
|
|
}
|
|
|
|
func formatBucket(b float64) string {
|
|
// Match Prometheus convention: no trailing zeros.
|
|
s := fmt.Sprintf("%g", b)
|
|
if !strings.ContainsAny(s, ".e") {
|
|
s = s + ".0"
|
|
}
|
|
return s
|
|
}
|
|
|
|
func formatSeconds(ns uint64) string {
|
|
return fmt.Sprintf("%g", float64(ns)/1e9)
|
|
}
|
|
|
|
func statusClass(code int) string {
|
|
switch {
|
|
case code >= 200 && code < 300:
|
|
return "2xx"
|
|
case code >= 300 && code < 400:
|
|
return "3xx"
|
|
case code >= 400 && code < 500:
|
|
return "4xx"
|
|
case code >= 500 && code < 600:
|
|
return "5xx"
|
|
default:
|
|
return "xxx"
|
|
}
|
|
}
|
|
|
|
// statusRecorder captures the response code so middleware can label
|
|
// the histogram by status class without buffering the body.
|
|
type statusRecorder struct {
|
|
http.ResponseWriter
|
|
code int
|
|
wroteHeader bool
|
|
}
|
|
|
|
func (r *statusRecorder) WriteHeader(code int) {
|
|
if r.wroteHeader {
|
|
return
|
|
}
|
|
r.code = code
|
|
r.wroteHeader = true
|
|
r.ResponseWriter.WriteHeader(code)
|
|
}
|
|
|
|
func (r *statusRecorder) Write(b []byte) (int, error) {
|
|
if !r.wroteHeader {
|
|
r.WriteHeader(http.StatusOK)
|
|
}
|
|
return r.ResponseWriter.Write(b)
|
|
}
|