Skip to content

Commit

Permalink
spring-projectsGH-3444: Custom TTL per LOCK in JdbcLockRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
EddieChoCho committed Mar 30, 2024
1 parent 5e6ae90 commit 17b63da
Show file tree
Hide file tree
Showing 18 changed files with 235 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* @author Gary Russell
* @author Alexandre Strubel
* @author Ruslan Stelmachenko
* @author Eddie Cho
*
* @since 4.3
*/
Expand All @@ -76,19 +77,12 @@ public class DefaultLockRepository
*/
public static final String DEFAULT_TABLE_PREFIX = "INT_";

/**
* Default value for the time-to-live property.
*/
public static final Duration DEFAULT_TTL = Duration.ofSeconds(10);

private final String id;

private final JdbcTemplate template;

private final AtomicBoolean started = new AtomicBoolean();

private Duration ttl = DEFAULT_TTL;

private String prefix = DEFAULT_TABLE_PREFIX;

private String region = "DEFAULT";
Expand All @@ -100,7 +94,7 @@ public class DefaultLockRepository

private String deleteExpiredQuery = """
DELETE FROM %sLOCK
WHERE REGION=? AND CREATED_DATE<?
WHERE REGION=? AND EXPIRED_AFTER<?
""";

private String deleteAllQuery = """
Expand All @@ -110,24 +104,24 @@ public class DefaultLockRepository

private String updateQuery = """
UPDATE %sLOCK
SET CLIENT_ID=?, CREATED_DATE=?
WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE<?)
SET CLIENT_ID=?, EXPIRED_AFTER=?
WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR EXPIRED_AFTER<?)
""";

private String insertQuery = """
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE)
INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER)
VALUES (?, ?, ?, ?)
""";

private String countQuery = """
SELECT COUNT(REGION)
FROM %sLOCK
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND CREATED_DATE>=?
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND EXPIRED_AFTER>=?
""";

private String renewQuery = """
UPDATE %sLOCK
SET CREATED_DATE=?
SET EXPIRED_AFTER=?
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
""";

Expand Down Expand Up @@ -188,14 +182,6 @@ public void setPrefix(String prefix) {
this.prefix = prefix;
}

/**
* Specify the time (in milliseconds) to expire deadlocks.
* @param timeToLive the time to expire deadlocks.
*/
public void setTimeToLive(int timeToLive) {
this.ttl = Duration.ofMillis(timeToLive);
}

/**
* Set a {@link PlatformTransactionManager} for operations.
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
Expand All @@ -219,8 +205,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
* <pre class="code">
* {@code
* UPDATE %sLOCK
* SET CLIENT_ID=?, CREATED_DATE=?
* WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE<?)
* SET CLIENT_ID=?, EXPIRED_AFTER=?
* WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR EXPIRED_AFTER<?)
* }
* </pre>
* @param updateQuery the query to update a lock record.
Expand All @@ -247,7 +233,7 @@ public String getUpdateQuery() {
* Set a custom {@code INSERT} query for a lock record.
* The {@link #getInsertQuery()} can be used as a template for customization.
* The default query is
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)}.
* {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, EXPIRED_AFTER) VALUES (?, ?, ?, ?)}.
* For example a PostgreSQL {@code ON CONFLICT DO NOTHING} hint can be provided like this:
* <pre class="code">
* {@code
Expand Down Expand Up @@ -281,7 +267,7 @@ public String getInsertQuery() {
* <pre class="code">
* {@code
* UPDATE %sLOCK
* SET CREATED_DATE=?
* SET EXPIRED_AFTER=?
* WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
* }
* </pre>
Expand Down Expand Up @@ -389,23 +375,23 @@ public void close() {
}

@Override
public void delete(String lock) {
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
public boolean delete(String lock) {
return this.defaultTransactionTemplate.execute(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)) > 0;
}

@Override
public boolean acquire(String lock) {
public boolean acquire(String lock, Duration ttlDuration) {
Boolean result =
this.readCommittedTransactionTemplate.execute(
transactionStatus -> {
if (this.template.update(this.updateQuery, this.id, epochMillis(),
this.region, lock, this.id, ttlEpochMillis()) > 0) {
if (this.template.update(this.updateQuery, this.id, ttlEpochMillis(ttlDuration),
this.region, lock, this.id, epochMillis()) > 0) {
return true;
}
try {
return this.template.update(this.insertQuery, this.region, lock, this.id,
epochMillis()) > 0;
ttlEpochMillis(ttlDuration)) > 0;
}
catch (DataIntegrityViolationException ex) {
return false;
Expand All @@ -420,27 +406,27 @@ public boolean isAcquired(String lock) {
transactionStatus ->
Integer.valueOf(1).equals(
this.template.queryForObject(this.countQuery,
Integer.class, this.region, lock, this.id, ttlEpochMillis())));
Integer.class, this.region, lock, this.id, epochMillis())));
return Boolean.TRUE.equals(result);
}

@Override
public void deleteExpired() {
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus ->
this.template.update(this.deleteExpiredQuery, this.region, ttlEpochMillis()));
this.template.update(this.deleteExpiredQuery, this.region, epochMillis()));
}

@Override
public boolean renew(String lock) {
public boolean renew(String lock, Duration ttlDuration) {
final Boolean result = this.defaultTransactionTemplate.execute(
transactionStatus ->
this.template.update(this.renewQuery, epochMillis(), this.region, lock, this.id) > 0);
this.template.update(this.renewQuery, ttlEpochMillis(ttlDuration), this.region, lock, this.id) > 0);
return Boolean.TRUE.equals(result);
}

private Timestamp ttlEpochMillis() {
return Timestamp.valueOf(currentTime().minus(this.ttl));
private Timestamp ttlEpochMillis(Duration ttl) {
return Timestamp.valueOf(currentTime().plus(ttl));
}

private static Timestamp epochMillis() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.TransientDataAccessException;
import org.springframework.integration.support.locks.CustomTtlLock;
import org.springframework.integration.support.locks.CustomTtlLockRegistry;
import org.springframework.integration.support.locks.ExpirableLockRegistry;
import org.springframework.integration.support.locks.RenewableLockRegistry;
import org.springframework.integration.util.UUIDConverter;
Expand Down Expand Up @@ -56,10 +58,11 @@
* @author Unseok Kim
* @author Christian Tzolov
* @author Myeonghyeon Lee
* @author Eddie Cho
*
* @since 4.3
*/
public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry {
public class JdbcLockRegistry implements ExpirableLockRegistry, CustomTtlLockRegistry, RenewableLockRegistry {

private static final int DEFAULT_IDLE = 100;

Expand All @@ -83,12 +86,25 @@ protected boolean removeEldestEntry(Entry<String, JdbcLock> eldest) {

private int cacheCapacity = DEFAULT_CAPACITY;

/**
* Default value for the time-to-live property.
*/
public static final Duration DEFAULT_TTL = Duration.ofSeconds(10);

private final Duration ttl;

/**
* Construct an instance based on the provided {@link LockRepository}.
* @param client the {@link LockRepository} to rely on.
*/
public JdbcLockRegistry(LockRepository client) {
this.client = client;
this.ttl = DEFAULT_TTL;
}

public JdbcLockRegistry(LockRepository client, long expireAfter) {
this.client = client;
this.ttl = convertToDuration(expireAfter, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -113,6 +129,11 @@ public void setCacheCapacity(int cacheCapacity) {

@Override
public Lock obtain(Object lockKey) {
return this.obtainCustomTtlLock(lockKey);
}

@Override
public CustomTtlLock obtainCustomTtlLock(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = pathFor((String) lockKey);
this.lock.lock();
Expand Down Expand Up @@ -165,7 +186,12 @@ public void renewLock(Object lockKey) {
}
}

private static final class JdbcLock implements Lock {
private static Duration convertToDuration(long time, TimeUnit timeUnit) {
long timeInMilliseconds = TimeUnit.MILLISECONDS.convert(time, timeUnit);
return Duration.ofMillis(timeInMilliseconds);
}

private final class JdbcLock implements CustomTtlLock {

private final LockRepository mutex;

Expand All @@ -189,10 +215,20 @@ public long getLastUsed() {

@Override
public void lock() {
lock(JdbcLockRegistry.this.ttl);
}

@Override
public void lock(long customTtl, TimeUnit customTtlUnit) {
Duration customTtlDuration = convertToDuration(customTtl, customTtlUnit);
lock(customTtlDuration);
}

private void lock(Duration ttl) {
this.delegate.lock();
while (true) {
try {
while (!doLock()) {
while (!doLock(ttl)) {
Thread.sleep(this.idleBetweenTries.toMillis());
}
break;
Expand Down Expand Up @@ -223,7 +259,7 @@ public void lockInterruptibly() throws InterruptedException {
this.delegate.lockInterruptibly();
while (true) {
try {
while (!doLock()) {
while (!doLock(JdbcLockRegistry.this.ttl)) {
Thread.sleep(this.idleBetweenTries.toMillis());
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
Expand Down Expand Up @@ -259,6 +295,16 @@ public boolean tryLock() {

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return tryLock(time, unit, JdbcLockRegistry.this.ttl);
}

@Override
public boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException {
Duration customTtlDuration = convertToDuration(customTtl, customTtlUnit);
return tryLock(time, unit, customTtlDuration);
}

private boolean tryLock(long time, TimeUnit unit, Duration ttl) throws InterruptedException {
long now = System.currentTimeMillis();
if (!this.delegate.tryLock(time, unit)) {
return false;
Expand All @@ -267,7 +313,7 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
boolean acquired;
while (true) {
try {
while (!(acquired = doLock()) && System.currentTimeMillis() < expire) { //NOSONAR
while (!(acquired = doLock(ttl)) && System.currentTimeMillis() < expire) { //NOSONAR
Thread.sleep(this.idleBetweenTries.toMillis());
}
if (!acquired) {
Expand All @@ -285,8 +331,8 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
}
}

private boolean doLock() {
boolean acquired = this.mutex.acquire(this.path);
private boolean doLock(Duration ttl) {
boolean acquired = this.mutex.acquire(this.path, ttl);
if (acquired) {
this.lastUsed = System.currentTimeMillis();
}
Expand All @@ -305,13 +351,22 @@ public void unlock() {
try {
while (true) {
try {
this.mutex.delete(this.path);
return;
if (this.mutex.delete(this.path)) {
return;
}
else {
throw new IllegalStateException();
// the lock is no longer owned by current process, the exception should be handle and rollback the execution result
}
}
catch (TransientDataAccessException | TransactionTimedOutException | TransactionSystemException e) {
// try again
}
catch (Exception e) {
if (e instanceof IllegalStateException) {
throw new IllegalStateException("Lock was released in the store due to expiration. " +
"The integrity of data protected by this lock may have been compromised.");
}
throw new DataAccessResourceFailureException("Failed to release mutex at " + this.path, e);
}
}
Expand All @@ -331,12 +386,21 @@ public boolean isAcquiredInThisProcess() {
}

public boolean renew() {
return renew(JdbcLockRegistry.this.ttl);
}

public boolean renew(long customTtl, TimeUnit customTtlTimeUnit) {
Duration customTtlDuration = convertToDuration(customTtl, customTtlTimeUnit);
return renew(customTtlDuration);
}

private boolean renew(Duration ttl) {
if (!this.delegate.isHeldByCurrentThread()) {
throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path);
}
while (true) {
try {
boolean renewed = this.mutex.renew(this.path);
boolean renewed = this.mutex.renew(this.path, ttl);
if (renewed) {
this.lastUsed = System.currentTimeMillis();
}
Expand Down
Loading

0 comments on commit 17b63da

Please sign in to comment.