Skip to content

Commit

Permalink
dynamic host volumes: delete by single volume ID (#24606)
Browse files Browse the repository at this point in the history
string instead of []string
  • Loading branch information
gulducat authored Dec 4, 2024
1 parent a4b7163 commit e4aac7c
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 87 deletions.
6 changes: 3 additions & 3 deletions api/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type HostVolumeListRequest struct {
}

type HostVolumeDeleteRequest struct {
VolumeIDs []string
ID string
}

// Create forwards to client agents so a host volume can be created on those
Expand Down Expand Up @@ -238,8 +238,8 @@ func (hv *HostVolumes) List(req *HostVolumeListRequest, opts *QueryOptions) ([]*
}

// Delete deletes a host volume
func (hv *HostVolumes) Delete(id string, opts *WriteOptions) (*WriteMeta, error) {
path, err := url.JoinPath("/v1/volume/host/", url.PathEscape(id))
func (hv *HostVolumes) Delete(req *HostVolumeDeleteRequest, opts *WriteOptions) (*WriteMeta, error) {
path, err := url.JoinPath("/v1/volume/host/", url.PathEscape(req.ID))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *HTTPServer) hostVolumeCreate(resp http.ResponseWriter, req *http.Reques
func (s *HTTPServer) hostVolumeDelete(id string, resp http.ResponseWriter, req *http.Request) (any, error) {
// HTTP API only supports deleting a single ID because of compatibility with
// the existing HTTP routes for CSI
args := structs.HostVolumeDeleteRequest{VolumeIDs: []string{id}}
args := structs.HostVolumeDeleteRequest{VolumeID: id}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.HostVolumeDeleteResponse
Expand Down
2 changes: 1 addition & 1 deletion command/volume_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (c *VolumeDeleteCommand) deleteCSIVolume(client *api.Client, volID string,
}

func (c *VolumeDeleteCommand) deleteHostVolume(client *api.Client, volID string) int {
_, err := client.HostVolumes().Delete(volID, nil)
_, err := client.HostVolumes().Delete(&api.HostVolumeDeleteRequest{ID: volID}, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deleting volume: %s", err))
return 1
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,7 +2443,7 @@ func (n *nomadFSM) applyHostVolumeDelete(msgType structs.MessageType, buf []byte
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteHostVolumes(index, req.RequestNamespace(), req.VolumeIDs); err != nil {
if err := n.state.DeleteHostVolume(index, req.RequestNamespace(), req.VolumeID); err != nil {
n.logger.Error("DeleteHostVolumes failed", "error", err)
return err
}
Expand Down
58 changes: 23 additions & 35 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -564,57 +563,46 @@ func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *struct
return structs.ErrPermissionDenied
}

if len(args.VolumeIDs) == 0 {
return fmt.Errorf("missing volumes to delete")
if args.VolumeID == "" {
return fmt.Errorf("missing volume ID to delete")
}

var deletedVols []string
var index uint64

snap, err := v.srv.State().Snapshot()
if err != nil {
return err
}

var mErr *multierror.Error
ns := args.RequestNamespace()
id := args.VolumeID

for _, id := range args.VolumeIDs {
vol, err := snap.HostVolumeByID(nil, ns, id, true)
if err != nil {
return fmt.Errorf("could not query host volume: %w", err)
}
if vol == nil {
return fmt.Errorf("no such volume: %s", id)
}
if len(vol.Allocations) > 0 {
allocIDs := helper.ConvertSlice(vol.Allocations,
func(a *structs.AllocListStub) string { return a.ID })
mErr = multierror.Append(mErr,
fmt.Errorf("volume %s in use by allocations: %v", id, allocIDs))
continue
}
vol, err := snap.HostVolumeByID(nil, ns, id, true)
if err != nil {
return fmt.Errorf("could not query host volume: %w", err)
}
if vol == nil {
return fmt.Errorf("no such volume: %s", id)
}
if len(vol.Allocations) > 0 {
allocIDs := helper.ConvertSlice(vol.Allocations,
func(a *structs.AllocListStub) string { return a.ID })
return fmt.Errorf("volume %s in use by allocations: %v", id, allocIDs)
}

err = v.deleteVolume(vol)
if err != nil {
mErr = multierror.Append(mErr, err)
} else {
deletedVols = append(deletedVols, id)
}
err = v.deleteVolume(vol)
if err != nil {
return err
}

if len(deletedVols) > 0 {
args.VolumeIDs = deletedVols
_, index, err = v.srv.raftApply(structs.HostVolumeDeleteRequestType, args)
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "delete")
mErr = multierror.Append(mErr, err)
}
_, index, err = v.srv.raftApply(structs.HostVolumeDeleteRequestType, args)
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "delete")
return err
}

reply.VolumeIDs = deletedVols
reply.Index = index
return helper.FlattenMultierror(mErr)
return nil
}

func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error {
Expand Down
51 changes: 34 additions & 17 deletions nomad/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
index, []*structs.Allocation{alloc}))

delReq := &structs.HostVolumeDeleteRequest{
VolumeIDs: []string{vol1.ID, vol2.ID},
VolumeID: vol2.ID,
WriteRequest: structs.WriteRequest{
Region: srv.Region(),
Namespace: ns,
Expand All @@ -336,20 +336,6 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Delete", delReq, &delResp)
must.EqError(t, err, fmt.Sprintf("volume %s in use by allocations: [%s]", vol2.ID, alloc.ID))

// volume not in use will be deleted even if we got an error
getReq := &structs.HostVolumeGetRequest{
ID: vol1.ID,
QueryOptions: structs.QueryOptions{
Region: srv.Region(),
Namespace: ns,
AuthToken: token,
},
}
var getResp structs.HostVolumeGetResponse
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Get", getReq, &getResp)
must.NoError(t, err)
must.Nil(t, getResp.Volume)

// update the allocations terminal so the delete works
alloc = alloc.Copy()
alloc.ClientStatus = structs.AllocClientStatusFailed
Expand All @@ -361,15 +347,46 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
}
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", nArgs, &structs.GenericResponse{})

delReq.VolumeIDs = []string{vol2.ID}
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Delete", delReq, &delResp)
must.NoError(t, err)

getReq.ID = vol2.ID
getReq := &structs.HostVolumeGetRequest{
ID: vol2.ID,
QueryOptions: structs.QueryOptions{
Region: srv.Region(),
Namespace: ns,
AuthToken: token,
},
}
var getResp structs.HostVolumeGetResponse
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Get", getReq, &getResp)
must.NoError(t, err)
must.Nil(t, getResp.Volume)
})

// delete vol1 to finish cleaning up
var delResp structs.HostVolumeDeleteResponse
err := msgpackrpc.CallWithCodec(codec, "HostVolume.Delete", &structs.HostVolumeDeleteRequest{
VolumeID: vol1.ID,
WriteRequest: structs.WriteRequest{
Region: srv.Region(),
Namespace: vol1.Namespace,
AuthToken: powerToken,
},
}, &delResp)
must.NoError(t, err)

// should be no volumes left
var listResp structs.HostVolumeListResponse
err = msgpackrpc.CallWithCodec(codec, "HostVolume.List", &structs.HostVolumeListRequest{
QueryOptions: structs.QueryOptions{
Region: srv.Region(),
Namespace: "*",
AuthToken: token,
},
}, &listResp)
must.NoError(t, err)
must.Len(t, 0, listResp.Volumes, must.Sprintf("expect no volumes to remain, got: %+v", listResp))
}

func TestHostVolumeEndpoint_List(t *testing.T) {
Expand Down
41 changes: 19 additions & 22 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,37 +107,34 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err
return txn.Commit()
}

// DeleteHostVolumes deletes a set of host volumes in the same namespace
func (s *StateStore) DeleteHostVolumes(index uint64, ns string, ids []string) error {
// DeleteHostVolume deletes a host volume
func (s *StateStore) DeleteHostVolume(index uint64, ns string, id string) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

for _, id := range ids {
obj, err := txn.First(TableHostVolumes, indexID, ns, id)
if err != nil {
return err
}
if obj != nil {
vol := obj.(*structs.HostVolume)

obj, err := txn.First(TableHostVolumes, indexID, ns, id)
allocs, err := s.AllocsByNodeTerminal(nil, vol.NodeID, false)
if err != nil {
return err
return fmt.Errorf("could not query allocs to check for host volume claims: %w", err)
}
if obj != nil {
vol := obj.(*structs.HostVolume)

allocs, err := s.AllocsByNodeTerminal(nil, vol.NodeID, false)
if err != nil {
return fmt.Errorf("could not query allocs to check for host volume claims: %w", err)
}
for _, alloc := range allocs {
for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes {
if volClaim.Type == structs.VolumeTypeHost && volClaim.Name == vol.Name {
return fmt.Errorf("could not delete volume %s in use by alloc %s",
vol.ID, alloc.ID)
}
for _, alloc := range allocs {
for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes {
if volClaim.Type == structs.VolumeTypeHost && volClaim.Name == vol.Name {
return fmt.Errorf("could not delete volume %s in use by alloc %s",
vol.ID, alloc.ID)
}
}
}

err = txn.Delete(TableHostVolumes, vol)
if err != nil {
return fmt.Errorf("host volume delete: %w", err)
}
err = txn.Delete(TableHostVolumes, vol)
if err != nil {
return fmt.Errorf("host volume delete: %w", err)
}
}

Expand Down
20 changes: 15 additions & 5 deletions nomad/state/state_store_host_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,11 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) {
index, []*structs.Allocation{alloc}))

index++
err = store.DeleteHostVolumes(index, vol2.Namespace, []string{vols[1].ID, vols[2].ID})
err = store.DeleteHostVolume(index, vol2.Namespace, vols[2].ID)
must.EqError(t, err, fmt.Sprintf(
"could not delete volume %s in use by alloc %s", vols[2].ID, alloc.ID))
vol, err = store.HostVolumeByID(nil, vols[1].Namespace, vols[1].ID, true)
must.NoError(t, err)
must.NotNil(t, vol, must.Sprint("volume that didn't error should not be deleted"))

err = store.DeleteHostVolumes(index, vol2.Namespace, []string{vols[1].ID})
err = store.DeleteHostVolume(index, vol2.Namespace, vols[1].ID)
must.NoError(t, err)
vol, err = store.HostVolumeByID(nil, vols[1].Namespace, vols[1].ID, true)
must.NoError(t, err)
Expand All @@ -177,6 +174,19 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) {
must.NoError(t, err)
got = consumeIter(iter)
must.MapLen(t, 1, got, must.Sprint(`expected only one volume to match prefix`))

alloc = alloc.Copy()
alloc.ClientStatus = structs.AllocClientStatusComplete
index++
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup,
index, []*structs.Allocation{alloc}))
for _, v := range vols {
index++
must.NoError(t, store.DeleteHostVolume(index, v.Namespace, v.ID))
}
iter, err = store.HostVolumes(nil, SortDefault)
got = consumeIter(iter)
must.MapLen(t, 0, got, must.Sprint(`expected no volumes to remain`))
}

func TestStateStore_UpdateHostVolumesFromFingerprint(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,11 @@ type HostVolumeRegisterResponse struct {
}

type HostVolumeDeleteRequest struct {
VolumeIDs []string
VolumeID string
WriteRequest
}

type HostVolumeDeleteResponse struct {
VolumeIDs []string // volumes actually deleted
WriteMeta
}

Expand Down

0 comments on commit e4aac7c

Please sign in to comment.