Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stateful deployments: task group host volume claims API #25114

Merged
merged 19 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions api/host_volume_claims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package api

import "net/url"

// TaskGroupHostVolumeClaim associates a task group with a host volume ID. It's
// used for stateful deployments, i.e., volume requests with "sticky" set to
// true.
type TaskGroupHostVolumeClaim struct {
ID string `mapstructure:"id"`
Namespace string `mapstructure:"namespace"`
JobID string `mapstructure:"job_id"`
TaskGroupName string `mapstructure:"task_group_name"`
AllocID string `mapstructure:"alloc_id"`
VolumeID string `mapstructure:"volume_id"`
VolumeName string `mapstructure:"volume_name"`

CreateIndex uint64
ModifyIndex uint64
}

// TaskGroupHostVolumeClaims is used to access the API.
type TaskGroupHostVolumeClaims struct {
client *Client
}

// TaskGroupHostVolumeClaims returns a new handle on the API.
func (c *Client) TaskGroupHostVolumeClaims() *TaskGroupHostVolumeClaims {
return &TaskGroupHostVolumeClaims{client: c}
}

type TaskGroupHostVolumeClaimsListRequest struct {
JobID string
TaskGroup string
VolumeName string
}

func (tgvc *TaskGroupHostVolumeClaims) List(req *TaskGroupHostVolumeClaimsListRequest, opts *QueryOptions) ([]*TaskGroupHostVolumeClaim, *QueryMeta, error) {

qv := url.Values{}
if req != nil {
if req.JobID != "" {
qv.Set("job_id", req.JobID)
}
if req.TaskGroup != "" {
qv.Set("task_group", req.TaskGroup)
}
if req.VolumeName != "" {
qv.Set("volume_name", req.VolumeName)
}
}

var out []*TaskGroupHostVolumeClaim
qm, err := tgvc.client.query("/v1/volumes/claims?"+qv.Encode(), &out, opts)
if err != nil {
return nil, qm, err
}
return out, qm, nil
}

