Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add oss storage type in blob sink #58

Closed
wants to merge 53 commits into from
Closed
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
2a83f52
feat: add oss storage type in blob sink
sumitaich1998 Dec 3, 2024
eb40d6e
feat: add oss storage type in blob sink
sumitaich1998 Dec 3, 2024
24ee818
feat: add oss storage type in blob sink
sumitaich1998 Dec 3, 2024
d918738
feat: add oss storage type in blob sink
sumitaich1998 Dec 3, 2024
d44d1fd
feat: add oss storage type in blob sink
sumitaich1998 Dec 3, 2024
afbbf43
feat: add oss storage type in blob sink
sumitaich1998 Dec 3, 2024
43dfbbc
feat: add oss storage type in blob sink
sumitaich1998 Dec 4, 2024
39ebd15
test: add unit tests for OSS blob sink
sumitaich1998 Dec 4, 2024
6287158
docs: add docs for oss blob sink
sumitaich1998 Dec 4, 2024
491924c
tests: add unit tests for BlobStorageFactory
sumitaich1998 Dec 4, 2024
0ec87b4
tests: add unit tests for BlobSinkFactory
sumitaich1998 Dec 4, 2024
352ac03
chore: add dependency for ali oss client
sumitaich1998 Dec 4, 2024
0cc819d
tests: add unit tests for BlobStorageFactory
sumitaich1998 Dec 4, 2024
7d47308
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
f2d6309
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
5fc7516
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
f395ffb
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
393e5a5
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
6e1ce01
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
e7218e2
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
19c3000
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
84cb363
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
000be33
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
2e7dd18
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
25b1f17
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
d5ff7d0
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
eebb607
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
78b5840
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
fa1ca1d
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
732528a
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
3cc3450
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
d5af54f
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
3f08be8
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
47acbc6
refactor: rename OSS class to ObjectStorageService
sumitaich1998 Dec 5, 2024
b956912
fix: fix checkstyle
sumitaich1998 Dec 5, 2024
3531934
test: add unit tests for ObjectStorageService
sumitaich1998 Dec 5, 2024
79aa530
test: add unit tests for ObjectStorageService
sumitaich1998 Dec 5, 2024
3f30c22
test: add unit tests for ObjectStorageService
sumitaich1998 Dec 5, 2024
a49a112
test: add unit tests for OSSConfig
sumitaich1998 Dec 5, 2024
dcca9dd
test: add unit tests for ObjectStorageService
sumitaich1998 Dec 5, 2024
5d3595e
tests: add unit tests for BlobSinkFactory
sumitaich1998 Dec 5, 2024
47a6389
tests: add unit tests for BlobStorageFactory
sumitaich1998 Dec 5, 2024
f2c894e
fix: fix build
sumitaich1998 Dec 5, 2024
03e5818
fix: fix checkstyle
sumitaich1998 Dec 5, 2024
5824c4f
fix: fix checkstyle
sumitaich1998 Dec 5, 2024
4d87820
fix: remove unused imports
sumitaich1998 Dec 5, 2024
b1eb591
fix: fix failing unit tests
sumitaich1998 Dec 5, 2024
b4c4594
fix: fix failing unit tests
sumitaich1998 Dec 5, 2024
a32a7ac
fix: fix failing unit tests
sumitaich1998 Dec 5, 2024
c4e5e8f
fix: fix failing unit tests
sumitaich1998 Dec 5, 2024
9025990
chore: version bump to 0.10.9
sumitaich1998 Dec 5, 2024
9f2c5af
Merge branch 'main' into oss-sink
sumitaich1998 Dec 11, 2024
f112dd4
refactor: merge back from master
sumitaich1998 Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.7'
version '0.10.9'

def projName = "firehose"

