10 Commits

Author SHA1 Message Date
Mathias Bergqvist
43e016e8fa feat(tools): workflow_run_status 2026-05-04 22:25:23 +02:00
Mathias Bergqvist
ba172e3db8 feat(tools): workflow_run_trigger 2026-05-04 22:25:10 +02:00
Mathias Bergqvist
c4874ae8d1 feat(tools): pr_get 2026-05-04 22:21:20 +02:00
Mathias Bergqvist
9972dcd94e feat(tools): pr_create with identity footer 2026-05-04 22:20:33 +02:00
Mathias Bergqvist
5af8addc26 feat(tools): file_write_branch
Add BranchExists/CreateBranch/UpsertFile gitea client methods and the
file_write_branch MCP tool. Branch is auto-created from base (or repo
default_branch) when it doesn't exist; file is upserted via PUT contents.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 22:15:39 +02:00
Mathias Bergqvist
044086b067 feat(tools): file_read with default-branch resolution
Adds GetFileContents to the gitea client and a file_read MCP tool.
When ref is omitted, the tool resolves the repo default_branch via
GetRepo before fetching contents. Decoded content capped at 1 MiB.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 22:11:50 +02:00
Mathias Bergqvist
f10cc9ac4b feat(tools): repo_get 2026-05-04 22:08:24 +02:00
Mathias Bergqvist
33ad02d369 feat(tools): repo_list 2026-05-04 22:07:44 +02:00
Mathias Bergqvist
18eadc0ae9 feat(tools): tool interface + helpers 2026-05-04 21:27:50 +02:00
Mathias Bergqvist
4ebb1eef6d feat(allowlist): owner allowlist enforcement 2026-05-04 21:26:50 +02:00
29 changed files with 2001 additions and 1 deletions

View File

