Skip to content

Commit

Permalink
feat(storage): implement RetryChunkDeadline for grpc writes (#11476)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp authored Jan 29, 2025
1 parent 8500e9e commit 03575d7
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 53 deletions.
4 changes: 2 additions & 2 deletions storage/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func TestBucketRetryer(t *testing.T) {
Multiplier: 3,
},
policy: RetryAlways,
maxAttempts: expectedAttempts(5),
maxAttempts: intPointer(5),
shouldRetry: func(err error) bool { return false },
},
},
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func TestBucketRetryer(t *testing.T) {
return b.Retryer(WithMaxAttempts(5))
},
want: &retryConfig{
maxAttempts: expectedAttempts(5),
maxAttempts: intPointer(5),
},
},
{
Expand Down
108 changes: 106 additions & 2 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package storage
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"strconv"
Expand Down Expand Up @@ -1859,7 +1861,7 @@ func TestRetryMaxAttemptsEmulated(t *testing.T) {
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{maxAttempts: intPointer(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

var ae *apierror.APIError
Expand Down Expand Up @@ -1910,7 +1912,7 @@ func TestRetryDeadlineExceededEmulated(t *testing.T) {
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{maxAttempts: intPointer(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
t.Fatalf("GetBucket: got unexpected error %v, want nil", err)
}
Expand Down Expand Up @@ -2108,6 +2110,77 @@ func TestWriterChunkTransferTimeoutEmulated(t *testing.T) {
})
}

func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
const (
// Resumable upload with smallest chunksize.
chunkSize = 256 * 1024
fileSize = 600 * 1024
// A small value for testing, but large enough that we do encounter the error.
retryDeadline = time.Second
errCode = 503
)

_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}

// Populate instructions with a lot of errors so it will take a long time
// to suceed. Error only after the first chunk has been sent, as the
// retry deadline does not apply to the first chunk.
manyErrs := []string{fmt.Sprintf("return-%d-after-%dK", errCode, 257)}
for i := 0; i < 20; i++ {
manyErrs = append(manyErrs, fmt.Sprintf("return-%d", errCode))

}
instructions := map[string][]string{"storage.objects.insert": manyErrs}
testID := createRetryTest(t, client, instructions)

var cancel context.CancelFunc
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()

params := &openWriterParams{
attrs: &ObjectAttrs{
Bucket: bucket,
Name: fmt.Sprintf("object-%d", time.Now().Nanosecond()),
Generation: defaultGen,
},
bucket: bucket,
chunkSize: chunkSize,
chunkRetryDeadline: retryDeadline,
ctx: ctx,
donec: make(chan struct{}),
setError: func(_ error) {}, // no-op
progress: func(_ int64) {}, // no-op
setObj: func(_ *ObjectAttrs) {},
}

pw, err := client.OpenWriter(params, &idempotentOption{true})
if err != nil {
t.Fatalf("failed to open writer: %v", err)
}
buffer := bytes.Repeat([]byte("A"), fileSize)
_, err = pw.Write(buffer)
defer pw.Close()
if !errorIsStatusCode(err, errCode, codes.Unavailable) {
t.Errorf("expected err with status %d, got err: %v", errCode, err)
}

// Make sure there was more than one attempt.
got, err := numInstructionsLeft(testID, "storage.objects.insert")
if err != nil {
t.Errorf("getting emulator instructions: %v", err)
}

if got >= len(manyErrs)-1 {
t.Errorf("not enough attempts - the request may not have been retried; got %d instructions left, expected at most %d", got, len(manyErrs)-2)
}
})
}

// createRetryTest creates a bucket in the emulator and sets up a test using the
// Retry Test API for the given instructions. This is intended for emulator tests
// of retry behavior that are not covered by conformance tests.
Expand Down Expand Up @@ -2136,6 +2209,37 @@ func createRetryTest(t *testing.T, client storageClient, instructions map[string
return et.id
}

// Gets the number of unused instructions matching the method.
func numInstructionsLeft(emulatorTestID, method string) (int, error) {
host := os.Getenv("STORAGE_EMULATOR_HOST")
endpoint, err := url.Parse(host)
if err != nil {
return 0, fmt.Errorf("parsing endpoint: %v", err)
}

endpoint.Path = strings.Join([]string{"retry_test", emulatorTestID}, "/")
c := http.DefaultClient
resp, err := c.Get(endpoint.String())
if err != nil || resp.StatusCode != 200 {
return 0, fmt.Errorf("getting retry test: err: %v, resp: %+v", err, resp)
}
defer func() {
closeErr := resp.Body.Close()
if err == nil {
err = closeErr
}
}()
testRes := struct {
Instructions map[string][]string
Completed bool
}{}
if err := json.NewDecoder(resp.Body).Decode(&testRes); err != nil {
return 0, fmt.Errorf("decoding response: %v", err)
}
// Subtract one because the testbench is off by one (see storage-testbench/issues/707).
return len(testRes.Instructions[method]) - 1, nil
}

// createObject creates an object in the emulator with content randomBytesToWrite and
// returns its name, generation, and metageneration.
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
Expand Down
10 changes: 10 additions & 0 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,16 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
return err
}

