feat(tools): file_write_branch

Add BranchExists/CreateBranch/UpsertFile gitea client methods and the
file_write_branch MCP tool. Branch is auto-created from base (or repo
default_branch) when it doesn't exist; file is upserted via PUT contents.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mathias Bergqvist
2026-05-04 22:15:39 +02:00
parent 044086b067
commit 5af8addc26
5 changed files with 448 additions and 0 deletions

View File

@@ -30,6 +30,7 @@ func main() {
reg.Register(tools.NewRepoList(giteaClient, ownerAllow)) reg.Register(tools.NewRepoList(giteaClient, ownerAllow))
reg.Register(tools.NewRepoGet(giteaClient, ownerAllow)) reg.Register(tools.NewRepoGet(giteaClient, ownerAllow))
reg.Register(tools.NewFileRead(giteaClient, ownerAllow)) reg.Register(tools.NewFileRead(giteaClient, ownerAllow))
reg.Register(tools.NewFileWriteBranch(giteaClient, ownerAllow))
mcpSrv := mcp.NewServer(mcp.ServerOptions{ mcpSrv := mcp.NewServer(mcp.ServerOptions{
Registry: reg, Registry: reg,

View File

@@ -32,3 +32,82 @@ func (c *Client) GetFileContents(ctx context.Context, owner, repo, path, ref str
} }
return &fc, nil return &fc, nil
} }
type Branch struct {
Name string `json:"name"`
Commit struct {
ID string `json:"id"`
URL string `json:"url"`
} `json:"commit"`
}
// BranchExists returns (true, nil) if the branch exists, (false, nil) on 404, (false, err) otherwise.
func (c *Client) BranchExists(ctx context.Context, owner, repo, branch string) (bool, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/branches/%s", owner, repo, branch)
body, status, err := c.GetJSON(ctx, p)
if err != nil {
return false, err
}
if status == 404 {
return false, nil
}
if err := MapStatus(status, body); err != nil {
return false, err
}
return true, nil
}
func (c *Client) CreateBranch(ctx context.Context, owner, repo, newBranch, oldBranch string) error {
p := fmt.Sprintf("/api/v1/repos/%s/%s/branches", owner, repo)
payload, err := json.Marshal(map[string]string{
"new_branch_name": newBranch,
"old_branch_name": oldBranch,
})
if err != nil {
return err
}
body, status, err := c.PostJSON(ctx, p, payload)
if err != nil {
return err
}
return MapStatus(status, body)
}
type UpsertFileArgs struct {
Branch string `json:"branch"`
Content string `json:"content"` // already base64-encoded
Message string `json:"message"`
Sha string `json:"sha,omitempty"`
}
type FileWriteResult struct {
Content struct {
Path string `json:"path"`
Sha string `json:"sha"`
HTMLURL string `json:"html_url"`
} `json:"content"`
Commit struct {
Sha string `json:"sha"`
HTMLURL string `json:"html_url"`
} `json:"commit"`
}
func (c *Client) UpsertFile(ctx context.Context, owner, repo, path string, args UpsertFileArgs) (*FileWriteResult, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/contents/%s", owner, repo, path)
payload, err := json.Marshal(args)
if err != nil {
return nil, err
}
body, status, err := c.PutJSON(ctx, p, payload)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var out FileWriteResult
if err := json.Unmarshal(body, &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -2,6 +2,8 @@ package gitea_test
import ( import (
"context" "context"
"encoding/json"
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
@@ -28,3 +30,89 @@ func TestGetFileContents(t *testing.T) {
assert.Equal(t, int64(13), fc.Size) assert.Equal(t, int64(13), fc.Size)
assert.Equal(t, "SGVsbG8sIHdvcmxkIQ==", fc.Content) assert.Equal(t, "SGVsbG8sIHdvcmxkIQ==", fc.Content)
} }
func TestBranchExistsTrue(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches/main", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"name":"main","commit":{"id":"abc123","url":"http://example.com"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
exists, err := c.BranchExists(context.Background(), "o", "r", "main")
require.NoError(t, err)
assert.True(t, exists)
}
func TestBranchExistsFalseOn404(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches/nonexistent", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"branch not found"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
exists, err := c.BranchExists(context.Background(), "o", "r", "nonexistent")
require.NoError(t, err)
assert.False(t, exists)
}
func TestCreateBranchSendsPayload(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(`{"name":"feat/x","commit":{"id":"abc","url":"http://example.com"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
err := c.CreateBranch(context.Background(), "o", "r", "feat/x", "main")
require.NoError(t, err)
var payload map[string]string
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "feat/x", payload["new_branch_name"])
assert.Equal(t, "main", payload["old_branch_name"])
}
func TestUpsertFileSendsPayloadAndDecodesResult(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/contents/p.md", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(`{"content":{"path":"p.md","sha":"newsha","html_url":"http://example.com/p.md"},"commit":{"sha":"abc","html_url":"http://example.com/commit/abc"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
result, err := c.UpsertFile(context.Background(), "o", "r", "p.md", gitea.UpsertFileArgs{
Branch: "feat/x",
Content: "aGVsbG8=",
Message: "add p.md",
Sha: "oldsha",
})
require.NoError(t, err)
var payload map[string]string
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "feat/x", payload["branch"])
assert.Equal(t, "aGVsbG8=", payload["content"])
assert.Equal(t, "add p.md", payload["message"])
assert.Equal(t, "oldsha", payload["sha"])
assert.Equal(t, "p.md", result.Content.Path)
assert.Equal(t, "newsha", result.Content.Sha)
assert.Equal(t, "http://example.com/p.md", result.Content.HTMLURL)
assert.Equal(t, "abc", result.Commit.Sha)
}

View File

@@ -0,0 +1,107 @@
package tools
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
type FileWriteBranch struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewFileWriteBranch(c *gitea.Client, a *allowlist.Allowlist) *FileWriteBranch {
return &FileWriteBranch{c: c, a: a}
}
func (t *FileWriteBranch) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "file_write_branch",
Description: "Create or update a file on a feature branch. Branch is created from base if it doesn't exist.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"name":{"type":"string"},
"path":{"type":"string"},
"content":{"type":"string"},
"branch":{"type":"string"},
"base":{"type":"string"},
"message":{"type":"string"},
"sha":{"type":"string"}
},
"required":["owner","name","path","content","branch","message"]
}`),
}
}
type fileWriteBranchArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
Path string `json:"path"`
Content string `json:"content"`
Branch string `json:"branch"`
Base string `json:"base"`
Message string `json:"message"`
Sha string `json:"sha"`
}
func (t *FileWriteBranch) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args fileWriteBranchArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
if args.Branch == "" {
return nil, fmt.Errorf("branch is required: %w", gitea.ErrValidation)
}
if args.Message == "" {
return nil, fmt.Errorf("message is required: %w", gitea.ErrValidation)
}
// Resolve base default if branch needs to be created
exists, err := t.c.BranchExists(ctx, args.Owner, args.Name, args.Branch)
if err != nil {
return nil, err
}
if !exists {
base := args.Base
if base == "" {
repo, err := t.c.GetRepo(ctx, args.Owner, args.Name)
if err != nil {
return nil, err
}
base = repo.DefaultBranch
}
if err := t.c.CreateBranch(ctx, args.Owner, args.Name, args.Branch, base); err != nil {
return nil, err
}
}
encoded := base64.StdEncoding.EncodeToString([]byte(args.Content))
result, err := t.c.UpsertFile(ctx, args.Owner, args.Name, args.Path, gitea.UpsertFileArgs{
Branch: args.Branch,
Content: encoded,
Message: args.Message,
Sha: args.Sha,
})
if err != nil {
return nil, err
}
return textOK(map[string]any{
"path": result.Content.Path,
"sha": result.Content.Sha,
"branch": args.Branch,
"commit_sha": result.Commit.Sha,
"html_url": result.Content.HTMLURL,
})
}

View File

@@ -0,0 +1,173 @@
package tools_test
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const branchCheckExistsResp = `{"name":"feat/x","commit":{"id":"abc","url":"http://example.com"}}`
const createBranchResp = `{"name":"feat/new","commit":{"id":"abc","url":"http://example.com"}}`
const upsertFileResp = `{"content":{"path":"doc.md","sha":"filsha","html_url":"http://example.com/doc.md"},"commit":{"sha":"cmt1","html_url":"http://example.com/commit/cmt1"}}`
const getRepoResp = `{"name":"myrepo","full_name":"owner/myrepo","default_branch":"main"}`
func TestFileWriteBranchCreatesBranchAndFile(t *testing.T) {
mux := http.NewServeMux()
// Branch check → 404 (branch doesn't exist)
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"branch not found"}`))
}
})
// Create branch → 201
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPost, r.Method)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(createBranchResp))
})
// Upsert file → 201
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPut, r.Method)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(upsertFileResp))
})
srv := httptest.NewServer(mux)
defer srv.Close()
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"myrepo","path":"doc.md",
"content":"hello","branch":"feat/new","base":"main",
"message":"add doc.md"
}`))
require.NoError(t, err)
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, "feat/new", result["branch"])
assert.Equal(t, "doc.md", result["path"])
assert.Equal(t, "cmt1", result["commit_sha"])
}
func TestFileWriteBranchUsesDefaultBaseWhenBaseEmpty(t *testing.T) {
var createBody []byte
mux := http.NewServeMux()
// Branch check → 404
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"not found"}`))
})
// GET repo (to resolve default_branch)
mux.HandleFunc("/api/v1/repos/owner/myrepo", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodGet, r.Method)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(getRepoResp))
})
// Create branch → capture body to assert old_branch_name
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPost, r.Method)
var err error
createBody, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(createBranchResp))
})
// Upsert file
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(upsertFileResp))
})
srv := httptest.NewServer(mux)
defer srv.Close()
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"myrepo","path":"doc.md",
"content":"hello","branch":"feat/new",
"message":"add doc.md"
}`))
require.NoError(t, err)
require.NotNil(t, out)
var payload map[string]string
require.NoError(t, json.Unmarshal(createBody, &payload))
assert.Equal(t, "main", payload["old_branch_name"])
assert.Equal(t, "feat/new", payload["new_branch_name"])
}
func TestFileWriteBranchSkipsCreateWhenBranchExists(t *testing.T) {
var createCallCount atomic.Int32
mux := http.NewServeMux()
// Branch check → 200 (branch exists)
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/existing", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(branchCheckExistsResp))
})
// Create branch — should NOT be called
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
createCallCount.Add(1)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(createBranchResp))
})
// Upsert file
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(upsertFileResp))
})
srv := httptest.NewServer(mux)
defer srv.Close()
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"myrepo","path":"doc.md",
"content":"hello","branch":"feat/existing",
"message":"update doc.md"
}`))
require.NoError(t, err)
require.NotNil(t, out)
assert.Equal(t, int32(0), createCallCount.Load(), "POST /branches should not be called when branch exists")
}
func TestFileWriteBranchAllowlistRejects(t *testing.T) {
tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"allowed"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"evil","name":"repo","path":"f.md",
"content":"x","branch":"feat/x","message":"msg"
}`))
require.Error(t, err)
}
func TestFileWriteBranchRequiresMessage(t *testing.T) {
tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"owner"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"repo","path":"f.md",
"content":"x","branch":"feat/x"
}`))
require.Error(t, err)
assert.ErrorIs(t, err, gitea.ErrValidation)
}