fix(ingestion): address code review issues in api and watcher packages
- Strip internal error detail from 500 responses (leak prevention) - Add path containment assertion in /write handler - Use Go 1.22 method-prefixed mux routes for automatic 405 responses - Clarify watch_interval log when watcher not yet wired - Consolidate validation tests into table-driven TestIngest_Validation - Watcher: return nil after successful quarantine to avoid double-logging - Watcher: append timestamp suffix to processed dest if file already exists Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@@ -55,19 +56,23 @@ func main() {
|
|||||||
_ = watchInterval
|
_ = watchInterval
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.HandleFunc("/query", h.Query)
|
mux.HandleFunc("POST /query", h.Query)
|
||||||
mux.HandleFunc("/write", h.Write)
|
mux.HandleFunc("POST /write", h.Write)
|
||||||
mux.HandleFunc("/ingest", h.Ingest)
|
mux.HandleFunc("POST /ingest", h.Ingest)
|
||||||
mux.HandleFunc("/ingest-path", h.IngestPath)
|
mux.HandleFunc("POST /ingest-path", h.IngestPath)
|
||||||
|
|
||||||
addr := ":" + port
|
addr := ":" + port
|
||||||
|
watchIntervalLog := "disabled"
|
||||||
|
if watchInterval > 0 {
|
||||||
|
watchIntervalLog = fmt.Sprintf("%ds (pending task11 wiring)", watchInterval)
|
||||||
|
}
|
||||||
logger.Info("ingestion server starting",
|
logger.Info("ingestion server starting",
|
||||||
"addr", addr,
|
"addr", addr,
|
||||||
"brain_dir", brainDir,
|
"brain_dir", brainDir,
|
||||||
"llm_url", llmURL,
|
"llm_url", llmURL,
|
||||||
"llm_model", llmModel,
|
"llm_model", llmModel,
|
||||||
"chunk_size", chunkSize,
|
"chunk_size", chunkSize,
|
||||||
"watch_interval_s", watchInterval,
|
"watch_interval", watchIntervalLog,
|
||||||
)
|
)
|
||||||
if err := http.ListenAndServe(addr, mux); err != nil {
|
if err := http.ListenAndServe(addr, mux); err != nil {
|
||||||
logger.Error("server stopped", "err", err)
|
logger.Error("server stopped", "err", err)
|
||||||
|
|||||||
@@ -126,6 +126,10 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
|
|||||||
base += ".md"
|
base += ".md"
|
||||||
}
|
}
|
||||||
dest := filepath.Join(rawDir, base)
|
dest := filepath.Join(rawDir, base)
|
||||||
|
if !strings.HasPrefix(filepath.Clean(dest)+string(os.PathSeparator), filepath.Clean(rawDir)+string(os.PathSeparator)) {
|
||||||
|
writeError(w, http.StatusBadRequest, "invalid filename")
|
||||||
|
return
|
||||||
|
}
|
||||||
if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil {
|
if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil {
|
||||||
h.logger.Error("write failed", "err", err)
|
h.logger.Error("write failed", "err", err)
|
||||||
writeError(w, http.StatusInternalServerError, "write error")
|
writeError(w, http.StatusInternalServerError, "write error")
|
||||||
@@ -155,7 +159,7 @@ func (h *Handler) Ingest(w http.ResponseWriter, r *http.Request) {
|
|||||||
result, err := pipeline.Run(r.Context(), h.pipeline, h.brainDir, req.Content, req.Source, req.DryRun)
|
result, err := pipeline.Run(r.Context(), h.pipeline, h.brainDir, req.Content, req.Source, req.DryRun)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.Error("ingest failed", "source", req.Source, "err", err)
|
h.logger.Error("ingest failed", "source", req.Source, "err", err)
|
||||||
writeError(w, http.StatusInternalServerError, fmt.Sprintf("ingest error: %v", err))
|
writeError(w, http.StatusInternalServerError, "ingest error")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -251,7 +255,7 @@ func (h *Handler) IngestPath(w http.ResponseWriter, r *http.Request) {
|
|||||||
result, runErr := pipeline.Run(r.Context(), h.pipeline, h.brainDir, string(content), source, req.DryRun)
|
result, runErr := pipeline.Run(r.Context(), h.pipeline, h.brainDir, string(content), source, req.DryRun)
|
||||||
if runErr != nil {
|
if runErr != nil {
|
||||||
h.logger.Error("ingest-path failed", "path", req.Path, "err", runErr)
|
h.logger.Error("ingest-path failed", "path", req.Path, "err", runErr)
|
||||||
writeError(w, http.StatusInternalServerError, fmt.Sprintf("ingest error: %v", runErr))
|
writeError(w, http.StatusInternalServerError, "ingest error")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
allPages = result.Pages
|
allPages = result.Pages
|
||||||
|
|||||||
@@ -137,26 +137,28 @@ func TestWrite_GeneratesFilenameIfAbsent(t *testing.T) {
|
|||||||
// POST /ingest
|
// POST /ingest
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
func TestIngest_MissingContent(t *testing.T) {
|
func TestIngest_Validation(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
body map[string]any
|
||||||
|
}{
|
||||||
|
{"missing content", map[string]any{"source": "test-source"}},
|
||||||
|
{"missing source", map[string]any{"content": "some content"}},
|
||||||
|
{"whitespace content", map[string]any{"content": " ", "source": "test-source"}},
|
||||||
|
{"whitespace source", map[string]any{"content": "some content", "source": " "}},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
_, h := setup(t)
|
_, h := setup(t)
|
||||||
body, _ := json.Marshal(map[string]any{"source": "test-source"})
|
body, _ := json.Marshal(tc.body)
|
||||||
req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body))
|
|
||||||
rec := httptest.NewRecorder()
|
|
||||||
|
|
||||||
h.Ingest(rec, req)
|
|
||||||
|
|
||||||
assert.Equal(t, http.StatusBadRequest, rec.Code)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIngest_MissingSource(t *testing.T) {
|
|
||||||
_, h := setup(t)
|
|
||||||
body, _ := json.Marshal(map[string]any{"content": "some content"})
|
|
||||||
req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body))
|
req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body))
|
||||||
rec := httptest.NewRecorder()
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
h.Ingest(rec, req)
|
h.Ingest(rec, req)
|
||||||
|
|
||||||
assert.Equal(t, http.StatusBadRequest, rec.Code)
|
assert.Equal(t, http.StatusBadRequest, rec.Code)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIngest_Success(t *testing.T) {
|
func TestIngest_Success(t *testing.T) {
|
||||||
|
|||||||
@@ -110,7 +110,9 @@ func processFile(ctx context.Context, cfg Config, path, date string) error {
|
|||||||
if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil {
|
if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil {
|
||||||
slog.Error("watcher: failed to write log entry", "error", logErr)
|
slog.Error("watcher: failed to write log entry", "error", logErr)
|
||||||
}
|
}
|
||||||
return runErr
|
// Return nil: the file was quarantined successfully; the error was already
|
||||||
|
// logged. Returning runErr would cause processDir to log it again at Error level.
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move to processed/YYYY-MM-DD/.
|
// Move to processed/YYYY-MM-DD/.
|
||||||
@@ -119,6 +121,12 @@ func processFile(ctx context.Context, cfg Config, path, date string) error {
|
|||||||
return fmt.Errorf("mkdir processed dir: %w", err)
|
return fmt.Errorf("mkdir processed dir: %w", err)
|
||||||
}
|
}
|
||||||
dest := filepath.Join(processedDir, filename)
|
dest := filepath.Join(processedDir, filename)
|
||||||
|
if _, err := os.Stat(dest); err == nil {
|
||||||
|
// File already exists in processed; append timestamp to avoid overwriting the archive.
|
||||||
|
ext := filepath.Ext(filename)
|
||||||
|
base := strings.TrimSuffix(filename, ext)
|
||||||
|
dest = filepath.Join(processedDir, base+"-"+time.Now().UTC().Format("150405")+ext)
|
||||||
|
}
|
||||||
if err := os.Rename(path, dest); err != nil {
|
if err := os.Rename(path, dest); err != nil {
|
||||||
return fmt.Errorf("move to processed: %w", err)
|
return fmt.Errorf("move to processed: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user