-
Notifications
You must be signed in to change notification settings - Fork 0
/
class_storage.py
143 lines (110 loc) · 4.37 KB
/
class_storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from fileinput import filename
import azure.storage.blob as blob
import azure.cosmos as cosmos
import json
class StorageBlob:
def __init__(self, url, key, container, filename):
self.container = container
self.filename = filename
self.storage_client = blob.BlobServiceClient(url, key)
self.blob_client = self.storage_client.get_blob_client(container, filename)
def _getBlobClient(self, filename=None, container=None):
## check to see if parameters have been passed
## otherwise, use defaults
if container is None:
container = self.container
if filename is None:
filename = self.filename
## use appropriate blob_client
if (container == self.container) and (filename == self.filename):
blob_client = self.blob_client
else:
blob_client = self.storage_client.get_blob_client(container, filename)
return blob_client
@classmethod
def fromDict(cls, keys):
url = keys["blob_url"]
key = keys["blob_key"]
container = keys["blob_container"]
filename = keys["blob_filename"]
return cls(url, key, container, filename)
def download(self, filename=None, container=None):
blob_client = self._getBlobClient(filename, container)
## read data
try:
return blob_client.download_blob().content_as_text()
except:
return ""
def upload(self, data, filename=None, container=None):
blob_client = self._getBlobClient(filename, container)
blob_client.upload_blob(data)
class StorageCosmos:
def __init__(self, url, key, database, container):
self.database = database
self.container = container
self.storage_client = cosmos.CosmosClient(url, key)
self.database_client = self.storage_client.get_database_client(database)
self.container_client = self.database_client.get_container_client(container)
def _getContainerClient(self, database=None, container=None):
## check to see if parameters have been passed
## otherwise, use defaults
if database is None:
database_client = self.database_client
else:
database_client = self.storage_client.get_database_client(database)
if container is None:
container_client = self.container_client
else:
container_client = database_client.get_container_client(container)
return container_client
@staticmethod
def _dataToStr(data):
return "\n".join([json.dumps(d)["id"] for d in data])
@staticmethod
def _strToData(x):
pass
@classmethod
def fromDict(cls, keys):
url = keys["cosmos_url"]
key = keys["cosmos_key"]
database = keys["cosmos_database"]
container = keys["cosmos_container"]
return cls(url, key, database, container)
def download(self, database=None, container=None):
container_client = self._getContainerClient(database, container)
try:
items = container_client.query_items("select id from mycontainer")
return self._dataToStr(items)
except:
return ""
def upload(self, data, filename=None, container=None):
if container is None:
container = self.container
if filename is None:
filename = self.filename
bc = self.storage_client.get_blob_client(container, filename)
bc.upload_blob(data)
class StorageFile:
def __init__(self, filename):
self.filename = filename
@classmethod
def fromDict(cls, keys):
filename = keys["filename"]
return cls(filename)
def download(self, filename=None, container=None):
if container is None:
container = self.container
if filename is None:
filename = self.filename
bc = self.storage_client.get_blob_client(container, filename)
try:
return bc.download_blob().content_as_text()
except:
return ""
def upload(self, data, filename=None, container=None):
if container is None:
container = self.container
if filename is None:
filename = self.filename
bc = self.storage_client.get_blob_client(container, filename)
bc.upload_blob(data)