Skip to content

Commit

Permalink
Merge pull request #18904 from serathius/robustness-duplicated-puts-2
Browse files Browse the repository at this point in the history
Robustness duplicated puts 2
  • Loading branch information
serathius authored Nov 17, 2024
2 parents 3507061 + 6d00e7b commit ee789c9
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 104 deletions.
46 changes: 23 additions & 23 deletions tests/robustness/validate/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,56 +46,56 @@ func TestValidateSerializableOperations(t *testing.T) {
},
{
Input: rangeRequest("a", "z", 2, 0),
Output: rangeResponse(1, keyValue("a", "1", 2)),
Output: rangeResponse(1, keyValueRevision("a", "1", 2)),
},
{
Input: rangeRequest("a", "z", 3, 0),
Output: rangeResponse(2,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
),
},
{
Input: rangeRequest("a", "z", 4, 0),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValue("c", "3", 4),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("a", "z", 4, 3),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValue("c", "3", 4),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("a", "z", 4, 4),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValue("c", "3", 4),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("a", "z", 4, 2),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
),
},
{
Input: rangeRequest("b\x00", "z", 4, 2),
Output: rangeResponse(1,
keyValue("c", "3", 4),
keyValueRevision("c", "3", 4),
),
},
{
Input: rangeRequest("b", "", 4, 0),
Output: rangeResponse(1,
keyValue("b", "2", 3),
keyValueRevision("b", "2", 3),
),
},
{
Expand All @@ -115,9 +115,9 @@ func TestValidateSerializableOperations(t *testing.T) {
{
Input: rangeRequest("a", "z", 4, 0),
Output: rangeResponse(3,
keyValue("c", "3", 4),
keyValue("b", "2", 3),
keyValue("a", "1", 2),
keyValueRevision("c", "3", 4),
keyValueRevision("b", "2", 3),
keyValueRevision("a", "1", 2),
),
},
},
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestValidateSerializableOperations(t *testing.T) {
{
Input: rangeRequest("a", "z", 2, 0),
Output: rangeResponse(3,
keyValue("b", "2", 3),
keyValueRevision("b", "2", 3),
),
},
},
Expand All @@ -166,8 +166,8 @@ func TestValidateSerializableOperations(t *testing.T) {
{
Input: rangeRequest("a", "z", 2, 0),
Output: rangeResponse(3,
keyValue("a", "1", 2),
keyValue("b", "2", 3),
keyValueRevision("a", "1", 2),
keyValueRevision("b", "2", 3),
),
},
},
Expand Down Expand Up @@ -284,7 +284,7 @@ func errorResponse(err error) model.MaybeEtcdResponse {
}
}

