Skip to content

Commit 2bee798

Browse files
committed
Correctly apply Reactive zadd NX/XX command flags.
We now correctly apply if exists/if not exists constraints on the reactive zadd command. Previously, we only considered upsert which wasn't sufficient to apply xx/nx. Closes #2731
1 parent 3a5624b commit 2bee798

File tree

3 files changed

+230
-55
lines changed

3 files changed

+230
-55
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java

+55-24
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import java.util.ArrayList;
2626
import java.util.Collection;
2727
import java.util.Collections;
28+
import java.util.EnumSet;
2829
import java.util.List;
2930
import java.util.Optional;
31+
import java.util.Set;
3032
import java.util.concurrent.TimeUnit;
3133
import java.util.function.Function;
3234

@@ -37,6 +39,7 @@
3739
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
3840
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
3941
import org.springframework.data.redis.connection.RedisZSetCommands.Weights;
42+
import org.springframework.data.redis.connection.RedisZSetCommands.ZAddArgs.Flag;
4043
import org.springframework.data.redis.core.ScanOptions;
4144
import org.springframework.data.redis.util.ByteUtils;
4245
import org.springframework.lang.Nullable;
@@ -61,23 +64,16 @@ public interface ReactiveZSetCommands {
6164
class ZAddCommand extends KeyCommand {
6265

6366
private final List<Tuple> tuples;
64-
private final boolean upsert;
65-
private final boolean returnTotalChanged;
67+
private final Set<Flag> flags;
6668
private final boolean incr;
67-
private final boolean gt;
68-
private final boolean lt;
6969

70-
private ZAddCommand(@Nullable ByteBuffer key, List<Tuple> tuples, boolean upsert, boolean returnTotalChanged,
71-
boolean incr, boolean gt, boolean lt) {
70+
private ZAddCommand(@Nullable ByteBuffer key, List<Tuple> tuples, Set<Flag> flags, boolean incr) {
7271

7372
super(key);
7473

7574
this.tuples = tuples;
76-
this.upsert = upsert;
77-
this.returnTotalChanged = returnTotalChanged;
75+
this.flags = flags;
7876
this.incr = incr;
79-
this.gt = gt;
80-
this.lt = lt;
8177
}
8278

8379
/**
@@ -103,7 +99,7 @@ public static ZAddCommand tuples(Collection<? extends Tuple> tuples) {
10399

104100
Assert.notNull(tuples, "Tuples must not be null!");
105101

106-
return new ZAddCommand(null, new ArrayList<>(tuples), false, false, false, false, false);
102+
return new ZAddCommand(null, new ArrayList<>(tuples), EnumSet.noneOf(Flag.class), false);
107103
}
108104

109105
/**
@@ -116,7 +112,7 @@ public ZAddCommand to(ByteBuffer key) {
116112

117113
Assert.notNull(key, "Key must not be null!");
118114

119-
return new ZAddCommand(key, tuples, upsert, returnTotalChanged, incr, gt, lt);
115+
return new ZAddCommand(key, tuples, flags, incr);
120116
}
121117

122118
/**
@@ -126,7 +122,11 @@ public ZAddCommand to(ByteBuffer key) {
126122
* @return a new {@link ZAddCommand} with {@literal xx} applied.
127123
*/
128124
public ZAddCommand xx() {
129-
return new ZAddCommand(getKey(), tuples, false, returnTotalChanged, incr, gt, lt);
125+
126+
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
127+
flags.remove(Flag.NX);
128+
flags.add(Flag.XX);
129+
return new ZAddCommand(getKey(), tuples, flags, incr);
130130
}
131131

132132
/**
@@ -136,7 +136,11 @@ public ZAddCommand xx() {
136136
* @return a new {@link ZAddCommand} with {@literal nx} applied.
137137
*/
138138
public ZAddCommand nx() {
139-
return new ZAddCommand(getKey(), tuples, true, returnTotalChanged, incr, gt, lt);
139+
140+
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
141+
flags.remove(Flag.XX);
142+
flags.add(Flag.NX);
143+
return new ZAddCommand(getKey(), tuples, flags, incr);
140144
}
141145

142146
/**
@@ -146,17 +150,20 @@ public ZAddCommand nx() {
146150
* @return a new {@link ZAddCommand} with {@literal ch} applied.
147151
*/
148152
public ZAddCommand ch() {
149-
return new ZAddCommand(getKey(), tuples, upsert, true, incr, gt, lt);
153+
154+
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
155+
flags.add(Flag.CH);
156+
return new ZAddCommand(getKey(), tuples, flags, incr);
150157
}
151158

152159
/**
153160
* Applies {@literal incr} mode (When this option is specified ZADD acts like ZINCRBY). Constructs a new command
154-
* instance with all previously configured properties.
161+
* instance with all previously configured properties. Note that the command result returns the score of the member.
155162
*
156163
* @return a new {@link ZAddCommand} with {@literal incr} applied.
157164
*/
158165
public ZAddCommand incr() {
159-
return new ZAddCommand(getKey(), tuples, upsert, upsert, true, gt, lt);
166+
return new ZAddCommand(getKey(), tuples, flags, true);
160167
}
161168

162169
/**
@@ -166,7 +173,11 @@ public ZAddCommand incr() {
166173
* @since 2.5
167174
*/
168175
public ZAddCommand gt() {
169-
return new ZAddCommand(getKey(), tuples, upsert, upsert, incr, true, lt);
176+
177+
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
178+
flags.remove(Flag.LT);
179+
flags.add(Flag.GT);
180+
return new ZAddCommand(getKey(), tuples, flags, incr);
170181
}
171182

172183
/**
@@ -176,7 +187,11 @@ public ZAddCommand gt() {
176187
* @since 2.5
177188
*/
178189
public ZAddCommand lt() {
179-
return new ZAddCommand(getKey(), tuples, upsert, upsert, incr, gt, true);
190+
191+
EnumSet<Flag> flags = EnumSet.copyOf(this.flags);
192+
flags.remove(Flag.GT);
193+
flags.add(Flag.LT);
194+
return new ZAddCommand(getKey(), tuples, flags, incr);
180195
}
181196

182197
/**
@@ -187,10 +202,26 @@ public List<Tuple> getTuples() {
187202
}
188203

189204
/**
190-
* @return
205+
* @return {@code true} if the command does not contain NX or XX flags.
191206
*/
192207
public boolean isUpsert() {
193-
return upsert;
208+
return !flags.contains(Flag.NX) && !flags.contains(Flag.XX);
209+
}
210+
211+
/**
212+
* @return {@code true} if the command contains the XX flag.
213+
* @since 2.7.17
214+
*/
215+
public boolean isIfExists() {
216+
return flags.contains(Flag.XX);
217+
}
218+
219+
/**
220+
* @return {@code true} if the command contains the NX flag.
221+
* @since 2.7.17
222+
*/
223+
public boolean isIfNotExists() {
224+
return flags.contains(Flag.NX);
194225
}
195226

196227
/**
@@ -205,22 +236,22 @@ public boolean isIncr() {
205236
* @since 2.5
206237
*/
207238
public boolean isGt() {
208-
return gt;
239+
return flags.contains(Flag.GT);
209240
}
210241

211242
/**
212243
* @return {@literal true} if {@literal LT} is set.
213244
* @since 2.5
214245
*/
215246
public boolean isLt() {
216-
return lt;
247+
return flags.contains(Flag.LT);
217248
}
218249

219250
/**
220251
* @return
221252
*/
222253
public boolean isReturnTotalChanged() {
223-
return returnTotalChanged;
254+
return flags.contains(Flag.CH);
224255
}
225256
}
226257

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java

+26-31
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.lettuce.core.Value;
2222
import io.lettuce.core.ZAddArgs;
2323
import io.lettuce.core.ZStoreArgs;
24-
import org.springframework.data.redis.core.TimeoutUtils;
2524
import reactor.core.publisher.Flux;
2625
import reactor.core.publisher.Mono;
2726

@@ -30,7 +29,6 @@
3029
import java.util.concurrent.TimeUnit;
3130

3231
import org.reactivestreams.Publisher;
33-
3432
import org.springframework.data.domain.Sort.Direction;
3533
import org.springframework.data.redis.connection.DefaultTuple;
3634
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
@@ -41,6 +39,7 @@
4139
import org.springframework.data.redis.connection.ReactiveZSetCommands;
4240
import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate;
4341
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
42+
import org.springframework.data.redis.core.TimeoutUtils;
4443
import org.springframework.data.redis.util.ByteUtils;
4544
import org.springframework.lang.Nullable;
4645
import org.springframework.util.Assert;
@@ -84,36 +83,32 @@ public Flux<NumericResponse<ZAddCommand, Number>> zAdd(Publisher<ZAddCommand> co
8483

8584
ZAddArgs args = null;
8685

87-
if (command.isIncr() || command.isUpsert() || command.isReturnTotalChanged()) {
86+
if (command.isIncr()) {
8887

89-
if (command.isIncr()) {
90-
91-
if (command.getTuples().size() > 1) {
92-
throw new IllegalArgumentException("ZADD INCR must not contain more than one tuple!");
93-
}
88+
if (command.getTuples().size() > 1) {
89+
throw new IllegalArgumentException("ZADD INCR must not contain more than one tuple!");
90+
}
9491

95-
Tuple tuple = command.getTuples().iterator().next();
92+
Tuple tuple = command.getTuples().iterator().next();
9693

97-
return cmd.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue()))
98-
.map(value -> new NumericResponse<>(command, value));
99-
}
94+
return cmd.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue()))
95+
.map(value -> new NumericResponse<>(command, value));
96+
}
10097

