Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #77
Browse files Browse the repository at this point in the history
* handle vshard error the same way as lua vshard router.
  • Loading branch information
nurzhan-saktaganov committed Nov 18, 2024
1 parent 6c00500 commit a8f96e4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ BUG FIXES:
CHANGES:
* Add comment why and how we handle "NON_MASTER" vshard error.
* Don't support 'type Error struct' anymore.
* Handle vshard error the same way as lua vshard router (resolve issue #77).

FEATURES:

Expand Down
59 changes: 48 additions & 11 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type StorageCallVShardError struct {
MasterUUID string `msgpack:"master" mapstructure:"master"`
ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"`
ReplicaUUID string `msgpack:"replica" mapstructure:"replica"`
Destination string `msgpack:"destination" mapstructure:"destination"`
}

func (s StorageCallVShardError) Error() string {
Expand Down Expand Up @@ -169,19 +170,45 @@ func (r *Router) RouterCallImpl(ctx context.Context,

switch vshardError.Name {
case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked:
// We reproduce here behaviour in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
r.BucketReset(bucketID)

// TODO we should inspect here err.destination like lua vshard router does,
// but we don't support vshard error fully yet:
// https://github.com/KaymeKaydex/go-vshard-router/issues/94
// So we just retry here as a temporary solution.
r.metrics().RetryOnCall("bucket_migrate")

r.log().Debugf(ctx, "retrying fnc '%s' cause got vshard error: %v", fnc, &vshardError)

// this vshardError will be returned to a caller in case of timeout
err = &vshardError
continue
if vshardError.Destination == "" {
break // leads to retry
}

destinationUUID, err := uuid.Parse(vshardError.Destination)
if err != nil {
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
vshardStorageClientCall, vshardError, err)
}

var loggedOnce bool
for {
idToReplicasetRef := r.getIDToReplicaset()
if _, ok := idToReplicasetRef[destinationUUID]; ok {
_, err := r.BucketSet(bucketID, destinationUUID)
if err == nil {
break // loop
}
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationUUID, err)
}

if !loggedOnce {
r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+
"update configuration", destinationUUID)
loggedOnce = true
}

const defaultPoolingPause = 50 * time.Millisecond
time.Sleep(defaultPoolingPause)

if time.Since(timeStart) > timeout {
return nil, nil, &vshardError
}
}

// leads to retry
case VShardErrNameTransferIsInProgress:
// Since lua vshard router doesn't retry here, we don't retry too.
// There is a comment why lua vshard router doesn't retry:
Expand All @@ -197,6 +224,16 @@ func (r *Router) RouterCallImpl(ctx context.Context,
default:
return nil, nil, &vshardError
}

// retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked

r.metrics().RetryOnCall("bucket_migrate")

r.log().Debugf(ctx, "retrying fnc '%s' cause got vshard error: %v", fnc, &vshardError)

// this vshardError will be returned to a caller in case of timeout
err = &vshardError
continue
}

var isVShardRespOk bool
Expand Down

0 comments on commit a8f96e4

Please sign in to comment.