retryDeadline := defaultWriteChunkRetryDeadline
if params.chunkRetryDeadline != 0 {
retryDeadline = params.chunkRetryDeadline
}

if gw.settings.retry == nil {
gw.settings.retry = defaultRetry
}
gw.settings.retry.maxRetryDuration = retryDeadline

var o *storagepb.Object
uploadBuff := func(ctx context.Context) error {
obj, err := gw.uploadBuffer(ctx, recvd, offset, doneReading)
Expand Down
3 changes: 3 additions & 0 deletions storage/grpc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io"
"time"

gapic "cloud.google.com/go/storage/internal/apiv2"
"cloud.google.com/go/storage/internal/apiv2/storagepb"
Expand All @@ -28,6 +29,8 @@ import (
"google.golang.org/protobuf/proto"
)

const defaultWriteChunkRetryDeadline = 32 * time.Second

type gRPCAppendBidiWriteBufferSender struct {
bucket string
routingToken *string
Expand Down
9 changes: 7 additions & 2 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2847,7 +2847,6 @@ func TestIntegration_SignedURL_EmptyStringObjectName(t *testing.T) {
t.Fatal(err)
}
})

}

func TestIntegration_BucketACL(t *testing.T) {
Expand Down Expand Up @@ -6962,11 +6961,17 @@ func extractErrCode(err error) int {
return -1
}

// errorIsStatusCode returns true if err is a:
// - googleapi.Error with httpStatusCode, or
// - apierror.APIError with grpcStatusCode, or
// - grpc/status.Status error with grpcStatusCode.
func errorIsStatusCode(err error, httpStatusCode int, grpcStatusCode codes.Code) bool {
var httpErr *googleapi.Error
var grpcErr *apierror.APIError

return (errors.As(err, &httpErr) && httpErr.Code == httpStatusCode) ||
(errors.As(err, &grpcErr) && grpcErr.GRPCStatus().Code() == grpcStatusCode)
(errors.As(err, &grpcErr) && grpcErr.GRPCStatus().Code() == grpcStatusCode) ||
status.Code(err) == grpcStatusCode
}

func setUpRequesterPaysBucket(ctx context.Context, t *testing.T, bucket, object string, addOwnerEmail string) {
Expand Down
29 changes: 24 additions & 5 deletions storage/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"net/url"
"strings"
"time"

"cloud.google.com/go/internal"
"cloud.google.com/go/internal/version"
Expand Down Expand Up @@ -67,22 +68,40 @@ func run(ctx context.Context, call func(ctx context.Context) error, retry *retry
errorFunc = retry.shouldRetry
}

var quitAfterTimer *time.Timer
if retry.maxRetryDuration != 0 {
quitAfterTimer = time.NewTimer(retry.maxRetryDuration)
defer quitAfterTimer.Stop()
}

var lastErr error
return internal.Retry(ctx, bo, func() (stop bool, err error) {
if retry.maxRetryDuration != 0 {
select {
case <-quitAfterTimer.C:
if lastErr == nil {
return true, fmt.Errorf("storage: request not sent, choose a larger value for the retry deadline (currently set to %s)", retry.maxRetryDuration)
}
return true, fmt.Errorf("storage: retry deadline of %s reached after %v attempts; last error: %w", retry.maxRetryDuration, attempts, lastErr)
default:
}
}

ctxWithHeaders := setInvocationHeaders(ctx, invocationID, attempts)
err = call(ctxWithHeaders)
if err != nil && retry.maxAttempts != nil && attempts >= *retry.maxAttempts {
return true, fmt.Errorf("storage: retry failed after %v attempts; last error: %w", *retry.maxAttempts, err)
lastErr = call(ctxWithHeaders)
if lastErr != nil && retry.maxAttempts != nil && attempts >= *retry.maxAttempts {
return true, fmt.Errorf("storage: retry failed after %v attempts; last error: %w", *retry.maxAttempts, lastErr)
}
attempts++
retryable := errorFunc(err)
retryable := errorFunc(lastErr)
// Explicitly check context cancellation so that we can distinguish between a
// DEADLINE_EXCEEDED error from the server and a user-set context deadline.
// Unfortunately gRPC will codes.DeadlineExceeded (which may be retryable if it's
// sent by the server) in both cases.
if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
retryable = false
}
return !retryable, err
return !retryable, lastErr
})
}

Expand Down
Loading

0 comments on commit 03575d7

Please sign in to comment.