@@ -5,10 +5,13 @@ import (
"net/http"
"os"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/auth"
"gitea.d-ma.be/mathias/gitea-mcp/internal/config"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/mcp"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
)
func main() {
@@ -20,8 +23,18 @@ func main() {
os.Exit(1)
}
giteaClient := gitea.NewClient(cfg.GiteaBaseURL, cfg.GiteaAPIToken)
ownerAllow := allowlist.New(cfg.AllowedOwners)
reg := registry.New()
// Tool registration happens in Phase 6+; for now, registry is empty.
reg.Register(tools.NewRepoList(giteaClient, ownerAllow))
reg.Register(tools.NewRepoGet(giteaClient, ownerAllow))
reg.Register(tools.NewFileRead(giteaClient, ownerAllow))
reg.Register(tools.NewFileWriteBranch(giteaClient, ownerAllow))
reg.Register(tools.NewPRCreate(giteaClient, ownerAllow))
reg.Register(tools.NewPRGet(giteaClient, ownerAllow))
reg.Register(tools.NewWorkflowRunTrigger(giteaClient, ownerAllow, cfg.GiteaBaseURL))
reg.Register(tools.NewWorkflowRunStatus(giteaClient, ownerAllow))
mcpSrv := mcp.NewServer(mcp.ServerOptions{
Registry: reg,

View File

@@ -0,0 +1,25 @@
package allowlist
import "fmt"
type Allowlist struct {
owners map[string]struct{}
}
func New(owners []string) *Allowlist {
m := make(map[string]struct{}, len(owners))
for _, o := range owners {
m[o] = struct{}{}
}
return &Allowlist{owners: m}
}
func (a *Allowlist) Check(owner string) error {
if owner == "" {
return fmt.Errorf("owner required")
}
if _, ok := a.owners[owner]; !ok {
return fmt.Errorf("owner %q not in allowlist", owner)
}
return nil
}

View File

@@ -0,0 +1,16 @@
package allowlist_test
import (
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"github.com/stretchr/testify/assert"
)
func TestAllowlistCheck(t *testing.T) {
a := allowlist.New([]string{"mathias", "acme"})
assert.NoError(t, a.Check("mathias"))
assert.NoError(t, a.Check("acme"))
assert.Error(t, a.Check("evil"))
assert.Error(t, a.Check(""))
}

View File

@@ -67,3 +67,35 @@ func (c *Client) PutJSON(ctx context.Context, path string, body []byte) ([]byte,
func (c *Client) DeleteJSON(ctx context.Context, path string) ([]byte, int, error) {
return c.do(ctx, http.MethodDelete, path, nil)
}
type rawResponse struct {
Body []byte
Status int
Headers http.Header
}
func (c *Client) doRaw(ctx context.Context, method, path string, body []byte) (*rawResponse, error) {
var reader io.Reader
if body != nil {
reader = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, reader)
if err != nil {
return nil, err
}
if c.token != "" {
req.Header.Set("Authorization", "token "+c.token)
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
req.Header.Set("Accept", "application/json")
resp, err := c.hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
return &rawResponse{Body: b, Status: resp.StatusCode, Headers: resp.Header}, err
}

113
internal/gitea/files.go Normal file
View File

@@ -0,0 +1,113 @@
package gitea
import (
"context"
"encoding/json"
"fmt"
)
type FileContents struct {
Path string `json:"path"`
Sha string `json:"sha"`
Size int64 `json:"size"`
Content string `json:"content"`
Encoding string `json:"encoding"`
}
func (c *Client) GetFileContents(ctx context.Context, owner, repo, path, ref string) (*FileContents, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/contents/%s", owner, repo, path)
if ref != "" {
p += "?ref=" + ref
}
body, status, err := c.GetJSON(ctx, p)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var fc FileContents
if err := json.Unmarshal(body, &fc); err != nil {
return nil, err
}
return &fc, nil
}
type Branch struct {
Name string `json:"name"`
Commit struct {
ID string `json:"id"`
URL string `json:"url"`
} `json:"commit"`
}
// BranchExists returns (true, nil) if the branch exists, (false, nil) on 404, (false, err) otherwise.
func (c *Client) BranchExists(ctx context.Context, owner, repo, branch string) (bool, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/branches/%s", owner, repo, branch)
body, status, err := c.GetJSON(ctx, p)
if err != nil {
return false, err
}
if status == 404 {
return false, nil
}
if err := MapStatus(status, body); err != nil {
return false, err
}
return true, nil
}
func (c *Client) CreateBranch(ctx context.Context, owner, repo, newBranch, oldBranch string) error {
p := fmt.Sprintf("/api/v1/repos/%s/%s/branches", owner, repo)
payload, err := json.Marshal(map[string]string{
"new_branch_name": newBranch,
"old_branch_name": oldBranch,
})
if err != nil {
return err
}
body, status, err := c.PostJSON(ctx, p, payload)
if err != nil {
return err
}
return MapStatus(status, body)
}
type UpsertFileArgs struct {
Branch string `json:"branch"`
Content string `json:"content"` // already base64-encoded
Message string `json:"message"`
Sha string `json:"sha,omitempty"`
}
type FileWriteResult struct {
Content struct {
Path string `json:"path"`
Sha string `json:"sha"`
HTMLURL string `json:"html_url"`
} `json:"content"`
Commit struct {
Sha string `json:"sha"`
HTMLURL string `json:"html_url"`
} `json:"commit"`
}
func (c *Client) UpsertFile(ctx context.Context, owner, repo, path string, args UpsertFileArgs) (*FileWriteResult, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/contents/%s", owner, repo, path)
payload, err := json.Marshal(args)
if err != nil {
return nil, err
}
body, status, err := c.PutJSON(ctx, p, payload)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var out FileWriteResult
if err := json.Unmarshal(body, &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -0,0 +1,118 @@
package gitea_test
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetFileContents(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/mathias/infra/contents/README.md", r.URL.Path)
assert.Equal(t, "main", r.URL.Query().Get("ref"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"path":"README.md","sha":"deadbeef","size":13,"content":"SGVsbG8sIHdvcmxkIQ==","encoding":"base64"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
fc, err := c.GetFileContents(context.Background(), "mathias", "infra", "README.md", "main")
require.NoError(t, err)
assert.Equal(t, "README.md", fc.Path)
assert.Equal(t, "deadbeef", fc.Sha)
assert.Equal(t, int64(13), fc.Size)
assert.Equal(t, "SGVsbG8sIHdvcmxkIQ==", fc.Content)
}
func TestBranchExistsTrue(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches/main", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"name":"main","commit":{"id":"abc123","url":"http://example.com"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
exists, err := c.BranchExists(context.Background(), "o", "r", "main")
require.NoError(t, err)
assert.True(t, exists)
}
func TestBranchExistsFalseOn404(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches/nonexistent", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"branch not found"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
exists, err := c.BranchExists(context.Background(), "o", "r", "nonexistent")
require.NoError(t, err)
assert.False(t, exists)
}
func TestCreateBranchSendsPayload(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(`{"name":"feat/x","commit":{"id":"abc","url":"http://example.com"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
err := c.CreateBranch(context.Background(), "o", "r", "feat/x", "main")
require.NoError(t, err)
var payload map[string]string
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "feat/x", payload["new_branch_name"])
assert.Equal(t, "main", payload["old_branch_name"])
}
func TestUpsertFileSendsPayloadAndDecodesResult(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/contents/p.md", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(`{"content":{"path":"p.md","sha":"newsha","html_url":"http://example.com/p.md"},"commit":{"sha":"abc","html_url":"http://example.com/commit/abc"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
result, err := c.UpsertFile(context.Background(), "o", "r", "p.md", gitea.UpsertFileArgs{
Branch: "feat/x",
Content: "aGVsbG8=",
Message: "add p.md",
Sha: "oldsha",
})
require.NoError(t, err)
var payload map[string]string
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "feat/x", payload["branch"])
assert.Equal(t, "aGVsbG8=", payload["content"])
assert.Equal(t, "add p.md", payload["message"])
assert.Equal(t, "oldsha", payload["sha"])
assert.Equal(t, "p.md", result.Content.Path)
assert.Equal(t, "newsha", result.Content.Sha)
assert.Equal(t, "http://example.com/p.md", result.Content.HTMLURL)
assert.Equal(t, "abc", result.Commit.Sha)
}

66
internal/gitea/pulls.go Normal file
View File

@@ -0,0 +1,66 @@
package gitea
import (
"context"
"encoding/json"
"fmt"
)
type PullRequest struct {
Number int `json:"number"`
Title string `json:"title"`
Body string `json:"body"`
HTMLURL string `json:"html_url"`
State string `json:"state"`
Draft bool `json:"draft"`
Head struct {
Ref string `json:"ref"`
} `json:"head"`
Base struct {
Ref string `json:"ref"`
} `json:"base"`
}
type CreatePullRequestArgs struct {
Title string `json:"title"`
Body string `json:"body"`
Head string `json:"head"`
Base string `json:"base"`
Draft bool `json:"draft"`
}
func (c *Client) CreatePullRequest(ctx context.Context, owner, repo string, args CreatePullRequestArgs) (*PullRequest, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/pulls", owner, repo)
payload, err := json.Marshal(args)
if err != nil {
return nil, err
}
body, status, err := c.PostJSON(ctx, p, payload)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var pr PullRequest
if err := json.Unmarshal(body, &pr); err != nil {
return nil, err
}
return &pr, nil
}
func (c *Client) GetPullRequest(ctx context.Context, owner, repo string, index int) (*PullRequest, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/pulls/%d", owner, repo, index)
body, status, err := c.GetJSON(ctx, p)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var pr PullRequest
if err := json.Unmarshal(body, &pr); err != nil {
return nil, err
}
return &pr, nil
}

View File

@@ -0,0 +1,95 @@
package gitea_test
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const pullFixture = `{
"number": 7,
"title": "Add feature X",
"body": "This PR adds feature X",
"html_url": "http://example.com/pulls/7",
"state": "open",
"draft": false,
"head": {"ref": "feat/x"},
"base": {"ref": "main"}
}`
func TestCreatePullRequestSendsPayload(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/pulls", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(pullFixture))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
pr, err := c.CreatePullRequest(context.Background(), "o", "r", gitea.CreatePullRequestArgs{
Title: "Add feature X",
Body: "This PR adds feature X",
Head: "feat/x",
Base: "main",
Draft: false,
})
require.NoError(t, err)
var payload map[string]any
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "Add feature X", payload["title"])
assert.Equal(t, "This PR adds feature X", payload["body"])
assert.Equal(t, "feat/x", payload["head"])
assert.Equal(t, "main", payload["base"])
assert.Equal(t, false, payload["draft"])
assert.Equal(t, 7, pr.Number)
assert.Equal(t, "Add feature X", pr.Title)
assert.Equal(t, "http://example.com/pulls/7", pr.HTMLURL)
assert.Equal(t, "feat/x", pr.Head.Ref)
assert.Equal(t, "main", pr.Base.Ref)
assert.Equal(t, "open", pr.State)
assert.False(t, pr.Draft)
}
func TestGetPullRequest(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/pulls/42", r.URL.Path)
assert.Equal(t, http.MethodGet, r.Method)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"number": 42,
"title": "Fix bug Y",
"body": "Fixes Y",
"html_url": "http://example.com/pulls/42",
"state": "open",
"draft": true,
"head": {"ref": "fix/y"},
"base": {"ref": "main"}
}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
pr, err := c.GetPullRequest(context.Background(), "o", "r", 42)
require.NoError(t, err)
assert.Equal(t, 42, pr.Number)
assert.Equal(t, "Fix bug Y", pr.Title)
assert.Equal(t, "http://example.com/pulls/42", pr.HTMLURL)
assert.Equal(t, "fix/y", pr.Head.Ref)
assert.Equal(t, "main", pr.Base.Ref)
assert.Equal(t, "open", pr.State)
assert.True(t, pr.Draft)
}

55
internal/gitea/repos.go Normal file
View File

@@ -0,0 +1,55 @@
package gitea
import (
"context"
"encoding/json"
"fmt"
)
type Repo struct {
Name string `json:"name"`
FullName string `json:"full_name"`
DefaultBranch string `json:"default_branch"`
Description string `json:"description"`
Private bool `json:"private"`
CloneURL string `json:"clone_url"`
HTMLURL string `json:"html_url"`
}
func (c *Client) ListRepos(ctx context.Context, owner string, page, limit int) ([]Repo, error) {
if page < 1 {
page = 1
}
if limit < 1 {
limit = 30
}
path := fmt.Sprintf("/api/v1/users/%s/repos?page=%d&limit=%d", owner, page, limit)
body, status, err := c.GetJSON(ctx, path)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var repos []Repo
if err := json.Unmarshal(body, &repos); err != nil {
return nil, err
}
return repos, nil
}
func (c *Client) GetRepo(ctx context.Context, owner, name string) (*Repo, error) {
path := fmt.Sprintf("/api/v1/repos/%s/%s", owner, name)
body, status, err := c.GetJSON(ctx, path)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var r Repo
if err := json.Unmarshal(body, &r); err != nil {
return nil, err
}
return &r, nil
}

View File

@@ -0,0 +1,30 @@
package gitea_test
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestListRepos(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/users/mathias/repos", r.URL.Path)
assert.Equal(t, "1", r.URL.Query().Get("page"))
assert.Equal(t, "10", r.URL.Query().Get("limit"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`[{"name":"infra","full_name":"mathias/infra","default_branch":"main","description":"d","private":true}]`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
repos, err := c.ListRepos(context.Background(), "mathias", 1, 10)
require.NoError(t, err)
require.Len(t, repos, 1)
assert.Equal(t, "mathias/infra", repos[0].FullName)
assert.Equal(t, "main", repos[0].DefaultBranch)
}

View File

@@ -0,0 +1,79 @@
package gitea
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
)
// DispatchWorkflowArgs is the request body for a workflow_dispatch trigger.
type DispatchWorkflowArgs struct {
Ref string `json:"ref"`
Inputs map[string]any `json:"inputs,omitempty"`
}
// WorkflowRunTrigger holds the run ID extracted from the Location header.
type WorkflowRunTrigger struct {
RunID int64
}
// DispatchWorkflow triggers a workflow_dispatch event and returns the new run ID.
func (c *Client) DispatchWorkflow(ctx context.Context, owner, repo, workflow string, args DispatchWorkflowArgs) (*WorkflowRunTrigger, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/actions/workflows/%s/dispatches", owner, repo, workflow)
payload, err := json.Marshal(args)
if err != nil {
return nil, err
}
resp, err := c.doRaw(ctx, "POST", p, payload)
if err != nil {
return nil, err
}
if resp.Status != 204 {
if mapErr := MapStatus(resp.Status, resp.Body); mapErr != nil {
return nil, mapErr
}
return nil, fmt.Errorf("unexpected status %d", resp.Status)
}
location := resp.Headers.Get("Location")
if location == "" {
return nil, fmt.Errorf("missing Location header in dispatch response")
}
// Location is e.g. "/api/v1/repos/o/r/actions/runs/123" — take the last segment.
parts := strings.Split(strings.TrimRight(location, "/"), "/")
if len(parts) == 0 {
return nil, fmt.Errorf("malformed Location: %s", location)
}
runID, err := strconv.ParseInt(parts[len(parts)-1], 10, 64)
if err != nil {
return nil, fmt.Errorf("parse run id from %q: %w", location, err)
}
return &WorkflowRunTrigger{RunID: runID}, nil
}
// WorkflowRun represents a Gitea Actions run.
type WorkflowRun struct {
ID int64 `json:"id"`
Status string `json:"status"` // queued | in_progress | completed
Conclusion string `json:"conclusion"` // success | failure | cancelled | skipped (only when completed)
StartedAt string `json:"started_at"`
HTMLURL string `json:"html_url"`
}
// GetWorkflowRun fetches the status of a specific Actions run.
func (c *Client) GetWorkflowRun(ctx context.Context, owner, repo string, runID int64) (*WorkflowRun, error) {
p := fmt.Sprintf("/api/v1/repos/%s/%s/actions/runs/%d", owner, repo, runID)
body, status, err := c.GetJSON(ctx, p)
if err != nil {
return nil, err
}
if err := MapStatus(status, body); err != nil {
return nil, err
}
var run WorkflowRun
if err := json.Unmarshal(body, &run); err != nil {
return nil, err
}
return &run, nil
}

View File

@@ -0,0 +1,93 @@
package gitea_test
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDispatchWorkflow(t *testing.T) {
var gotBody []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodPost, r.Method)
assert.Equal(t, "/api/v1/repos/o/r/actions/workflows/ci.yml/dispatches", r.URL.Path)
var err error
gotBody, err = io.ReadAll(r.Body)
assert.NoError(t, err)
w.Header().Set("Location", "/api/v1/repos/o/r/actions/runs/789")
w.WriteHeader(http.StatusNoContent)
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
result, err := c.DispatchWorkflow(context.Background(), "o", "r", "ci.yml", gitea.DispatchWorkflowArgs{
Ref: "main",
Inputs: map[string]any{"env": "prod"},
})
require.NoError(t, err)
assert.Equal(t, int64(789), result.RunID)
var body map[string]any
require.NoError(t, json.Unmarshal(gotBody, &body))
assert.Equal(t, "main", body["ref"])
inputs, ok := body["inputs"].(map[string]any)
require.True(t, ok)
assert.Equal(t, "prod", inputs["env"])
}
func TestDispatchWorkflowMissingLocation(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 204 but no Location header
w.WriteHeader(http.StatusNoContent)
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
_, err := c.DispatchWorkflow(context.Background(), "o", "r", "ci.yml", gitea.DispatchWorkflowArgs{Ref: "main"})
require.Error(t, err)
assert.Contains(t, err.Error(), "Location")
}
func TestDispatchWorkflowError404(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
_, err := c.DispatchWorkflow(context.Background(), "o", "r", "ci.yml", gitea.DispatchWorkflowArgs{Ref: "main"})
require.Error(t, err)
assert.True(t, errors.Is(err, gitea.ErrNotFound))
}
func TestGetWorkflowRun(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/actions/runs/789", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"id":789,
"status":"completed",
"conclusion":"success",
"started_at":"2026-05-04T10:00:00Z",
"html_url":"http://gitea.example/o/r/actions/runs/789"
}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
run, err := c.GetWorkflowRun(context.Background(), "o", "r", 789)
require.NoError(t, err)
assert.Equal(t, int64(789), run.ID)
assert.Equal(t, "completed", run.Status)
assert.Equal(t, "success", run.Conclusion)
assert.Equal(t, "2026-05-04T10:00:00Z", run.StartedAt)
assert.Equal(t, "http://gitea.example/o/r/actions/runs/789", run.HTMLURL)
}

View File

@@ -0,0 +1,88 @@
package tools
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
const fileReadMaxBytes = 1 << 20 // 1 MiB
type FileRead struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewFileRead(c *gitea.Client, a *allowlist.Allowlist) *FileRead {
return &FileRead{c: c, a: a}
}
func (t *FileRead) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "file_read",
Description: "Read a file from a repo at a given ref. Defaults to the repo's default branch.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"name":{"type":"string"},
"path":{"type":"string"},
"ref":{"type":"string"}
},
"required":["owner","name","path"]
}`),
}
}
type fileReadArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
Path string `json:"path"`
Ref string `json:"ref"`
}
func (t *FileRead) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args fileReadArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
ref := args.Ref
if ref == "" {
repo, err := t.c.GetRepo(ctx, args.Owner, args.Name)
if err != nil {
return nil, err
}
ref = repo.DefaultBranch
}
fc, err := t.c.GetFileContents(ctx, args.Owner, args.Name, args.Path, ref)
if err != nil {
return nil, err
}
if fc.Size > fileReadMaxBytes {
return nil, fmt.Errorf("file %q size %d exceeds 1MiB cap: %w", args.Path, fc.Size, gitea.ErrValidation)
}
decoded, err := base64.StdEncoding.DecodeString(fc.Content)
if err != nil {
return nil, fmt.Errorf("decode base64 content: %w", err)
}
return textOK(map[string]any{
"path": fc.Path,
"ref": ref,
"sha": fc.Sha,
"size": fc.Size,
"content": string(decoded),
})
}

View File

@@ -0,0 +1,64 @@
package tools_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFileReadToolWithExplicitRef(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/mathias/infra/contents/README.md", r.URL.Path)
assert.Equal(t, "main", r.URL.Query().Get("ref"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"path":"README.md","sha":"deadbeef","size":13,"content":"SGVsbG8sIHdvcmxkIQ==","encoding":"base64"}`))
}))
defer srv.Close()
tool := tools.NewFileRead(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"infra","path":"README.md","ref":"main"}`))
require.NoError(t, err)
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, "README.md", result["path"])
assert.Equal(t, "main", result["ref"])
assert.Equal(t, "Hello, world!", result["content"])
}
func TestFileReadToolDefaultBranchResolution(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/repos/mathias/infra", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"name":"infra","full_name":"mathias/infra","default_branch":"main"}`))
})
mux.HandleFunc("/api/v1/repos/mathias/infra/contents/README.md", func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "main", r.URL.Query().Get("ref"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"path":"README.md","sha":"deadbeef","size":13,"content":"SGVsbG8sIHdvcmxkIQ==","encoding":"base64"}`))
})
srv := httptest.NewServer(mux)
defer srv.Close()
tool := tools.NewFileRead(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"infra","path":"README.md"}`))
require.NoError(t, err)
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, "main", result["ref"])
}
func TestFileReadAllowlistRejects(t *testing.T) {
tool := tools.NewFileRead(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"evil","name":"infra","path":"README.md"}`))
require.Error(t, err)
}

View File

@@ -0,0 +1,107 @@
package tools
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
type FileWriteBranch struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewFileWriteBranch(c *gitea.Client, a *allowlist.Allowlist) *FileWriteBranch {
return &FileWriteBranch{c: c, a: a}
}
func (t *FileWriteBranch) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "file_write_branch",
Description: "Create or update a file on a feature branch. Branch is created from base if it doesn't exist.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"name":{"type":"string"},
"path":{"type":"string"},
"content":{"type":"string"},
"branch":{"type":"string"},
"base":{"type":"string"},
"message":{"type":"string"},
"sha":{"type":"string"}
},
"required":["owner","name","path","content","branch","message"]
}`),
}
}
type fileWriteBranchArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
Path string `json:"path"`
Content string `json:"content"`
Branch string `json:"branch"`
Base string `json:"base"`
Message string `json:"message"`
Sha string `json:"sha"`
}
func (t *FileWriteBranch) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args fileWriteBranchArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
if args.Branch == "" {
return nil, fmt.Errorf("branch is required: %w", gitea.ErrValidation)
}
if args.Message == "" {
return nil, fmt.Errorf("message is required: %w", gitea.ErrValidation)
}
// Resolve base default if branch needs to be created
exists, err := t.c.BranchExists(ctx, args.Owner, args.Name, args.Branch)
if err != nil {
return nil, err
}
if !exists {
base := args.Base
if base == "" {
repo, err := t.c.GetRepo(ctx, args.Owner, args.Name)
if err != nil {
return nil, err
}
base = repo.DefaultBranch
}
if err := t.c.CreateBranch(ctx, args.Owner, args.Name, args.Branch, base); err != nil {
return nil, err
}
}
encoded := base64.StdEncoding.EncodeToString([]byte(args.Content))
result, err := t.c.UpsertFile(ctx, args.Owner, args.Name, args.Path, gitea.UpsertFileArgs{
Branch: args.Branch,
Content: encoded,
Message: args.Message,
Sha: args.Sha,
})
if err != nil {
return nil, err
}
return textOK(map[string]any{
"path": result.Content.Path,
"sha": result.Content.Sha,
"branch": args.Branch,
"commit_sha": result.Commit.Sha,
"html_url": result.Content.HTMLURL,
})
}

