Skip to content

Commit

Permalink
Merge pull request #14 from earthobservatory/develop-dataspace
Browse files Browse the repository at this point in the history
Changes for new copernicus dataspace sling-extract
  • Loading branch information
shitong01 authored Nov 7, 2023
2 parents f33731e + 9e76080 commit b20b2cf
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN set -ex \
&& sudo /opt/conda/bin/conda install -c conda-forge --yes httplib2 lxml pyproj \
&& sudo /opt/conda/bin/pip install joblib netcdf4 \
&& $HOME/verdi/bin/pip install joblib netcdf4 \
&& $HOME/verdi/bin/pip install 'fiona==1.7.13' \
#&& $HOME/verdi/bin/pip install 'fiona==1.7.13' \
#&& $HOME/verdi/bin/pip install 'pyproj==2.6.1' \
&& sudo rm -rf /root/.cache \
&& sudo chown -R ops:ops /home/ops/ariamh \
Expand Down
180 changes: 154 additions & 26 deletions frameMetadata/sling_extract_scihub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from datetime import datetime
import random
import time
import netrc

# from utils.UrlUtils import UrlUtils

Expand Down Expand Up @@ -174,6 +175,19 @@ def check_slc_status(slc_id, index_suffix=None):
logging.info("check_slc_status : returning False")
return False

def get_slc_checksum_md5_dataspace(dataspace_uuid):

product_detail_template = f"https://catalogue.dataspace.copernicus.eu/odata/v1/Products({dataspace_uuid})"
req = requests.get(product_detail_template, timeout=30)
if req.status_code == 200:
product_json = req.json()
checksums = product_json.get('Checksum')
if checksums:
for checksum in checksums:
if "md5" in checksum['Algorithm'].lower():
return checksum['Value']
return None


