Skip to content

Commit

Permalink
Merge pull request #110 from vansante/limit-concurrent-receives
Browse files Browse the repository at this point in the history
Limit the concurrent ZFS receives to 3 (default)
  • Loading branch information
vansante authored Sep 25, 2024
2 parents 9dce9f2 + a814bfc commit a16343a
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 8 deletions.
3 changes: 3 additions & 0 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var (
ErrInvalidResumeToken = errors.New("invalid resume token given")
ErrResumeNotPossible = errors.New("resume not possible")
ErrTooManyRequests = errors.New("too many requests")
)

const clientUserAgent = "go-zfsutils@%s"
Expand Down Expand Up @@ -302,6 +303,8 @@ func (c *Client) doSendStream(req *http.Request, pipeWrtr *io.PipeWriter, cancel
return ErrInvalidResumeToken
case http.StatusPreconditionFailed:
return ErrResumeNotPossible
case http.StatusTooManyRequests:
return ErrTooManyRequests
default:
return fmt.Errorf("unexpected status %d sending stream, server error: %s", resp.StatusCode, resp.Header.Get(HeaderError))
}
Expand Down
7 changes: 6 additions & 1 deletion http/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package http

const (
defaultBytesPerSecond = 100 * 1024 * 1024
defaultBytesPerSecond = 100 * 1024 * 1024
defaultMaximumConcurrentReceives = 3
)

// Config specifies the configuration for the zfs http server
Expand All @@ -10,6 +11,9 @@ type Config struct {
ParentDataset string `json:"ParentDataset" yaml:"ParentDataset"`
SpeedBytesPerSecond int64 `json:"SpeedBytesPerSecond" yaml:"SpeedBytesPerSecond"`

// MaximumConcurrentReceives limits the concurrent amount of ZFS receives, set to zero to disable limits
MaximumConcurrentReceives int `json:"MaximumConcurrentReceives" yaml:"MaximumConcurrentReceives"`

Permissions Permissions `json:"Permissions" yaml:"Permissions"`
}

