Skip to content

Commit

Permalink
backport of commit 1f94419
Browse files Browse the repository at this point in the history
  • Loading branch information
Juanadelacuesta authored Nov 18, 2024
1 parent 20df9fd commit 71345e1
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .changelog/24363.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: add the possibility to scale system jobs between 0 and 1
```
25 changes: 25 additions & 0 deletions e2e/scaling/input/namespace_default_system.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: BUSL-1.1

job "system_job" {
type = "system"

group "system_job_group" {

task "system_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "sleep 15000"]
}

env {
version = "1"
}
}
}
}

100 changes: 99 additions & 1 deletion e2e/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)

type ScalingE2ETest struct {
Expand All @@ -27,7 +28,6 @@ func init() {
new(ScalingE2ETest),
},
})

}

func (tc *ScalingE2ETest) BeforeAll(f *framework.F) {
Expand Down Expand Up @@ -165,3 +165,101 @@ func (tc *ScalingE2ETest) TestScalingNamespaces(f *framework.F) {
"Nomad e2e testing", false, nil, &aWriteOpts)
f.NoError(err)
}

// TestScalingBasic performs basic scaling e2e tests within a single namespace using
// using a SystemScheduler.
func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()

// Register a system job with a scaling policy without a group count, it should
// default to 1 per node.

jobID := "test-scaling-" + uuid.Generate()[0:8]
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scaling/input/namespace_default_system.nomad", jobID, "")

jobs := nomadClient.Jobs()
initialAllocs, _, err := jobs.Allocations(jobID, true, nil)
f.NoError(err)

nodeStubList, _, err := nomadClient.Nodes().List(&api.QueryOptions{Namespace: "default"})
f.NoError(err)

// A system job will spawn an allocation per node, we need to know how many nodes
// there are to know how many allocations to expect.
numberOfNodes := len(nodeStubList)

f.Equal(numberOfNodes, len(initialAllocs))
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(initialAllocs)

// Wait for allocations to get past initial pending state
e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs)

// Try to scale beyond 1
testMeta := map[string]interface{}{"scaling-e2e-test": "value"}
scaleResp, _, err := tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(3),
"Nomad e2e testing", false, testMeta, nil)

f.Error(err)
f.Nil(scaleResp)

// The same allocs should be running.
jobs = nomadClient.Jobs()
allocs1, _, err := jobs.Allocations(jobID, true, nil)
f.NoError(err)

f.Equal(len(initialAllocs), len(allocs1))

for i, a := range allocs1 {
f.Equal(a.ID, initialAllocs[i].ID)
}

// Scale down to 0
testMeta = map[string]interface{}{"scaling-e2e-test": "value"}
scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(0),
"Nomad e2e testing", false, testMeta, nil)
f.NoError(err)
f.NotEmpty(scaleResp.EvalID)

// Assert job is still up but no allocs are running
stopedAllocs, _, err := jobs.Allocations(jobID, false, nil)
f.NoError(err)

f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, stopedAllocs)))
f.Equal(numberOfNodes, len(stopedAllocs))

// Scale up to 1 again
testMeta = map[string]interface{}{"scaling-e2e-test": "value"}
scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(1),
"Nomad e2e testing", false, testMeta, nil)
f.NoError(err)
f.NotEmpty(scaleResp.EvalID)

// Wait for new allocation to get past initial pending state
e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs)

// Assert job is still running and there is a running allocation again
allocs, _, err := jobs.Allocations(jobID, true, nil)
f.NoError(err)
f.Equal(numberOfNodes*2, len(allocs))

f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, allocs)))
f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusRun, allocs)))

// Remove the job.
_, _, err = tc.Nomad().Jobs().Deregister(jobID, true, nil)
f.NoError(err)
f.NoError(tc.Nomad().System().GarbageCollect())
}

func filterAllocsByDesiredStatus(status string, allocs []*api.AllocationListStub) []*api.AllocationListStub {
res := []*api.AllocationListStub{}

for _, a := range allocs {
if a.DesiredStatus == status {
res = append(res, a)
}
}

return res
}
5 changes: 3 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,8 +1027,9 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
if job == nil {
return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID))
}
if job.Type == structs.JobTypeSystem {
return structs.NewErrRPCCoded(http.StatusBadRequest, `cannot scale jobs of type "system"`)

if job.Type == structs.JobTypeSystem && *args.Count > 1 {
return structs.NewErrRPCCoded(http.StatusBadRequest, `jobs of type "system" can only be scaled between 0 and 1`)
}

// Since job is going to be mutated we must copy it since state store methods
Expand Down
29 changes: 26 additions & 3 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8271,20 +8271,43 @@ func TestJobEndpoint_Scale_SystemJob(t *testing.T) {
mockSystemJob := mock.SystemJob()
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 10, nil, mockSystemJob))

// Scale to 0
scaleReq := &structs.JobScaleRequest{
JobID: mockSystemJob.ID,
Target: map[string]string{
structs.ScalingTargetGroup: mockSystemJob.TaskGroups[0].Name,
},
Count: pointer.Of(int64(13)),
Count: pointer.Of(int64(0)),
WriteRequest: structs.WriteRequest{
Region: DefaultRegion,
Namespace: mockSystemJob.Namespace,
},
}
var resp structs.JobRegisterResponse

resp := structs.JobRegisterResponse{}
err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp)
must.NoError(t, err)

// Scale to a negative number
scaleReq.Count = pointer.Of(int64(-5))

resp = structs.JobRegisterResponse{}
must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp),
`400,scaling action count can't be negative`)

// Scale back to 1
scaleReq.Count = pointer.Of(int64(1))

resp = structs.JobRegisterResponse{}
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp)
must.NoError(t, err)

// Scale beyond 1
scaleReq.Count = pointer.Of(int64(13))

resp = structs.JobRegisterResponse{}
must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp),
`400,cannot scale jobs of type "system"`)
`400,jobs of type "system" can only be scaled between 0 and 1`)
}

func TestJobEndpoint_Scale_BatchJob(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7055,10 +7055,6 @@ func (tg *TaskGroup) Canonicalize(job *Job) {
tg.EphemeralDisk = DefaultEphemeralDisk()
}

if job.Type == JobTypeSystem && tg.Count == 0 {
tg.Count = 1
}

if tg.Scaling != nil {
tg.Scaling.Canonicalize(job, tg, nil)
}
Expand Down

0 comments on commit 71345e1

Please sign in to comment.