Skip to content

Commit

Permalink
INTERNAL: integrate piped operation callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla authored and jhpark816 committed Jan 24, 2025
1 parent 99a2dda commit 7f5c781
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 79 deletions.
44 changes: 23 additions & 21 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,14 @@
import net.spy.memcached.ops.BTreeInsertAndGetOperation;
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
import net.spy.memcached.ops.BTreeSortMergeGetOperationOld;
import net.spy.memcached.ops.CollectionBulkInsertOperation;
import net.spy.memcached.ops.CollectionGetOperation;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.CollectionPipedExistOperation;
import net.spy.memcached.ops.CollectionPipedInsertOperation;
import net.spy.memcached.ops.CollectionPipedUpdateOperation;
import net.spy.memcached.ops.GetAttrOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.PipedOperationCallback;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.plugin.FrontCacheMemcachedClient;
import net.spy.memcached.transcoders.CollectionTranscoder;
Expand Down Expand Up @@ -823,7 +820,7 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final int idx = i;

Operation op = opFact.collectionPipedUpdate(key, update,
new CollectionPipedUpdateOperation.Callback() {
new PipedOperationCallback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
Expand All @@ -844,12 +841,14 @@ public void complete() {

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
if (!status.isSuccess()) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
}
});
Expand Down Expand Up @@ -2947,7 +2946,7 @@ <T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
latch, operationTimeout);

