From e82814bb29b6362870706a3999111d0cee57d3fe Mon Sep 17 00:00:00 2001 From: EddieChoCho Date: Sat, 30 Mar 2024 14:59:47 +0100 Subject: [PATCH] GH-3444: Add Custom TTL support for RedisLock, and JdbcLock Fixes: #3444 * Add `CustomTtlLock`, and `CustomTtlLockRegistry` interfaces * Modify `RedisLockRegistry` to implement the interfaces. * Modify ddl of `INT_LOCK` table, `LockRepository`, `DefaultLockRepository`, and `JdbcLockRegistry` to implement the interfaces. * Fix potential concurrency issue of `unlock` method of `JdbcLock`. * Maintain existing test cases and add new test cases. --- .../support/locks/CustomTtlLock.java | 49 +++++++ .../support/locks/CustomTtlLockRegistry.java | 29 +++++ .../jdbc/lock/DefaultLockRepository.java | 62 ++++----- .../jdbc/lock/JdbcLockRegistry.java | 86 +++++++++++-- .../integration/jdbc/lock/LockRepository.java | 19 ++- .../integration/jdbc/schema-db2.sql | 2 +- .../integration/jdbc/schema-derby.sql | 2 +- .../integration/jdbc/schema-h2.sql | 2 +- .../integration/jdbc/schema-hsqldb.sql | 2 +- .../integration/jdbc/schema-mysql.sql | 2 +- .../integration/jdbc/schema-oracle.sql | 2 +- .../integration/jdbc/schema-postgresql.sql | 2 +- .../integration/jdbc/schema-sqlserver.sql | 2 +- .../integration/jdbc/schema-sybase.sql | 2 +- .../jdbc/lock/DefaultLockRepositoryTests.java | 9 +- .../lock/JdbcLockRegistryDelegateTests.java | 43 +++---- .../JdbcLockRegistryDifferentClientTests.java | 22 ++-- .../lock/JdbcLockRegistryTests-context.xml | 2 - .../jdbc/lock/JdbcLockRegistryTests.java | 120 ++++++++++++------ .../redis/util/RedisLockRegistry.java | 58 ++++++--- .../redis/util/RedisLockRegistryTests.java | 61 +++++++++ .../ROOT/pages/jdbc/lock-registry.adoc | 2 +- 22 files changed, 413 insertions(+), 167 deletions(-) create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLock.java create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLockRegistry.java diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLock.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLock.java new file mode 100644 index 00000000000..d6fc1b7422b --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLock.java @@ -0,0 +1,49 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.locks; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +/** + * A {@link Lock} implementing this interface supports the spring distributed locks with custom time-to-live value per lock + * + * @author Eddie Cho + * + * @since 6.3 + */ +public interface CustomTtlLock extends Lock { + + /** + * Attempt to acquire a lock with a specific time-to-live + * @param time the maximum time to wait for the lock unit + * @param unit the time unit of the time argument + * @param customTtl the specific time-to-live for the lock status data + * @param customTtlUnit the time unit of the customTtl argument + * @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired + * @throws InterruptedException - + * if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported) + */ + boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException; + + /** + * Attempt to acquire a lock with a specific time-to-live + * @param customTtl the specific time-to-live for the lock status data + * @param customTtlUnit the time unit of the customTtl argument + */ + void lock(long customTtl, TimeUnit customTtlUnit); +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLockRegistry.java new file mode 100644 index 00000000000..bbb013dd75e --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/CustomTtlLockRegistry.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A {@link LockRegistry} implementing this interface supports the CustomTtlLock + * + * @author Eddie Cho + * + * @since 6.3 + */ +package org.springframework.integration.support.locks; + +public interface CustomTtlLockRegistry extends LockRegistry { + + CustomTtlLock obtainCustomTtlLock(Object lockKey); +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java index e442a6d1259..f5c26d96147 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java @@ -62,6 +62,7 @@ * @author Gary Russell * @author Alexandre Strubel * @author Ruslan Stelmachenko + * @author Eddie Cho * * @since 4.3 */ @@ -76,19 +77,12 @@ public class DefaultLockRepository */ public static final String DEFAULT_TABLE_PREFIX = "INT_"; - /** - * Default value for the time-to-live property. - */ - public static final Duration DEFAULT_TTL = Duration.ofSeconds(10); - private final String id; private final JdbcTemplate template; private final AtomicBoolean started = new AtomicBoolean(); - private Duration ttl = DEFAULT_TTL; - private String prefix = DEFAULT_TABLE_PREFIX; private String region = "DEFAULT"; @@ -100,7 +94,7 @@ public class DefaultLockRepository private String deleteExpiredQuery = """ DELETE FROM %sLOCK - WHERE REGION=? AND CREATED_DATE=? + WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND EXPIRED_AFTER>=? """; private String renewQuery = """ UPDATE %sLOCK - SET CREATED_DATE=? + SET EXPIRED_AFTER=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? """; @@ -188,14 +182,6 @@ public void setPrefix(String prefix) { this.prefix = prefix; } - /** - * Specify the time (in milliseconds) to expire deadlocks. - * @param timeToLive the time to expire deadlocks. - */ - public void setTimeToLive(int timeToLive) { - this.ttl = Duration.ofMillis(timeToLive); - } - /** * Set a {@link PlatformTransactionManager} for operations. * Otherwise, a primary {@link PlatformTransactionManager} bean is obtained @@ -219,8 +205,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws *
 	 * {@code
 	 *  UPDATE %sLOCK
-	 * 			SET CLIENT_ID=?, CREATED_DATE=?
-	 * 			WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE
 	 * @param updateQuery the query to update a lock record.
@@ -247,7 +233,7 @@ public String getUpdateQuery() {
 	 * Set a custom {@code INSERT} query for a lock record.
 	 * The {@link #getInsertQuery()} can be used as a template for customization.
 	 * The default query is
-	 * {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)}.
+	 * {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER) VALUES (?, ?, ?, ?)}.
 	 * For example a PostgreSQL {@code ON CONFLICT DO NOTHING} hint can be provided like this:
 	 * 
 	 * {@code
@@ -281,7 +267,7 @@ public String getInsertQuery() {
 	 * 
 	 * {@code
 	 *  UPDATE %sLOCK
-	 * 			SET CREATED_DATE=?
+	 * 			SET EXPIRED_AFTER=?
 	 * 			WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
 	 * }
 	 * 
@@ -389,23 +375,23 @@ public void close() { } @Override - public void delete(String lock) { - this.defaultTransactionTemplate.executeWithoutResult( - transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)); + public boolean delete(String lock) { + return this.defaultTransactionTemplate.execute( + transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)) > 0; } @Override - public boolean acquire(String lock) { + public boolean acquire(String lock, Duration ttlDuration) { Boolean result = this.readCommittedTransactionTemplate.execute( transactionStatus -> { - if (this.template.update(this.updateQuery, this.id, epochMillis(), - this.region, lock, this.id, ttlEpochMillis()) > 0) { + if (this.template.update(this.updateQuery, this.id, ttlEpochMillis(ttlDuration), + this.region, lock, this.id, epochMillis()) > 0) { return true; } try { return this.template.update(this.insertQuery, this.region, lock, this.id, - epochMillis()) > 0; + ttlEpochMillis(ttlDuration)) > 0; } catch (DataIntegrityViolationException ex) { return false; @@ -420,7 +406,7 @@ public boolean isAcquired(String lock) { transactionStatus -> Integer.valueOf(1).equals( this.template.queryForObject(this.countQuery, - Integer.class, this.region, lock, this.id, ttlEpochMillis()))); + Integer.class, this.region, lock, this.id, epochMillis()))); return Boolean.TRUE.equals(result); } @@ -428,19 +414,19 @@ public boolean isAcquired(String lock) { public void deleteExpired() { this.defaultTransactionTemplate.executeWithoutResult( transactionStatus -> - this.template.update(this.deleteExpiredQuery, this.region, ttlEpochMillis())); + this.template.update(this.deleteExpiredQuery, this.region, epochMillis())); } @Override - public boolean renew(String lock) { + public boolean renew(String lock, Duration ttlDuration) { final Boolean result = this.defaultTransactionTemplate.execute( transactionStatus -> - this.template.update(this.renewQuery, epochMillis(), this.region, lock, this.id) > 0); + this.template.update(this.renewQuery, ttlEpochMillis(ttlDuration), this.region, lock, this.id) > 0); return Boolean.TRUE.equals(result); } - private Timestamp ttlEpochMillis() { - return Timestamp.valueOf(currentTime().minus(this.ttl)); + private Timestamp ttlEpochMillis(Duration ttl) { + return Timestamp.valueOf(currentTime().plus(ttl)); } private static Timestamp epochMillis() { diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java index cae7d922d7f..224d58e7366 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import org.springframework.dao.CannotAcquireLockException; import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.dao.TransientDataAccessException; +import org.springframework.integration.support.locks.CustomTtlLock; +import org.springframework.integration.support.locks.CustomTtlLockRegistry; import org.springframework.integration.support.locks.ExpirableLockRegistry; import org.springframework.integration.support.locks.RenewableLockRegistry; import org.springframework.integration.util.UUIDConverter; @@ -56,10 +58,11 @@ * @author Unseok Kim * @author Christian Tzolov * @author Myeonghyeon Lee + * @author Eddie Cho * * @since 4.3 */ -public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry { +public class JdbcLockRegistry implements ExpirableLockRegistry, CustomTtlLockRegistry, RenewableLockRegistry { private static final int DEFAULT_IDLE = 100; @@ -83,12 +86,25 @@ protected boolean removeEldestEntry(Entry eldest) { private int cacheCapacity = DEFAULT_CAPACITY; + /** + * Default value for the time-to-live property. + */ + public static final Duration DEFAULT_TTL = Duration.ofSeconds(10); + + private final Duration ttl; + /** * Construct an instance based on the provided {@link LockRepository}. * @param client the {@link LockRepository} to rely on. */ public JdbcLockRegistry(LockRepository client) { this.client = client; + this.ttl = DEFAULT_TTL; + } + + public JdbcLockRegistry(LockRepository client, long expireAfter) { + this.client = client; + this.ttl = convertToDuration(expireAfter, TimeUnit.MILLISECONDS); } /** @@ -113,6 +129,11 @@ public void setCacheCapacity(int cacheCapacity) { @Override public Lock obtain(Object lockKey) { + return this.obtainCustomTtlLock(lockKey); + } + + @Override + public CustomTtlLock obtainCustomTtlLock(Object lockKey) { Assert.isInstanceOf(String.class, lockKey); String path = pathFor((String) lockKey); this.lock.lock(); @@ -165,7 +186,12 @@ public void renewLock(Object lockKey) { } } - private static final class JdbcLock implements Lock { + private static Duration convertToDuration(long time, TimeUnit timeUnit) { + long timeInMilliseconds = TimeUnit.MILLISECONDS.convert(time, timeUnit); + return Duration.ofMillis(timeInMilliseconds); + } + + private final class JdbcLock implements CustomTtlLock { private final LockRepository mutex; @@ -189,10 +215,20 @@ public long getLastUsed() { @Override public void lock() { + lock(JdbcLockRegistry.this.ttl); + } + + @Override + public void lock(long customTtl, TimeUnit customTtlUnit) { + Duration customTtlDuration = convertToDuration(customTtl, customTtlUnit); + lock(customTtlDuration); + } + + private void lock(Duration ttl) { this.delegate.lock(); while (true) { try { - while (!doLock()) { + while (!doLock(ttl)) { Thread.sleep(this.idleBetweenTries.toMillis()); } break; @@ -223,7 +259,7 @@ public void lockInterruptibly() throws InterruptedException { this.delegate.lockInterruptibly(); while (true) { try { - while (!doLock()) { + while (!doLock(JdbcLockRegistry.this.ttl)) { Thread.sleep(this.idleBetweenTries.toMillis()); if (Thread.currentThread().isInterrupted()) { throw new InterruptedException(); @@ -259,6 +295,16 @@ public boolean tryLock() { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return tryLock(time, unit, JdbcLockRegistry.this.ttl); + } + + @Override + public boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException { + Duration customTtlDuration = convertToDuration(customTtl, customTtlUnit); + return tryLock(time, unit, customTtlDuration); + } + + private boolean tryLock(long time, TimeUnit unit, Duration ttl) throws InterruptedException { long now = System.currentTimeMillis(); if (!this.delegate.tryLock(time, unit)) { return false; @@ -267,7 +313,7 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { boolean acquired; while (true) { try { - while (!(acquired = doLock()) && System.currentTimeMillis() < expire) { //NOSONAR + while (!(acquired = doLock(ttl)) && System.currentTimeMillis() < expire) { //NOSONAR Thread.sleep(this.idleBetweenTries.toMillis()); } if (!acquired) { @@ -285,8 +331,8 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { } } - private boolean doLock() { - boolean acquired = this.mutex.acquire(this.path); + private boolean doLock(Duration ttl) { + boolean acquired = this.mutex.acquire(this.path, ttl); if (acquired) { this.lastUsed = System.currentTimeMillis(); } @@ -305,13 +351,22 @@ public void unlock() { try { while (true) { try { - this.mutex.delete(this.path); - return; + if (this.mutex.delete(this.path)) { + return; + } + else { + throw new IllegalStateException(); + // the lock is no longer owned by current process, the exception should be handle and rollback the execution result + } } catch (TransientDataAccessException | TransactionTimedOutException | TransactionSystemException e) { // try again } catch (Exception e) { + if (e instanceof IllegalStateException) { + throw new IllegalStateException("Lock was released in the store due to expiration. " + + "The integrity of data protected by this lock may have been compromised."); + } throw new DataAccessResourceFailureException("Failed to release mutex at " + this.path, e); } } @@ -331,12 +386,21 @@ public boolean isAcquiredInThisProcess() { } public boolean renew() { + return renew(JdbcLockRegistry.this.ttl); + } + + public boolean renew(long customTtl, TimeUnit customTtlTimeUnit) { + Duration customTtlDuration = convertToDuration(customTtl, customTtlTimeUnit); + return renew(customTtlDuration); + } + + private boolean renew(Duration ttl) { if (!this.delegate.isHeldByCurrentThread()) { throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path); } while (true) { try { - boolean renewed = this.mutex.renew(this.path); + boolean renewed = this.mutex.renew(this.path, ttl); if (renewed) { this.lastUsed = System.currentTimeMillis(); } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java index b0a6a902fda..8f09ceba3ea 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.jdbc.lock; import java.io.Closeable; +import java.time.Duration; /** * Encapsulation of the SQL shunting that is needed for locks. A {@link JdbcLockRegistry} @@ -26,6 +27,7 @@ * @author Dave Syer * @author Alexandre Strubel * @author Artem Bilan + * @author Eddie Cho * * @since 4.3 */ @@ -41,8 +43,9 @@ public interface LockRepository extends Closeable { /** * Remove a lock from this repository. * @param lock the lock to remove. + * @return removed successfully or not */ - void delete(String lock); + boolean delete(String lock); /** * Remove all the expired locks. @@ -50,18 +53,20 @@ public interface LockRepository extends Closeable { void deleteExpired(); /** - * Acquire a lock for a key. + * Acquire a lock for a key with specific time-to-live value * @param lock the key for lock to acquire. + * @param ttl the custom time-to-live value * @return acquired or not. */ - boolean acquire(String lock); + boolean acquire(String lock, Duration ttl); /** - * Renew the lease for a lock. - * @param lock the lock to renew. + * Renew the lease for a lock with specific time-to-live value + * @param lock the key for lock to acquire. + * @param ttl the custom time-to-live value * @return renewed or not. */ - boolean renew(String lock); + boolean renew(String lock, Duration ttl); @Override void close(); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql index 82dcc727305..7cfa54763f9 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql index 7914556a41f..9068427a63f 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql index 1751ab4cbcb..c66776e55a1 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql index 22592c1a875..27f454d3457 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql index 655688a8913..92d359986fa 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE DATETIME(6) NOT NULL, + EXPIRED_AFTER DATETIME(6) NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ) ENGINE=InnoDB; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql index e465da9993f..8ad29698319 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY VARCHAR2(36) NOT NULL, REGION VARCHAR2(100) NOT NULL, CLIENT_ID VARCHAR2(36), - CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql index 45894c57ea9..90fd5c19630 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql index 6c1c091ef9f..7ea4453d2b9 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE DATETIME NOT NULL, + EXPIRED_AFTER DATETIME NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql index 7aa89388770..f0a504406e4 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql @@ -30,7 +30,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE DATETIME NOT NULL, + EXPIRED_AFTER DATETIME NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ) LOCK DATAROWS; diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java index c8cd38b55bf..831be8e6f13 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.jdbc.lock; import java.sql.Connection; +import java.time.Duration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -60,8 +61,8 @@ public void testNewTransactionIsStartedWhenTransactionIsAlreadyActive() { TransactionSynchronization transactionSynchronization = spy(TransactionSynchronization.class); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); - this.client.acquire("foo"); // 1 - this.client.renew("foo"); // 2 + this.client.acquire("foo", Duration.ofMillis(10000)); // 1 + this.client.renew("foo", Duration.ofMillis(10000)); // 2 this.client.delete("foo"); // 3 this.client.isAcquired("foo"); // 4 this.client.deleteExpired(); // 5 @@ -82,7 +83,7 @@ public void testIsAcquiredFromRepeatableReadTransaction() { assertThat(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()) .isEqualTo(Connection.TRANSACTION_REPEATABLE_READ); - this.client.acquire("foo"); + this.client.acquire("foo", Duration.ofMillis(10000)); assertThat(this.client.isAcquired("foo")).isTrue(); this.client.delete("foo"); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java index 2ff6a720d8c..32bae4f38ee 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,21 +17,21 @@ package org.springframework.integration.jdbc.lock; import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.dao.QueryTimeoutException; import org.springframework.dao.TransientDataAccessException; import org.springframework.integration.test.util.TestUtils; import org.springframework.transaction.TransactionSystemException; import org.springframework.transaction.TransactionTimedOutException; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -52,7 +52,7 @@ public void clear() { repository = mock(LockRepository.class); registry = new JdbcLockRegistry(repository); - when(repository.acquire(anyString())).thenReturn(true); + when(repository.acquire(anyString(), any())).thenReturn(true); } @Test @@ -65,6 +65,8 @@ public void testLessAmountOfUnlockThanLock() { for (int i = 0; i < lockCount; i++) { lock.tryLock(); } + + when(repository.delete(anyString())).thenReturn(true); for (int i = 0; i < unlockCount; i++) { lock.unlock(); } @@ -81,6 +83,8 @@ public void testSameAmountOfUnlockThanLock() { for (int i = 0; i < lockCount; i++) { lock.tryLock(); } + + when(repository.delete(anyString())).thenReturn(true); for (int i = 0; i < lockCount; i++) { lock.unlock(); } @@ -93,13 +97,10 @@ public void testTransientDataAccessException() { final Lock lock = registry.obtain("foo"); lock.tryLock(); - final AtomicBoolean shouldThrow = new AtomicBoolean(true); - doAnswer(invocation -> { - if (shouldThrow.getAndSet(false)) { - throw mock(TransientDataAccessException.class); - } - return null; - }).when(repository).delete(anyString()); + TransientDataAccessException transientDataAccessException = new QueryTimeoutException(""); + when(repository.delete(anyString())) + .thenThrow(transientDataAccessException) + .thenReturn(true); lock.unlock(); @@ -111,13 +112,9 @@ public void testTransactionTimedOutException() { final Lock lock = registry.obtain("foo"); lock.tryLock(); - final AtomicBoolean shouldThrow = new AtomicBoolean(true); - doAnswer(invocation -> { - if (shouldThrow.getAndSet(false)) { - throw mock(TransactionTimedOutException.class); - } - return null; - }).when(repository).delete(anyString()); + when(repository.delete(anyString())) + .thenThrow(new TransactionTimedOutException("")) + .thenReturn(true); lock.unlock(); @@ -129,13 +126,9 @@ public void testTransactionSystemException() { final Lock lock = registry.obtain("foo"); lock.tryLock(); - final AtomicBoolean shouldThrow = new AtomicBoolean(true); - doAnswer(invocation -> { - if (shouldThrow.getAndSet(false)) { - throw mock(TransactionSystemException.class); - } - return null; - }).when(repository).delete(anyString()); + when(repository.delete(anyString())) + .thenThrow(new TransactionSystemException("")) + .thenReturn(true); lock.unlock(); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java index bcd040e6514..aeeddf5be55 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,7 @@ import org.springframework.util.StopWatch; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * @author Dave Syer @@ -64,7 +65,7 @@ public class JdbcLockRegistryDifferentClientTests { private JdbcLockRegistry registry; @Autowired - private LockRepository client; + private DefaultLockRepository client; @Autowired private ConfigurableApplicationContext context; @@ -78,6 +79,7 @@ public class JdbcLockRegistryDifferentClientTests { public void clear() { this.registry.expireUnusedOlderThan(0); this.client.close(); + this.client.afterPropertiesSet(); this.child = new AnnotationConfigApplicationContext(); this.child.registerBean("childLockRepository", DefaultLockRepository.class, this.dataSource); this.child.setParent(this.context); @@ -282,24 +284,23 @@ public void testExclusiveAccess() throws Exception { @Test public void testOutOfDateLockTaken() throws Exception { + long ttl = 100; DefaultLockRepository client1 = new DefaultLockRepository(dataSource); - client1.setTimeToLive(100); client1.setApplicationContext(this.context); client1.afterPropertiesSet(); client1.afterSingletonsInstantiated(); DefaultLockRepository client2 = new DefaultLockRepository(dataSource); - client2.setTimeToLive(100); client2.setApplicationContext(this.context); client2.afterPropertiesSet(); client2.afterSingletonsInstantiated(); - Lock lock1 = new JdbcLockRegistry(client1).obtain("foo"); + Lock lock1 = new JdbcLockRegistry(client1, ttl).obtain("foo"); final BlockingQueue data = new LinkedBlockingQueue<>(); final CountDownLatch latch = new CountDownLatch(1); lock1.lockInterruptibly(); Thread.sleep(500); new SimpleAsyncTaskExecutor() .execute(() -> { - Lock lock2 = new JdbcLockRegistry(client2).obtain("foo"); + Lock lock2 = new JdbcLockRegistry(client2, ttl).obtain("foo"); try { lock2.lockInterruptibly(); data.add(1); @@ -314,7 +315,7 @@ public void testOutOfDateLockTaken() throws Exception { }); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); data.add(2); - lock1.unlock(); + assertThatThrownBy(lock1::unlock).isInstanceOf(IllegalStateException.class); for (int i = 0; i < 2; i++) { Integer integer = data.poll(10, TimeUnit.SECONDS); assertThat(integer).isNotNull(); @@ -324,17 +325,16 @@ public void testOutOfDateLockTaken() throws Exception { @Test public void testRenewLock() throws Exception { + long ttl = 500; DefaultLockRepository client1 = new DefaultLockRepository(dataSource); - client1.setTimeToLive(500); client1.setApplicationContext(this.context); client1.afterPropertiesSet(); client1.afterSingletonsInstantiated(); DefaultLockRepository client2 = new DefaultLockRepository(dataSource); - client2.setTimeToLive(500); client2.setApplicationContext(this.context); client2.afterPropertiesSet(); client2.afterSingletonsInstantiated(); - JdbcLockRegistry registry = new JdbcLockRegistry(client1); + JdbcLockRegistry registry = new JdbcLockRegistry(client1, ttl); Lock lock1 = registry.obtain("foo"); final BlockingQueue data = new LinkedBlockingQueue<>(); final CountDownLatch latch1 = new CountDownLatch(2); @@ -342,7 +342,7 @@ public void testRenewLock() throws Exception { lock1.lockInterruptibly(); new SimpleAsyncTaskExecutor() .execute(() -> { - Lock lock2 = new JdbcLockRegistry(client2).obtain("foo"); + Lock lock2 = new JdbcLockRegistry(client2, ttl).obtain("foo"); try { latch1.countDown(); lock2.lockInterruptibly(); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml index 0dc9eab475e..4d4511c095e 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml @@ -24,8 +24,6 @@ - diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java index e5ccbb83097..8c0695db219 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java @@ -38,6 +38,7 @@ import org.springframework.context.ApplicationContextException; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.integration.support.locks.CustomTtlLock; import org.springframework.integration.test.util.TestUtils; import org.springframework.integration.util.UUIDConverter; import org.springframework.test.annotation.DirtiesContext; @@ -152,11 +153,10 @@ public void testReentrantLockInterruptibly() throws Exception { @Test public void testReentrantLockAfterExpiration() throws Exception { DefaultLockRepository client = new DefaultLockRepository(dataSource); - client.setTimeToLive(1); client.setApplicationContext(this.context); client.afterPropertiesSet(); client.afterSingletonsInstantiated(); - JdbcLockRegistry registry = new JdbcLockRegistry(client); + JdbcLockRegistry registry = new JdbcLockRegistry(client, 1); Lock lock1 = registry.obtain("foo"); assertThat(lock1.tryLock()).isTrue(); Thread.sleep(100); @@ -246,44 +246,6 @@ public void testTwoThreads() throws Exception { assertThat(locked.get()).isTrue(); } - @Test - public void testTwoThreadsDifferentRegistries() throws Exception { - for (int i = 0; i < 100; i++) { - - final JdbcLockRegistry registry1 = new JdbcLockRegistry(this.client); - final JdbcLockRegistry registry2 = new JdbcLockRegistry(this.client); - final Lock lock1 = registry1.obtain("foo"); - final AtomicBoolean locked = new AtomicBoolean(); - final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); - final CountDownLatch latch3 = new CountDownLatch(1); - lock1.lockInterruptibly(); - this.taskExecutor.execute(() -> { - Lock lock2 = registry2.obtain("foo"); - try { - latch1.countDown(); - lock2.lockInterruptibly(); - latch2.await(10, TimeUnit.SECONDS); - locked.set(true); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - finally { - lock2.unlock(); - latch3.countDown(); - } - }); - assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(locked.get()).isFalse(); - lock1.unlock(); - latch2.countDown(); - assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(locked.get()).isTrue(); - - } - } - @Test public void testTwoThreadsWrongOneUnlocks() throws Exception { final Lock lock = this.registry.obtain("foo"); @@ -502,6 +464,84 @@ void noTableThrowsExceptionOnStart() { } } + @Test + public void testLockWithCustomTtl() throws Exception { + for (int i = 0; i < 10; i++) { + CustomTtlLock lock = this.registry.obtainCustomTtlLock("foo"); + lock.lock(100, TimeUnit.MILLISECONDS); + try { + assertThat(TestUtils.getPropertyValue(this.registry, "locks", Map.class).size()).isEqualTo(1); + } + finally { + lock.unlock(); + } + } + + Thread.sleep(10); + this.registry.expireUnusedOlderThan(0); + assertThat(TestUtils.getPropertyValue(this.registry, "locks", Map.class).size()).isEqualTo(0); + } + + @Test + public void testTryLockWithCustomTtl() throws Exception { + for (int i = 0; i < 10; i++) { + CustomTtlLock lock = this.registry.obtainCustomTtlLock("foo"); + lock.tryLock(100, TimeUnit.MILLISECONDS, 100, TimeUnit.MILLISECONDS); + try { + assertThat(TestUtils.getPropertyValue(this.registry, "locks", Map.class).size()).isEqualTo(1); + } + finally { + lock.unlock(); + } + } + + Thread.sleep(10); + this.registry.expireUnusedOlderThan(0); + assertThat(TestUtils.getPropertyValue(this.registry, "locks", Map.class).size()).isEqualTo(0); + } + + @Test + public void testUnlock_lockStatusIsExpired_lockHasBeenAcquiredByAnotherProcess_DataAccessResourceFailureExceptionWillBeThrown() throws Exception { + long ttl = 100; + DefaultLockRepository client1 = new DefaultLockRepository(dataSource); + client1.setApplicationContext(this.context); + client1.afterPropertiesSet(); + client1.afterSingletonsInstantiated(); + DefaultLockRepository client2 = new DefaultLockRepository(dataSource); + client2.setApplicationContext(this.context); + client2.afterPropertiesSet(); + client2.afterSingletonsInstantiated(); + JdbcLockRegistry process1Registry = new JdbcLockRegistry(client1, ttl); + JdbcLockRegistry process2Registry = new JdbcLockRegistry(client2, ttl); + Lock lock1 = process1Registry.obtain("foo"); + Lock lock2 = process2Registry.obtain("foo"); + try { + lock1.lock(); + Thread.sleep(ttl); + assertThat(lock2.tryLock()).isTrue(); + } + finally { + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> lock1.unlock()); + lock2.unlock(); + } + } + + @Test + public void testUnlock_lockStatusIsExpired_lockDataHasBeenDeleted_IllegalStateExceptionWillBeThrown() throws Exception { + JdbcLockRegistry registry = new JdbcLockRegistry(client, 100); + Lock lock = registry.obtain("foo"); + try { + lock.lock(); + Thread.sleep(200); + client.deleteExpired(); + } + finally { + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> lock.unlock()); + } + } + @SuppressWarnings("unchecked") private static Map getRegistryLocks(JdbcLockRegistry registry) { return TestUtils.getPropertyValue(registry, "locks", Map.class); diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java index 7f73eebbc7c..e845add93e9 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java @@ -51,6 +51,8 @@ import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; +import org.springframework.integration.support.locks.CustomTtlLock; +import org.springframework.integration.support.locks.CustomTtlLockRegistry; import org.springframework.integration.support.locks.ExpirableLockRegistry; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.Assert; @@ -90,7 +92,7 @@ * @since 4.0 * */ -public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean { +public final class RedisLockRegistry implements ExpirableLockRegistry, CustomTtlLockRegistry, DisposableBean { private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class); @@ -225,6 +227,11 @@ public void setRedisLockType(RedisLockType redisLockType) { @Override public Lock obtain(Object lockKey) { + return this.obtainCustomTtlLock(lockKey); + } + + @Override + public CustomTtlLock obtainCustomTtlLock(Object lockKey) { Assert.isInstanceOf(String.class, lockKey); String path = (String) lockKey; this.lock.lock(); @@ -296,7 +303,7 @@ private Function getRedisLockConstructor(RedisLockType redisL }; } - private abstract class RedisLock implements Lock { + private abstract class RedisLock implements CustomTtlLock { private static final String OBTAIN_LOCK_SCRIPT = """ local lockClientId = redis.call('GET', KEYS[1]) @@ -334,11 +341,12 @@ public long getLockedAt() { /** * Attempt to acquire a lock in redis. * @param time the maximum time(milliseconds) to wait for the lock, -1 infinity + * @param expireAfter the time-to-live(milliseconds) for the lock status data * @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired * @throws InterruptedException – * if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported) */ - protected abstract boolean tryRedisLockInner(long time) throws ExecutionException, InterruptedException; + protected abstract boolean tryRedisLockInner(long time, long expireAfter) throws ExecutionException, InterruptedException; /** * Unlock the lock using the unlink method in redis. @@ -352,10 +360,16 @@ public long getLockedAt() { @Override public final void lock() { + this.lock(RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS); + } + + @Override + public void lock(long customTtl, TimeUnit customTtlUnit) { this.localLock.lock(); while (true) { try { - if (tryRedisLock(-1L)) { + long customTtlInMilliseconds = TimeUnit.MILLISECONDS.convert(customTtl, customTtlUnit); + if (tryRedisLock(-1L, customTtlInMilliseconds)) { return; } } @@ -382,7 +396,7 @@ public final void lockInterruptibly() throws InterruptedException { this.localLock.lockInterruptibly(); while (true) { try { - if (tryRedisLock(-1L)) { + if (tryRedisLock(-1L, RedisLockRegistry.this.expireAfter)) { return; } } @@ -411,12 +425,18 @@ public final boolean tryLock() { @Override public final boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return this.tryLock(time, unit, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS); + } + + @Override + public boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException { if (!this.localLock.tryLock(time, unit)) { return false; } try { long waitTime = TimeUnit.MILLISECONDS.convert(time, unit); - boolean acquired = tryRedisLock(waitTime); + long customTtlInMilliseconds = TimeUnit.MILLISECONDS.convert(customTtl, customTtlUnit); + boolean acquired = tryRedisLock(waitTime, customTtlInMilliseconds); if (!acquired) { this.localLock.unlock(); } @@ -429,8 +449,8 @@ public final boolean tryLock(long time, TimeUnit unit) throws InterruptedExcepti return false; } - private boolean tryRedisLock(long time) throws ExecutionException, InterruptedException { - final boolean acquired = tryRedisLockInner(time); + private boolean tryRedisLock(long time, long expireAfter) throws ExecutionException, InterruptedException { + final boolean acquired = tryRedisLockInner(time, expireAfter); if (acquired) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Acquired lock; " + this); @@ -440,11 +460,11 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx return acquired; } - protected final Boolean obtainLock() { + protected final Boolean obtainLock(long expireAfter) { return RedisLockRegistry.this.redisTemplate .execute(OBTAIN_LOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId, - String.valueOf(RedisLockRegistry.this.expireAfter)); + String.valueOf(expireAfter)); } @Override @@ -598,8 +618,8 @@ private RedisPubSubLock(String path) { } @Override - protected boolean tryRedisLockInner(long time) throws ExecutionException, InterruptedException { - return subscribeLock(time); + protected boolean tryRedisLockInner(long time, long expireAfter) throws ExecutionException, InterruptedException { + return subscribeLock(time, expireAfter); } @Override @@ -618,9 +638,9 @@ private boolean removeLockKeyWithScript(RedisScript redisScript) { RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey)); } - private boolean subscribeLock(long time) throws ExecutionException, InterruptedException { + private boolean subscribeLock(long time, long expireAfter) throws ExecutionException, InterruptedException { final long expiredTime = System.currentTimeMillis() + time; - if (obtainLock()) { + if (obtainLock(expireAfter)) { return true; } @@ -635,7 +655,7 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE Future future = RedisLockRegistry.this.unlockNotifyMessageListener.subscribeLock(this.lockKey); //DCL - if (obtainLock()) { + if (obtainLock(expireAfter)) { return true; } try { @@ -645,7 +665,7 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE } catch (TimeoutException ignore) { } - if (obtainLock()) { + if (obtainLock(expireAfter)) { return true; } } @@ -737,10 +757,10 @@ private RedisSpinLock(String path) { } @Override - protected boolean tryRedisLockInner(long time) throws InterruptedException { + protected boolean tryRedisLockInner(long time, long expireAfter) throws InterruptedException { long now = System.currentTimeMillis(); if (time == -1L) { - while (!obtainLock()) { + while (!obtainLock(expireAfter)) { Thread.sleep(100); //NOSONAR } return true; @@ -748,7 +768,7 @@ protected boolean tryRedisLockInner(long time) throws InterruptedException { else { long expire = now + TimeUnit.MILLISECONDS.convert(time, TimeUnit.MILLISECONDS); boolean acquired; - while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR + while (!(acquired = obtainLock(expireAfter)) && System.currentTimeMillis() < expire) { //NOSONAR Thread.sleep(100); //NOSONAR } return acquired; diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java index a0874de6202..fece4628800 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java @@ -49,11 +49,13 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.integration.redis.RedisContainerTest; import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType; +import org.springframework.integration.support.locks.CustomTtlLock; import org.springframework.integration.test.util.TestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; /** @@ -64,6 +66,7 @@ * @author Unseok Kim * @author Artem Vozhdayenko * @author Anton Gabov + * @author Eddie Cho * * @since 4.0 * @@ -115,6 +118,64 @@ void testLock(RedisLockType testRedisLockType) { registry.destroy(); } + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testLockWithCustomTtl(RedisLockType testRedisLockType) throws InterruptedException { + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100); + registry.setRedisLockType(testRedisLockType); + for (int i = 0; i < 3; i++) { + CustomTtlLock lock = registry.obtainCustomTtlLock("foo"); + lock.lock(500, TimeUnit.MILLISECONDS); + try { + assertThat(getRedisLockRegistryLocks(registry)).hasSize(1); + Thread.sleep(400); + } + finally { + lock.unlock(); + } + } + registry.expireUnusedOlderThan(-1000); + assertThat(getRedisLockRegistryLocks(registry)).isEmpty(); + registry.destroy(); + } + + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testTryLockWithCustomTtl(RedisLockType testRedisLockType) throws InterruptedException { + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100); + registry.setRedisLockType(testRedisLockType); + for (int i = 0; i < 3; i++) { + CustomTtlLock lock = registry.obtainCustomTtlLock("foo"); + lock.tryLock(100, TimeUnit.MILLISECONDS, 500, TimeUnit.MILLISECONDS); + try { + assertThat(getRedisLockRegistryLocks(registry)).hasSize(1); + Thread.sleep(400); + } + finally { + lock.unlock(); + } + } + registry.expireUnusedOlderThan(-1000); + assertThat(getRedisLockRegistryLocks(registry)).isEmpty(); + registry.destroy(); + } + + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testUnlock_lockStatusIsExpired_IllegalStateExceptionWillBeThrown(RedisLockType testRedisLockType) throws InterruptedException { + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100); + registry.setRedisLockType(testRedisLockType); + Lock lock = registry.obtain("foo"); + try { + lock.lock(); + Thread.sleep(200); + } + finally { + assertThatThrownBy(lock::unlock).isInstanceOf(IllegalStateException.class); + } + registry.destroy(); + } + @ParameterizedTest @EnumSource(RedisLockType.class) void testLockInterruptibly(RedisLockType testRedisLockType) throws Exception { diff --git a/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc b/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc index 1147b5263fb..e4c7ac2a1f4 100644 --- a/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc +++ b/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc @@ -20,7 +20,7 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36), REGION VARCHAR(100), CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); ----