View File

@@ -0,0 +1,173 @@
package tools_test
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const branchCheckExistsResp = `{"name":"feat/x","commit":{"id":"abc","url":"http://example.com"}}`
const createBranchResp = `{"name":"feat/new","commit":{"id":"abc","url":"http://example.com"}}`
const upsertFileResp = `{"content":{"path":"doc.md","sha":"filsha","html_url":"http://example.com/doc.md"},"commit":{"sha":"cmt1","html_url":"http://example.com/commit/cmt1"}}`
const getRepoResp = `{"name":"myrepo","full_name":"owner/myrepo","default_branch":"main"}`
func TestFileWriteBranchCreatesBranchAndFile(t *testing.T) {
mux := http.NewServeMux()
// Branch check → 404 (branch doesn't exist)
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"branch not found"}`))
}
})
// Create branch → 201
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPost, r.Method)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(createBranchResp))
})
// Upsert file → 201
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPut, r.Method)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(upsertFileResp))
})
srv := httptest.NewServer(mux)
defer srv.Close()
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"myrepo","path":"doc.md",
"content":"hello","branch":"feat/new","base":"main",
"message":"add doc.md"
}`))
require.NoError(t, err)
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, "feat/new", result["branch"])
assert.Equal(t, "doc.md", result["path"])
assert.Equal(t, "cmt1", result["commit_sha"])
}
func TestFileWriteBranchUsesDefaultBaseWhenBaseEmpty(t *testing.T) {
var createBody []byte
mux := http.NewServeMux()
// Branch check → 404
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"not found"}`))
})
// GET repo (to resolve default_branch)
mux.HandleFunc("/api/v1/repos/owner/myrepo", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodGet, r.Method)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(getRepoResp))
})
// Create branch → capture body to assert old_branch_name
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPost, r.Method)
var err error
createBody, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(createBranchResp))
})
// Upsert file
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(upsertFileResp))
})
srv := httptest.NewServer(mux)
defer srv.Close()
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"myrepo","path":"doc.md",
"content":"hello","branch":"feat/new",
"message":"add doc.md"
}`))
require.NoError(t, err)
require.NotNil(t, out)
var payload map[string]string
require.NoError(t, json.Unmarshal(createBody, &payload))
assert.Equal(t, "main", payload["old_branch_name"])
assert.Equal(t, "feat/new", payload["new_branch_name"])
}
func TestFileWriteBranchSkipsCreateWhenBranchExists(t *testing.T) {
var createCallCount atomic.Int32
mux := http.NewServeMux()
// Branch check → 200 (branch exists)
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/existing", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(branchCheckExistsResp))
})
// Create branch — should NOT be called
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
createCallCount.Add(1)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(createBranchResp))
})
// Upsert file
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(upsertFileResp))
})
srv := httptest.NewServer(mux)
defer srv.Close()
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"myrepo","path":"doc.md",
"content":"hello","branch":"feat/existing",
"message":"update doc.md"
}`))
require.NoError(t, err)
require.NotNil(t, out)
assert.Equal(t, int32(0), createCallCount.Load(), "POST /branches should not be called when branch exists")
}
func TestFileWriteBranchAllowlistRejects(t *testing.T) {
tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"allowed"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"evil","name":"repo","path":"f.md",
"content":"x","branch":"feat/x","message":"msg"
}`))
require.Error(t, err)
}
func TestFileWriteBranchRequiresMessage(t *testing.T) {
tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"owner"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"owner","name":"repo","path":"f.md",
"content":"x","branch":"feat/x"
}`))
require.Error(t, err)
assert.ErrorIs(t, err, gitea.ErrValidation)
}

View File

@@ -0,0 +1,91 @@
package tools
import (
"context"
"encoding/json"
"fmt"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/auth"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/identity"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
type PRCreate struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewPRCreate(c *gitea.Client, a *allowlist.Allowlist) *PRCreate {
return &PRCreate{c: c, a: a}
}
func (t *PRCreate) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "pr_create",
Description: "Create a pull request. Applies an identity footer to the PR body.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"name":{"type":"string"},
"title":{"type":"string"},
"body":{"type":"string"},
"head":{"type":"string"},
"base":{"type":"string"},
"draft":{"type":"boolean"}
},
"required":["owner","name","title","head","base"]
}`),
}
}
type prCreateArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
Title string `json:"title"`
Body string `json:"body"`
Head string `json:"head"`
Base string `json:"base"`
Draft bool `json:"draft"`
}
func (t *PRCreate) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args prCreateArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
if args.Title == "" {
return nil, fmt.Errorf("title is required: %w", gitea.ErrValidation)
}
if args.Head == "" || args.Base == "" {
return nil, fmt.Errorf("head and base are required: %w", gitea.ErrValidation)
}
body := identity.ApplyFooter(args.Body, auth.Caller(ctx))
pr, err := t.c.CreatePullRequest(ctx, args.Owner, args.Name, gitea.CreatePullRequestArgs{
Title: args.Title,
Body: body,
Head: args.Head,
Base: args.Base,
Draft: args.Draft,
})
if err != nil {
return nil, err
}
return textOK(map[string]any{
"number": pr.Number,
"title": pr.Title,
"html_url": pr.HTMLURL,
"head": pr.Head.Ref,
"base": pr.Base.Ref,
"state": pr.State,
"draft": pr.Draft,
})
}

