From cf5d9f2450377d2fbabbc354265b4879b31cae58 Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle <54168385+mgabelle@users.noreply.github.com> Date: Wed, 11 Dec 2024 10:30:09 +0100 Subject: [PATCH] refactor/dynamic properties (#34) * refactor: dynamic properties migrate base interfaces * refactor: migrate Upload * refactor: migrate Delete and Copy task * refactor: migrate CreateBucket * refactor: migrate others --- .../plugin/minio/AbstractMinioObject.java | 4 +- .../java/io/kestra/plugin/minio/Copy.java | 25 +++--- .../io/kestra/plugin/minio/CreateBucket.java | 8 +- .../java/io/kestra/plugin/minio/Delete.java | 14 ++-- .../io/kestra/plugin/minio/DeleteList.java | 30 +++---- .../java/io/kestra/plugin/minio/Download.java | 17 ++-- .../io/kestra/plugin/minio/Downloads.java | 28 +++---- .../java/io/kestra/plugin/minio/List.java | 84 ++++++------------- .../kestra/plugin/minio/MinioConnection.java | 9 +- .../minio/MinioConnectionInterface.java | 21 ++--- .../io/kestra/plugin/minio/MinioService.java | 17 ++-- .../java/io/kestra/plugin/minio/Trigger.java | 45 +++++----- .../java/io/kestra/plugin/minio/Upload.java | 23 +++-- .../plugin/minio/AbstractMinIoTest.java | 29 +++---- .../java/io/kestra/plugin/minio/AllTest.java | 31 +++---- .../java/io/kestra/plugin/minio/CopyTest.java | 19 +++-- .../kestra/plugin/minio/DeleteListTest.java | 9 +- .../io/kestra/plugin/minio/DownloadsTest.java | 29 ++++--- .../java/io/kestra/plugin/minio/ListTest.java | 11 +-- .../io/kestra/plugin/minio/TriggerTest.java | 3 +- .../io/kestra/plugin/minio/UploadsTest.java | 23 ++--- 21 files changed, 218 insertions(+), 261 deletions(-) diff --git a/src/main/java/io/kestra/plugin/minio/AbstractMinioObject.java b/src/main/java/io/kestra/plugin/minio/AbstractMinioObject.java index 8b870cf..49eb159 100644 --- a/src/main/java/io/kestra/plugin/minio/AbstractMinioObject.java +++ b/src/main/java/io/kestra/plugin/minio/AbstractMinioObject.java @@ -1,6 +1,7 @@ package io.kestra.plugin.minio; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.swagger.v3.oas.annotations.media.Schema; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -18,7 +19,6 @@ public abstract class AbstractMinioObject extends MinioConnection implements Abs @Schema( title = "The bucket name." ) - @PluginProperty(dynamic = true) - protected String bucket; + protected Property bucket; } diff --git a/src/main/java/io/kestra/plugin/minio/Copy.java b/src/main/java/io/kestra/plugin/minio/Copy.java index c44b4cc..ea65506 100644 --- a/src/main/java/io/kestra/plugin/minio/Copy.java +++ b/src/main/java/io/kestra/plugin/minio/Copy.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.minio.model.ObjectOutput; @@ -84,31 +85,30 @@ public class Copy extends AbstractMinioObject implements RunnableTask delete = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { try (MinioClient minioClient = this.client(runContext)) { CopySource.Builder sourceBuilder = CopySource.builder() - .bucket(runContext.render(this.from.bucket)) - .object(runContext.render(this.from.key)); + .bucket(runContext.render(this.from.bucket).as(String.class).orElse(null)) + .object(runContext.render(this.from.key).as(String.class).orElse(null)); if (this.from.versionId != null) { - sourceBuilder.versionId(runContext.render(this.from.versionId)); + sourceBuilder.versionId(runContext.render(this.from.versionId).as(String.class).orElseThrow()); } CopyObjectArgs.Builder builder = CopyObjectArgs.builder() - .bucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket)) - .object(runContext.render(this.to.key)) + .bucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket).as(String.class).orElseThrow()) + .object(runContext.render(this.to.key).as(String.class).orElse(null)) .source(sourceBuilder.build()); CopyObjectArgs request = builder.build(); ObjectWriteResponse response = minioClient.copyObject(request); - if (this.delete) { + if (runContext.render(this.delete).as(Boolean.class).orElseThrow()) { Delete.builder() .id(this.id) .type(Delete.class.getName()) @@ -138,14 +138,12 @@ public static class CopyObject { @Schema( title = "The bucket name" ) - @PluginProperty(dynamic = true) - String bucket; + Property bucket; @Schema( title = "The bucket key" ) - @PluginProperty(dynamic = true) - String key; + Property key; } @SuperBuilder(toBuilder = true) @@ -155,8 +153,7 @@ public static class CopyObjectFrom extends CopyObject { @Schema( title = "The specific version of the object." ) - @PluginProperty(dynamic = true) - private String versionId; + private Property versionId; } @SuperBuilder diff --git a/src/main/java/io/kestra/plugin/minio/CreateBucket.java b/src/main/java/io/kestra/plugin/minio/CreateBucket.java index 68be103..63fe861 100644 --- a/src/main/java/io/kestra/plugin/minio/CreateBucket.java +++ b/src/main/java/io/kestra/plugin/minio/CreateBucket.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.minio.BucketExistsArgs; @@ -61,12 +62,11 @@ public class CreateBucket extends AbstractMinioObject implements RunnableTask objectLockEnabledForBucket; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElse(null); try (MinioClient client = this.client(runContext)) { @@ -81,7 +81,7 @@ public Output run(RunContext runContext) throws Exception { MakeBucketArgs.Builder requestBuilder = MakeBucketArgs.builder().bucket(bucket); if (this.objectLockEnabledForBucket != null) { - requestBuilder.objectLock(objectLockEnabledForBucket); + requestBuilder.objectLock(runContext.render(objectLockEnabledForBucket).as(Boolean.class).orElseThrow()); } client.makeBucket(requestBuilder.build()); diff --git a/src/main/java/io/kestra/plugin/minio/Delete.java b/src/main/java/io/kestra/plugin/minio/Delete.java index 66fc20f..c087d8b 100644 --- a/src/main/java/io/kestra/plugin/minio/Delete.java +++ b/src/main/java/io/kestra/plugin/minio/Delete.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.minio.MinioClient; @@ -23,7 +24,7 @@ code = """ id: minio_delete namespace: company.team - + tasks: - id: delete type: io.kestra.plugin.minio.Delete @@ -61,19 +62,18 @@ public class Delete extends AbstractMinioObject implements RunnableTask key; @Schema( title = "Indicates whether Object Lock should bypass Governance-mode restrictions to process this operation." ) @PluginProperty - private Boolean bypassGovernanceRetention; + private Property bypassGovernanceRetention; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElse(null); + String key = runContext.render(this.key).as(String.class).orElse(null); try (MinioClient minioClient = this.client(runContext)) { RemoveObjectArgs.Builder builder = RemoveObjectArgs.builder() @@ -81,7 +81,7 @@ public Output run(RunContext runContext) throws Exception { .object(key); if (this.bypassGovernanceRetention != null) { - builder.bypassGovernanceMode(this.bypassGovernanceRetention); + builder.bypassGovernanceMode(runContext.render(this.bypassGovernanceRetention).as(Boolean.class).orElseThrow()); } RemoveObjectArgs request = builder.build(); diff --git a/src/main/java/io/kestra/plugin/minio/DeleteList.java b/src/main/java/io/kestra/plugin/minio/DeleteList.java index e4b5281..0456dc5 100644 --- a/src/main/java/io/kestra/plugin/minio/DeleteList.java +++ b/src/main/java/io/kestra/plugin/minio/DeleteList.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.minio.model.MinioObject; @@ -36,7 +37,7 @@ code = """ id: minio_delete_objects namespace: company.team - + tasks: - id: delete_objects type: io.kestra.plugin.minio.DeleteList @@ -74,29 +75,25 @@ public class DeleteList extends AbstractMinioObject implements RunnableTask prefix; @Schema( title = "A delimiter is a character you use to group keys." ) - @PluginProperty(dynamic = true) - private String delimiter; + private Property delimiter; @Schema( title = "Marker is where you want to start listing from.", description = "Start listing after this specified key. Marker can be any key in the bucket." ) - @PluginProperty(dynamic = true) - private String marker; + private Property marker; @Schema( title = "Sets the maximum number of keys returned in the response.", description = "By default, the action returns up to 1,000 key names. The response might contain fewer keys but will never contain more." ) - @PluginProperty(dynamic = true) @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); @Schema( title = "A regexp to filter on full key.", @@ -104,15 +101,13 @@ public class DeleteList extends AbstractMinioObject implements RunnableTask regexp; @Schema( title = "The type of objects to filter: files, directory, or both." ) - @PluginProperty @Builder.Default - protected final List.Filter filter = List.Filter.BOTH; + protected final Property filter = Property.of(List.Filter.BOTH); @Min(2) @Schema( @@ -124,14 +119,13 @@ public class DeleteList extends AbstractMinioObject implements RunnableTask errorOnEmpty = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { Logger logger = runContext.logger(); - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElse(null); try (MinioClient client = this.client(runContext)) { @@ -165,10 +159,10 @@ public Output run(RunContext runContext) throws Exception { runContext.metric(Counter.of("count", finalResult.getLeft())); runContext.metric(Counter.of("size", finalResult.getRight())); - if (errorOnEmpty && finalResult.getLeft() == 0) { + if (runContext.render(errorOnEmpty).as(Boolean.class).orElseThrow() && finalResult.getLeft() == 0) { throw new NoSuchElementException( "Unable to find any files to delete on " + - runContext.render(this.bucket) + " " + + runContext.render(this.bucket).as(String.class).orElse(null) + " " + "with regexp='" + runContext.render(this.regexp) + "', " + "prefix='" + runContext.render(this.prefix) + "'" ); diff --git a/src/main/java/io/kestra/plugin/minio/Download.java b/src/main/java/io/kestra/plugin/minio/Download.java index 402f373..0dfe3f9 100644 --- a/src/main/java/io/kestra/plugin/minio/Download.java +++ b/src/main/java/io/kestra/plugin/minio/Download.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.FileUtils; @@ -33,10 +34,10 @@ code = """ id: minio_download namespace: company.team - + tasks: - id: download_from_storage - type: io.kestra.plugin.minio.Download + type: io.kestra.plugin.minio.Download accessKeyId: "" secretKeyId: "" region: "eu-central-1" @@ -71,22 +72,20 @@ public class Download extends AbstractMinioObject implements RunnableTask key; @Schema( title = "The specific version of the object." ) - @PluginProperty(dynamic = true) - protected String versionId; + protected Property versionId; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElse(null); + String key = runContext.render(this.key).as(String.class).orElse(null); try (MinioAsyncClient client = this.asyncClient(runContext)) { - Pair output = MinioService.download(runContext, client, bucket, key, this.versionId); + Pair output = MinioService.download(runContext, client, bucket, key, runContext.render(this.versionId).as(String.class).orElse(null)); long length = output.getRight(); URI uri = output.getLeft(); diff --git a/src/main/java/io/kestra/plugin/minio/Downloads.java b/src/main/java/io/kestra/plugin/minio/Downloads.java index df5aca0..1361f13 100644 --- a/src/main/java/io/kestra/plugin/minio/Downloads.java +++ b/src/main/java/io/kestra/plugin/minio/Downloads.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.minio.model.MinioObject; @@ -33,7 +34,7 @@ code = """ id: minio_downloads namespace: company.team - + tasks: - id: downloads type: io.kestra.plugin.minio.Downloads @@ -79,29 +80,25 @@ public enum Action { @Schema( title = "Limits the response to keys that begin with the specified prefix." ) - @PluginProperty(dynamic = true) - private String prefix; + private Property prefix; @Schema( title = "A delimiter is a character you use to group keys." ) - @PluginProperty(dynamic = true) - private String delimiter; + private Property delimiter; @Schema( title = "Marker is where you want to start listing from.", description = "Start listing after this specified key. Marker can be any key in the bucket." ) - @PluginProperty(dynamic = true) - private String marker; + private Property marker; @Schema( title = "Sets the maximum number of keys returned in the response.", description = "By default, the action returns up to 1,000 key names. The response might contain fewer keys but will never contain more." ) - @PluginProperty(dynamic = true) @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); @Schema( title = "A regexp to filter on full key.", @@ -109,22 +106,19 @@ public enum Action { "`regExp: .*` to match all files\n"+ "`regExp: .*2020-01-0.\\\\.csv` to match files between 01 and 09 of january ending with `.csv`" ) - @PluginProperty(dynamic = true) - protected String regexp; + protected Property regexp; @Schema( title = "The type of objects to filter: files, directory, or both." ) - @PluginProperty @Builder.Default - protected final io.kestra.plugin.minio.List.Filter filter = io.kestra.plugin.minio.List.Filter.BOTH; + protected final Property filter = Property.of(io.kestra.plugin.minio.List.Filter.BOTH); @Schema( title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering." ) - @PluginProperty(dynamic = true) @NotNull - private Action action; + private Property action; @Schema( title = "The destination bucket and key for `MOVE` action." @@ -154,7 +148,7 @@ public Output run(RunContext runContext) throws Exception { io.kestra.plugin.minio.List.Output run = task.run(runContext); try (MinioAsyncClient client = this.asyncClient(runContext)) { - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElse(null); List list = run .getObjects() @@ -181,7 +175,7 @@ public Output run(RunContext runContext) throws Exception { .map(object -> new AbstractMap.SimpleEntry<>(object.getKey(), object.getUri())) .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); - MinioService.performAction(runContext, list, action,bucket, this, moveTo); + MinioService.performAction(runContext, list, runContext.render(action).as(Action.class).orElseThrow(), bucket, this, moveTo); return Output .builder() diff --git a/src/main/java/io/kestra/plugin/minio/List.java b/src/main/java/io/kestra/plugin/minio/List.java index 431ff1d..ff65a7f 100644 --- a/src/main/java/io/kestra/plugin/minio/List.java +++ b/src/main/java/io/kestra/plugin/minio/List.java @@ -5,6 +5,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.minio.model.MinioObject; @@ -33,7 +34,7 @@ code = """ id: minio_list namespace: company.team - + tasks: - id: list_objects type: io.kestra.plugin.minio.List @@ -76,35 +77,30 @@ public enum Filter { @Schema( title = "Limits the response to keys that begin with the specified prefix." ) - @PluginProperty(dynamic = true) - private String prefix; + private Property prefix; @Schema( title = "Limits the response to keys that ends with the specified string." ) - @PluginProperty(dynamic = true) - private String startAfter; + private Property startAfter; @Schema( title = "A delimiter is a character you use to group keys." ) - @PluginProperty(dynamic = true) - private String delimiter; + private Property delimiter; @Schema( title = "Marker is where you want to start listing from.", description = "Start listing after this specified key. Marker can be any key in the bucket." ) - @PluginProperty(dynamic = true) - private String marker; + private Property marker; @Schema( title = "Sets the maximum number of keys returned in the response.", description = "By default, the action returns up to 1,000 key names. The response might contain fewer keys but will never contain more." ) - @PluginProperty(dynamic = true) @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); @Schema( title = "A regexp to filter on full key.", @@ -112,78 +108,45 @@ public enum Filter { "`regExp: .*` to match all files\n"+ "`regExp: .*2020-01-0.\\\\.csv` to match files between 01 and 09 of january ending with `.csv`" ) - @PluginProperty(dynamic = true) - protected String regexp; + protected Property regexp; @Schema( title = "The type of objects to filter: files, directory, or both." ) - @PluginProperty @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); @Schema( title = "Indicates whether it should look into subfolders." ) - @PluginProperty @Builder.Default - public Boolean recursive = true; + public Property recursive = Property.of(true); @Schema( title = "Indicates whether task should include versions in output." ) - @PluginProperty - @Builder.Default - public Boolean includeVersions = true; - - /* Uncomment for List Objects (client.listObjects) version 2 only - @Schema( - title = "Indicates whether task should include user metadata in output." - ) - @PluginProperty - @Builder.Default - public Boolean includeUserMetadata = true; - - @Schema( - title = "Indicates whether task should include owner data in output." - ) - @PluginProperty @Builder.Default - public Boolean fetchOwner = true; - */ + public Property includeVersions = Property.of(true); @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElse(null); try (MinioClient client = this.client(runContext)) { ListObjectsArgs.Builder requestBuilder = ListObjectsArgs .builder() .bucket(bucket) - .recursive(recursive) - .maxKeys(this.maxKeys); - - if (this.prefix != null) { - requestBuilder.prefix(runContext.render(this.prefix)); - } - - if (this.startAfter != null) { - requestBuilder.startAfter(runContext.render(this.startAfter)); - } - - if (this.delimiter != null) { - requestBuilder.delimiter(runContext.render(this.delimiter)); - } + .recursive(runContext.render(recursive).as(Boolean.class).orElseThrow()) + .maxKeys(runContext.render(this.maxKeys).as(Integer.class).orElseThrow()); - if (this.marker != null) { - requestBuilder.marker(runContext.render(this.marker)); - } - if (this.includeVersions != null) { - requestBuilder.includeVersions(this.includeVersions); - } + runContext.render(this.prefix).as(String.class).ifPresent(requestBuilder::prefix); + runContext.render(this.startAfter).as(String.class).ifPresent(requestBuilder::startAfter); + runContext.render(this.delimiter).as(String.class).ifPresent(requestBuilder::delimiter); + runContext.render(this.marker).as(String.class).ifPresent(requestBuilder::marker); + runContext.render(this.includeVersions).as(Boolean.class).ifPresent(requestBuilder::includeVersions); - String regExp = runContext.render(this.regexp); + String regExp = runContext.render(this.regexp).as(String.class).orElse(null); Iterable> response = client.listObjects(requestBuilder.build()); @@ -196,12 +159,13 @@ public Output run(RunContext runContext) throws Exception { spliterator.getExactSizeIfKnown(), bucket, regExp, - runContext.render(this.prefix) + runContext.render(this.prefix).as(String.class).orElse(null) ); + var filterValue = runContext.render(this.filter).as(Filter.class).orElseThrow(); java.util.List minioObjects = StreamSupport.stream(spliterator, false) .map(throwFunction(Result::get)) - .filter(item -> filter(item, regExp)) + .filter(item -> filter(item, regExp, filterValue)) .map(MinioObject::of) .toList(); @@ -212,7 +176,7 @@ public Output run(RunContext runContext) throws Exception { } } - private boolean filter(Item object, String regExp) { + private boolean filter(Item object, String regExp, Filter filter) { return (regExp == null || object.objectName().matches(regExp)) && (filter.equals(Filter.BOTH) || diff --git a/src/main/java/io/kestra/plugin/minio/MinioConnection.java b/src/main/java/io/kestra/plugin/minio/MinioConnection.java index c692e2c..b6fa1f9 100644 --- a/src/main/java/io/kestra/plugin/minio/MinioConnection.java +++ b/src/main/java/io/kestra/plugin/minio/MinioConnection.java @@ -1,5 +1,6 @@ package io.kestra.plugin.minio; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Task; import jakarta.annotation.Nullable; import lombok.EqualsAndHashCode; @@ -15,12 +16,12 @@ @NoArgsConstructor public abstract class MinioConnection extends Task implements MinioConnectionInterface { - protected String region; + protected Property region; - protected String accessKeyId; - protected String secretKeyId; + protected Property accessKeyId; + protected Property secretKeyId; - protected String endpoint; + protected Property endpoint; public record MinioClientConfig( @Nullable String accessKeyId, diff --git a/src/main/java/io/kestra/plugin/minio/MinioConnectionInterface.java b/src/main/java/io/kestra/plugin/minio/MinioConnectionInterface.java index 61e4b8a..ec2fe91 100644 --- a/src/main/java/io/kestra/plugin/minio/MinioConnectionInterface.java +++ b/src/main/java/io/kestra/plugin/minio/MinioConnectionInterface.java @@ -2,6 +2,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.swagger.v3.oas.annotations.media.Schema; @@ -10,33 +11,29 @@ public interface MinioConnectionInterface { @Schema( title = "URL to the MinIO endpoint." ) - @PluginProperty(dynamic = true) - String getEndpoint(); + Property getEndpoint(); @Schema( title = "Access Key Id for authentication." ) - @PluginProperty(dynamic = true) - String getAccessKeyId(); + Property getAccessKeyId(); @Schema( title = "Secret Key Id for authentication." ) - @PluginProperty(dynamic = true) - String getSecretKeyId(); + Property getSecretKeyId(); @Schema( title = "MinIO region with which the SDK should communicate." ) - @PluginProperty(dynamic = true) - String getRegion(); + Property getRegion(); default MinioConnection.MinioClientConfig minioClientConfig(final RunContext runContext) throws IllegalVariableEvaluationException { return new MinioConnection.MinioClientConfig( - runContext.render(this.getAccessKeyId()), - runContext.render(this.getSecretKeyId()), - runContext.render(this.getRegion()), - runContext.render(this.getEndpoint()) + runContext.render(this.getAccessKeyId()).as(String.class).orElse(null), + runContext.render(this.getSecretKeyId()).as(String.class).orElse(null), + runContext.render(this.getRegion()).as(String.class).orElse(null), + runContext.render(this.getEndpoint()).as(String.class).orElse(null) ); } diff --git a/src/main/java/io/kestra/plugin/minio/MinioService.java b/src/main/java/io/kestra/plugin/minio/MinioService.java index 505e1ed..69b6253 100644 --- a/src/main/java/io/kestra/plugin/minio/MinioService.java +++ b/src/main/java/io/kestra/plugin/minio/MinioService.java @@ -1,6 +1,7 @@ package io.kestra.plugin.minio; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.FileUtils; import io.kestra.plugin.minio.model.MinioObject; @@ -32,8 +33,8 @@ public static void performAction( .region(minioConnection.getRegion()) .accessKeyId(minioConnection.getAccessKeyId()) .secretKeyId(minioConnection.getSecretKeyId()) - .key(object.getKey()) - .bucket(bucket) + .key(Property.of(object.getKey())) + .bucket(Property.of(bucket)) .endpoint(minioConnection.getEndpoint()) .build(); delete.run(runContext); @@ -50,20 +51,20 @@ public static void performAction( .from( Copy.CopyObjectFrom .builder() - .bucket(bucket) - .key(object.getKey()) + .bucket(Property.of(bucket)) + .key(Property.of(object.getKey())) .build() ) .to(moveTo.toBuilder() - .key( + .key(Property.of( "%s/%s".formatted( - StringUtils.stripEnd(moveTo.getKey() + "/", "/"), + StringUtils.stripEnd(runContext.render(moveTo.getKey()).as(String.class).orElse(null) + "/", "/"), FilenameUtils.getName(object.getKey()) ) - ) + )) .build() ) - .delete(true) + .delete(Property.of(true)) .build(); copy.run(runContext); } diff --git a/src/main/java/io/kestra/plugin/minio/Trigger.java b/src/main/java/io/kestra/plugin/minio/Trigger.java index 6d11bd7..d370692 100644 --- a/src/main/java/io/kestra/plugin/minio/Trigger.java +++ b/src/main/java/io/kestra/plugin/minio/Trigger.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.property.Property; import io.kestra.core.models.triggers.*; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.Rethrow; @@ -39,7 +40,7 @@ code = """ id: minio_listen namespace: company.team - + tasks: - id: each type: io.kestra.plugin.core.flow.ForEach @@ -48,7 +49,7 @@ - id: return type: io.kestra.plugin.core.debug.Return format: "{{ taskrun.value }}" - + triggers: - id: watch type: io.kestra.plugin.minio.Trigger @@ -59,7 +60,7 @@ bucket: "my-bucket" prefix: "sub-dir" action: MOVE - moveTo: + moveTo: key: archive" """ ), @@ -69,7 +70,7 @@ code = """ id: minio_listen namespace: company.team - + tasks: - id: each type: io.kestra.plugin.core.flow.ForEach @@ -85,7 +86,7 @@ region: "eu-central-1" bucket: "my-bucket" key: "{{ taskrun.value }}" - + triggers: - id: watch type: io.kestra.plugin.minio.Trigger @@ -112,7 +113,7 @@ - id: return type: io.kestra.plugin.core.debug.Return format: "{{ taskrun.value }}" - + triggers: - id: watch type: io.kestra.plugin.minio.Trigger @@ -123,7 +124,7 @@ bucket: "kestra-test-bucket" prefix: "sub-dir" action: MOVE - moveTo: + moveTo: key: archive """ ) @@ -134,31 +135,31 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface, @Builder.Default private final Duration interval = Duration.ofSeconds(60); - protected String accessKeyId; + protected Property accessKeyId; - protected String secretKeyId; + protected Property secretKeyId; - protected String region; + protected Property region; - protected String endpoint; + protected Property endpoint; - protected String bucket; + protected Property bucket; - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); - protected String regexp; + protected Property regexp; @Builder.Default - protected final List.Filter filter = List.Filter.BOTH; + protected final Property filter = Property.of(List.Filter.BOTH); - private Downloads.Action action; + private Property action; private Copy.CopyObject moveTo; @@ -196,8 +197,8 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo MinioService.performAction( runContext, run.getObjects(), - this.action, - this.bucket, + runContext.render(this.action).as(Downloads.Action.class).orElseThrow(), + runContext.render(this.bucket).as(String.class).orElse(null), this, this.moveTo ); @@ -222,7 +223,7 @@ private Rethrow.FunctionChecked getMinioObj .accessKeyId(this.accessKeyId) .secretKeyId(this.secretKeyId) .bucket(this.bucket) - .key(object.getKey()) + .key(Property.of(object.getKey())) .build(); Download.Output downloadOutput = download.run(runContext); diff --git a/src/main/java/io/kestra/plugin/minio/Upload.java b/src/main/java/io/kestra/plugin/minio/Upload.java index 8d1dabf..f806308 100644 --- a/src/main/java/io/kestra/plugin/minio/Upload.java +++ b/src/main/java/io/kestra/plugin/minio/Upload.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; @@ -69,7 +70,7 @@ - id: http_download type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv\ - + - id: upload_to_storage type: io.kestra.plugin.minio.Upload accessKeyId: "" @@ -91,8 +92,7 @@ public class Upload extends AbstractMinioObject implements RunnableTask key; @Schema( title = "The file(s) to upload.", @@ -105,19 +105,17 @@ public class Upload extends AbstractMinioObject implements RunnableTask contentType; @Schema( title = "A map of metadata to store with the object." ) - @PluginProperty(dynamic = true) - private Map metadata; + private Property> metadata; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElse(null); + String key = runContext.render(this.key).as(String.class).orElse(null); String[] renderedFroms; if (this.from instanceof Collection fromURIs) { @@ -133,12 +131,13 @@ public Output run(RunContext runContext) throws Exception { .bucket(bucket) .object(key); - if (this.metadata != null) { - builder.userMetadata(runContext.renderMap(this.metadata)); + var metadataValue = runContext.render(this.metadata).asMap(String.class, String.class); + if (!metadataValue.isEmpty()) { + builder.userMetadata(metadataValue); } if (this.contentType != null) { - builder.contentType(runContext.render(this.contentType)); + builder.contentType(runContext.render(this.contentType).as(String.class).orElseThrow()); } for (String renderedFrom : renderedFroms) { diff --git a/src/test/java/io/kestra/plugin/minio/AbstractMinIoTest.java b/src/test/java/io/kestra/plugin/minio/AbstractMinIoTest.java index d0ebb58..6b35447 100644 --- a/src/test/java/io/kestra/plugin/minio/AbstractMinIoTest.java +++ b/src/test/java/io/kestra/plugin/minio/AbstractMinIoTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.minio; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; @@ -70,10 +71,10 @@ protected String createBucket(String bucket) throws Exception { CreateBucket createBucket = CreateBucket.builder() .id(AllTest.class.getSimpleName()) .type(CreateBucket.class.getName()) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .bucket(bucket) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .bucket(Property.of(bucket)) .build(); CreateBucket.Output createOutput = createBucket.run(runContext(createBucket)); @@ -101,12 +102,12 @@ protected String upload(String dir, String bucket) throws Exception { Upload upload = Upload.builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .bucket(bucket) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .bucket(Property.of(bucket)) .from(source.toString()) - .key(dir + "/" + out + ".yml") + .key(Property.of(dir + "/" + out + ".yml")) .build(); Upload.Output output = upload.run(runContext(upload)); @@ -118,11 +119,11 @@ protected String upload(String dir, String bucket) throws Exception { return List.builder() .id(ListTest.class.getSimpleName()) .type(List.class.getName()) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .bucket(this.BUCKET) - .includeVersions(true); + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .bucket(Property.of(this.BUCKET)) + .includeVersions(Property.of(true)); } protected RunContext runContext(Task task) { diff --git a/src/test/java/io/kestra/plugin/minio/AllTest.java b/src/test/java/io/kestra/plugin/minio/AllTest.java index ff216f0..6610612 100644 --- a/src/test/java/io/kestra/plugin/minio/AllTest.java +++ b/src/test/java/io/kestra/plugin/minio/AllTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.minio; import com.google.common.io.CharStreams; +import io.kestra.core.models.property.Property; import io.minio.errors.ErrorResponseException; import org.junit.jupiter.api.Test; @@ -26,11 +27,11 @@ void run() throws Exception { .builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .prefix("tasks/upload/") + .bucket(Property.of(this.BUCKET)) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .prefix(Property.of("tasks/upload/")) .build(); List.Output listOutput = list.run(runContext(list)); @@ -40,11 +41,11 @@ void run() throws Exception { .builder() .id(AllTest.class.getSimpleName()) .type(Download.class.getName()) - .bucket(this.BUCKET) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .key(key) + .bucket(Property.of(this.BUCKET)) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .key(Property.of(key)) .build(); Download.Output downloadOutput = download.run(runContext(download)); @@ -59,11 +60,11 @@ void run() throws Exception { .builder() .id(AllTest.class.getSimpleName()) .type(Delete.class.getName()) - .bucket(this.BUCKET) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .key(key) + .bucket(Property.of(this.BUCKET)) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .key(Property.of(key)) .build(); Delete.Output deleteOutput = delete.run(runContext(delete)); diff --git a/src/test/java/io/kestra/plugin/minio/CopyTest.java b/src/test/java/io/kestra/plugin/minio/CopyTest.java index aa88adb..59575b9 100644 --- a/src/test/java/io/kestra/plugin/minio/CopyTest.java +++ b/src/test/java/io/kestra/plugin/minio/CopyTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.minio; +import io.kestra.core.models.property.Property; import io.kestra.core.utils.IdUtils; import org.junit.jupiter.api.Test; @@ -17,34 +18,34 @@ void run(Boolean delete) throws Exception { Copy task = Copy.builder() .id(CopyTest.class.getSimpleName()) .type(List.class.getName()) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) .from( Copy.CopyObjectFrom .builder() - .bucket(this.BUCKET) - .key(upload) + .bucket(Property.of(this.BUCKET)) + .key(Property.of(upload)) .build() ) .to( Copy.CopyObject .builder() - .key(move) + .key(Property.of(move)) .build() ) - .delete(delete) + .delete(Property.of(delete)) .build(); Copy.Output copyOutput = task.run(runContext(task)); assertThat(copyOutput.getKey(), is(move)); - List list = list().prefix(move).build(); + List list = list().prefix(Property.of(move)).build(); List.Output listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(1)); - list = list().prefix(upload).build(); + list = list().prefix(Property.of(upload)).build(); listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(delete ? 0 : 1)); diff --git a/src/test/java/io/kestra/plugin/minio/DeleteListTest.java b/src/test/java/io/kestra/plugin/minio/DeleteListTest.java index 81e86f3..a67aa2a 100644 --- a/src/test/java/io/kestra/plugin/minio/DeleteListTest.java +++ b/src/test/java/io/kestra/plugin/minio/DeleteListTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.minio; +import io.kestra.core.models.property.Property; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; @@ -19,10 +20,10 @@ void run() throws Exception { DeleteList task = DeleteList.builder() .id(ListTest.class.getSimpleName()) .type(List.class.getName()) - .bucket(this.BUCKET) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) + .bucket(Property.of(this.BUCKET)) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) .concurrent(5) .build(); diff --git a/src/test/java/io/kestra/plugin/minio/DownloadsTest.java b/src/test/java/io/kestra/plugin/minio/DownloadsTest.java index 7aa8e58..84687bd 100644 --- a/src/test/java/io/kestra/plugin/minio/DownloadsTest.java +++ b/src/test/java/io/kestra/plugin/minio/DownloadsTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.minio; +import io.kestra.core.models.property.Property; import org.junit.jupiter.api.Test; import java.util.Map; @@ -20,11 +21,11 @@ void delete() throws Exception { Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) - .bucket(this.BUCKET) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .action(Downloads.Action.DELETE) + .bucket(Property.of(this.BUCKET)) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .action(Property.of(Downloads.Action.DELETE)) .build(); Downloads.Output run = task.run(runContext(task)); @@ -48,13 +49,13 @@ void move() throws Exception { Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) - .bucket("{{bucket}}") - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .action(Downloads.Action.MOVE) + .bucket(new Property<>("{{bucket}}")) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .action(Property.of(Downloads.Action.MOVE)) .moveTo(Copy.CopyObject.builder() - .key("/tasks/move") + .key(Property.of("/tasks/move")) .build() ) .build(); @@ -64,11 +65,13 @@ void move() throws Exception { assertThat(run.getObjects().size(), is(2)); assertThat(run.getOutputFiles().size(), is(2)); - List list = list().prefix("tasks/from").build(); + List list = list().prefix(Property.of("tasks/from")) + .build(); List.Output listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(0)); - list = list().prefix("tasks/move").build(); + list = list().prefix(Property.of("tasks/move")) + .build(); listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(2)); } diff --git a/src/test/java/io/kestra/plugin/minio/ListTest.java b/src/test/java/io/kestra/plugin/minio/ListTest.java index 5201d3c..c7d8278 100644 --- a/src/test/java/io/kestra/plugin/minio/ListTest.java +++ b/src/test/java/io/kestra/plugin/minio/ListTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.minio; +import io.kestra.core.models.property.Property; import io.kestra.core.utils.IdUtils; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Test; @@ -26,26 +27,26 @@ void run() throws Exception { assertThat(output.getObjects().size(), is(6)); task = list() - .filter(List.Filter.FILES) - .prefix("tasks/"+dir+"/") + .filter(Property.of(List.Filter.FILES)) + .prefix(Property.of("tasks/"+dir+"/")) .build(); output = task.run(runContext(task)); assertThat(output.getObjects().size(), is(6)); task = list() - .filter(List.Filter.FILES) + .filter(Property.of(List.Filter.FILES)) .build(); output = task.run(runContext(task)); assertThat(output.getObjects().size(), is(6)); task = list() - .prefix("tasks/%s/sub".formatted(dir)) + .prefix(Property.of("tasks/%s/sub".formatted(dir))) .build(); output = task.run(runContext(task)); assertThat(output.getObjects().size(), is(1)); task = list() - .regexp("tasks/.*/" + StringUtils.substringAfterLast(lastFileName, "/")) + .regexp(Property.of("tasks/.*/" + StringUtils.substringAfterLast(lastFileName, "/"))) .build(); output = task.run(runContext(task)); assertThat(output.getObjects().size(), is(1)); diff --git a/src/test/java/io/kestra/plugin/minio/TriggerTest.java b/src/test/java/io/kestra/plugin/minio/TriggerTest.java index d351112..ce8ec0d 100644 --- a/src/test/java/io/kestra/plugin/minio/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/minio/TriggerTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.minio; import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.property.Property; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; @@ -44,7 +45,7 @@ public class TriggerTest extends AbstractMinIoTest { void deleteAction() throws Exception { String bucket = "trigger-test"; this.createBucket(bucket); - List listTask = list().bucket(bucket).build(); + List listTask = list().bucket(Property.of(bucket)).build(); // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); diff --git a/src/test/java/io/kestra/plugin/minio/UploadsTest.java b/src/test/java/io/kestra/plugin/minio/UploadsTest.java index c2098bb..68431e0 100644 --- a/src/test/java/io/kestra/plugin/minio/UploadsTest.java +++ b/src/test/java/io/kestra/plugin/minio/UploadsTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.minio; +import io.kestra.core.models.property.Property; import io.kestra.core.utils.IdUtils; import org.junit.jupiter.api.Test; @@ -23,24 +24,24 @@ void run() throws Exception { .builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) + .bucket(Property.of(this.BUCKET)) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) .from(java.util.List.of(source1.toString(), source2.toString(), source3.toString(), source4.toString())) - .key(IdUtils.create() + "/") + .key(Property.of(IdUtils.create() + "/")) .build(); - upload.run(runContext(upload)); + var result = upload.run(runContext(upload)); List list = List .builder() .id(AllTest.class.getSimpleName()) .type(List.class.getName()) - .bucket(this.BUCKET) - .endpoint(minIOContainer.getS3URL()) - .accessKeyId(minIOContainer.getUserName()) - .secretKeyId(minIOContainer.getPassword()) - .prefix(upload.getKey()) + .bucket(Property.of(this.BUCKET)) + .endpoint(Property.of(minIOContainer.getS3URL())) + .accessKeyId(Property.of(minIOContainer.getUserName())) + .secretKeyId(Property.of(minIOContainer.getPassword())) + .prefix(Property.of(result.getKey())) .build(); List.Output output = list.run(runContext(list));