package tools_test import ( "context" "encoding/json" "io" "net/http" "net/http/httptest" "sync/atomic" "testing" "gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist" "gitea.d-ma.be/mathias/gitea-mcp/internal/gitea" "gitea.d-ma.be/mathias/gitea-mcp/internal/tools" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) const branchCheckExistsResp = `{"name":"feat/x","commit":{"id":"abc","url":"http://example.com"}}` const createBranchResp = `{"name":"feat/new","commit":{"id":"abc","url":"http://example.com"}}` const upsertFileResp = `{"content":{"path":"doc.md","sha":"filsha","html_url":"http://example.com/doc.md"},"commit":{"sha":"cmt1","html_url":"http://example.com/commit/cmt1"}}` const getRepoResp = `{"name":"myrepo","full_name":"owner/myrepo","default_branch":"main"}` func TestFileWriteBranchCreatesBranchAndFile(t *testing.T) { mux := http.NewServeMux() // Branch check → 404 (branch doesn't exist) mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodGet { w.WriteHeader(http.StatusNotFound) _, _ = w.Write([]byte(`{"message":"branch not found"}`)) } }) // Create branch → 201 mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodPost, r.Method) w.WriteHeader(http.StatusCreated) _, _ = w.Write([]byte(createBranchResp)) }) // New file (no sha) → POST to /contents/{path} mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodPost, r.Method) w.WriteHeader(http.StatusCreated) _, _ = w.Write([]byte(upsertFileResp)) }) srv := httptest.NewServer(mux) defer srv.Close() tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"})) out, err := tool.Call(context.Background(), json.RawMessage(`{ "owner":"owner","name":"myrepo","path":"doc.md", "content":"hello","branch":"feat/new","base":"main", "message":"add doc.md" }`)) require.NoError(t, err) var result map[string]any require.NoError(t, json.Unmarshal(out, &result)) assert.Equal(t, "feat/new", result["branch"]) assert.Equal(t, "doc.md", result["path"]) assert.Equal(t, "cmt1", result["commit_sha"]) } func TestFileWriteBranchUsesPutWhenShaProvided(t *testing.T) { mux := http.NewServeMux() // Branch exists mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/existing", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(branchCheckExistsResp)) }) // Existing file (sha provided) → PUT mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodPut, r.Method) w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(upsertFileResp)) }) srv := httptest.NewServer(mux) defer srv.Close() tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"})) out, err := tool.Call(context.Background(), json.RawMessage(`{ "owner":"owner","name":"myrepo","path":"doc.md", "content":"hello","branch":"feat/existing", "sha":"oldsha","message":"update doc.md" }`)) require.NoError(t, err) require.NotNil(t, out) } func TestFileWriteBranchUsesDefaultBaseWhenBaseEmpty(t *testing.T) { var createBody []byte mux := http.NewServeMux() // Branch check → 404 mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) _, _ = w.Write([]byte(`{"message":"not found"}`)) }) // GET repo (to resolve default_branch) mux.HandleFunc("/api/v1/repos/owner/myrepo", func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodGet, r.Method) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(getRepoResp)) }) // Create branch → capture body to assert old_branch_name mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodPost, r.Method) var err error createBody, err = io.ReadAll(r.Body) require.NoError(t, err) w.WriteHeader(http.StatusCreated) _, _ = w.Write([]byte(createBranchResp)) }) // Upsert file mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusCreated) _, _ = w.Write([]byte(upsertFileResp)) }) srv := httptest.NewServer(mux) defer srv.Close() tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"})) out, err := tool.Call(context.Background(), json.RawMessage(`{ "owner":"owner","name":"myrepo","path":"doc.md", "content":"hello","branch":"feat/new", "message":"add doc.md" }`)) require.NoError(t, err) require.NotNil(t, out) var payload map[string]string require.NoError(t, json.Unmarshal(createBody, &payload)) assert.Equal(t, "main", payload["old_branch_name"]) assert.Equal(t, "feat/new", payload["new_branch_name"]) } func TestFileWriteBranchSkipsCreateWhenBranchExists(t *testing.T) { var createCallCount atomic.Int32 mux := http.NewServeMux() // Branch check → 200 (branch exists) mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/existing", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(branchCheckExistsResp)) }) // Create branch — should NOT be called mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) { createCallCount.Add(1) w.WriteHeader(http.StatusCreated) _, _ = w.Write([]byte(createBranchResp)) }) // Upsert file mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusCreated) _, _ = w.Write([]byte(upsertFileResp)) }) srv := httptest.NewServer(mux) defer srv.Close() tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"})) out, err := tool.Call(context.Background(), json.RawMessage(`{ "owner":"owner","name":"myrepo","path":"doc.md", "content":"hello","branch":"feat/existing", "message":"update doc.md" }`)) require.NoError(t, err) require.NotNil(t, out) assert.Equal(t, int32(0), createCallCount.Load(), "POST /branches should not be called when branch exists") } func TestFileWriteBranchAllowlistRejects(t *testing.T) { tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"allowed"})) _, err := tool.Call(context.Background(), json.RawMessage(`{ "owner":"evil","name":"repo","path":"f.md", "content":"x","branch":"feat/x","message":"msg" }`)) require.Error(t, err) } func TestFileWriteBranchRequiresMessage(t *testing.T) { tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"owner"})) _, err := tool.Call(context.Background(), json.RawMessage(`{ "owner":"owner","name":"repo","path":"f.md", "content":"x","branch":"feat/x" }`)) require.Error(t, err) assert.ErrorIs(t, err, gitea.ErrValidation) }