View File

@@ -0,0 +1,107 @@
package tools_test
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/auth"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const prFixture = `{
"number": 3,
"title": "My PR",
"body": "description",
"html_url": "http://example.com/pulls/3",
"state": "open",
"draft": false,
"head": {"ref": "feat/new"},
"base": {"ref": "main"}
}`
func callerContext(user string) context.Context {
var capturedCtx context.Context
h := auth.CallerMiddleware(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
capturedCtx = r.Context()
}))
req := httptest.NewRequest("POST", "/", nil)
if user != "" {
req.Header.Set("X-Auth-Request-User", user)
}
h.ServeHTTP(httptest.NewRecorder(), req)
return capturedCtx
}
func TestPRCreateAppliesIdentityFooter(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/pulls", r.URL.Path)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(prFixture))
}))
defer srv.Close()
tool := tools.NewPRCreate(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"o"}))
ctx := callerContext("mathiasbq")
_, err := tool.Call(ctx, json.RawMessage(`{
"owner":"o","name":"r","title":"My PR","body":"description","head":"feat/new","base":"main"
}`))
require.NoError(t, err)
var payload map[string]any
require.NoError(t, json.Unmarshal(captured, &payload))
body, _ := payload["body"].(string)
assert.Contains(t, body, "_Created via git-mcp on behalf of @mathiasbq_")
}
func TestPRCreateNoFooterWhenCallerEmpty(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(prFixture))
}))
defer srv.Close()
tool := tools.NewPRCreate(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"o"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"o","name":"r","title":"My PR","body":"description","head":"feat/new","base":"main"
}`))
require.NoError(t, err)
var payload map[string]any
require.NoError(t, json.Unmarshal(captured, &payload))
body, _ := payload["body"].(string)
assert.False(t, strings.Contains(body, "_Created via git-mcp on behalf of"), "footer should not be present when caller is empty")
}
func TestPRCreateAllowlistRejects(t *testing.T) {
tool := tools.NewPRCreate(gitea.NewClient("http://unused", ""), allowlist.New([]string{"allowed"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"evil","name":"r","title":"T","head":"feat/x","base":"main"
}`))
require.Error(t, err)
}
func TestPRCreateRequiresTitle(t *testing.T) {
tool := tools.NewPRCreate(gitea.NewClient("http://unused", ""), allowlist.New([]string{"o"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{
"owner":"o","name":"r","title":"","head":"feat/x","base":"main"
}`))
require.Error(t, err)
assert.ErrorIs(t, err, gitea.ErrValidation)
}

