Skip to content

Commit

Permalink
feat: add oss storage type in blob sink (#60)
Browse files Browse the repository at this point in the history
* feat: add oss storage type in blob sink

* feat: add oss storage type in blob sink

* docs: add docs for oss blob sink

* docs: add docs for oss blob sink

* test: add unit tests for ObjectStorageServiceConfig

* test: add unit tests for BlobStorageFactory

* test: add unit tests for ObjectStorageService

* tests: add unit tests for BlobStorageFactory

* fix: fix checkstyle

* chore: version bump to 0.11.2

* chore: refactor direct config reference to class props

---------

Co-authored-by: Eka Winata <[email protected]>
  • Loading branch information
sumitaich1998 and ekawinataa authored Dec 13, 2024
1 parent b874b89 commit ca003d0
Show file tree
Hide file tree
Showing 7 changed files with 816 additions and 25 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.11.1'
version '0.11.2'

def projName = "firehose"

Expand Down
95 changes: 93 additions & 2 deletions docs/docs/sinks/blob-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ A Blob sink Firehose \(`SINK_TYPE`=`blob`\) requires the following variables to

### `SINK_BLOB_STORAGE_TYPE`

Defines the types of blob storage the destination remote file system the file will be uploaded. Currently, the only supported blob storages are `GCS` (google cloud storage) and `S3` (Amazon S3) .
Defines the types of blob storage the destination remote file system the file will be uploaded. Currently, the only supported blob storages are `GCS` (google cloud storage), `OSS`( Alibaba Cloud Object Storage Service) and `S3` (Amazon S3) .

- Example value: `GCS` or `S3`
- Example value: `GCS`, `OSS` or `S3`
- Type: `required`

### `SINK_BLOB_LOCAL_FILE_WRITER_TYPE`
Expand Down Expand Up @@ -262,3 +262,94 @@ The amount of time to allow the client to complete the execution of an API call.
- Example value: `40000`
- Type: `optional`
- Default value : `40000`

### `SINK_BLOB_OSS_ENDPOINT`

The endpoint of the OSS service. Each region has its own endpoint. For example, the endpoint for the China (Hangzhou) region is "oss-cn-hangzhou.aliyuncs.com". For more information, please refer to the [OSS endpoints documentation](https://www.alibabacloud.com/help/en/oss/user-guide/regions-and-endpoints).

- Example value: `oss-cn-hangzhou.aliyuncs.com`
- Type: `required`

### `SINK_BLOB_OSS_REGION`

The region where your OSS bucket is located. This should match the region in your endpoint. For example, if your endpoint is "oss-cn-hangzhou.aliyuncs.com", the region should be "cn-hangzhou".

- Example value: `cn-hangzhou`
- Type: `required`

### `SINK_BLOB_OSS_ACCESS_ID`

The AccessKey ID provided by Alibaba Cloud. This is used for authentication when accessing OSS. You can obtain this from the Alibaba Cloud console under AccessKey management. For security best practices, it's recommended to use RAM user AccessKeys instead of the primary account AccessKey.

- Example value: `LTAI4FxxxxxxxxxxxxxxxT7PD`
- Type: `required`

### `SINK_BLOB_OSS_ACCESS_KEY`

The AccessKey Secret provided by Alibaba Cloud. This is paired with the Access ID for authentication. Keep this secret secure and never share it publicly.

- Example value: `D6TxxxxxxxxxxxxxxxxxxxxxxxtRcO`
- Type: `required`

### `SINK_BLOB_OSS_BUCKET_NAME`

The name of your OSS bucket. Bucket names must be globally unique across all Alibaba Cloud OSS. The name must comply with OSS naming conventions: lowercase letters, numbers, and hyphens only, 3-63 characters long, must start and end with lowercase letter or number.

- Example value: `my-firehose-bucket`
- Type: `required`

### `SINK_BLOB_OSS_DIRECTORY_PREFIX`

The prefix path where objects will be stored in the bucket. This allows you to organize objects in a folder-like structure within your bucket.

- Example value: `data/firehose/`
- Type: `optional`
- Default value: `` (empty string, root of bucket)

### `SINK_BLOB_OSS_SOCKET_TIMEOUT_MS`

The socket timeout in milliseconds. This is the maximum time to wait for data to be transferred over an established connection before giving up.

- Example value: `50000`
- Type: `optional`
- Default value: `50000`

### `SINK_BLOB_OSS_CONNECTION_TIMEOUT_MS`

The connection timeout in milliseconds. This is the maximum time to wait while establishing a connection with OSS.

- Example value: `50000`
- Type: `optional`
- Default value: `50000`

### `SINK_BLOB_OSS_CONNECTION_REQUEST_TIMEOUT_MS`

The timeout in milliseconds for requesting a connection from the connection manager. A value of -1 means no timeout.

- Example value: `10000`
- Type: `optional`
- Default value: `-1`

### `SINK_BLOB_OSS_REQUEST_TIMEOUT_MS`

The maximum time allowed for the entire request operation (connection, upload, server processing, etc.).

- Example value: `300000`
- Type: `optional`
- Default value: `300000`

### `SINK_BLOB_OSS_RETRY_ENABLED`

Whether to enable automatic retry of failed requests. When enabled, failed requests will be retried according to the retry configuration.

- Example value: `true`
- Type: `optional`
- Default value: `true`

### `SINK_BLOB_OSS_MAX_RETRY_ATTEMPTS`

The maximum number of retry attempts for failed requests when retry is enabled.

- Example value: `3`
- Type: `optional`
- Default value: `3`
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ private static Descriptors.Descriptor getMetadataMessageDescriptor(BlobSinkConfi
return sinkConfig.getOutputKafkaMetadataColumnName().isEmpty()
? fileDescriptor.findMessageTypeByName(KafkaMetadataProtoMessage.getTypeName())
: fileDescriptor.findMessageTypeByName(NestedKafkaMetadataProtoMessage.getTypeName());

}

private static LocalStorage getLocalFileWriterWrapper(BlobSinkConfig sinkConfig, StencilClient stencilClient, StatsDReporter statsDReporter) {
Expand All @@ -71,10 +70,12 @@ public static BlobStorage createSinkObjectStorage(BlobSinkConfig sinkConfig, Map
case S3:
configuration.put("S3_TYPE", "SINK_BLOB");
break;
case OSS:
configuration.put("OSS_TYPE", "SINK_BLOB");
break;
default:
throw new IllegalArgumentException("Sink Blob Storage type " + sinkConfig.getBlobStorageType() + "is not supported");
}
return BlobStorageFactory.createObjectStorage(sinkConfig.getBlobStorageType(), configuration);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
public class ObjectStorageService implements BlobStorage {

private final OSS oss;
private final ObjectStorageServiceConfig objectStorageServiceConfig;
private final String ossBucketName;
private final String ossDirectoryPrefix;

public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig) {
this(objectStorageServiceConfig, initializeOss(objectStorageServiceConfig));
}

public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig, OSS oss) {
this.oss = oss;
this.objectStorageServiceConfig = objectStorageServiceConfig;
this.ossBucketName = objectStorageServiceConfig.getOssBucketName();
this.ossDirectoryPrefix = objectStorageServiceConfig.getOssDirectoryPrefix();
checkBucket();
}

Expand Down Expand Up @@ -60,7 +62,7 @@ protected static OSS initializeOss(ObjectStorageServiceConfig objectStorageServi
@Override
public void store(String objectName, String filePath) throws BlobStorageException {
PutObjectRequest putObjectRequest = new PutObjectRequest(
objectStorageServiceConfig.getOssBucketName(),
ossBucketName,
buildObjectPath(objectName),
new File(filePath)
);
Expand All @@ -70,7 +72,7 @@ public void store(String objectName, String filePath) throws BlobStorageExceptio
@Override
public void store(String objectName, byte[] content) throws BlobStorageException {
PutObjectRequest putObjectRequest = new PutObjectRequest(
objectStorageServiceConfig.getOssBucketName(),
ossBucketName,
buildObjectPath(objectName),
new ByteArrayInputStream(content)
);
Expand All @@ -90,17 +92,17 @@ private void putObject(PutObjectRequest putObjectRequest) throws BlobStorageExce
}

private String buildObjectPath(String objectName) {
return Optional.ofNullable(objectStorageServiceConfig.getOssDirectoryPrefix())
return Optional.ofNullable(ossDirectoryPrefix)
.map(prefix -> prefix + "/" + objectName)
.orElse(objectName);
}

private void checkBucket() {
BucketList bucketList = oss.listBuckets(new ListBucketsRequest(objectStorageServiceConfig.getOssBucketName(),
BucketList bucketList = oss.listBuckets(new ListBucketsRequest(ossBucketName,
null, 1));
if (bucketList.getBucketList().isEmpty()) {
log.error("Bucket does not exist:{}", objectStorageServiceConfig.getOssBucketName());
log.error("Please create OSS bucket before running firehose: {}", objectStorageServiceConfig.getOssBucketName());
log.error("Bucket does not exist:{}", ossBucketName);
log.error("Please create OSS bucket before running firehose: {}", ossBucketName);
throw new IllegalArgumentException("Bucket does not exist");
}
}
Expand Down
Loading

0 comments on commit ca003d0

Please sign in to comment.