Expand Down Expand Up @@ -103,6 +103,7 @@ dependencies {
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
implementation 'dev.cel:cel:0.5.2'
implementation group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1'

testImplementation group: 'junit', name: 'junit', version: '4.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
69 changes: 69 additions & 0 deletions docs/docs/sinks/blob-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,72 @@ The amount of time to allow the client to complete the execution of an API call.
- Example value: `40000`
- Type: `optional`
- Default value : `40000`

### OSS Configuration

When using OSS as the blob storage type (`SINK_BLOB_STORAGE_TYPE=OSS`), the following configurations are required:

### `SINK_BLOB_OSS_ENDPOINT`

The endpoint of your OSS bucket.

- Example value: `oss-cn-hangzhou.aliyuncs.com`
- Type: `required`

### `SINK_BLOB_OSS_ACCESS_KEY_ID`

The access key ID for OSS authentication.

- Type: `required`

### `SINK_BLOB_OSS_ACCESS_KEY_SECRET`

The access key secret for OSS authentication.

- Type: `required`

### `SINK_BLOB_OSS_BUCKET_NAME`

The name of the OSS bucket.

- Type: `required`

### `SINK_BLOB_OSS_DIRECTORY_PREFIX`

The directory prefix in the OSS bucket where files will be uploaded.

- Example value: `data/raw`
- Type: `optional`
- Default value: ``

### `SINK_BLOB_OSS_MAX_CONNECTIONS`

Maximum number of concurrent connections to OSS.

- Example value: `1024`
- Type: `optional`
- Default value: `1024`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this default value in line with Ali documentation and/or recommendation?


### `SINK_BLOB_OSS_SOCKET_TIMEOUT`

Socket timeout in milliseconds.

- Example value: `50000`
- Type: `optional`
- Default value: `50000`

### `SINK_BLOB_OSS_CONNECTION_TIMEOUT`

Connection timeout in milliseconds.

- Example value: `50000`
- Type: `optional`
- Default value: `50000`

### `SINK_BLOB_OSS_MAX_ERROR_RETRY`

Maximum number of retry attempts for failed operations.

- Example value: `3`
- Type: `optional`
- Default value: `3`
37 changes: 37 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/OSSConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.gotocompany.firehose.config;

import org.aeonbits.owner.Config;

public interface OSSConfig extends Config {

@Key("${OSS_TYPE}_OSS_ENDPOINT")
String getOSSEndpoint();

@Key("${OSS_TYPE}_OSS_ACCESS_KEY_ID")
String getOSSAccessKeyId();

@Key("${OSS_TYPE}_OSS_ACCESS_KEY_SECRET")
String getOSSAccessKeySecret();

@Key("${OSS_TYPE}_OSS_BUCKET_NAME")
String getOSSBucketName();

@Key("${OSS_TYPE}_OSS_DIRECTORY_PREFIX")
String getOSSDirectoryPrefix();

@Key("${OSS_TYPE}_OSS_MAX_CONNECTIONS")
@DefaultValue("1024")
Integer getOSSMaxConnections();

@Key("${OSS_TYPE}_OSS_SOCKET_TIMEOUT")
@DefaultValue("50000")
Integer getOSSSocketTimeout();

@Key("${OSS_TYPE}_OSS_CONNECTION_TIMEOUT")
@DefaultValue("50000")
Integer getOSSConnectionTimeout();

@Key("${OSS_TYPE}_OSS_MAX_ERROR_RETRY")
@DefaultValue("3")
Integer getOSSMaxErrorRetry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ public static BlobStorage createSinkObjectStorage(BlobSinkConfig sinkConfig, Map
case S3:
configuration.put("S3_TYPE", "SINK_BLOB");
break;
case OSS:
configuration.put("OSS_TYPE", "SINK_BLOB");
break;
default:
throw new IllegalArgumentException("Sink Blob Storage type " + sinkConfig.getBlobStorageType() + "is not supported");
}
return BlobStorageFactory.createObjectStorage(sinkConfig.getBlobStorageType(), configuration);

}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gotocompany.firehose.sink.common.blobstorage;

import com.gotocompany.firehose.config.GCSConfig;
import com.gotocompany.firehose.config.OSSConfig;
import com.gotocompany.firehose.config.S3Config;
import com.gotocompany.firehose.sink.common.blobstorage.gcs.GoogleCloudStorage;
import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService;
import com.gotocompany.firehose.sink.common.blobstorage.s3.S3;
import org.aeonbits.owner.ConfigFactory;

Expand All @@ -24,10 +26,16 @@ public static BlobStorage createObjectStorage(BlobStorageType storageType, Map<S
try {
S3Config s3Config = ConfigFactory.create(S3Config.class, config);
return new S3(s3Config);
} catch (Exception e) {
} catch (Exception e) {
throw new IllegalArgumentException("Exception while creating S3 Storage", e);
}

case OSS:
try {
OSSConfig ossConfig = ConfigFactory.create(OSSConfig.class, config);
return new ObjectStorageService(ossConfig);
} catch (Exception e) {
throw new IllegalArgumentException("Exception while creating OSS Storage", e);
}
default:
throw new IllegalArgumentException("Blob Storage Type " + storageType + " is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum BlobStorageType {
GCS,
S3
S3,
OSS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.gotocompany.firehose.sink.common.blobstorage.oss;

import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.ObjectMetadata;
import com.gotocompany.firehose.config.OSSConfig;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.nio.file.Paths;

public class ObjectStorageService implements BlobStorage {
private final OSS ossClient;
private final String bucketName;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the OSSConfig as property? This approach encapsulates all related configuration settings within a single object, making the code cleaner and easier to maintain.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would also recommend this

private final String directoryPrefix;

public ObjectStorageService(OSSConfig config) {
this.ossClient = createOSSClient(
config.getOSSEndpoint(),
config.getOSSAccessKeyId(),
config.getOSSAccessKeySecret()
);
this.bucketName = config.getOSSBucketName();
this.directoryPrefix = config.getOSSDirectoryPrefix();
}

protected OSS createOSSClient(String endpoint, String accessKeyId, String accessKeySecret) {
return new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce the option to pass configurable socketTimeout, connectionTimeout, connectionRequestTimeout and requestTimeout? There is also an option to pass configurable retry strategy for the OSSClient

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are exposing connection parameters in config, the builder should accommodate the values

}

@Override
public void store(String objectName, String localPath) throws BlobStorageException {
try {
String fullPath = Paths.get(directoryPrefix, objectName).toString();
ossClient.putObject(bucketName, fullPath, new File(localPath));
} catch (OSSException e) {
throw new BlobStorageException(e.getErrorCode(), "OSS Upload failed", e);
}
}

@Override
public void store(String objectName, byte[] content) throws BlobStorageException {
try {
String fullPath = Paths.get(directoryPrefix, objectName).toString();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(content.length);
ossClient.putObject(bucketName, fullPath, new ByteArrayInputStream(content), metadata);
} catch (OSSException e) {
throw new BlobStorageException(e.getErrorCode(), "OSS Upload failed", e);
}
}
}
Loading
Loading