68
internal/tools/pr_get.go Normal file
View File

@@ -0,0 +1,68 @@
package tools
import (
"context"
"encoding/json"
"fmt"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
type PRGet struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewPRGet(c *gitea.Client, a *allowlist.Allowlist) *PRGet { return &PRGet{c: c, a: a} }
func (t *PRGet) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "pr_get",
Description: "Get a pull request by number.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"name":{"type":"string"},
"number":{"type":"integer","minimum":1}
},
"required":["owner","name","number"]
}`),
}
}
type prGetArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
Number int `json:"number"`
}
func (t *PRGet) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args prGetArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
if args.Number < 1 {
return nil, fmt.Errorf("number must be >= 1: %w", gitea.ErrValidation)
}
pr, err := t.c.GetPullRequest(ctx, args.Owner, args.Name, args.Number)
if err != nil {
return nil, err
}
return textOK(map[string]any{
"number": pr.Number,
"title": pr.Title,
"html_url": pr.HTMLURL,
"head": pr.Head.Ref,
"base": pr.Base.Ref,
"state": pr.State,
"draft": pr.Draft,
})
}

View File

@@ -0,0 +1,61 @@
package tools_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPRGetTool(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/pulls/42", r.URL.Path)
assert.Equal(t, http.MethodGet, r.Method)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"number": 42,
"title": "Fix bug Y",
"body": "Fixes Y",
"html_url": "http://example.com/pulls/42",
"state": "open",
"draft": true,
"head": {"ref": "fix/y"},
"base": {"ref": "main"}
}`))
}))
defer srv.Close()
tool := tools.NewPRGet(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"o"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"o","name":"r","number":42}`))
require.NoError(t, err)
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, float64(42), result["number"])
assert.Equal(t, "Fix bug Y", result["title"])
assert.Equal(t, "http://example.com/pulls/42", result["html_url"])
assert.Equal(t, "fix/y", result["head"])
assert.Equal(t, "main", result["base"])
assert.Equal(t, "open", result["state"])
assert.Equal(t, true, result["draft"])
}
func TestPRGetAllowlistRejects(t *testing.T) {
tool := tools.NewPRGet(gitea.NewClient("http://unused", ""), allowlist.New([]string{"allowed"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"evil","name":"r","number":1}`))
require.Error(t, err)
}
func TestPRGetRequiresValidNumber(t *testing.T) {
tool := tools.NewPRGet(gitea.NewClient("http://unused", ""), allowlist.New([]string{"o"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"o","name":"r","number":0}`))
require.Error(t, err)
assert.ErrorIs(t, err, gitea.ErrValidation)
}

View File

@@ -0,0 +1,49 @@
package tools
import (
"context"
"encoding/json"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
type RepoGet struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewRepoGet(c *gitea.Client, a *allowlist.Allowlist) *RepoGet { return &RepoGet{c: c, a: a} }
func (t *RepoGet) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "repo_get",
Description: "Get a repo's metadata.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{"owner":{"type":"string"},"name":{"type":"string"}},
"required":["owner","name"]
}`),
}
}
type repoGetArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
}
func (t *RepoGet) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args repoGetArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
r, err := t.c.GetRepo(ctx, args.Owner, args.Name)
if err != nil {
return nil, err
}
return textOK(r)
}