Operation op = opFact.collectionPipedExist(key, exist,
new CollectionPipedExistOperation.Callback() {
new PipedOperationCallback() {

private final Map<T, Boolean> result = new HashMap<>();
private boolean hasAnError = false;
Expand Down Expand Up @@ -3078,7 +3077,7 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
final int idx = i;

Operation op = opFact.collectionPipedInsert(key, insert,
new CollectionPipedInsertOperation.Callback() {
new PipedOperationCallback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;
Expand All @@ -3099,12 +3098,14 @@ public void complete() {

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
if (!status.isSuccess()) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
}
});
Expand Down Expand Up @@ -3293,7 +3294,7 @@ private <T> Future<Map<String, CollectionOperationStatus>> asyncCollectionInsert

for (final CollectionBulkInsert<T> insert : insertList) {
Operation op = opFact.collectionBulkInsert(
insert, new CollectionBulkInsertOperation.Callback() {
insert, new PipedOperationCallback() {
public void receivedStatus(OperationStatus status) {
// Nothing to do here because the user MUST search the result Map instance.
}
Expand All @@ -3302,8 +3303,9 @@ public void complete() {
latch.countDown();
}

public void gotStatus(String key, OperationStatus status) {
public void gotStatus(Integer index, OperationStatus status) {
if (!status.isSuccess()) {
String key = insert.getKey(index);
if (status instanceof CollectionOperationStatus) {
rv.addFailedResult(key, (CollectionOperationStatus) status);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ public interface CollectionBulkInsertOperation extends KeyedOperation {

CollectionBulkInsert<?> getInsert();

interface Callback extends OperationCallback {
void gotStatus(String key, OperationStatus status);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ public interface CollectionPipedExistOperation extends KeyedOperation {

SetPipedExist<?> getExist();

interface Callback extends OperationCallback {
void gotStatus(Integer index, OperationStatus status);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ public interface CollectionPipedInsertOperation extends KeyedOperation {

CollectionPipedInsert<?> getInsert();

interface Callback extends OperationCallback {
void gotStatus(Integer index, OperationStatus status);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ public interface CollectionPipedUpdateOperation extends KeyedOperation {

CollectionPipedUpdate<?> getUpdate();

interface Callback extends OperationCallback {
void gotStatus(Integer index, OperationStatus status);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package net.spy.memcached.ops;

public class MultiCollectionBulkInsertOperationCallback extends MultiOperationCallback
implements CollectionBulkInsertOperation.Callback {
implements PipedOperationCallback {

public MultiCollectionBulkInsertOperationCallback(OperationCallback original, int todo) {
super(original, todo);
}

@Override
public void gotStatus(String key, OperationStatus status) {
((CollectionBulkInsertOperation.Callback) originalCallback).gotStatus(key, status);
public void gotStatus(Integer index, OperationStatus status) {
((PipedOperationCallback) originalCallback).gotStatus(index, status);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package net.spy.memcached.ops;

public interface PipedOperationCallback extends OperationCallback {
void gotStatus(Integer index, OperationStatus status);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
import net.spy.memcached.ops.PipedOperationCallback;

/**
* Operation to store collection data in a memcached server.
Expand Down Expand Up @@ -62,7 +63,7 @@ public final class CollectionBulkInsertOperationImpl extends OperationImpl
false, "BKEY_MISMATCH", CollectionResponse.BKEY_MISMATCH);

private final CollectionBulkInsert<?> insert;
private final CollectionBulkInsertOperation.Callback cb;
private final PipedOperationCallback cb;

private int count;
private int index = 0;
Expand All @@ -71,7 +72,7 @@ public final class CollectionBulkInsertOperationImpl extends OperationImpl
public CollectionBulkInsertOperationImpl(CollectionBulkInsert<?> insert, OperationCallback cb) {
super(cb);
this.insert = insert;
this.cb = (Callback) cb;
this.cb = (PipedOperationCallback) cb;
if (this.insert instanceof CollectionBulkInsert.ListBulkInsert) {
setAPIType(APIType.LOP_INSERT);
} else if (this.insert instanceof CollectionBulkInsert.SetBulkInsert) {
Expand Down Expand Up @@ -111,9 +112,10 @@ assert getState() == OperationState.READING
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
if (!status.isSuccess()) {
cb.gotStatus(insert.getKey(index), status);
successAll = false;
}

cb.gotStatus(index, status);
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
Expand Down Expand Up @@ -151,10 +153,10 @@ assert getState() == OperationState.READING
TYPE_MISMATCH, BKEY_MISMATCH);

if (!status.isSuccess()) {
cb.gotStatus(insert.getKey(index), status);
successAll = false;
}

cb.gotStatus(index, status);
index++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
import net.spy.memcached.ops.PipedOperationCallback;

public final class CollectionPipedExistOperationImpl extends OperationImpl implements
CollectionPipedExistOperation {
Expand All @@ -56,18 +57,19 @@ public final class CollectionPipedExistOperationImpl extends OperationImpl imple

private final String key;
private final SetPipedExist<?> setPipedExist;
private final CollectionPipedExistOperation.Callback cb;
private final PipedOperationCallback cb;

private int count;
private int index = 0;
private boolean successAll = true;

public CollectionPipedExistOperationImpl(String key,
SetPipedExist<?> collectionExist, OperationCallback cb) {
SetPipedExist<?> collectionExist,
OperationCallback cb) {
super(cb);
this.key = key;
this.setPipedExist = collectionExist;
this.cb = (Callback) cb;
this.cb = (PipedOperationCallback) cb;
if (this.setPipedExist instanceof SetPipedExist) {
setAPIType(APIType.SOP_EXIST);
}
Expand Down Expand Up @@ -95,8 +97,12 @@ assert getState() == OperationState.READING : "Read ``" + line
if (setPipedExist.isNotPiped()) {
OperationStatus status = matchStatus(line, EXIST, NOT_EXIST,
NOT_FOUND, TYPE_MISMATCH, UNREADABLE);
if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
cb.receivedStatus(status.isSuccess() ? END : FAILED_END);
cb.receivedStatus(successAll ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
}
Expand Down Expand Up @@ -134,6 +140,7 @@ assert getState() == OperationState.READING : "Read ``" + line
if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
index++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
import net.spy.memcached.ops.PipedOperationCallback;

/**
* Operation to store collection data in a memcached server.
Expand Down Expand Up @@ -67,19 +68,20 @@ public final class CollectionPipedInsertOperationImpl extends OperationImpl

private final String key;
private final CollectionPipedInsert<?> insert;
private final CollectionPipedInsertOperation.Callback cb;
private final PipedOperationCallback cb;

private int count;
private int index = 0;
private boolean successAll = true;
private boolean readUntilLastLine = false;

public CollectionPipedInsertOperationImpl(String key,
CollectionPipedInsert<?> insert, OperationCallback cb) {
CollectionPipedInsert<?> insert,
OperationCallback cb) {
super(cb);
this.key = key;
this.insert = insert;
this.cb = (Callback) cb;
this.cb = (PipedOperationCallback) cb;
if (this.insert instanceof CollectionPipedInsert.ListPipedInsert) {
setAPIType(APIType.LOP_INSERT);
} else if (this.insert instanceof CollectionPipedInsert.SetPipedInsert) {
Expand Down Expand Up @@ -124,12 +126,12 @@ assert getState() == OperationState.READING
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
TYPE_MISMATCH, BKEY_MISMATCH);
if (status.isSuccess()) {
cb.receivedStatus((successAll) ? END : FAILED_END);
} else {
cb.gotStatus(index, status);
cb.receivedStatus(FAILED_END);
if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
}
Expand Down Expand Up @@ -167,9 +169,10 @@ assert getState() == OperationState.READING
TYPE_MISMATCH, BKEY_MISMATCH);

if (!status.isSuccess()) {
cb.gotStatus(index, status);
successAll = false;
}

cb.gotStatus(index, status);
index++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.OperationType;
import net.spy.memcached.ops.PipedOperationCallback;

/**
* Operation to update collection data in a memcached server.
Expand Down Expand Up @@ -64,18 +65,19 @@ public final class CollectionPipedUpdateOperationImpl extends OperationImpl impl

private final String key;
private final CollectionPipedUpdate<?> update;
private final CollectionPipedUpdateOperation.Callback cb;
private final PipedOperationCallback cb;

private int count;
private int index = 0;
private boolean successAll = true;

public CollectionPipedUpdateOperationImpl(String key,
CollectionPipedUpdate<?> update, OperationCallback cb) {
CollectionPipedUpdate<?> update,
OperationCallback cb) {
super(cb);
this.key = key;
this.update = update;
this.cb = (Callback) cb;
this.cb = (PipedOperationCallback) cb;
if (this.update instanceof BTreePipedUpdate) {
setAPIType(APIType.BOP_UPDATE);
} else if (this.update instanceof MapPipedUpdate) {
Expand Down Expand Up @@ -113,12 +115,12 @@ assert getState() == OperationState.READING : "Read ``" + line
OperationStatus status = matchStatus(line, UPDATED, NOT_FOUND,
NOT_FOUND_ELEMENT, NOTHING_TO_UPDATE, TYPE_MISMATCH,
BKEY_MISMATCH, EFLAG_MISMATCH);
if (status.isSuccess()) {
cb.receivedStatus((successAll) ? END : FAILED_END);
} else {
cb.gotStatus(index, status);
cb.receivedStatus(FAILED_END);
if (!status.isSuccess()) {
successAll = false;
}

cb.gotStatus(index, status);
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
}
Expand Down Expand Up @@ -155,9 +157,10 @@ assert getState() == OperationState.READING : "Read ``" + line
BKEY_MISMATCH, EFLAG_MISMATCH);

if (!status.isSuccess()) {
cb.gotStatus(index, status);
successAll = false;
}

cb.gotStatus(index, status);
index++;
}
}
Expand Down
Loading

0 comments on commit 7f5c781

Please sign in to comment.