diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java index c0ae587a792..cc735416571 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java @@ -24,10 +24,14 @@ import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; import java.util.regex.Matcher; -import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,8 +57,10 @@ import org.springframework.integration.file.support.FileUtils; import org.springframework.integration.metadata.MetadataStore; import org.springframework.integration.metadata.SimpleMetadataStore; +import org.springframework.integration.support.locks.DefaultLockRegistry; import org.springframework.messaging.MessagingException; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -86,6 +92,19 @@ public abstract class AbstractInboundFileSynchronizer private final RemoteFileTemplate remoteFileTemplate; + private final DefaultLockRegistry lockRegistry = new DefaultLockRegistry(); + + @SuppressWarnings("serial") + private final Map> fetchCache = + Collections.synchronizedMap(new LinkedHashMap<>(100, 0.75f, true) { + + @Override + protected boolean removeEldestEntry(Map.Entry> eldest) { + return size() > 100; + } + + }); + @SuppressWarnings("NullAway.Init") private EvaluationContext evaluationContext; @@ -331,9 +350,6 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max return; } String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext, String.class); - if (this.logger.isTraceEnabled()) { - this.logger.trace("Synchronizing " + remoteDirectory + " to " + localDirectory); - } try { Integer transferred = this.remoteFileTemplate.execute(session -> transferFilesFromRemoteToLocal(remoteDirectory, localDirectory, maxFetchSize, session)); @@ -350,94 +366,142 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max private Integer transferFilesFromRemoteToLocal(@Nullable String remoteDirectory, File localDirectory, int maxFetchSize, Session session) throws IOException { - F[] files = session.list(remoteDirectory); - if (!ObjectUtils.isEmpty(files)) { - files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator); + String remoteDirectoryKey = remoteDirectory == null ? "" : remoteDirectory; + + Lock remoteDirectoryLock = null; + if (maxFetchSize > 0) { + // The result of session.list is going to be sliced by this maxFetchSize and cached. + // Therefore, a lock for the directory to avoid race condition from different threads. + // The perfomance degration is minimal since session.list is filtered once, + // and all the later slices are handled only from the in-memory cache. + remoteDirectoryLock = this.lockRegistry.obtain(remoteDirectoryKey); + remoteDirectoryLock.lock(); } - if (!ObjectUtils.isEmpty(files)) { - boolean filteringOneByOne = this.filter != null && this.filter.supportsSingleFileFiltering(); - List filteredFiles = applyFilter(files, this.filter != null, filteringOneByOne, maxFetchSize); + else { + // The cache makes sense only for maxFetchSize > 0. + this.fetchCache.remove(remoteDirectoryKey); + } + + try { + List remoteFiles = null; + if (maxFetchSize > 0) { + remoteFiles = this.fetchCache.get(remoteDirectoryKey); + } + + if (CollectionUtils.isEmpty(remoteFiles)) { + // The session.list and filter all the files only once. + // If maxFetchSize > 0, the rest of filtered files are going to be cached + // for subsequent fetches. + // If no maxFetchSize, all the files are transferred at once anyway. + remoteFiles = listAndFilterFiles(remoteDirectory, session); + } - int copied = filteredFiles.size(); - int accepted = 0; + List sliceToTransfer = remoteFiles; + List remoteFilesToCache = null; + if (!CollectionUtils.isEmpty(remoteFiles) && maxFetchSize > 0) { + remoteFilesToCache = remoteFiles; + sliceToTransfer = remoteFiles.stream().limit(maxFetchSize).toList(); + remoteFilesToCache.removeAll(sliceToTransfer); + } - for (F file : filteredFiles) { - F fileToCopy = file; - if (filteringOneByOne) { - if ((maxFetchSize < 0 || accepted < maxFetchSize) - && this.filter != null && this.filter.accept(fileToCopy)) { + int copied = 0; - accepted++; + for (int i = 0; i < sliceToTransfer.size(); i++) { + F file = sliceToTransfer.get(i); + boolean transferred = false; + try { + if (transferFile(remoteDirectory, localDirectory, session, file)) { + copied++; + } + } + catch (RuntimeException | IOException ex) { + // The filtering has happened before transfer, so if it fails, + // all the following files have to be rest from the filter. + if (this.filter != null && this.filter.supportsSingleFileFiltering()) { + for (int j = i; j < remoteFiles.size(); j++) { + F fileToReset = remoteFiles.get(j); + resetFilterIfNecessary(fileToReset); + } } else { - fileToCopy = null; - copied--; + rollbackFromFileToListEnd(remoteFiles, file); } + + if (maxFetchSize > 0) { + // When trasfer fails, reset the cache as well + // for a fresh session.list on the next synchronization. + this.fetchCache.remove(remoteDirectoryKey); + } + + throw ex; } - copied = - copyIfNotNull(remoteDirectory, localDirectory, session, filteringOneByOne, - filteredFiles, copied, fileToCopy); } + + if (maxFetchSize > 0) { + if (!CollectionUtils.isEmpty(remoteFilesToCache)) { + this.fetchCache.put(remoteDirectoryKey, remoteFilesToCache); + } + else { + this.fetchCache.remove(remoteDirectoryKey); + } + } + return copied; } - else { - return 0; + finally { + if (remoteDirectoryLock != null) { + remoteDirectoryLock.unlock(); + } } } - private int copyIfNotNull(@Nullable String remoteDirectory, File localDirectory, - Session session, boolean filteringOneByOne, - List filteredFiles, int copied, @Nullable F file) throws IOException { - - boolean renamedFailed = false; - EvaluationContext localFileEvaluationContext = null; - if (this.localFilenameGeneratorExpression != null) { - localFileEvaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory); - localFileEvaluationContext.setVariable("remoteDirectory", remoteDirectory); + private List listAndFilterFiles(@Nullable String remoteDirectory, Session session) throws IOException { + F[] files = session.list(remoteDirectory); + if (!ObjectUtils.isEmpty(files)) { + files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator); } - try { - if (file != null && - !copyFileToLocalDirectory(remoteDirectory, localFileEvaluationContext, file, localDirectory, - session)) { - renamedFailed = true; - } - } - catch (RuntimeException | IOException e1) { - if (filteringOneByOne) { - resetFilterIfNecessary(file); + if (!ObjectUtils.isEmpty(files)) { + List filteredFiles; + if (this.filter != null) { + if (this.filter.supportsSingleFileFiltering()) { + filteredFiles = new ArrayList<>(files.length); + for (F file : files) { + if (this.filter.accept(file)) { + filteredFiles.add(file); + } + } + } + else { + filteredFiles = filterFiles(files); + } } else { - rollbackFromFileToListEnd(filteredFiles, file); + filteredFiles = new ArrayList<>(); + Collections.addAll(filteredFiles, files); } - throw e1; + + return filteredFiles; } - return renamedFailed ? copied - 1 : copied; + + return Collections.emptyList(); } - private List applyFilter(F[] files, boolean haveFilter, boolean filteringOneByOne, int maxFetchSize) { - List filteredFiles; - if (!filteringOneByOne && haveFilter) { - filteredFiles = filterFiles(files); - } - else { - filteredFiles = List.of(files); - } - if (maxFetchSize >= 0 && filteredFiles.size() > maxFetchSize && !filteringOneByOne) { - if (haveFilter) { - rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize)); - } - filteredFiles = filteredFiles.stream() - .limit(maxFetchSize) - .collect(Collectors.toList()); + private boolean transferFile(@Nullable String remoteDirectory, File localDirectory, Session session, F file) + throws IOException { + + EvaluationContext localFileEvaluationContext = null; + if (this.localFilenameGeneratorExpression != null) { + localFileEvaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory); + localFileEvaluationContext.setVariable("remoteDirectory", remoteDirectory); } - return filteredFiles; + + return copyFileToLocalDirectory(remoteDirectory, localFileEvaluationContext, file, localDirectory, session); } protected void rollbackFromFileToListEnd(List filteredFiles, F file) { - if (this.filter instanceof ReversibleFileListFilter) { - ((ReversibleFileListFilter) this.filter) - .rollback(file, filteredFiles); + if (this.filter instanceof ReversibleFileListFilter reversibleFileListFilter) { + reversibleFileListFilter.rollback(file, filteredFiles); } } @@ -530,12 +594,12 @@ else if (this.logger.isWarnEnabled()) { } private void resetFilterIfNecessary(F remoteFile) { - if (this.filter instanceof ResettableFileListFilter) { + if (this.filter instanceof ResettableFileListFilter resettableFileListFilter) { if (this.logger.isInfoEnabled()) { this.logger.info("Removing the remote file '" + remoteFile + "' from the filter for a subsequent transfer attempt"); } - ((ResettableFileListFilter) this.filter).remove(remoteFile); + resettableFileListFilter.remove(remoteFile); } } diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/synchronizer/AbstractRemoteFileSynchronizerTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/synchronizer/AbstractRemoteFileSynchronizerTests.java index 5f79bea1ad9..2904838d793 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/synchronizer/AbstractRemoteFileSynchronizerTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/synchronizer/AbstractRemoteFileSynchronizerTests.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,6 +30,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -39,6 +42,7 @@ import org.springframework.integration.file.remote.session.Session; import org.springframework.integration.file.remote.session.SessionFactory; import org.springframework.integration.test.support.TestApplicationContextAware; +import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.MessagingException; import static org.assertj.core.api.Assertions.assertThat; @@ -58,7 +62,7 @@ public class AbstractRemoteFileSynchronizerTests implements TestApplicationConte @Test public void testRollback() throws Exception { - final AtomicBoolean failWhenCopyingBar = new AtomicBoolean(true); + final AtomicBoolean failWhenCopyingTestFile2 = new AtomicBoolean(true); final AtomicInteger count = new AtomicInteger(); SessionFactory sf = new StringSessionFactory(); AbstractInboundFileSynchronizer sync = new AbstractInboundFileSynchronizer<>(sf) { @@ -88,7 +92,7 @@ protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, EvaluationContext localFileEvaluationContext, String remoteFile, File localDirectory, Session session) throws IOException { - if ("bar".equals(remoteFile) && failWhenCopyingBar.getAndSet(false)) { + if ("testFile2".equals(remoteFile) && failWhenCopyingTestFile2.getAndSet(false)) { throw new IOException("fail"); } count.incrementAndGet(); @@ -97,7 +101,7 @@ protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, }; sync.setFilter(new AcceptOnceFileListFilter<>()); - sync.setRemoteDirectory("foo"); + sync.setRemoteDirectory("testRemoteDirectory"); assertThatExceptionOfType(MessagingException.class) .isThrownBy(() -> sync.synchronizeToLocalDirectory(mock(File.class))) @@ -114,12 +118,31 @@ public void testMaxFetchSizeSynchronizer() throws Exception { final AtomicInteger count = new AtomicInteger(); AbstractInboundFileSynchronizer sync = createLimitingSynchronizer(count); - sync.synchronizeToLocalDirectory(mock(File.class), 1); + File localDirectory = FileUtils.getTempDirectory(); + sync.synchronizeToLocalDirectory(localDirectory, 1); assertThat(count.get()).isEqualTo(1); - sync.synchronizeToLocalDirectory(mock(File.class), 1); + + @SuppressWarnings("unchecked") + Map> fetchCache = TestUtils.getPropertyValue(sync, "fetchCache", Map.class); + List cachedFiles = fetchCache.get("testRemoteDirectory"); + assertThat(cachedFiles).containsExactly("testFile2", "testFile3"); + + sync.synchronizeToLocalDirectory(localDirectory, 1); assertThat(count.get()).isEqualTo(2); - sync.synchronizeToLocalDirectory(mock(File.class), 1); + + cachedFiles = fetchCache.get("testRemoteDirectory"); + assertThat(cachedFiles).containsExactly("testFile3"); + + sync.synchronizeToLocalDirectory(localDirectory, 1); assertThat(count.get()).isEqualTo(3); + + cachedFiles = fetchCache.get("testRemoteDirectory"); + assertThat(cachedFiles).isNull(); + + StringSession stringSession = + TestUtils.getPropertyValue(sync, "remoteFileTemplate.sessionFactory.session", StringSession.class); + assertThat(stringSession.listCallCount).isEqualTo(1); + sync.close(); } @@ -287,15 +310,15 @@ private AbstractInboundFileSynchronizingMessageSource createSource( @Override public String getComponentType() { - return "foo"; + return "MessageSource"; } }; source.setMaxFetchSize(1); - source.setLocalDirectory(new File(System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID())); + source.setLocalDirectory(new File(FileUtils.getTempDirectoryPath() + File.separator + UUID.randomUUID())); source.setAutoCreateLocalDirectory(true); source.setBeanFactory(TEST_INTEGRATION_CONTEXT); - source.setBeanName("fooSource"); + source.setBeanName("testSource"); return source; } @@ -334,22 +357,26 @@ protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, }; sync.setFilter(new AcceptOnceFileListFilter<>()); - sync.setRemoteDirectory("foo"); + sync.setRemoteDirectory("testRemoteDirectory"); sync.setBeanFactory(TEST_INTEGRATION_CONTEXT); return sync; } private static class StringSessionFactory implements SessionFactory { + private final Session session = new StringSession(); + @Override public Session getSession() { - return new StringSession(); + return this.session; } } private static class StringSession implements Session { + int listCallCount = 0; + StringSession() { } @@ -360,7 +387,8 @@ public boolean remove(String path) { @Override public String[] list(String path) { - return new String[] {"foo", "bar", "baz"}; + this.listCallCount++; + return new String[] {"testFile1", "testFile2", "testFile3"}; } @Override diff --git a/src/reference/antora/modules/ROOT/pages/ftp/max-fetch.adoc b/src/reference/antora/modules/ROOT/pages/ftp/max-fetch.adoc index f256932dcaa..9d85474b720 100644 --- a/src/reference/antora/modules/ROOT/pages/ftp/max-fetch.adoc +++ b/src/reference/antora/modules/ROOT/pages/ftp/max-fetch.adoc @@ -1,27 +1,6 @@ [[ftp-max-fetch]] = Inbound Channel Adapters: Controlling Remote File Fetching -There are two properties that you should consider when you configure inbound channel adapters. -`max-messages-per-poll`, as with all pollers, can be used to limit the number of messages emitted on each poll (if more than the configured value are ready). -`max-fetch-size` (since version 5.0) can limit the number of files retrieved from the remote server at one time. - -The following scenarios assume the starting state is an empty local directory: - -* `max-messages-per-poll=2` and `max-fetch-size=1`: The adapter fetches one file, emits it, fetches the next file, emits it, and then sleeps until the next poll. -* `max-messages-per-poll=2` and `max-fetch-size=2`: The adapter fetches both files and then emits each one. -* `max-messages-per-poll=2` and `max-fetch-size=4`: The adapter fetches up to four files (if available) and emits the first two (if there are at least two). -The next two files are emitted on the next poll. -* `max-messages-per-poll=2` and `max-fetch-size` not specified: The adapter fetches all remote files and emits the first two (if there are at least two). -The subsequent files are emitted on subsequent polls (two at a time). -When all files are consumed, the remote fetch is attempted again to pick up any new files. - -IMPORTANT: When you deploy multiple instances of an application, we recommend a small `max-fetch-size`, to avoid one instance "`grabbing`" all the files and starving other instances. - -Another use for `max-fetch-size` is if you want to stop fetching remote files but continue to process files that have already been fetched. -Setting the `maxFetchSize` property on the `MessageSource` (programmatically, with JMX, or with a xref:control-bus.adoc[control bus]) effectively stops the adapter from fetching more files but lets the poller continue to emit messages for files that have previously been fetched. -If the poller is active when the property is changed, the change takes effect on the next poll. - -Starting with version 5.1, the synchronizer can be provided with a `Comparator`. -This is useful when restricting the number of files fetched with `maxFetchSize`. +See xref:sftp/max-fetch.adoc[SFTP: Controlling Remote File Fetching] for exactly the same generic functionality. Also see general xref:ftp/inbound.adoc[FTP Inbound Channel Adapter] chapter for information about `FileListFilter` configuration. \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/sftp/max-fetch.adoc b/src/reference/antora/modules/ROOT/pages/sftp/max-fetch.adoc index ca2122211ea..68d504f5d3a 100644 --- a/src/reference/antora/modules/ROOT/pages/sftp/max-fetch.adoc +++ b/src/reference/antora/modules/ROOT/pages/sftp/max-fetch.adoc @@ -26,6 +26,19 @@ Starting with version 5.1, the synchronizer can be provided with a `Comparator 0`, the lock is acquired against `remoteDirectory` to avoid race condition from different threads, when work is done around cache. +The performance degradation is minimal since all the later synchronizations deal only with in-memory cached leftover; +- If no cache entry for the `remoteDirectory`, the `Session.list(remoteDirectory)` is called and all returned remote files are filtered; +- the filtered result then sliced to the `maxFetchSize`; +- then these file entries are being transferred to the local directory; +- the rest of filtered remote files are cached for later synchronizations; +- if there is a cache entry for the `remoteDirectory`, such a list is sliced to the maxFetchSize and iterated for the transfer to the local directory; +- if one of the transfers fails, the `filter` is reset from the failed remote file. +The cache is also evicted; therefore, the next synchronization would start from a clean state. Also see general xref:sftp/inbound.adoc[SFTP Inbound Channel Adapter] chapter for information about `FileListFilter` configuration. diff --git a/src/reference/antora/modules/ROOT/pages/smb.adoc b/src/reference/antora/modules/ROOT/pages/smb.adoc index bd7828ca0f3..fea9067ad74 100644 --- a/src/reference/antora/modules/ROOT/pages/smb.adoc +++ b/src/reference/antora/modules/ROOT/pages/smb.adoc @@ -317,28 +317,7 @@ Notice that, in this example, the message handler downstream of the transformer [[smb-max-fetch]] == Inbound Channel Adapters: Controlling Remote File Fetching -There are two properties that you should consider when you configure inbound channel adapters. -`max-messages-per-poll`, as with all pollers, can be used to limit the number of messages emitted on each poll (if more than the configured value are ready). -`max-fetch-size` can limit the number of files retrieved from the remote server at one time. - -The following scenarios assume the starting state is an empty local directory: - -* `max-messages-per-poll=2` and `max-fetch-size=1`: The adapter fetches one file, emits it, fetches the next file, emits it, and then sleeps until the next poll. -* `max-messages-per-poll=2` and `max-fetch-size=2`: The adapter fetches both files and then emits each one. -* `max-messages-per-poll=2` and `max-fetch-size=4`: The adapter fetches up to four files (if available) and emits the first two (if there are at least two). -The next two files are emitted on the next poll. -* `max-messages-per-poll=2` and `max-fetch-size` not specified: The adapter fetches all remote files and emits the first two (if there are at least two). -The subsequent files are emitted on later polls (two at a time). -When all files are consumed, the remote fetch is attempted again to pick up any new files. - -IMPORTANT: When you deploy multiple instances of an application, we recommend a small `max-fetch-size`, to avoid one instance "`grabbing`" all the files and starving other instances. - -Another use for `max-fetch-size` is if you want to stop fetching remote files but continue to process files that have already been fetched. -Setting the `maxFetchSize` property on the `MessageSource` (programmatically, with JMX, or with a xref:control-bus.adoc[control bus]) effectively stops the adapter from fetching more files but lets the poller continue to emit messages for files that have previously been fetched. -If the poller is active when the property is changed, the change takes effect on the next poll. - -The synchronizer can be provided with a `Comparator`. -This is useful when restricting the number of files fetched with `maxFetchSize`. +See xref:sftp/max-fetch.adoc[SFTP: Controlling Remote File Fetching] for exactly the same generic functionality. [[smb-outbound]] == SMB Outbound Channel Adapter diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 3ce1706a99f..08d039c43d4 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -55,4 +55,11 @@ Previously deprecated classes in the `spring-integation-hazelcast` module, such === MQTT Changes The `AbstractMqttMessageDrivenChannelAdapter` and `ClientManager` implementations now expose a `quiescentTimeout` option which is propagated in their `stop()` method down to the `disconnectForcibly()` API of the MQTT Paho clients. -See xref:mqtt.adoc[] for more information. \ No newline at end of file +See xref:mqtt.adoc[] for more information. + +[[x7.0-remote-files-changes]] +=== Remote Files Support Changes + +The `AbstractInboundFileSynchronizer` now caches a filtered result of the `Session.list(remoteDirectory)` after slicing by the `maxFetchSize`. +So, later synchronizations deal with the cache only by the `maxFetchSize` until the cache is exhausted. +See xref:sftp/max-fetch.adoc[] for more information. \ No newline at end of file