View File

@@ -0,0 +1,36 @@
package tools_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRepoGetTool(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/mathias/infra", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"name":"infra","full_name":"mathias/infra","default_branch":"main","private":true}`))
}))
defer srv.Close()
tool := tools.NewRepoGet(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"infra"}`))
require.NoError(t, err)
assert.Contains(t, string(out), `"full_name":"mathias/infra"`)
assert.Contains(t, string(out), `"default_branch":"main"`)
}
func TestRepoGetAllowlistRejects(t *testing.T) {
tool := tools.NewRepoGet(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"evil","name":"x"}`))
require.Error(t, err)
}

View File

@@ -0,0 +1,68 @@
package tools
import (
"context"
"encoding/json"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
type RepoList struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewRepoList(c *gitea.Client, a *allowlist.Allowlist) *RepoList {
return &RepoList{c: c, a: a}
}
func (t *RepoList) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "repo_list",
Description: "List repos for a given owner.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"page":{"type":"integer","minimum":1},
"limit":{"type":"integer","minimum":1,"maximum":50}
},
"required":["owner"]
}`),
}
}
type repoListArgs struct {
Owner string `json:"owner"`
Page int `json:"page"`
Limit int `json:"limit"`
}
func (t *RepoList) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args repoListArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
if args.Limit == 0 || args.Limit > 50 {
args.Limit = 30
}
if args.Page == 0 {
args.Page = 1
}
repos, err := t.c.ListRepos(ctx, args.Owner, args.Page, args.Limit)
if err != nil {
return nil, err
}
out := map[string]any{
"repos": repos,
}
if len(repos) == args.Limit {
out["next_page"] = args.Page + 1
}
return textOK(out)
}

View File

@@ -0,0 +1,34 @@
package tools_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRepoListTool(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`[{"name":"infra","full_name":"mathias/infra","default_branch":"main"}]`))
}))
defer srv.Close()
tool := tools.NewRepoList(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias"}`))
require.NoError(t, err)
assert.Contains(t, string(out), `"full_name":"mathias/infra"`)
}
func TestRepoListAllowlistRejects(t *testing.T) {
tool := tools.NewRepoList(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"evil"}`))
require.Error(t, err)
}

24
internal/tools/tool.go Normal file
View File

@@ -0,0 +1,24 @@
package tools
import (
"context"
"encoding/json"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
// Tool implements registry.Tool.
type Tool = registry.Tool
func textOK(v any) (json.RawMessage, error) {
return json.Marshal(v)
}
func parseArgs(raw json.RawMessage, dst any) error {
if len(raw) == 0 {
return json.Unmarshal([]byte("{}"), dst)
}
return json.Unmarshal(raw, dst)
}
func _ctx(ctx context.Context) context.Context { return ctx } // stub for future hooks

View File

@@ -0,0 +1,69 @@
package tools
import (
"context"
"encoding/json"
"fmt"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
// WorkflowRunStatus fetches the status of a Gitea Actions run.
type WorkflowRunStatus struct {
c *gitea.Client
a *allowlist.Allowlist
}
func NewWorkflowRunStatus(c *gitea.Client, a *allowlist.Allowlist) *WorkflowRunStatus {
return &WorkflowRunStatus{c: c, a: a}
}
func (t *WorkflowRunStatus) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "workflow_run_status",
Description: "Get the status of a Gitea Actions workflow run.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"name":{"type":"string"},
"run_id":{"type":"integer","minimum":1}
},
"required":["owner","name","run_id"]
}`),
}
}
type workflowRunStatusArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
RunID int64 `json:"run_id"`
}
func (t *WorkflowRunStatus) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args workflowRunStatusArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
if args.RunID < 1 {
return nil, fmt.Errorf("run_id must be >= 1: %w", gitea.ErrValidation)
}
run, err := t.c.GetWorkflowRun(ctx, args.Owner, args.Name, args.RunID)
if err != nil {
return nil, err
}
return textOK(map[string]any{
"run_id": run.ID,
"status": run.Status,
"conclusion": run.Conclusion,
"started_at": run.StartedAt,
"html_url": run.HTMLURL,
})
}

