diff --git a/models/actions/runner.go b/models/actions/runner.go index 97db0ca7ea..9ddf346aa6 100644 --- a/models/actions/runner.go +++ b/models/actions/runner.go @@ -57,6 +57,8 @@ type ActionRunner struct { // Store labels defined in state file (default: .runner file) of `act_runner` AgentLabels []string `xorm:"TEXT"` + // Store if this is a runner that only ever get one single job assigned + Ephemeral bool `xorm:"ephemeral NOT NULL DEFAULT false"` Created timeutil.TimeStamp `xorm:"created"` Updated timeutil.TimeStamp `xorm:"updated"` diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 5f79a873f1..f9ab993abf 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -375,6 +375,7 @@ func prepareMigrationTasks() []*migration { newMigration(312, "Add DeleteBranchAfterMerge to AutoMerge", v1_24.AddDeleteBranchAfterMergeForAutoMerge), newMigration(313, "Move PinOrder from issue table to a new table issue_pin", v1_24.MovePinOrderToTableIssuePin), newMigration(314, "Update OwnerID as zero for repository level action tables", v1_24.UpdateOwnerIDOfRepoLevelActionsTables), + newMigration(315, "Add Ephemeral to ActionRunner", v1_24.AddEphemeralToActionRunner), } return preparedMigrations } diff --git a/models/migrations/v1_24/v315.go b/models/migrations/v1_24/v315.go new file mode 100644 index 0000000000..aefb872d0f --- /dev/null +++ b/models/migrations/v1_24/v315.go @@ -0,0 +1,16 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_24 //nolint + +import ( + "xorm.io/xorm" +) + +func AddEphemeralToActionRunner(x *xorm.Engine) error { + type ActionRunner struct { + Ephemeral bool `xorm:"ephemeral NOT NULL DEFAULT false"` + } + + return x.Sync(new(ActionRunner)) +} diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 27a0317942..ce8137592d 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -78,6 +78,7 @@ func (s *Service) Register( RepoID: runnerToken.RepoID, Version: req.Msg.Version, AgentLabels: labels, + Ephemeral: req.Msg.Ephemeral, } if err := runner.GenerateToken(); err != nil { return nil, errors.New("can't generate token") @@ -96,12 +97,13 @@ func (s *Service) Register( res := connect.NewResponse(&runnerv1.RegisterResponse{ Runner: &runnerv1.Runner{ - Id: runner.ID, - Uuid: runner.UUID, - Token: runner.Token, - Name: runner.Name, - Version: runner.Version, - Labels: runner.AgentLabels, + Id: runner.ID, + Uuid: runner.UUID, + Token: runner.Token, + Name: runner.Name, + Version: runner.Version, + Labels: runner.AgentLabels, + Ephemeral: runner.Ephemeral, }, }) diff --git a/services/actions/cleanup.go b/services/actions/cleanup.go index ee1d167713..23d6e3a49d 100644 --- a/services/actions/cleanup.go +++ b/services/actions/cleanup.go @@ -9,14 +9,17 @@ import ( "time" actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" actions_module "code.gitea.io/gitea/modules/actions" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/builder" ) -// Cleanup removes expired actions logs, data and artifacts +// Cleanup removes expired actions logs, data, artifacts and used ephemeral runners func Cleanup(ctx context.Context) error { // clean up expired artifacts if err := CleanupArtifacts(ctx); err != nil { @@ -28,6 +31,11 @@ func Cleanup(ctx context.Context) error { return fmt.Errorf("cleanup logs: %w", err) } + // clean up old ephemeral runners + if err := CleanupEphemeralRunners(ctx); err != nil { + return fmt.Errorf("cleanup old ephemeral runners: %w", err) + } + return nil } @@ -123,3 +131,20 @@ func CleanupLogs(ctx context.Context) error { log.Info("Removed %d logs", count) return nil } + +// CleanupEphemeralRunners removes used ephemeral runners which are no longer able to process jobs +func CleanupEphemeralRunners(ctx context.Context) error { + subQuery := builder.Select("`action_runner`.id"). + From(builder.Select("*").From("`action_runner`"), "`action_runner`"). // mysql needs this redundant subquery + Join("INNER", "`action_task`", "`action_task`.`runner_id` = `action_runner`.`id`"). + Where(builder.Eq{"`action_runner`.`ephemeral`": true}). + And(builder.NotIn("`action_task`.`status`", actions_model.StatusWaiting, actions_model.StatusRunning, actions_model.StatusBlocked)) + b := builder.Delete(builder.In("id", subQuery)).From("`action_runner`") + res, err := db.GetEngine(ctx).Exec(b) + if err != nil { + return fmt.Errorf("find runners: %w", err) + } + affected, _ := res.RowsAffected() + log.Info("Removed %d runners", affected) + return nil +} diff --git a/services/actions/task.go b/services/actions/task.go index 1feeb67a80..9c8198206a 100644 --- a/services/actions/task.go +++ b/services/actions/task.go @@ -23,6 +23,26 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv actionTask *actions_model.ActionTask ) + if runner.Ephemeral { + var task actions_model.ActionTask + has, err := db.GetEngine(ctx).Where("runner_id = ?", runner.ID).Get(&task) + // Let the runner retry the request, do not allow to proceed + if err != nil { + return nil, false, err + } + if has { + if task.Status == actions_model.StatusWaiting || task.Status == actions_model.StatusRunning || task.Status == actions_model.StatusBlocked { + return nil, false, nil + } + // task has been finished, remove it + _, err = db.DeleteByID[actions_model.ActionRunner](ctx, runner.ID) + if err != nil { + return nil, false, err + } + return nil, false, fmt.Errorf("runner has been removed") + } + } + if err := db.WithTx(ctx, func(ctx context.Context) error { t, ok, err := actions_model.CreateTaskForRunner(ctx, runner) if err != nil { diff --git a/tests/integration/actions_job_test.go b/tests/integration/actions_job_test.go index a967adb417..caab215cee 100644 --- a/tests/integration/actions_job_test.go +++ b/tests/integration/actions_job_test.go @@ -21,8 +21,10 @@ import ( "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" + actions_service "code.gitea.io/gitea/services/actions" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" ) @@ -132,7 +134,7 @@ jobs: apiRepo := createActionsTestRepo(t, token, "actions-jobs-with-needs", false) runner := newMockRunner() - runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}) + runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false) for _, tc := range testCases { t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) { @@ -318,7 +320,7 @@ jobs: apiRepo := createActionsTestRepo(t, token, "actions-jobs-outputs-with-matrix", false) runner := newMockRunner() - runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}) + runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false) for _, tc := range testCases { t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) { @@ -363,7 +365,7 @@ func TestActionsGiteaContext(t *testing.T) { user2APICtx := NewAPITestContext(t, baseRepo.OwnerName, baseRepo.Name, auth_model.AccessTokenScopeWriteRepository) runner := newMockRunner() - runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}) + runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false) // init the workflow wfTreePath := ".gitea/workflows/pull.yml" @@ -437,6 +439,156 @@ jobs: }) } +// Ephemeral +func TestActionsGiteaContextEphemeral(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + user2Session := loginUser(t, user2.Name) + user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiBaseRepo := createActionsTestRepo(t, user2Token, "actions-gitea-context", false) + baseRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiBaseRepo.ID}) + user2APICtx := NewAPITestContext(t, baseRepo.OwnerName, baseRepo.Name, auth_model.AccessTokenScopeWriteRepository) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}, true) + + // verify CleanupEphemeralRunners does not remove this runner + err := actions_service.CleanupEphemeralRunners(t.Context()) + assert.NoError(t, err) + + // init the workflow + wfTreePath := ".gitea/workflows/pull.yml" + wfFileContent := `name: Pull Request +on: pull_request +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'test the pull' + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'test the pull' +` + opts := getWorkflowCreateFileOptions(user2, baseRepo.DefaultBranch, fmt.Sprintf("create %s", wfTreePath), wfFileContent) + createWorkflowFile(t, user2Token, baseRepo.OwnerName, baseRepo.Name, wfTreePath, opts) + // user2 creates a pull request + doAPICreateFile(user2APICtx, "user2-patch.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "user2/patch-1", + Message: "create user2-patch.txt", + Author: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Committer: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user2-fix")), + })(t) + apiPull, err := doAPICreatePullRequest(user2APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, "user2/patch-1")(t) + assert.NoError(t, err) + task := runner.fetchTask(t) + gtCtx := task.Context.GetFields() + actionTask := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: task.Id}) + actionRunJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: actionTask.JobID}) + actionRun := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: actionRunJob.RunID}) + assert.NoError(t, actionRun.LoadAttributes(t.Context())) + + assert.Equal(t, user2.Name, gtCtx["actor"].GetStringValue()) + assert.Equal(t, setting.AppURL+"api/v1", gtCtx["api_url"].GetStringValue()) + assert.Equal(t, apiPull.Base.Ref, gtCtx["base_ref"].GetStringValue()) + runEvent := map[string]any{} + assert.NoError(t, json.Unmarshal([]byte(actionRun.EventPayload), &runEvent)) + assert.True(t, reflect.DeepEqual(gtCtx["event"].GetStructValue().AsMap(), runEvent)) + assert.Equal(t, actionRun.TriggerEvent, gtCtx["event_name"].GetStringValue()) + assert.Equal(t, apiPull.Head.Ref, gtCtx["head_ref"].GetStringValue()) + assert.Equal(t, actionRunJob.JobID, gtCtx["job"].GetStringValue()) + assert.Equal(t, actionRun.Ref, gtCtx["ref"].GetStringValue()) + assert.Equal(t, (git.RefName(actionRun.Ref)).ShortName(), gtCtx["ref_name"].GetStringValue()) + assert.False(t, gtCtx["ref_protected"].GetBoolValue()) + assert.Equal(t, string((git.RefName(actionRun.Ref)).RefType()), gtCtx["ref_type"].GetStringValue()) + assert.Equal(t, actionRun.Repo.OwnerName+"/"+actionRun.Repo.Name, gtCtx["repository"].GetStringValue()) + assert.Equal(t, actionRun.Repo.OwnerName, gtCtx["repository_owner"].GetStringValue()) + assert.Equal(t, actionRun.Repo.HTMLURL(), gtCtx["repositoryUrl"].GetStringValue()) + assert.Equal(t, fmt.Sprint(actionRunJob.RunID), gtCtx["run_id"].GetStringValue()) + assert.Equal(t, fmt.Sprint(actionRun.Index), gtCtx["run_number"].GetStringValue()) + assert.Equal(t, fmt.Sprint(actionRunJob.Attempt), gtCtx["run_attempt"].GetStringValue()) + assert.Equal(t, "Actions", gtCtx["secret_source"].GetStringValue()) + assert.Equal(t, setting.AppURL, gtCtx["server_url"].GetStringValue()) + assert.Equal(t, actionRun.CommitSHA, gtCtx["sha"].GetStringValue()) + assert.Equal(t, actionRun.WorkflowID, gtCtx["workflow"].GetStringValue()) + assert.Equal(t, setting.Actions.DefaultActionsURL.URL(), gtCtx["gitea_default_actions_url"].GetStringValue()) + token := gtCtx["token"].GetStringValue() + assert.Equal(t, actionTask.TokenLastEight, token[len(token)-8:]) + + // verify CleanupEphemeralRunners does not remove this runner + err = actions_service.CleanupEphemeralRunners(t.Context()) + assert.NoError(t, err) + + resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: 0, + })) + assert.NoError(t, err) + assert.Nil(t, resp.Msg.Task) + + // verify CleanupEphemeralRunners does not remove this runner + err = actions_service.CleanupEphemeralRunners(t.Context()) + assert.NoError(t, err) + + runner.client.runnerServiceClient.UpdateTask(t.Context(), connect.NewRequest(&runnerv1.UpdateTaskRequest{ + State: &runnerv1.TaskState{ + Id: actionTask.ID, + Result: runnerv1.Result_RESULT_SUCCESS, + }, + })) + resp, err = runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: 0, + })) + assert.Error(t, err) + assert.Nil(t, resp) + + resp, err = runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: 0, + })) + assert.Error(t, err) + assert.Nil(t, resp) + + // create an runner that picks a job and get force cancelled + runnerToBeRemoved := newMockRunner() + runnerToBeRemoved.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner-to-be-removed", []string{"ubuntu-latest"}, true) + + taskToStopAPIObj := runnerToBeRemoved.fetchTask(t) + + taskToStop := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskToStopAPIObj.Id}) + + // verify CleanupEphemeralRunners does not remove the custom crafted runner + err = actions_service.CleanupEphemeralRunners(t.Context()) + assert.NoError(t, err) + + runnerToRemove := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunner{ID: taskToStop.RunnerID}) + + err = actions_model.StopTask(t.Context(), taskToStop.ID, actions_model.StatusFailure) + assert.NoError(t, err) + + // verify CleanupEphemeralRunners does remove the custom crafted runner + err = actions_service.CleanupEphemeralRunners(t.Context()) + assert.NoError(t, err) + + unittest.AssertNotExistsBean(t, &actions_model.ActionRunner{ID: runnerToRemove.ID}) + + // this cleanup is required to allow further tests to pass + doAPIDeleteRepository(user2APICtx)(t) + }) +} + func createActionsTestRepo(t *testing.T, authToken, repoName string, isPrivate bool) *api.Repository { req := NewRequestWithJSON(t, "POST", "/api/v1/user/repos", &api.CreateRepoOption{ Name: repoName, diff --git a/tests/integration/actions_log_test.go b/tests/integration/actions_log_test.go index fd055fc4c4..8f10226721 100644 --- a/tests/integration/actions_log_test.go +++ b/tests/integration/actions_log_test.go @@ -105,7 +105,7 @@ jobs: apiRepo := createActionsTestRepo(t, token, "actions-download-task-logs", false) repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) runner := newMockRunner() - runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) for _, tc := range testCases { t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) { diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index da51e1c04e..ee92032e9f 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -67,19 +67,20 @@ func (r *mockRunner) doPing(t *testing.T) { assert.Equal(t, "Hello, mock-runner!", resp.Msg.Data) } -func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []string) { +func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []string, ephemeral bool) { r.doPing(t) resp, err := r.client.runnerServiceClient.Register(t.Context(), connect.NewRequest(&runnerv1.RegisterRequest{ - Name: name, - Token: token, - Version: "mock-runner-version", - Labels: labels, + Name: name, + Token: token, + Version: "mock-runner-version", + Labels: labels, + Ephemeral: ephemeral, })) assert.NoError(t, err) r.client = newMockRunnerClient(resp.Msg.Runner.Uuid, resp.Msg.Runner.Token) } -func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) { +func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string, ephemeral bool) { session := loginUser(t, ownerName) token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository) req := NewRequest(t, "GET", fmt.Sprintf("/api/v1/repos/%s/%s/actions/runners/registration-token", ownerName, repoName)).AddTokenAuth(token) @@ -88,7 +89,7 @@ func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, run Token string `json:"token"` } DecodeJSON(t, resp, ®istrationToken) - r.doRegister(t, runnerName, registrationToken.Token, labels) + r.doRegister(t, runnerName, registrationToken.Token, labels, ephemeral) } func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { diff --git a/tests/integration/repo_webhook_test.go b/tests/integration/repo_webhook_test.go index effeff111d..7e85c10d4b 100644 --- a/tests/integration/repo_webhook_test.go +++ b/tests/integration/repo_webhook_test.go @@ -639,7 +639,7 @@ func Test_WebhookWorkflowJob(t *testing.T) { assert.NoError(t, err) runner := newMockRunner() - runner.registerAsRepoRunner(t, "user2", "repo1", "mock-runner", []string{"ubuntu-latest"}) + runner.registerAsRepoRunner(t, "user2", "repo1", "mock-runner", []string{"ubuntu-latest"}, false) // 2. trigger the webhooks