Add optimistic lock to ActionRun table (#26563)
Should fix #26559. How xorm works: https://xorm.io/docs/chapter-06/1.lock/ --------- Co-authored-by: Giteabot <teabot@gitea.io>
This commit is contained in:
parent
42cbe6005a
commit
8cf3b61fb9
5 changed files with 53 additions and 25 deletions
|
@ -43,6 +43,7 @@ type ActionRun struct {
|
||||||
EventPayload string `xorm:"LONGTEXT"`
|
EventPayload string `xorm:"LONGTEXT"`
|
||||||
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
|
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
|
||||||
Status Status `xorm:"index"`
|
Status Status `xorm:"index"`
|
||||||
|
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
|
||||||
Started timeutil.TimeStamp
|
Started timeutil.TimeStamp
|
||||||
Stopped timeutil.TimeStamp
|
Stopped timeutil.TimeStamp
|
||||||
Created timeutil.TimeStamp `xorm:"created"`
|
Created timeutil.TimeStamp `xorm:"created"`
|
||||||
|
@ -332,12 +333,22 @@ func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error)
|
||||||
return run, nil
|
return run, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateRun updates a run.
|
||||||
|
// It requires the inputted run has Version set.
|
||||||
|
// It will return error if the version is not matched (it means the run has been changed after loaded).
|
||||||
func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
|
func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
|
||||||
sess := db.GetEngine(ctx).ID(run.ID)
|
sess := db.GetEngine(ctx).ID(run.ID)
|
||||||
if len(cols) > 0 {
|
if len(cols) > 0 {
|
||||||
sess.Cols(cols...)
|
sess.Cols(cols...)
|
||||||
}
|
}
|
||||||
_, err := sess.Update(run)
|
affected, err := sess.Update(run)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if affected == 0 {
|
||||||
|
return fmt.Errorf("run has changed")
|
||||||
|
// It's impossible that the run is not found, since Gitea never deletes runs.
|
||||||
|
}
|
||||||
|
|
||||||
if run.Status != 0 || util.SliceContains(cols, "status") {
|
if run.Status != 0 || util.SliceContains(cols, "status") {
|
||||||
if run.RepoID == 0 {
|
if run.RepoID == 0 {
|
||||||
|
@ -358,7 +369,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ActionRunIndex db.ResourceIndex
|
type ActionRunIndex db.ResourceIndex
|
||||||
|
|
|
@ -114,32 +114,41 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
|
||||||
if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() {
|
if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() {
|
||||||
// if the status of job changes to waiting again, increase tasks version.
|
// if the status of job changes to waiting again, increase tasks version.
|
||||||
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
|
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
|
||||||
return affected, err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if job.RunID == 0 {
|
if job.RunID == 0 {
|
||||||
var err error
|
var err error
|
||||||
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
|
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
|
||||||
return affected, err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
|
{
|
||||||
if err != nil {
|
// Other goroutines may aggregate the status of the run and update it too.
|
||||||
return affected, err
|
// So we need load the run and its jobs before updating the run.
|
||||||
|
run, err := GetRunByID(ctx, job.RunID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
run.Status = aggregateJobStatus(jobs)
|
||||||
|
if run.Started.IsZero() && run.Status.IsRunning() {
|
||||||
|
run.Started = timeutil.TimeStampNow()
|
||||||
|
}
|
||||||
|
if run.Stopped.IsZero() && run.Status.IsDone() {
|
||||||
|
run.Stopped = timeutil.TimeStampNow()
|
||||||
|
}
|
||||||
|
if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil {
|
||||||
|
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
runStatus := aggregateJobStatus(jobs)
|
return affected, nil
|
||||||
|
|
||||||
run := &ActionRun{
|
|
||||||
ID: job.RunID,
|
|
||||||
Status: runStatus,
|
|
||||||
}
|
|
||||||
if runStatus.IsDone() {
|
|
||||||
run.Stopped = timeutil.TimeStampNow()
|
|
||||||
}
|
|
||||||
return affected, UpdateRun(ctx, run)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func aggregateJobStatus(jobs []*ActionRunJob) Status {
|
func aggregateJobStatus(jobs []*ActionRunJob) Status {
|
||||||
|
|
|
@ -317,14 +317,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if job.Run.Status.IsWaiting() {
|
|
||||||
job.Run.Status = StatusRunning
|
|
||||||
job.Run.Started = now
|
|
||||||
if err := UpdateRun(ctx, job.Run, "status", "started"); err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
task.Job = job
|
task.Job = job
|
||||||
|
|
||||||
if err := commiter.Commit(); err != nil {
|
if err := commiter.Commit(); err != nil {
|
||||||
|
|
|
@ -524,6 +524,8 @@ var migrations = []Migration{
|
||||||
NewMigration("Fix PackageProperty typo", v1_21.FixPackagePropertyTypo),
|
NewMigration("Fix PackageProperty typo", v1_21.FixPackagePropertyTypo),
|
||||||
// v271 -> v272
|
// v271 -> v272
|
||||||
NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable),
|
NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable),
|
||||||
|
// v272 -> v273
|
||||||
|
NewMigration("Add Version to ActionRun table", v1_21.AddVersionToActionRunTable),
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetCurrentDBVersion returns the current db version
|
// GetCurrentDBVersion returns the current db version
|
||||||
|
|
14
models/migrations/v1_21/v272.go
Normal file
14
models/migrations/v1_21/v272.go
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package v1_21 //nolint
|
||||||
|
import (
|
||||||
|
"xorm.io/xorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
func AddVersionToActionRunTable(x *xorm.Engine) error {
|
||||||
|
type ActionRun struct {
|
||||||
|
Version int `xorm:"version default 0"`
|
||||||
|
}
|
||||||
|
return x.Sync(new(ActionRun))
|
||||||
|
}
|
Loading…
Reference in a new issue