View File

@@ -0,0 +1,56 @@
package tools_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWorkflowRunStatusTool(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/mathias/myrepo/actions/runs/789", r.URL.Path)
assert.Equal(t, http.MethodGet, r.Method)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"id":789,
"status":"completed",
"conclusion":"success",
"started_at":"2026-05-04T10:00:00Z",
"html_url":"http://gitea.example/mathias/myrepo/actions/runs/789"
}`))
}))
defer srv.Close()
tool := tools.NewWorkflowRunStatus(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"myrepo","run_id":789}`))
require.NoError(t, err)
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, float64(789), result["run_id"])
assert.Equal(t, "completed", result["status"])
assert.Equal(t, "success", result["conclusion"])
assert.Equal(t, "2026-05-04T10:00:00Z", result["started_at"])
assert.Equal(t, "http://gitea.example/mathias/myrepo/actions/runs/789", result["html_url"])
}
func TestWorkflowRunStatusAllowlistRejects(t *testing.T) {
tool := tools.NewWorkflowRunStatus(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"evil","name":"repo","run_id":1}`))
require.Error(t, err)
}
func TestWorkflowRunStatusRequiresValidRunID(t *testing.T) {
tool := tools.NewWorkflowRunStatus(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"repo","run_id":0}`))
require.Error(t, err)
assert.Contains(t, err.Error(), "run_id")
}

