Skip to content

Commit

Permalink
spring-projectsGH-3444: Add Custom TTL support for RedisLock, and Jdb…
Browse files Browse the repository at this point in the history
…cLock

Fixes: spring-projects#3444

* add `CustomTtlLock`, and `CustomTtlLockRegistry` interfaces
* Modify `RedisLockRegistry` to implement the interfaces.
* Modify `JdbcLockRegistry` to implement the interfaces.
* Modify `unlock` method of `JdbcLock` to prevent potential concurrency issue.
* Maintain existing test cases and add new test cases.
  • Loading branch information
EddieChoCho committed Mar 30, 2024
1 parent c155d5d commit 6a3d072
Show file tree
Hide file tree
Showing 22 changed files with 413 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.locks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
* A {@link Lock} implementing this interface supports the spring distributed locks with custom time-to-live value per lock
*
* @author Eddie Cho
*
* @since 6.3
*/
public interface CustomTtlLock extends Lock {

/**
* Attempt to acquire a lock with a specific time-to-live
* @param time the maximum time to wait for the lock unit
* @param unit the time unit of the time argument
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
* @throws InterruptedException -
* if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported)
*/
boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException;

/**
* Attempt to acquire a lock with a specific time-to-live
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
*/
void lock(long customTtl, TimeUnit customTtlUnit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* A {@link LockRegistry} implementing this interface supports the CustomTtlLock
*
* @author Eddie Cho
*
* @since 6.3
*/
package org.springframework.integration.support.locks;

public interface CustomTtlLockRegistry extends LockRegistry {

CustomTtlLock obtainCustomTtlLock(Object lockKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* @author Gary Russell
* @author Alexandre Strubel
* @author Ruslan Stelmachenko
* @author Eddie Cho
*
* @since 4.3
*/
Expand All @@ -76,19 +77,12 @@ public class DefaultLockRepository
*/
public static final String DEFAULT_TABLE_PREFIX = "INT_";

/**
* Default value for the time-to-live property.
*/
public static final Duration DEFAULT_TTL = Duration.ofSeconds(10);

private final String id;

private final JdbcTemplate template;

private final AtomicBoolean started = new AtomicBoolean();

private Duration ttl = DEFAULT_TTL;

private String prefix = DEFAULT_TABLE_PREFIX;

private String region = "DEFAULT";
Expand All @@ -100,7 +94,7 @@ public class DefaultLockRepository

private String deleteExpiredQuery = """
DELETE FROM %sLOCK
WHERE REGION=? AND CREATED_DATE<?
WHERE REGION=? AND EXPIRED_AFTER<?
""";

private String deleteAllQuery = """
Expand All @@ -110,24 +104,24 @@ public class DefaultLockRepository

private String updateQuery = """
UPDATE %sLOCK
SET CLIENT_ID=?, CREATED_DATE=?
WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE<?)
SET CLIENT_ID=?, EXPIRED_AFTER=?
WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR EXPIRED_AFTER<?)
""";

private String insertQuery = """
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE)
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER)
VALUES (?, ?, ?, ?)
""";

private String countQuery = """
SELECT COUNT(REGION)
FROM %sLOCK
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND CREATED_DATE>=?
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND EXPIRED_AFTER>=?
""";

private String renewQuery = """
UPDATE %sLOCK
SET CREATED_DATE=?
SET EXPIRED_AFTER=?
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
""";

Expand Down Expand Up @@ -188,14 +182,6 @@ public void setPrefix(String prefix) {
this.prefix = prefix;
}

/**
* Specify the time (in milliseconds) to expire deadlocks.
* @param timeToLive the time to expire deadlocks.
*/
public void setTimeToLive(int timeToLive) {
this.ttl = Duration.ofMillis(timeToLive);
}

/**
* Set a {@link PlatformTransactionManager} for operations.
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
Expand All @@ -219,8 +205,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
* <pre class="code">
* {@code
* UPDATE %sLOCK
* SET CLIENT_ID=?, CREATED_DATE=?
* WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE<?)
* SET CLIENT_ID=?, EXPIRED_AFTER=?
* WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR EXPIRED_AFTER<?)
* }
* </pre>
* @param updateQuery the query to update a lock record.
Expand All @@ -247,7 +233,7 @@ public String getUpdateQuery() {
* Set a custom {@code INSERT} query for a lock record.
* The {@link #getInsertQuery()} can be used as a template for customization.
* The default query is
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)}.
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER) VALUES (?, ?, ?, ?)}.
* For example a PostgreSQL {@code ON CONFLICT DO NOTHING} hint can be provided like this:
* <pre class="code">
* {@code
Expand Down Expand Up @@ -281,7 +267,7 @@ public String getInsertQuery() {
* <pre class="code">
* {@code
* UPDATE %sLOCK
* SET CREATED_DATE=?
* SET EXPIRED_AFTER=?
* WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
* }
* </pre>
Expand Down Expand Up @@ -389,23 +375,23 @@ public void close() {
}

@Override
public void delete(String lock) {
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
public boolean delete(String lock) {
return this.defaultTransactionTemplate.execute(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)) > 0;
}

@Override
public boolean acquire(String lock) {
public boolean acquire(String lock, Duration ttlDuration) {
Boolean result =
this.readCommittedTransactionTemplate.execute(
transactionStatus -> {
if (this.template.update(this.updateQuery, this.id, epochMillis(),
this.region, lock, this.id, ttlEpochMillis()) > 0) {
if (this.template.update(this.updateQuery, this.id, ttlEpochMillis(ttlDuration),
this.region, lock, this.id, epochMillis()) > 0) {
return true;
}
try {
return this.template.update(this.insertQuery, this.region, lock, this.id,
epochMillis()) > 0;
ttlEpochMillis(ttlDuration)) > 0;
}
catch (DataIntegrityViolationException ex) {
return false;
Expand All @@ -420,27 +406,27 @@ public boolean isAcquired(String lock) {
transactionStatus ->
Integer.valueOf(1).equals(
this.template.queryForObject(this.countQuery,
Integer.class, this.region, lock, this.id, ttlEpochMillis())));
Integer.class, this.region, lock, this.id, epochMillis())));
return Boolean.TRUE.equals(result);
}

@Override
public void deleteExpired() {
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus ->
this.template.update(this.deleteExpiredQuery, this.region, ttlEpochMillis()));
this.template.update(this.deleteExpiredQuery, this.region, epochMillis()));
}

@Override
public boolean renew(String lock) {
public boolean renew(String lock, Duration ttlDuration) {
final Boolean result = this.defaultTransactionTemplate.execute(
transactionStatus ->
this.template.update(this.renewQuery, epochMillis(), this.region, lock, this.id) > 0);
this.template.update(this.renewQuery, ttlEpochMillis(ttlDuration), this.region, lock, this.id) > 0);
return Boolean.TRUE.equals(result);
}

private Timestamp ttlEpochMillis() {
return Timestamp.valueOf(currentTime().minus(this.ttl));
private Timestamp ttlEpochMillis(Duration ttl) {
return Timestamp.valueOf(currentTime().plus(ttl));
}

private static Timestamp epochMillis() {
Expand Down
Loading

0 comments on commit 6a3d072

Please sign in to comment.