Skip to content

Commit 2418cdf

Browse files
authored
Merge pull request #3290 from bcamel/issue-2965-update-azure-storage
Updated luigi.contrib.azureblob to 12.x.y series of azure.storage.blob
2 parents 3cafd15 + e25142e commit 2418cdf

File tree

5 files changed

+102
-47
lines changed

5 files changed

+102
-47
lines changed

luigi/contrib/azureblob.py

+76-33
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import logging
2121
import datetime
2222

23-
from azure.storage.blob import blockblobservice
23+
from azure.storage.blob import BlobServiceClient
2424

2525
from luigi.format import get_default_format
2626
from luigi.target import FileAlreadyExists, FileSystem, AtomicLocalFile, FileSystemTarget
@@ -62,60 +62,101 @@ def __init__(self, account_name=None, account_key=None, sas_token=None, **kwargs
6262
* `custom_domain` - The custom domain to use. This can be set in the Azure Portal. For example, ‘www.mydomain.com’.
6363
* `token_credential` - A token credential used to authenticate HTTPS requests. The token value should be updated before its expiration.
6464
"""
65-
self.options = {"account_name": account_name, "account_key": account_key, "sas_token": sas_token}
65+
if kwargs.get("custom_domain"):
66+
account_url = "{protocol}://{custom_domain}/{account_name}".format(protocol=kwargs.get("protocol", "https"),
67+
custom_domain=kwargs.get("custom_domain"),
68+
account_name=account_name)
69+
else:
70+
account_url = "{protocol}://{account_name}.blob.{endpoint_suffix}".format(protocol=kwargs.get("protocol",
71+
"https"),
72+
account_name=account_name,
73+
endpoint_suffix=kwargs.get(
74+
"endpoint_suffix",
75+
"core.windows.net"))
76+
77+
self.options = {
78+
"account_name": account_name,
79+
"account_key": account_key,
80+
"account_url": account_url,
81+
"sas_token": sas_token}
6682
self.kwargs = kwargs
6783

6884
@property
6985
def connection(self):
70-
return blockblobservice.BlockBlobService(account_name=self.options.get("account_name"),
71-
account_key=self.options.get("account_key"),
72-
sas_token=self.options.get("sas_token"),
73-
protocol=self.kwargs.get("protocol"),
74-
connection_string=self.kwargs.get("connection_string"),
75-
endpoint_suffix=self.kwargs.get("endpoint_suffix"),
76-
custom_domain=self.kwargs.get("custom_domain"),
77-
is_emulated=self.kwargs.get("is_emulated") or False)
86+
if self.kwargs.get("connection_string"):
87+
return BlobServiceClient.from_connection_string(conn_str=self.kwargs.get("connection_string"),
88+
**self.kwargs)
89+
else:
90+
return BlobServiceClient(account_url=self.options.get("account_url"),
91+
credential=self.options.get("account_key") or self.options.get("sas_token"),
92+
**self.kwargs)
93+
94+
def container_client(self, container_name):
95+
return self.connection.get_container_client(container_name)
96+
97+
def blob_client(self, container_name, blob_name):
98+
container_client = self.container_client(container_name)
99+
return container_client.get_blob_client(blob_name)
78100

79101
def upload(self, tmp_path, container, blob, **kwargs):
80102
logging.debug("Uploading file '{tmp_path}' to container '{container}' and blob '{blob}'".format(
81103
tmp_path=tmp_path, container=container, blob=blob))
82104
self.create_container(container)
83-
lease_id = self.connection.acquire_blob_lease(container, blob)\
84-
if self.exists("{container}/{blob}".format(container=container, blob=blob)) else None
105+
lease = None
106+
blob_client = self.blob_client(container, blob)
107+
if blob_client.exists():
108+
lease = blob_client.acquire_lease()
85109
try:
86-
self.connection.create_blob_from_path(container, blob, tmp_path, lease_id=lease_id, progress_callback=kwargs.get("progress_callback"))
110+
with open(tmp_path, 'rb') as data:
111+
blob_client.upload_blob(data,
112+
overwrite=True,
113+
lease=lease,
114+
progress_hook=kwargs.get("progress_callback"))
87115
finally:
88-
if lease_id is not None:
89-
self.connection.release_blob_lease(container, blob, lease_id)
116+
if lease is not None:
117+
lease.release()
90118

91119
def download_as_bytes(self, container, blob, bytes_to_read=None):
92-
start_range, end_range = (0, bytes_to_read-1) if bytes_to_read is not None else (None, None)
93120
logging.debug("Downloading from container '{container}' and blob '{blob}' as bytes".format(
94121
container=container, blob=blob))
95-
return self.connection.get_blob_to_bytes(container, blob, start_range=start_range, end_range=end_range).content
122+
blob_client = self.blob_client(container, blob)
123+
download_stream = blob_client.download_blob(offset=0, length=bytes_to_read) if bytes_to_read \
124+
else blob_client.download_blob()
125+
return download_stream.readall()
96126

97127
def download_as_file(self, container, blob, location):
98128
logging.debug("Downloading from container '{container}' and blob '{blob}' to {location}".format(
99129
container=container, blob=blob, location=location))
100-
return self.connection.get_blob_to_path(container, blob, location)
130+
blob_client = self.blob_client(container, blob)
131+
with open(location, 'wb') as file:
132+
download_stream = blob_client.download_blob()
133+
file.write(download_stream.readall())
134+
return blob_client.get_blob_properties()
101135

102136
def create_container(self, container_name):
103-
return self.connection.create_container(container_name)
137+
if not self.exists(container_name):
138+
return self.connection.create_container(container_name)
104139

105140
def delete_container(self, container_name):
106-
lease_id = self.connection.acquire_container_lease(container_name)
107-
self.connection.delete_container(container_name, lease_id=lease_id)
141+
container_client = self.container_client(container_name)
142+
lease = container_client.acquire_lease()
143+
container_client.delete_container(lease=lease)
108144

109145
def exists(self, path):
110146
container, blob = self.splitfilepath(path)
111-
return self.connection.exists(container, blob)
147+
if blob is None:
148+
return self.container_client(container).exists()
149+
else:
150+
return self.blob_client(container, blob).exists()
112151

113152
def remove(self, path, recursive=True, skip_trash=True):
114-
container, blob = self.splitfilepath(path)
115153
if not self.exists(path):
116154
return False
117-
lease_id = self.connection.acquire_blob_lease(container, blob)
118-
self.connection.delete_blob(container, blob, lease_id=lease_id)
155+
156+
container, blob = self.splitfilepath(path)
157+
blob_client = self.blob_client(container, blob)
158+
lease = blob_client.acquire_lease()
159+
blob_client.delete_blob(lease=lease)
119160
return True
120161

121162
def mkdir(self, path, parents=True, raise_if_exists=False):
@@ -148,16 +189,18 @@ def copy(self, path, dest):
148189
source_container=source_container, dest_container=dest_container
149190
))
150191

151-
source_lease_id = self.connection.acquire_blob_lease(source_container, source_blob)
152-
destination_lease_id = self.connection.acquire_blob_lease(dest_container, dest_blob) if self.exists(dest) else None
192+
source_blob_client = self.blob_client(source_container, source_blob)
193+
dest_blob_client = self.blob_client(dest_container, dest_blob)
194+
source_lease = source_blob_client.acquire_lease()
195+
destination_lease = dest_blob_client.acquire_lease() if self.exists(dest) else None
153196
try:
154-
return self.connection.copy_blob(source_container, dest_blob, self.connection.make_blob_url(
155-
source_container, source_blob),
156-
destination_lease_id=destination_lease_id, source_lease_id=source_lease_id)
197+
return dest_blob_client.start_copy_from_url(source_url=source_blob_client.url,
198+
source_lease=source_lease,
199+
destination_lease=destination_lease)
157200
finally:
158-
self.connection.release_blob_lease(source_container, source_blob, source_lease_id)
159-
if destination_lease_id is not None:
160-
self.connection.release_blob_lease(dest_container, dest_blob, destination_lease_id)
201+
source_lease.release()
202+
if destination_lease is not None:
203+
destination_lease.release()
161204

162205
def rename_dont_move(self, path, dest):
163206
self.move(path, dest)

scripts/ci/install_start_azurite.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
echo "$DOCKERHUB_TOKEN" | docker login -u spotifyci --password-stdin
44

5-
docker pull arafato/azurite
5+
docker pull mcr.microsoft.com/azure-storage/azurite
66
mkdir -p blob_emulator
77
$1/stop_azurite.sh
8-
docker run -e executable=blob -d -t -p 10000:10000 -v blob_emulator:/opt/azurite/folder arafato/azurite
8+
docker run -p 10000:10000 -v blob_emulator:/data -e AZURITE_ACCOUNTS=devstoreaccount1:YXp1cml0ZQ== -d mcr.microsoft.com/azure-storage/azurite azurite-blob -l /data --blobHost 0.0.0.0 --blobPort 10000

scripts/ci/stop_azurite.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
#!/usr/bin/env bash
2-
docker stop $(docker ps -q --filter ancestor=arafato/azurite)
2+
docker stop "$(docker ps -q --filter ancestor=mcr.microsoft.com/azure-storage/azurite)"

test/contrib/azureblob_test.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626

2727
import luigi
2828
from luigi.contrib.azureblob import AzureBlobClient, AzureBlobTarget
29+
from luigi.target import FileAlreadyExists
2930

30-
account_name = os.environ.get("ACCOUNT_NAME")
31-
account_key = os.environ.get("ACCOUNT_KEY")
32-
sas_token = os.environ.get("SAS_TOKEN")
33-
is_emulated = False if account_name else True
34-
client = AzureBlobClient(account_name, account_key, sas_token, is_emulated=is_emulated)
31+
account_name = os.environ.get("AZURITE_ACCOUNT_NAME")
32+
account_key = os.environ.get("AZURITE_ACCOUNT_KEY")
33+
sas_token = os.environ.get("AZURITE_SAS_TOKEN")
34+
custom_domain = os.environ.get("AZURITE_CUSTOM_DOMAIN")
35+
protocol = os.environ.get("AZURITE_PROTOCOL", "http")
36+
client = AzureBlobClient(account_name, account_key, sas_token, custom_domain=custom_domain, protocol=protocol)
3537

3638

3739
@pytest.mark.azureblob
@@ -95,8 +97,15 @@ def test_upload_copy_move_remove_blob(self):
9597
self.client.upload(f.name, container_name, from_blob_name)
9698
self.assertTrue(self.client.exists(from_path))
9799

100+
# mkdir
101+
self.assertRaises(FileAlreadyExists, self.client.mkdir, from_path, False, True)
102+
103+
# mkdir does not actually create anything
104+
self.client.mkdir(to_path, True, True)
105+
self.assertFalse(self.client.exists(to_path))
106+
98107
# copy
99-
self.assertIn(self.client.copy(from_path, to_path).status, ["success", "pending"])
108+
self.assertIn(self.client.copy(from_path, to_path)["copy_status"], ["success", "pending"])
100109
self.assertTrue(self.client.exists(to_path))
101110

102111
# remove
@@ -121,7 +130,7 @@ def output(self):
121130
return AzureBlobTarget("luigi-test", "movie-cheesy.txt", client, download_when_reading=False)
122131

123132
def run(self):
124-
client.connection.create_container("luigi-test")
133+
client.create_container("luigi-test")
125134
with self.output().open("w") as op:
126135
op.write("I'm going to make him an offer he can't refuse.\n")
127136
op.write("Toto, I've got a feeling we're not in Kansas anymore.\n")

tox.ini

+7-4
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ deps =
4848
google-compute-engine
4949
coverage>=5.0,<6
5050
codecov>=1.4.0
51-
requests>=2.20.0,<3.0
51+
requests>=2.20.0,<=2.31.0
5252
unixsocket: requests-unixsocket<1.0
5353
pygments
5454
hypothesis>=6.7.0,<7.0.0
5555
selenium==3.0.2
5656
pymongo==3.4.0
5757
toml<2.0.0
5858
responses<1.0.0
59-
azure-storage<=0.36
59+
azure-storage-blob<=12.20.0
6060
datadog==0.22.0
61-
prometheus-client>=0.5.0<0.15
61+
prometheus-client>=0.5.0,<0.15
6262
dropbox: dropbox>=11.0.0
6363
jsonschema
6464
passenv =
@@ -75,6 +75,9 @@ setenv =
7575
AWS_DEFAULT_REGION=us-east-1
7676
AWS_ACCESS_KEY_ID=accesskey
7777
AWS_SECRET_ACCESS_KEY=secretkey
78+
AZURITE_ACCOUNT_NAME=devstoreaccount1
79+
AZURITE_ACCOUNT_KEY=YXp1cml0ZQ==
80+
AZURITE_CUSTOM_DOMAIN=localhost:10000
7881
commands =
7982
cdh,hdp: {toxinidir}/scripts/ci/setup_hadoop_env.sh
8083
azureblob: {toxinidir}/scripts/ci/install_start_azurite.sh {toxinidir}/scripts/ci
@@ -137,7 +140,7 @@ deps =
137140
jinja2==3.0.3
138141
Sphinx>=1.4.4,<1.5
139142
sphinx_rtd_theme
140-
azure-storage<=0.36
143+
azure-storage-blob<=12.20.0
141144
prometheus-client==0.5.0
142145
alabaster<0.7.13
143146
commands =

0 commit comments

Comments
 (0)