feat(ingestion): add OpenAI-compatible LLM HTTP client with 429 retry
This commit is contained in:
119
ingestion/internal/llm/client.go
Normal file
119
ingestion/internal/llm/client.go
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
package llm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client calls an OpenAI-compatible chat completions endpoint.
|
||||||
|
type Client struct {
|
||||||
|
baseURL string
|
||||||
|
apiKey string
|
||||||
|
model string
|
||||||
|
httpClient *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a Client.
|
||||||
|
func New(baseURL, apiKey, model string, timeout time.Duration) *Client {
|
||||||
|
return &Client{
|
||||||
|
baseURL: strings.TrimRight(baseURL, "/"),
|
||||||
|
apiKey: apiKey,
|
||||||
|
model: model,
|
||||||
|
httpClient: &http.Client{Timeout: timeout},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type chatRequest struct {
|
||||||
|
Model string `json:"model"`
|
||||||
|
Messages []message `json:"messages"`
|
||||||
|
Temperature float64 `json:"temperature"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type message struct {
|
||||||
|
Role string `json:"role"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type chatResponse struct {
|
||||||
|
Choices []struct {
|
||||||
|
Message message `json:"message"`
|
||||||
|
} `json:"choices"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Complete sends a system + user message and returns the assistant's reply.
|
||||||
|
// Retries once on HTTP 429 using Retry-After header or 5s backoff.
|
||||||
|
func (c *Client) Complete(ctx context.Context, system, user string) (string, error) {
|
||||||
|
body := chatRequest{
|
||||||
|
Model: c.model,
|
||||||
|
Messages: []message{
|
||||||
|
{Role: "system", Content: system},
|
||||||
|
{Role: "user", Content: user},
|
||||||
|
},
|
||||||
|
Temperature: 0.2,
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("marshal request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
do := func() (*http.Response, error) {
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/chat/completions", bytes.NewReader(b))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("build request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
if c.apiKey != "" {
|
||||||
|
req.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||||
|
}
|
||||||
|
return c.httpClient.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := do()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("call LLM: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusTooManyRequests {
|
||||||
|
resp.Body.Close()
|
||||||
|
wait := 5 * time.Second
|
||||||
|
if ra := resp.Header.Get("Retry-After"); ra != "" {
|
||||||
|
if secs, err := strconv.Atoi(ra); err == nil {
|
||||||
|
wait = time.Duration(secs) * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return "", ctx.Err()
|
||||||
|
case <-time.After(wait):
|
||||||
|
}
|
||||||
|
resp, err = do()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("retry LLM call: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
out, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("read response: %w", err)
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return "", fmt.Errorf("LLM returned %d: %s", resp.StatusCode, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
var cr chatResponse
|
||||||
|
if err := json.Unmarshal(out, &cr); err != nil {
|
||||||
|
return "", fmt.Errorf("parse response: %w", err)
|
||||||
|
}
|
||||||
|
if len(cr.Choices) == 0 {
|
||||||
|
return "", fmt.Errorf("LLM returned no choices")
|
||||||
|
}
|
||||||
|
return cr.Choices[0].Message.Content, nil
|
||||||
|
}
|
||||||
86
ingestion/internal/llm/client_test.go
Normal file
86
ingestion/internal/llm/client_test.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
package llm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func mockServer(t *testing.T, response string) *httptest.Server {
|
||||||
|
t.Helper()
|
||||||
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
assert.Equal(t, "/chat/completions", r.URL.Path)
|
||||||
|
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"choices": []map[string]any{
|
||||||
|
{"message": map[string]any{"role": "assistant", "content": response}},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_Complete(t *testing.T) {
|
||||||
|
srv := mockServer(t, "hello world")
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL, "", "test-model", 10*time.Second)
|
||||||
|
got, err := c.Complete(context.Background(), "you are helpful", "say hello")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "hello world", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_ReturnsErrorOnNon200(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.Error(w, "overloaded", http.StatusServiceUnavailable)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL, "", "test-model", 10*time.Second)
|
||||||
|
_, err := c.Complete(context.Background(), "sys", "user")
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_SendsAuthHeader(t *testing.T) {
|
||||||
|
var gotAuth string
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
gotAuth = r.Header.Get("Authorization")
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"choices": []map[string]any{{"message": map[string]any{"content": "ok"}}},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL, "my-key", "test-model", 10*time.Second)
|
||||||
|
_, err := c.Complete(context.Background(), "sys", "user")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "Bearer my-key", gotAuth)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_Retries429(t *testing.T) {
|
||||||
|
calls := 0
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
calls++
|
||||||
|
if calls == 1 {
|
||||||
|
w.Header().Set("Retry-After", "0")
|
||||||
|
w.WriteHeader(http.StatusTooManyRequests)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"choices": []map[string]any{{"message": map[string]any{"content": "retried"}}},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL, "", "test-model", 10*time.Second)
|
||||||
|
got, err := c.Complete(context.Background(), "sys", "user")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "retried", got)
|
||||||
|
assert.Equal(t, 2, calls)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user