def get_slc_checksum_md5_scihub(esa_uuid):
'''
Expand Down Expand Up @@ -293,6 +307,101 @@ def download_file(url, path, cache=False):
else:
return osaka.main.get(url, path, params=params)

def get_access_token_dataspace(username=None, password=None):
if username and password:
logger.info("using user-defined username and password")
else:
logger.info("using netrc username and password")
try:
netrc_file = f'{os.path.expanduser("~")}/.netrc'
info = netrc.netrc(netrc_file)
login, account, password = info.authenticators("identity.dataspace.copernicus.eu")
username = login
password = password
except Exception as e:
logger.info(f"Unable to find identity.dataspace.copernicus.eu credentials in {netrc_file}. "
f"Please define username and password in script.")
sys.exit(0)
access_token = \
check_output(
f"curl -d 'client_id=cdse-public' -d 'username={username}' -d 'password={password}' -d 'grant_type=password' "
"'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token' | "
"python3 -m json.tool | "
"grep access_token | "
"awk -F\\\" '{print $4}'", shell=True)
access_token = access_token.decode('utf-8').strip()
logger.info(access_token)
return access_token

def download_file_dataspace(url, path, access_token):
headers = {"Authorization": f"Bearer {access_token}"}
session = requests.Session()
session.headers.update(headers)
# import pdb; pdb.set_trace();
o_file = path
r = requests.get(url, headers=headers, stream=True, verify=False, allow_redirects=True)
if 'zipper' in r.url:
response_url = r.url
logger.info(r.headers)
response = session.get(r.url, headers=headers, stream=True, verify=False, allow_redirects=True)
filesize_b = int(response.headers['Content-Length'])
logger.info(f"Filesize is: {filesize_b} bytes")
if os.path.exists(o_file):
temp_size = os.path.getsize(o_file)
if filesize_b == temp_size:
logger.info(f"Download of {o_file} has already been completed")
sys.exit()
else:
temp_size = 0
download_complete = temp_size == filesize_b
max_retry = 5
CHUNK = 8 * 1024
for i in range(max_retry):
access_token = get_access_token_dataspace()
headers = {"Authorization": f"Bearer {access_token}", 'Range': 'bytes=%d-' % temp_size}
if temp_size:
logger.info(f"Headers to start mid-file: {headers}")
response = session.get(response_url, headers=headers, stream=True, verify=False)
with open(o_file, "ab") as file:
start = time.time()
count = 0
logger.info(f"Downloading file to: {o_file} ({filesize_b / (1024 * 1024):.2f} MB)")
try:
for chunk in response.iter_content(chunk_size=CHUNK):
count += 1
if chunk:
file.write(chunk)
if not count % 20:
size = count * CHUNK / (1024 * 1024)
percent = count * CHUNK / filesize_b * 100
logger.info(f"Wrote {count} chunks: {size} MB ({percent:.2f} %)")

file.close()
except Exception as e:
logger.info("Issue with retrieving chunk")
logger.info(str(e))
logger.info(traceback.format_exc())
raise RuntimeError("Issue with retrieving chunk")

total_time = time.time() - start
mb_sec = (os.path.getsize(o_file) / (1024 * 1024.0)) / total_time
logger.info(f"Speed: {mb_sec} MB/s")
logger.info(f"Total Time: {total_time} s")
temp_size = os.path.getsize(o_file)
download_complete = temp_size == filesize_b

if not download_complete:
percent = temp_size / filesize_b * 100
logger.info(f"Wrote {temp_size} MB ({percent:.2f} %)")
logger.info(f"Download not completed somehow. "
f"\n Restarting download from where we left off: ({temp_size / (1024 * 1024):.2f} MB)")
else:
logger.info(f"Download completed. Breaking loop. Wrote {temp_size} MB")
return True

# download incomplete despite retries
if not download_complete:
raise RuntimeError(f"Unable to complete download from {url}, only partial file downloaded.")

def localize_file(url, path, cache):
"""Localize urls for job inputs. Track metrics."""
Expand All @@ -315,26 +424,36 @@ def localize_file(url, path, cache):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
loc_t1 = datetime.utcnow()
try:
download_file(url, path, cache=cache)
except Exception as e:
tb = traceback.format_exc()
raise RuntimeError("Failed to download %s: %s\n%s" % (url, str(e), tb))
loc_t2 = datetime.utcnow()
loc_dur = (loc_t2 - loc_t1).total_seconds()
# path_disk_usage = get_disk_usage(path)
'''
job['job_info']['metrics']['inputs_localized'].append({
'url': url,
'path': path,
'disk_usage': path_disk_usage,
'time_start': loc_t1.isoformat() + 'Z',
'time_end': loc_t2.isoformat() + 'Z',
'duration': loc_dur,
'transfer_rate': path_disk_usage/loc_dur
})
'''
# signal run_job() to continue
if 'dataspace' in url:
# if acquisition url is from dataspace
access_token = get_access_token_dataspace()
try:
download_file_dataspace(url, path, access_token)
except Exception as e:
tb = traceback.format_exc()
raise RuntimeError("Failed to download %s: %s\n%s" % (url, str(e), tb))

else:
try:
download_file(url, path, cache=cache)
except Exception as e:
tb = traceback.format_exc()
raise RuntimeError("Failed to download %s: %s\n%s" % (url, str(e), tb))
loc_t2 = datetime.utcnow()
loc_dur = (loc_t2 - loc_t1).total_seconds()
# path_disk_usage = get_disk_usage(path)
'''
job['job_info']['metrics']['inputs_localized'].append({
'url': url,
'path': path,
'disk_usage': path_disk_usage,
'time_start': loc_t1.isoformat() + 'Z',
'time_end': loc_t2.isoformat() + 'Z',
'duration': loc_dur,
'transfer_rate': path_disk_usage/loc_dur
})
'''
# signal run_job() to continue
return True


Expand Down Expand Up @@ -551,7 +670,7 @@ def is_non_zero_file(fpath):
logging.info("acq_data['metadata']['id'] : %s" % acq_data['metadata']['id'])

# get md5 checksum from ESA sci-hub
esa_sci_hub_md5_hash = get_slc_checksum_md5_scihub(acq_data['metadata']['id'])
esa_md5_hash = get_slc_checksum_md5_dataspace(acq_data['metadata']['id']) if 'dataspace' in download_url else get_slc_checksum_md5_scihub(acq_data['metadata']['id'])

source = "scihub"
localize_url = None
Expand Down Expand Up @@ -581,11 +700,20 @@ def is_non_zero_file(fpath):
localized_md5_checksum = get_md5_from_localized_file(slc_file_path)

# comparing localized md5 hash with esa's md5 hash
if localized_md5_checksum != esa_sci_hub_md5_hash:
raise RuntimeError(
"Checksums DO NOT match SLC id {} : SLC checksum {}. local checksum {}".format(args.slc_id,
esa_sci_hub_md5_hash,
if esa_md5_hash:
if localized_md5_checksum != esa_md5_hash:
raise RuntimeError(
"Checksums DO NOT match SLC id {} : SLC checksum {}. local checksum {}".format(args.slc_id,
esa_md5_hash,
localized_md5_checksum))
else:
filesize = os.path.getsize(slc_file_path)
min_size = 3*1024*1024*1024
# unable to get md5 sum, so we check file size, should be larger than 3GB
if filesize <min_size:
raise RuntimeError(
"SLC id {} files size: {} B. SLC size smaller than {} B, likely corrupted download ".format(args.slc_id, filesize, min_size))


'''
try:
Expand All @@ -609,7 +737,7 @@ def is_non_zero_file(fpath):
if not is_non_zero_file(archive_filename):
raise Exception("File Not Found or Empty File : %s" % archive_filename)

create_product(archive_filename, localize_url, args.slc_id, prod_date, esa_sci_hub_md5_hash)
create_product(archive_filename, localize_url, args.slc_id, prod_date, esa_md5_hash)
except Exception as e:
with open('_alt_error.txt', 'w') as f:
f.write("%s\n" % str(e))
Expand Down

0 comments on commit b20b2cf

Please sign in to comment.