func (tgvc *TaskGroupHostVolumeClaims) Delete(claimID string, opts *WriteOptions) (*WriteMeta, error) {
path, err := url.JoinPath("/v1/volumes/claim", url.PathEscape(claimID))
if err != nil {
return nil, err
}
wm, err := tgvc.client.delete(path, nil, nil, opts)
return wm, err
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/plugins", s.wrap(s.CSIPluginsRequest))
s.mux.HandleFunc("/v1/plugin/csi/", s.wrap(s.CSIPluginSpecificRequest))
s.mux.HandleFunc("/v1/volume/host/", s.wrap(s.HostVolumeSpecificRequest))
s.mux.HandleFunc("/v1/volumes/claims", s.wrap(s.TaskGroupHostVolumeClaimListRequest))
s.mux.HandleFunc("/v1/volumes/claim/", s.wrap(s.TaskGroupHostVolumeClaimRequest))

s.mux.HandleFunc("/v1/acl/policies", s.wrap(s.ACLPoliciesRequest))
s.mux.HandleFunc("/v1/acl/policy/", s.wrap(s.ACLPolicySpecificRequest))
Expand Down
64 changes: 64 additions & 0 deletions command/agent/task_group_host_volume_claim_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package agent

import (
"net/http"
"strings"

"github.com/hashicorp/nomad/nomad/structs"
)

func (s *HTTPServer) TaskGroupHostVolumeClaimRequest(resp http.ResponseWriter, req *http.Request) (any, error) {
// Tokenize the suffix of the path to get the volume id, tolerating a
// present or missing trailing slash
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/volume/claim/")
tokens := strings.FieldsFunc(reqSuffix, func(c rune) bool { return c == '/' })

if len(tokens) == 0 {
return nil, CodedError(404, resourceNotFoundErr)
}

switch req.Method {
// DELETE /v1/volume/claim/:id
case http.MethodDelete:
return s.taskGroupHostVolumeClaimDelete(tokens[0], resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}

func (s *HTTPServer) TaskGroupHostVolumeClaimListRequest(resp http.ResponseWriter, req *http.Request) (any, error) {
args := structs.TaskGroupVolumeClaimListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

query := req.URL.Query()
args.Prefix = query.Get("prefix")
args.JobID = query.Get("job_id")
args.TaskGroup = query.Get("task_group")
args.VolumeName = query.Get("volume_name")

var out structs.TaskGroupVolumeClaimListResponse
if err := s.agent.RPC(structs.TaskGroupHostVolumeClaimListRPCMethod, &args, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
return out.Claims, nil
}

func (s *HTTPServer) taskGroupHostVolumeClaimDelete(id string, resp http.ResponseWriter, req *http.Request) (any, error) {
args := structs.TaskGroupVolumeClaimDeleteRequest{ClaimID: id}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.TaskGroupVolumeClaimDeleteResponse
if err := s.agent.RPC(structs.TaskGroupHostVolumeClaimDeleteRPCMethod, &args, &out); err != nil {
return nil, err
}

setIndex(resp, out.Index)
return nil, nil
}
126 changes: 126 additions & 0 deletions command/agent/task_group_host_volume_claim_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package agent

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
)

func TestTaskGroupHostVolumeClaimEndpoint(t *testing.T) {
hostVolCapsReadWrite := []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
}
dhv1 := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: uuid.Generate(),
RequestedCapabilities: hostVolCapsReadWrite,
State: structs.HostVolumeStateReady,
}
stickyRequests := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
Sticky: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}
stickyJob := mock.Job()
stickyJob.TaskGroups[0].Volumes = stickyRequests

existingClaims := []*structs.TaskGroupHostVolumeClaim{
{
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
JobID: stickyJob.ID,
TaskGroupName: stickyJob.TaskGroups[0].Name,
VolumeID: dhv1.ID,
VolumeName: dhv1.Name,
},
{
ID: uuid.Generate(),
Namespace: "foo",
JobID: stickyJob.ID,
TaskGroupName: stickyJob.TaskGroups[0].Name,
VolumeID: dhv1.ID,
VolumeName: dhv1.Name,
},
{
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
JobID: "fooooo",
TaskGroupName: stickyJob.TaskGroups[0].Name,
VolumeID: dhv1.ID,
VolumeName: dhv1.Name,
},
}

httpTest(t, nil, func(s *TestAgent) {

// Create a volume claim on the test node

for _, claim := range existingClaims {
must.NoError(t, s.Agent.Server().State().UpsertTaskGroupHostVolumeClaim(structs.MsgTypeTestSetup, 1000, claim))
}

respW := httptest.NewRecorder()

// List claims

req, err := http.NewRequest(http.MethodGet, "/v1/volumes/claims", nil)
must.NoError(t, err)
claims, err := s.Server.TaskGroupHostVolumeClaimListRequest(respW, req)
must.NoError(t, err)
must.NotNil(t, claims)
respClaims := claims.([]*structs.TaskGroupHostVolumeClaim)
must.NotNil(t, respClaims)
// must contain all the claims except for the one in other ns
must.SliceLen(t, len(existingClaims)-1, respClaims)

// list by fooooo
req, err = http.NewRequest(http.MethodGet, "/v1/volumes/claims?job_id=fooooo", nil)
must.NoError(t, err)
claims, err = s.Server.TaskGroupHostVolumeClaimListRequest(respW, req)
must.NoError(t, err)
must.NotNil(t, claims)
respClaims = claims.([]*structs.TaskGroupHostVolumeClaim)
must.NotNil(t, respClaims)
must.Eq(t, existingClaims[2].ID, respClaims[0].ID)

// Delete claim number 1

req, err = http.NewRequest(http.MethodDelete, fmt.Sprintf("/v1/volumes/claim/%s", existingClaims[0].ID), nil)
must.NoError(t, err)
_, err = s.Server.taskGroupHostVolumeClaimDelete(existingClaims[0].ID, respW, req)
must.NoError(t, err)

// Verify claim was deleted

req, err = http.NewRequest(http.MethodGet, "/v1/volumes/claims", nil)
must.NoError(t, err)
claims, err = s.Server.TaskGroupHostVolumeClaimListRequest(respW, req)
must.NoError(t, err)
must.NotNil(t, claims)
respClaims = claims.([]*structs.TaskGroupHostVolumeClaim)
must.NotNil(t, respClaims)
must.SliceNotContains(t, respClaims, existingClaims[0])
})
}
17 changes: 17 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyHostVolumeRegister(msgType, buf[1:], log.Index)
case structs.HostVolumeDeleteRequestType:
return n.applyHostVolumeDelete(msgType, buf[1:], log.Index)
case structs.TaskGroupHostVolumeClaimDeleteRequestType:
return n.applyTaskGroupHostVolumeClaimDelete(msgType, buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -2450,6 +2452,21 @@ func (n *nomadFSM) applyHostVolumeDelete(msgType structs.MessageType, buf []byte
return nil
}

func (n *nomadFSM) applyTaskGroupHostVolumeClaimDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_task_group_host_volume_claim_delete"}, time.Now())

var req structs.TaskGroupVolumeClaimDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteTaskGroupHostVolumeClaim(index, req.ClaimID); err != nil {
n.logger.Error("DeleteTaskGroupHostVolumeClaim failed", "error", err)
return err
}
return nil
}

func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes
Expand Down
1 change: 1 addition & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
_ = server.Register(NewSystemEndpoint(s, ctx))
_ = server.Register(NewVariablesEndpoint(s, ctx, s.encrypter))
_ = server.Register(NewHostVolumeEndpoint(s, ctx))
_ = server.Register(NewTaskGroupVolumeClaimEndpoint(s, ctx))
_ = server.Register(NewClientHostVolumeEndpoint(s, ctx))

// Register non-streaming
Expand Down
9 changes: 9 additions & 0 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
indexSigningKey = "signing_key"
indexAuthMethod = "auth_method"
indexNodePool = "node_pool"
indexClaimID = "claim_id"
)

var (
Expand Down Expand Up @@ -1740,6 +1741,14 @@ func taskGroupHostVolumeClaimSchema() *memdb.TableSchema {
},
},
},
indexClaimID: {
Name: indexClaimID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
},
}
}
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,7 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
}

// Delete task group volume claims
if err = s.deleteTaskGroupHostVolumeClaim(index, txn, namespace, jobID); err != nil {
if err = s.deleteTaskGroupHostVolumeClaimByNamespaceAndJob(index, txn, namespace, jobID); err != nil {
return fmt.Errorf("deleting job volume claims failed: %v", err)
}

Expand Down
Loading