Expand All @@ -25,4 +29,5 @@ type Permissions struct {
// ApplyDefaults sets all config values to their defaults (if they have one)
func (c *Config) ApplyDefaults() {
c.SpeedBytesPerSecond = defaultBytesPerSecond
c.MaximumConcurrentReceives = defaultMaximumConcurrentReceives
}
11 changes: 7 additions & 4 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import (
"log/slog"
"net/http"
"strconv"
"sync"

"github.com/klauspost/compress/zstd"
)

// HTTP is the main object for serving the ZFS HTTP server
type HTTP struct {
router *http.ServeMux
config Config
logger *slog.Logger
ctx context.Context
router *http.ServeMux
config Config
logger *slog.Logger
receiveCount int
receiveMutex sync.Mutex
ctx context.Context
}

type handle func(http.ResponseWriter, *http.Request, *slog.Logger)
Expand Down
29 changes: 29 additions & 0 deletions http/http_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,35 @@ func (h *HTTP) handleReceiveSnapshot(w http.ResponseWriter, req *http.Request, l
receiveDataset = fmt.Sprintf("%s/%s", h.config.ParentDataset, filesystem)
}

// If we are configured to limit receives
if h.config.MaximumConcurrentReceives > 0 {
h.receiveMutex.Lock()
curRcvCount := h.receiveCount
if curRcvCount >= h.config.MaximumConcurrentReceives {
h.receiveMutex.Unlock()
logger.Warn("zfs.http.handleReceiveSnapshot: Returning 429 Too Many Requests",
"receives", curRcvCount, "maxReceives", h.config.MaximumConcurrentReceives,
)
w.Header().Set(HeaderError, fmt.Sprintf("maximum concurrent receives of %d exceeded", h.config.MaximumConcurrentReceives))
w.WriteHeader(http.StatusTooManyRequests)
return
}
// Reserve a slot
h.receiveCount++
h.receiveMutex.Unlock()

logger.Debug("zfs.http.handleReceiveSnapshot: Receive slot claimed",
"receives", curRcvCount+1, "maxReceives", h.config.MaximumConcurrentReceives,
)

defer func() {
// Unlock the slot at request completion
h.receiveMutex.Lock()
h.receiveCount--
h.receiveMutex.Unlock()
}()
}

ds, err := zfs.ReceiveSnapshot(req.Context(), req.Body, receiveDataset, zfs.ReceiveOptions{
EnableDecompression: h.getEnableDecompression(req),
ForceRollback: h.getReceiveForceRollback(req),
Expand Down
64 changes: 64 additions & 0 deletions http/http_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -486,3 +487,66 @@ func TestHTTP_handleReceiveSnapshotResume(t *testing.T) {
require.Equal(t, ds.Name, newFullSnap)
})
}

func TestHTTP_handleReceiveSnapshotMaxConcurrent(t *testing.T) {
httpHandlerTest(t, func(url string) {
startMutex := sync.RWMutex{}
startMutex.Lock()
endWg := sync.WaitGroup{}

const snapName = "snappy"

ds, err := zfs.GetDataset(context.Background(), testFilesystem)
require.NoError(t, err)

ds, err = ds.Snapshot(context.Background(), snapName, zfs.SnapshotOptions{})
require.NoError(t, err)

const newSnap = "recv"
var countError, countTooMany int32
endWg.Add(4)
for i, name := range []string{"bla1", "bla2", "bla3", "bla4"} {
go func(i int, name string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()

body := bytes.NewBuffer([]byte{0, 0, 7})
req, err := http.NewRequestWithContext(ctx, http.MethodPut, fmt.Sprintf("%s/filesystems/%s/snapshots/%s?%s=%s",
url, name,
newSnap,
GETParamReceiveProperties, ReceiveProperties{
zfs.PropertyCanMount: zfs.ValueOff,
}.Encode(),
), body)

startMutex.RLock()
defer startMutex.RUnlock()

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
_ = resp.Body.Close()

switch resp.StatusCode {
case http.StatusInternalServerError:
atomic.AddInt32(&countError, 1)
endWg.Done()
t.Logf("%d: Received OK", i)
case http.StatusTooManyRequests:
atomic.AddInt32(&countTooMany, 1)
endWg.Done()
t.Logf("%d: Received too many requests", i)
default:
endWg.Done()
t.Logf("%d: Got unexpected status code %d", i, resp.StatusCode)
t.Fail()
}
}(i, name)
}
startMutex.Unlock()
time.Sleep(10 * time.Millisecond)
endWg.Wait()

require.GreaterOrEqual(t, countTooMany, int32(1), "CountTooMany not returned")
require.GreaterOrEqual(t, countError, int32(2), "CountError is not at least 2")
})
}
2 changes: 2 additions & 0 deletions http/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ func TestHTTPZPool(testZPool, prefix, testFs string, fn func(server *httptest.Se
ParentDataset: testZPool,
HTTPPathPrefix: prefix,

MaximumConcurrentReceives: 2,

Permissions: Permissions{
AllowSpeedOverride: true,
AllowNonRaw: true,
Expand Down
26 changes: 23 additions & 3 deletions job/snapshots_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,14 @@ func (r *Runner) sendDatasetSnapshots(ds *zfs.Dataset) error {

// If we have a sending property, its worth checking whether we can resume a transfer
if propertyIsSet(ds.ExtraProps[sendingProp]) {
hasSent, err := r.resumeSendSnapshot(client, ds, remoteDataset, ds.ExtraProps[sendingProp])
resumable, err := r.resumeSendSnapshot(client, ds, remoteDataset, ds.ExtraProps[sendingProp])
if err != nil {
// TODO:FIXME We should probably force a full re-send after throwing away the partial data on the remote server here
return err
}
if hasSent {
if resumable {
// Clear remote cache, because we have resumed snapshots, its no longer correct
r.clearRemoteDatasetCache(client.Server(), remoteDataset)
return nil
}
}
Expand Down Expand Up @@ -245,7 +247,17 @@ func (r *Runner) resumeSendSnapshot(client *zfshttp.Client, ds *zfs.Dataset, rem
})
cancel()
result.BytesSent += int64(curBytes)
if err != nil {
switch {
case errors.Is(err, zfshttp.ErrTooManyRequests):
r.logger.Info("zfs.job.Runner.resumeSendSnapshot: Too many receives, delaying",
"error", err,
"snapshot", ds.Name,
"server", client.Server(),
"snapshotName", sendingSnapName,
"snapshot", fullSnapName,
)
return true, nil
case err != nil:
return false, fmt.Errorf("error resuming send of %s (sent %d bytes in %s): %w",
fullSnapName, result.BytesSent, result.TimeTaken, err,
)
Expand Down Expand Up @@ -305,6 +317,14 @@ func (r *Runner) sendSnapshot(client *zfshttp.Client, send zfshttp.SnapshotSendO
)
r.clearRemoteDatasetCache(client.Server(), datasetName(send.Snapshot.Name, true))
return nil
case errors.Is(err, zfshttp.ErrTooManyRequests):
r.logger.Info("zfs.job.Runner.sendDatasetSnapshots: Too many receives, delaying",
"error", err,
"snapshot", send.Snapshot.Name,
"server", client.Server(),
"sendSnapshotName", send.SnapshotName,
)
return nil
case err != nil:
return fmt.Errorf("error sending %s@%s (sent %d bytes in %s): %w",
send.DatasetName, send.SnapshotName, result.BytesSent, result.TimeTaken, err,
Expand Down

0 comments on commit a16343a

Please sign in to comment.