Skip to content

GH-10137: Cache session.list result in the AbstractInboundFileSynchronizer #10344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -53,8 +57,10 @@
import org.springframework.integration.file.support.FileUtils;
import org.springframework.integration.metadata.MetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

Expand Down Expand Up @@ -86,6 +92,19 @@ public abstract class AbstractInboundFileSynchronizer<F>

private final RemoteFileTemplate<F> remoteFileTemplate;

private final DefaultLockRegistry lockRegistry = new DefaultLockRegistry();

@SuppressWarnings("serial")
private final Map<String, List<F>> fetchCache =
Collections.synchronizedMap(new LinkedHashMap<>(100, 0.75f, true) {

@Override
protected boolean removeEldestEntry(Map.Entry<String, List<F>> eldest) {
return size() > 100;
}

});

@SuppressWarnings("NullAway.Init")
private EvaluationContext evaluationContext;

Expand Down Expand Up @@ -331,9 +350,6 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
return;
}
String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext, String.class);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Synchronizing " + remoteDirectory + " to " + localDirectory);
}
try {
Integer transferred = this.remoteFileTemplate.execute(session ->
transferFilesFromRemoteToLocal(remoteDirectory, localDirectory, maxFetchSize, session));
Expand All @@ -350,94 +366,142 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
private Integer transferFilesFromRemoteToLocal(@Nullable String remoteDirectory, File localDirectory,
int maxFetchSize, Session<F> session) throws IOException {

F[] files = session.list(remoteDirectory);
if (!ObjectUtils.isEmpty(files)) {
files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator);
String remoteDirectoryKey = remoteDirectory == null ? "" : remoteDirectory;

Lock remoteDirectoryLock = null;
if (maxFetchSize > 0) {
// The result of session.list is going to be sliced by this maxFetchSize and cached.
// Therefore, a lock for the directory to avoid race condition from different threads.
// The perfomance degration is minimal since session.list is filtered once,
// and all the later slices are handled only from the in-memory cache.
remoteDirectoryLock = this.lockRegistry.obtain(remoteDirectoryKey);
remoteDirectoryLock.lock();
}
if (!ObjectUtils.isEmpty(files)) {
boolean filteringOneByOne = this.filter != null && this.filter.supportsSingleFileFiltering();
List<F> filteredFiles = applyFilter(files, this.filter != null, filteringOneByOne, maxFetchSize);
else {
// The cache makes sense only for maxFetchSize > 0.
this.fetchCache.remove(remoteDirectoryKey);
}

try {
List<F> remoteFiles = null;
if (maxFetchSize > 0) {
remoteFiles = this.fetchCache.get(remoteDirectoryKey);
}

if (CollectionUtils.isEmpty(remoteFiles)) {
// The session.list and filter all the files only once.
// If maxFetchSize > 0, the rest of filtered files are going to be cached
// for subsequent fetches.
// If no maxFetchSize, all the files are transferred at once anyway.
remoteFiles = listAndFilterFiles(remoteDirectory, session);
}

int copied = filteredFiles.size();
int accepted = 0;
List<F> sliceToTransfer = remoteFiles;
List<F> remoteFilesToCache = null;
if (!CollectionUtils.isEmpty(remoteFiles) && maxFetchSize > 0) {
remoteFilesToCache = remoteFiles;
sliceToTransfer = remoteFiles.stream().limit(maxFetchSize).toList();
remoteFilesToCache.removeAll(sliceToTransfer);
}

for (F file : filteredFiles) {
F fileToCopy = file;
if (filteringOneByOne) {
if ((maxFetchSize < 0 || accepted < maxFetchSize)
&& this.filter != null && this.filter.accept(fileToCopy)) {
int copied = 0;

accepted++;
for (int i = 0; i < sliceToTransfer.size(); i++) {
F file = sliceToTransfer.get(i);
boolean transferred = false;
try {
if (transferFile(remoteDirectory, localDirectory, session, file)) {
copied++;
}
}
catch (RuntimeException | IOException ex) {
// The filtering has happened before transfer, so if it fails,
// all the following files have to be rest from the filter.
if (this.filter != null && this.filter.supportsSingleFileFiltering()) {
for (int j = i; j < remoteFiles.size(); j++) {
F fileToReset = remoteFiles.get(j);
resetFilterIfNecessary(fileToReset);
}
}
else {
fileToCopy = null;
copied--;
rollbackFromFileToListEnd(remoteFiles, file);
}

if (maxFetchSize > 0) {
// When trasfer fails, reset the cache as well
// for a fresh session.list on the next synchronization.
this.fetchCache.remove(remoteDirectoryKey);
}

throw ex;
}
copied =
copyIfNotNull(remoteDirectory, localDirectory, session, filteringOneByOne,
filteredFiles, copied, fileToCopy);
}

if (maxFetchSize > 0) {
if (!CollectionUtils.isEmpty(remoteFilesToCache)) {
this.fetchCache.put(remoteDirectoryKey, remoteFilesToCache);
}
else {
this.fetchCache.remove(remoteDirectoryKey);
}
}

return copied;
}
else {
return 0;
finally {
if (remoteDirectoryLock != null) {
remoteDirectoryLock.unlock();
}
}
}

private int copyIfNotNull(@Nullable String remoteDirectory, File localDirectory,
Session<F> session, boolean filteringOneByOne,
List<F> filteredFiles, int copied, @Nullable F file) throws IOException {

boolean renamedFailed = false;
EvaluationContext localFileEvaluationContext = null;
if (this.localFilenameGeneratorExpression != null) {
localFileEvaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
localFileEvaluationContext.setVariable("remoteDirectory", remoteDirectory);
private List<F> listAndFilterFiles(@Nullable String remoteDirectory, Session<F> session) throws IOException {
F[] files = session.list(remoteDirectory);
if (!ObjectUtils.isEmpty(files)) {
files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator);
}
try {
if (file != null &&
!copyFileToLocalDirectory(remoteDirectory, localFileEvaluationContext, file, localDirectory,
session)) {

renamedFailed = true;
}
}
catch (RuntimeException | IOException e1) {
if (filteringOneByOne) {
resetFilterIfNecessary(file);
if (!ObjectUtils.isEmpty(files)) {
List<F> filteredFiles;
if (this.filter != null) {
if (this.filter.supportsSingleFileFiltering()) {
filteredFiles = new ArrayList<>(files.length);
for (F file : files) {
if (this.filter.accept(file)) {
filteredFiles.add(file);
}
}
}
else {
filteredFiles = filterFiles(files);
}
}
else {
rollbackFromFileToListEnd(filteredFiles, file);
filteredFiles = new ArrayList<>();
Collections.addAll(filteredFiles, files);
}
throw e1;

return filteredFiles;
}
return renamedFailed ? copied - 1 : copied;

return Collections.emptyList();
}

private List<F> applyFilter(F[] files, boolean haveFilter, boolean filteringOneByOne, int maxFetchSize) {
List<F> filteredFiles;
if (!filteringOneByOne && haveFilter) {
filteredFiles = filterFiles(files);
}
else {
filteredFiles = List.of(files);
}
if (maxFetchSize >= 0 && filteredFiles.size() > maxFetchSize && !filteringOneByOne) {
if (haveFilter) {
rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize));
}
filteredFiles = filteredFiles.stream()
.limit(maxFetchSize)
.collect(Collectors.toList());
private boolean transferFile(@Nullable String remoteDirectory, File localDirectory, Session<F> session, F file)
throws IOException {

EvaluationContext localFileEvaluationContext = null;
if (this.localFilenameGeneratorExpression != null) {
localFileEvaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
localFileEvaluationContext.setVariable("remoteDirectory", remoteDirectory);
}
return filteredFiles;

return copyFileToLocalDirectory(remoteDirectory, localFileEvaluationContext, file, localDirectory, session);
}

protected void rollbackFromFileToListEnd(List<F> filteredFiles, F file) {
if (this.filter instanceof ReversibleFileListFilter) {
((ReversibleFileListFilter<F>) this.filter)
.rollback(file, filteredFiles);
if (this.filter instanceof ReversibleFileListFilter<F> reversibleFileListFilter) {
reversibleFileListFilter.rollback(file, filteredFiles);
}
}

Expand Down Expand Up @@ -530,12 +594,12 @@ else if (this.logger.isWarnEnabled()) {
}

private void resetFilterIfNecessary(F remoteFile) {
if (this.filter instanceof ResettableFileListFilter) {
if (this.filter instanceof ResettableFileListFilter<F> resettableFileListFilter) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Removing the remote file '" + remoteFile +
"' from the filter for a subsequent transfer attempt");
}
((ResettableFileListFilter<F>) this.filter).remove(remoteFile);
resettableFileListFilter.remove(remoteFile);
}
}

Expand Down
Loading