View File

@@ -0,0 +1,84 @@
package tools
import (
"context"
"encoding/json"
"fmt"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/registry"
)
// WorkflowRunTrigger triggers a Gitea Actions workflow_dispatch run.
type WorkflowRunTrigger struct {
c *gitea.Client
a *allowlist.Allowlist
baseURL string
}
func NewWorkflowRunTrigger(c *gitea.Client, a *allowlist.Allowlist, baseURL string) *WorkflowRunTrigger {
return &WorkflowRunTrigger{c: c, a: a, baseURL: baseURL}
}
func (t *WorkflowRunTrigger) Descriptor() registry.ToolDescriptor {
return registry.ToolDescriptor{
Name: "workflow_run_trigger",
Description: "Trigger a Gitea Actions workflow_dispatch run.",
InputSchema: json.RawMessage(`{
"type":"object",
"properties":{
"owner":{"type":"string"},
"name":{"type":"string"},
"workflow":{"type":"string"},
"ref":{"type":"string"},
"inputs":{"type":"object"}
},
"required":["owner","name","workflow"]
}`),
}
}
type workflowRunTriggerArgs struct {
Owner string `json:"owner"`
Name string `json:"name"`
Workflow string `json:"workflow"`
Ref string `json:"ref"`
Inputs map[string]any `json:"inputs"`
}
func (t *WorkflowRunTrigger) Call(ctx context.Context, raw json.RawMessage) (json.RawMessage, error) {
var args workflowRunTriggerArgs
if err := parseArgs(raw, &args); err != nil {
return nil, err
}
if err := t.a.Check(args.Owner); err != nil {
return nil, err
}
if args.Workflow == "" {
return nil, fmt.Errorf("workflow is required: %w", gitea.ErrValidation)
}
ref := args.Ref
if ref == "" {
repo, err := t.c.GetRepo(ctx, args.Owner, args.Name)
if err != nil {
return nil, err
}
ref = repo.DefaultBranch
}
result, err := t.c.DispatchWorkflow(ctx, args.Owner, args.Name, args.Workflow, gitea.DispatchWorkflowArgs{
Ref: ref,
Inputs: args.Inputs,
})
if err != nil {
return nil, err
}
htmlURL := fmt.Sprintf("%s/%s/%s/actions/runs/%d", t.baseURL, args.Owner, args.Name, result.RunID)
return textOK(map[string]any{
"run_id": result.RunID,
"html_url": htmlURL,
})
}

View File

@@ -0,0 +1,86 @@
package tools_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWorkflowRunTriggerSuccess(t *testing.T) {
// Fake server handles both the repo endpoint (default_branch) and the dispatch endpoint.
repoHit := false
dispatchHit := false
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == "/api/v1/repos/mathias/myrepo" && r.Method == http.MethodGet:
repoHit = true
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"name":"myrepo","full_name":"mathias/myrepo","default_branch":"main"}`))
case r.URL.Path == "/api/v1/repos/mathias/myrepo/actions/workflows/ci.yml/dispatches" && r.Method == http.MethodPost:
dispatchHit = true
w.Header().Set("Location", "/api/v1/repos/mathias/myrepo/actions/runs/42")
w.WriteHeader(http.StatusNoContent)
default:
http.NotFound(w, r)
}
}))
defer srv.Close()
tool := tools.NewWorkflowRunTrigger(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}), srv.URL)
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"myrepo","workflow":"ci.yml"}`))
require.NoError(t, err)
assert.True(t, repoHit, "expected GET /repo for default branch")
assert.True(t, dispatchHit, "expected POST dispatch")
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, float64(42), result["run_id"])
assert.Contains(t, result["html_url"], "/mathias/myrepo/actions/runs/42")
}
func TestWorkflowRunTriggerExplicitRef(t *testing.T) {
repoHit := false
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/v1/repos/mathias/myrepo" {
repoHit = true
}
if r.URL.Path == "/api/v1/repos/mathias/myrepo/actions/workflows/ci.yml/dispatches" {
w.Header().Set("Location", "/api/v1/repos/mathias/myrepo/actions/runs/99")
w.WriteHeader(http.StatusNoContent)
return
}
http.NotFound(w, r)
}))
defer srv.Close()
tool := tools.NewWorkflowRunTrigger(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}), srv.URL)
out, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"myrepo","workflow":"ci.yml","ref":"develop"}`))
require.NoError(t, err)
assert.False(t, repoHit, "should not call GET /repo when ref is provided")
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
assert.Equal(t, float64(99), result["run_id"])
}
func TestWorkflowRunTriggerAllowlistRejects(t *testing.T) {
tool := tools.NewWorkflowRunTrigger(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}), "http://unused")
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"evil","name":"repo","workflow":"ci.yml"}`))
require.Error(t, err)
}
func TestWorkflowRunTriggerRequiresWorkflow(t *testing.T) {
// workflow field is present in required schema but let's test empty string fallback guard
tool := tools.NewWorkflowRunTrigger(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}), "http://unused")
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","name":"repo","workflow":""}`))
require.Error(t, err)
assert.Contains(t, err.Error(), "workflow")
}