101-
if (command.isReturnTotalChanged()) {
102-
args = ZAddArgs.Builder.ch();
103-
}
98+
if (command.isReturnTotalChanged()) {
99+
args = ZAddArgs.Builder.ch();
100+
}
104101

105-
if (command.isUpsert()) {
106-
args = args == null ? ZAddArgs.Builder.nx() : args.nx();
107-
} else {
108-
args = args == null ? ZAddArgs.Builder.xx() : args.xx();
109-
}
102+
if (command.isIfNotExists()) {
103+
args = args == null ? ZAddArgs.Builder.nx() : args.nx();
104+
} else if (command.isIfExists()) {
105+
args = args == null ? ZAddArgs.Builder.xx() : args.xx();
106+
}
110107

111-
if (command.isGt()) {
112-
args = args == null ? ZAddArgs.Builder.gt() : args.gt();
113-
}
114-
if (command.isLt()) {
115-
args = args == null ? ZAddArgs.Builder.lt() : args.lt();
116-
}
108+
if (command.isGt()) {
109+
args = args == null ? ZAddArgs.Builder.gt() : args.gt();
110+
} else if (command.isLt()) {
111+
args = args == null ? ZAddArgs.Builder.lt() : args.lt();
117112
}
118113

119114
ScoredValue<ByteBuffer>[] values = (ScoredValue<ByteBuffer>[]) command.getTuples().stream()
@@ -161,7 +156,7 @@ public Flux<NumericResponse<ZIncrByCommand, Double>> zIncrBy(Publisher<ZIncrByCo
161156
}));
162157
}
163158