func keyValue(key, value string, rev int64) model.KeyValue {
func keyValueRevision(key, value string, rev int64) model.KeyValue {
return model.KeyValue{
Key: key,
ValueRevision: model.ValueRevision{
Expand Down
121 changes: 75 additions & 46 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

func patchLinearizableOperations(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation {
allOperations := relevantOperations(reports)
uniqueEvents := uniqueWatchEvents(reports)
operationsReturnTime := persistedOperationsReturnTime(allOperations, persistedRequests)
return patchOperations(allOperations, uniqueEvents, operationsReturnTime)
putRevision := putRevision(reports)
putReturnTimeFromWatch := putReturnTimeFromWatch(reports)
putReturnTimeFromPersisted := putReturnTimeFromPersistedOperations(allOperations, persistedRequests)
return patchOperations(allOperations, putRevision, putReturnTimeFromWatch, putReturnTimeFromPersisted)
}

func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
Expand All @@ -46,25 +46,53 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
return ops
}

func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]client.TimedWatchEvent {
persisted := map[model.Event]client.TimedWatchEvent{}
for _, r := range reports {
for _, op := range r.Watch {
for _, resp := range op.Responses {
func putReturnTimeFromWatch(reports []report.ClientReport) map[keyValue]int64 {
earliestTime := map[keyValue]int64{}
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
for _, event := range resp.Events {
responseTime := resp.Time
if prev, found := persisted[event.Event]; found && prev.Time < responseTime {
responseTime = prev.Time
switch event.Type {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
if t, ok := earliestTime[kv]; !ok || t > resp.Time.Nanoseconds() {
earliestTime[kv] = resp.Time.Nanoseconds()
}
case model.DeleteOperation:
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
persisted[event.Event] = client.TimedWatchEvent{Time: responseTime, WatchEvent: event}
}
}
}
}
return persisted
return earliestTime
}

func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation {
func putRevision(reports []report.ClientReport) map[keyValue]int64 {
requestRevision := map[keyValue]int64{}
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
for _, event := range resp.Events {
switch event.Type {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
requestRevision[kv] = event.Revision
case model.DeleteOperation:
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
}
}
}
}
return requestRevision
}

func patchOperations(operations []porcupine.Operation, watchRevision, putReturnTimeFromWatch, putReturnTimeFromPersisted map[keyValue]int64) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))

for _, op := range operations {
Expand All @@ -75,30 +103,21 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
newOperations = append(newOperations, op)
continue
}
var resourceVersion int64
var txnRevision int64
var persisted bool
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch etcdOp.Type {
case model.PutOperation:
event, ok := watchEvents[model.Event{
Type: etcdOp.Type,
Key: etcdOp.Put.Key,
Value: etcdOp.Put.Value,
}]
if ok {
eventTime := event.Time.Nanoseconds()
// Set revision and time based on watchEvent.
if eventTime < op.Return {
op.Return = eventTime
}
resourceVersion = event.Revision
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if revision, ok := watchRevision[kv]; ok {
txnRevision = revision
}
if returnTime, found := persistedOperations[etcdOp]; found {
if returnTime, ok := putReturnTimeFromWatch[kv]; ok {
op.Return = min(op.Return, returnTime)
}
if returnTime, ok := putReturnTimeFromPersisted[kv]; ok {
persisted = true
// Set return time based on persisted return time.
if returnTime < op.Return {
op.Return = returnTime
}
op.Return = min(op.Return, returnTime)
}
case model.DeleteOperation:
case model.RangeOperation:
Expand All @@ -111,8 +130,8 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
// Remove non persisted operations
continue
} else {
if resourceVersion != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: resourceVersion}
if txnRevision != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision}
} else {
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
Expand All @@ -125,7 +144,11 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
}

func isUniqueTxn(request *model.TxnRequest) bool {
return (hasUniqueWriteOperation(request.OperationsOnSuccess) || !hasWriteOperation(request.OperationsOnSuccess)) && (hasUniqueWriteOperation(request.OperationsOnFailure) || !hasWriteOperation(request.OperationsOnFailure))
return isUniqueOps(request.OperationsOnSuccess) && isUniqueOps(request.OperationsOnFailure)
}

func isUniqueOps(ops []model.EtcdOperation) bool {
return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops)
}

func hasWriteOperation(ops []model.EtcdOperation) bool {
Expand All @@ -146,11 +169,11 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation) bool {
return false
}

func persistedOperationsReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[model.EtcdOperation]int64 {
operationReturnTime := operationReturnTime(allOperations)
persisted := map[model.EtcdOperation]int64{}
func putReturnTimeFromPersistedOperations(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[keyValue]int64 {
putReturnTimes := putReturnTime(allOperations)
persisted := map[keyValue]int64{}

lastReturnTime := maxReturnTime(operationReturnTime)
lastReturnTime := maxReturnTime(putReturnTimes)

for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
Expand All @@ -162,14 +185,15 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
if op.Type != model.PutOperation {
continue
}
if _, found := persisted[op]; found {
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
if _, found := persisted[kv]; found {
panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op))
}
hasPut = true
persisted[op] = lastReturnTime
persisted[kv] = lastReturnTime
}
if hasPut {
newReturnTime := requestReturnTime(operationReturnTime, request)
newReturnTime := returnTimeFromRequest(putReturnTimes, request)
if newReturnTime != -1 {
lastReturnTime = min(lastReturnTime, newReturnTime)
}
Expand All @@ -184,7 +208,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
return persisted
}

func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 {
func putReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 {
newOperations := map[model.EtcdOperation]int64{}
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
Expand Down Expand Up @@ -220,14 +244,14 @@ func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 {
return maxReturnTime
}

func requestReturnTime(operationTime map[model.EtcdOperation]int64, request model.EtcdRequest) int64 {
func returnTimeFromRequest(putReturnTimes map[model.EtcdOperation]int64, request model.EtcdRequest) int64 {
switch request.Type {
case model.Txn:
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if op.Type != model.PutOperation {
continue
}
if time, found := operationTime[op]; found {
if time, found := putReturnTimes[op]; found {
return time
}
}
Expand All @@ -236,3 +260,8 @@ func requestReturnTime(operationTime map[model.EtcdOperation]int64, request mode
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}

type keyValue struct {
Key string
Value model.ValueOrHash
}
Loading

0 comments on commit ee789c9

Please sign in to comment.