From bf8a3fc11c84049c89d6607d477284075b041000 Mon Sep 17 00:00:00 2001 From: Mathias Bergqvist Date: Wed, 22 Apr 2026 22:29:24 +0200 Subject: [PATCH] feat(ingestion): add OpenAI-compatible LLM HTTP client with 429 retry --- ingestion/internal/llm/client.go | 119 ++++++++++++++++++++++++++ ingestion/internal/llm/client_test.go | 86 +++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 ingestion/internal/llm/client.go create mode 100644 ingestion/internal/llm/client_test.go diff --git a/ingestion/internal/llm/client.go b/ingestion/internal/llm/client.go new file mode 100644 index 0000000..4fac2d5 --- /dev/null +++ b/ingestion/internal/llm/client.go @@ -0,0 +1,119 @@ +package llm + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" +) + +// Client calls an OpenAI-compatible chat completions endpoint. +type Client struct { + baseURL string + apiKey string + model string + httpClient *http.Client +} + +// New constructs a Client. +func New(baseURL, apiKey, model string, timeout time.Duration) *Client { + return &Client{ + baseURL: strings.TrimRight(baseURL, "/"), + apiKey: apiKey, + model: model, + httpClient: &http.Client{Timeout: timeout}, + } +} + +type chatRequest struct { + Model string `json:"model"` + Messages []message `json:"messages"` + Temperature float64 `json:"temperature"` +} + +type message struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type chatResponse struct { + Choices []struct { + Message message `json:"message"` + } `json:"choices"` +} + +// Complete sends a system + user message and returns the assistant's reply. +// Retries once on HTTP 429 using Retry-After header or 5s backoff. +func (c *Client) Complete(ctx context.Context, system, user string) (string, error) { + body := chatRequest{ + Model: c.model, + Messages: []message{ + {Role: "system", Content: system}, + {Role: "user", Content: user}, + }, + Temperature: 0.2, + } + b, err := json.Marshal(body) + if err != nil { + return "", fmt.Errorf("marshal request: %w", err) + } + + do := func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/chat/completions", bytes.NewReader(b)) + if err != nil { + return nil, fmt.Errorf("build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if c.apiKey != "" { + req.Header.Set("Authorization", "Bearer "+c.apiKey) + } + return c.httpClient.Do(req) + } + + resp, err := do() + if err != nil { + return "", fmt.Errorf("call LLM: %w", err) + } + + if resp.StatusCode == http.StatusTooManyRequests { + resp.Body.Close() + wait := 5 * time.Second + if ra := resp.Header.Get("Retry-After"); ra != "" { + if secs, err := strconv.Atoi(ra); err == nil { + wait = time.Duration(secs) * time.Second + } + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(wait): + } + resp, err = do() + if err != nil { + return "", fmt.Errorf("retry LLM call: %w", err) + } + } + defer resp.Body.Close() + + out, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("read response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("LLM returned %d: %s", resp.StatusCode, out) + } + + var cr chatResponse + if err := json.Unmarshal(out, &cr); err != nil { + return "", fmt.Errorf("parse response: %w", err) + } + if len(cr.Choices) == 0 { + return "", fmt.Errorf("LLM returned no choices") + } + return cr.Choices[0].Message.Content, nil +} diff --git a/ingestion/internal/llm/client_test.go b/ingestion/internal/llm/client_test.go new file mode 100644 index 0000000..7101f96 --- /dev/null +++ b/ingestion/internal/llm/client_test.go @@ -0,0 +1,86 @@ +package llm + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func mockServer(t *testing.T, response string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/chat/completions", r.URL.Path) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]any{ + "choices": []map[string]any{ + {"message": map[string]any{"role": "assistant", "content": response}}, + }, + }) + })) +} + +func TestClient_Complete(t *testing.T) { + srv := mockServer(t, "hello world") + defer srv.Close() + + c := New(srv.URL, "", "test-model", 10*time.Second) + got, err := c.Complete(context.Background(), "you are helpful", "say hello") + require.NoError(t, err) + assert.Equal(t, "hello world", got) +} + +func TestClient_ReturnsErrorOnNon200(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "overloaded", http.StatusServiceUnavailable) + })) + defer srv.Close() + + c := New(srv.URL, "", "test-model", 10*time.Second) + _, err := c.Complete(context.Background(), "sys", "user") + assert.Error(t, err) +} + +func TestClient_SendsAuthHeader(t *testing.T) { + var gotAuth string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + json.NewEncoder(w).Encode(map[string]any{ + "choices": []map[string]any{{"message": map[string]any{"content": "ok"}}}, + }) + })) + defer srv.Close() + + c := New(srv.URL, "my-key", "test-model", 10*time.Second) + _, err := c.Complete(context.Background(), "sys", "user") + require.NoError(t, err) + assert.Equal(t, "Bearer my-key", gotAuth) +} + +func TestClient_Retries429(t *testing.T) { + calls := 0 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls++ + if calls == 1 { + w.Header().Set("Retry-After", "0") + w.WriteHeader(http.StatusTooManyRequests) + return + } + json.NewEncoder(w).Encode(map[string]any{ + "choices": []map[string]any{{"message": map[string]any{"content": "retried"}}}, + }) + })) + defer srv.Close() + + c := New(srv.URL, "", "test-model", 10*time.Second) + got, err := c.Complete(context.Background(), "sys", "user") + require.NoError(t, err) + assert.Equal(t, "retried", got) + assert.Equal(t, 2, calls) +}