164-
/*
159+
/*
165160
* (non-Javadoc)
166161
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zRandMember(Publisher)
167162
*/
@@ -177,7 +172,7 @@ public Flux<CommandResponse<ZRandMemberCommand, Flux<ByteBuffer>>> zRandMember(
177172
}));
178173
}
179174

180-
/*
175+
/*
181176
* (non-Javadoc)
182177
* @see org.springframework.data.redis.connection.ReactiveZSetCommands#zRandMemberWithScore(Publisher)
183178
*/
@@ -189,8 +184,8 @@ public Flux<CommandResponse<ZRandMemberCommand, Flux<Tuple>>> zRandMemberWithSco
189184

190185
Assert.notNull(command.getKey(), "Key must not be null!");
191186

192-
return new CommandResponse<>(command, cmd.zrandmemberWithScores(command.getKey(), command.getCount())
193-
.map(this::toTuple));
187+
return new CommandResponse<>(command,
188+
cmd.zrandmemberWithScores(command.getKey(), command.getCount()).map(this::toTuple));
194189
}));
195190
}
196191

@@ -414,7 +409,7 @@ public Flux<CommandResponse<BZPopCommand, Flux<Tuple>>> bZPop(Publisher<BZPopCom
414409
Assert.notNull(command.getKey(), "Key must not be null!");
415410
Assert.notNull(command.getTimeout(), "Timeout must not be null!");
416411

417-
if(command.getTimeUnit() == TimeUnit.MILLISECONDS) {
412+
if (command.getTimeUnit() == TimeUnit.MILLISECONDS) {
418413

419414
double timeout = TimeoutUtils.toDoubleSeconds(command.getTimeout(), command.getTimeUnit());
420415

0 commit comments

Comments
 (0)