generated from dataforgoodfr/d4g-project-template
-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathstorage_client.py
67 lines (54 loc) · 2.24 KB
/
storage_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
import boto3
from botocore.client import Config
import pandas as pd
import io
"""Client class to interact with Scaleway Object Storage."""
class ObjectStorageClient:
region_name = "fr-par"
endpoint_url = "https://s3.fr-par.scw.cloud"
bucket_name = "pollution-eau-s3"
def __init__(self):
# Need to use V2 signature for upload and V4 for download
self.client_v2 = self.build_client("s3")
self.client_v4 = self.build_client("s3v4")
@staticmethod
def build_client(signature_version: str = "s3v4"):
return boto3.session.Session().client(
service_name="s3",
config=Config(signature_version=signature_version),
region_name=ObjectStorageClient.region_name,
use_ssl=True,
endpoint_url=ObjectStorageClient.endpoint_url,
aws_access_key_id=os.getenv("SCW_ACCESS_KEY"),
aws_secret_access_key=os.getenv("SCW_SECRET_KEY"),
)
# def list_buckets(self):
# response = self.client_v4.list_buckets()
# return response['Buckets']
def list_objects(self):
response = self.client_v4.list_objects(Bucket=self.bucket_name)
if "Contents" in response:
return response["Contents"]
else:
return []
def download_object(self, file_key, local_path):
self.client_v4.download_file(self.bucket_name, file_key, local_path)
def upload_object(self, local_path, file_key=None):
if file_key is None:
file_key = os.path.basename(local_path)
self.client_v2.upload_file(local_path, self.bucket_name, file_key)
def upload_dataframe(self, df, file_key):
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# Upload the buffer to S3
self.client_v2.put_object(
Bucket=self.bucket_name, Key=file_key, Body=csv_buffer.getvalue()
)
def read_object_as_dataframe(self, file_key):
response = self.client_v4.get_object(Bucket=self.bucket_name, Key=file_key)
csv_data = response["Body"].read().decode("utf-8")
df = pd.read_csv(io.StringIO(csv_data))
return df
def delete_object(self, key):
self.client_v4.delete_object(Bucket=self.bucket_name, Key=key)