-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathvault_archive.py
370 lines (282 loc) · 13.4 KB
/
vault_archive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
"""Functions to archive vault data packages."""
from __future__ import annotations
__copyright__ = 'Copyright (c) 2023-2025, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import json
import time
from typing import Dict, List
import genquery
import irods_types
import folder
import groups
import meta
import notifications
import provenance
from util import *
__all__ = ['api_vault_archive',
'api_vault_archival_status',
'api_vault_extract',
'rule_vault_archive',
'rule_vault_create_archive',
'rule_vault_extract_archive',
'rule_vault_update_archive']
def package_system_metadata(ctx: rule.Context, coll: str) -> List:
"""Retrieve system metadata of collection.
:param ctx: Combined type of a callback and rei struct
:param coll: Collection to retrieve system metadata of
:returns: List of dicts with system metadata
"""
return [
{
"name": row[0],
"value": row[1]
}
for row in genquery.row_iterator(
"META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME = '{}' AND META_COLL_ATTR_NAME like '{}%'".format(coll, constants.UUORGMETADATAPREFIX),
genquery.AS_LIST,
ctx)
]
def package_provenance_log(ctx: rule.Context, system_metadata: List) -> List:
"""Retrieve provenance log from system metadata.
:param ctx: Combined type of a callback and rei struct
:param system_metadata: System metadata to retrieve provenance log from
:returns: List of dicts with provenance log
"""
def key(item: Dict) -> int:
return int(item["time"])
provenance_log = []
for item in system_metadata:
if item["name"] == constants.UUPROVENANCELOG:
data = json.loads(item["value"])
provenance_log.append({
"time": data[0],
"action": data[1],
"actor": data[2]
})
return sorted(provenance_log, key=key)
def package_archive_path(ctx: rule.Context, coll: str) -> str | None:
for row in genquery.row_iterator("DATA_PATH",
"COLL_NAME = '{}' AND DATA_NAME = 'archive.tar'".format(coll),
genquery.AS_LIST,
ctx):
return row[0]
return None
def vault_archivable(ctx: rule.Context, coll: str) -> bool:
minimum = int(config.data_package_archive_minimum)
maximum = int(config.data_package_archive_maximum)
# No archive limits configured.
if minimum < 0 and maximum < 0:
return True
if not coll.endswith("/original"):
for _row in genquery.row_iterator("META_COLL_ATTR_VALUE",
"META_COLL_ATTR_NAME = 'org_vault_status' AND COLL_NAME = '{}'".format(coll),
genquery.AS_LIST,
ctx):
coll_size = collection.size(ctx, coll)
# Data package size is inside archive limits.
if ((coll_size >= minimum and maximum < 0)
or (minimum < 0 and coll_size <= maximum)
or (coll_size >= minimum and coll_size <= maximum)):
return True
return False
def vault_archival_status(ctx: rule.Context, coll: str) -> str:
return bagit.status(ctx, coll)
def create_archive(ctx: rule.Context, coll: str) -> None:
log.write(ctx, "Creating archive of data package <{}>".format(coll))
user_metadata = meta.get_latest_vault_metadata_path(ctx, coll)
system_metadata = package_system_metadata(ctx, coll)
provenance_log = package_provenance_log(ctx, system_metadata)
# create extra archive files
log.write(ctx, "Generating metadata for archive of data package <{}>".format(coll))
data_object.copy(ctx, user_metadata, coll + "/archive/user-metadata.json")
data_object.write(ctx, coll + "/archive/system-metadata.json",
jsonutil.dump(system_metadata))
msi.data_obj_chksum(ctx, coll + "/archive/system-metadata.json", "",
irods_types.BytesBuf())
data_object.write(ctx, coll + "/archive/provenance-log.json",
jsonutil.dump(provenance_log))
msi.data_obj_chksum(ctx, coll + "/archive/provenance-log.json", "",
irods_types.BytesBuf())
# create bagit archive
bagit.create(ctx, coll + "/archive.tar", coll + "/archive", config.data_package_archive_resource)
msi.data_obj_chksum(ctx, coll + "/archive.tar", "", irods_types.BytesBuf())
log.write(ctx, "Move archive of data package <{}> to tape".format(coll))
ctx.dmput(package_archive_path(ctx, coll), config.data_package_archive_fqdn, "REG")
def extract_archive(ctx: rule.Context, coll: str) -> None:
while True:
state = ctx.dmattr(package_archive_path(ctx, coll), config.data_package_archive_fqdn, "")["arguments"][2]
if state not in ("UNM", "MIG"):
break
time.sleep(10)
if state not in ("DUL", "REG", "INV"):
log.write(ctx, "Archive of data package <{}> is not available, state is <{}>".format(coll, state))
raise Exception("Archive is not available")
bagit.extract(ctx, coll + "/archive.tar", coll + "/archive", resource=config.resource_vault)
def vault_archive(ctx: rule.Context, actor: str, coll: str) -> str:
try:
# Prepare for archival.
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "archive")
provenance.log_action(ctx, actor, coll, "archive scheduled", False)
# Send notifications to datamanagers.
datamanagers = folder.get_datamanagers(ctx, coll)
message = "Data package scheduled for archival"
for datamanager in datamanagers:
datamanager = '{}#{}'.format(*datamanager)
notifications.set(ctx, actor, datamanager, coll, message)
log.write(ctx, "Data package <{}> scheduled for archiving by <{}>".format(coll, actor))
return "Success"
except Exception:
return "Failure"
def vault_create_archive(ctx: rule.Context, coll: str) -> str:
if vault_archival_status(ctx, coll) != "archive":
return "Invalid"
try:
log.write(ctx, "Start archival of data package <{}>".format(coll))
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "archiving")
collection.create(ctx, coll + "/archive")
if data_object.exists(ctx, coll + "/License.txt"):
data_object.copy(ctx, coll + "/License.txt", coll + "/archive/License.txt")
collection.rename(ctx, coll + "/original", coll + "/archive/data")
create_archive(ctx, coll)
collection.remove(ctx, coll + "/archive")
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "archived")
provenance.log_action(ctx, "system", coll, "archive completed", False)
log.write(ctx, "Finished archival of data package <{}>".format(coll))
return "Success"
except Exception:
# attempt to restore package
try:
collection.rename(ctx, coll + "/archive/data", coll + "/original")
except Exception:
pass
# remove temporary files
try:
collection.remove(ctx, coll + "/archive")
except Exception:
pass
provenance.log_action(ctx, "system", coll, "archive failed", False)
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "archival failed")
log.write(ctx, "Archival of data package <{}> failed".format(coll))
return "Failure"
def vault_unarchive(ctx: rule.Context, actor: str, coll: str) -> str:
try:
# Prepare for unarchival.
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "extract")
provenance.log_action(ctx, actor, coll, "unarchive scheduled", False)
log.write(ctx, "Request retrieval of data package <{}> from tape".format(coll))
ctx.dmget(package_archive_path(ctx, coll), config.data_package_archive_fqdn, "OFL")
# Send notifications to datamanagers.
datamanagers = folder.get_datamanagers(ctx, coll)
message = "Data package scheduled for unarchival"
for datamanager in datamanagers:
datamanager = '{}#{}'.format(*datamanager)
notifications.set(ctx, actor, datamanager, coll, message)
log.write(ctx, "Data package <{}> scheduled for unarchiving by <{}>".format(coll, actor))
return "Success"
except Exception:
return "Failure"
def vault_extract_archive(ctx: rule.Context, coll: str) -> str:
if vault_archival_status(ctx, coll) != "extract":
return "Invalid"
try:
log.write(ctx, "Start unarchival of data package <{}>".format(coll))
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "extracting")
extract_archive(ctx, coll)
collection.rename(ctx, coll + "/archive/data", coll + "/original")
ctx.iiCopyACLsFromParent(coll + "/original", "recursive")
collection.remove(ctx, coll + "/archive")
data_object.remove(ctx, coll + "/archive.tar")
avu.rm_from_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "extracting")
provenance.log_action(ctx, "system", coll, "unarchive completed", False)
log.write(ctx, "Finished unarchival of data package <{}>".format(coll))
return "Success"
except Exception:
provenance.log_action(ctx, "system", coll, "unarchive failed", False)
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "extraction failed")
log.write(ctx, "Unarchival of data package <{}> failed".format(coll))
return "Failure"
def update(ctx: rule.Context, coll: str, attr: str | None) -> None:
if pathutil.info(coll).space == pathutil.Space.VAULT and attr not in (constants.IIARCHIVEATTRNAME, constants.UUPROVENANCELOG) and vault_archival_status(ctx, coll) == "archived":
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "update")
ctx.dmget(package_archive_path(ctx, coll), config.data_package_archive_fqdn, "OFL")
def vault_update_archive(ctx: rule.Context, coll: str) -> str:
try:
log.write(ctx, "Start update of archived data package <{}>".format(coll))
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "updating")
extract_archive(ctx, coll)
data_object.remove(ctx, coll + "/archive.tar")
create_archive(ctx, coll)
collection.remove(ctx, coll + "/archive")
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "archived")
log.write(ctx, "Finished update of archived data package <{}>".format(coll))
return "Success"
except Exception:
avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "update failed")
log.write(ctx, "Update of archived data package <{}> failed".format(coll))
return "Failure"
@api.make()
def api_vault_archive(ctx: rule.Context, coll: str) -> api.Result:
"""Request to archive vault data package.
:param ctx: Combined type of a callback and rei struct
:param coll: Collection of vault data package to archive
:returns: API status
"""
space, _, group, _ = pathutil.info(coll)
if space != pathutil.Space.VAULT:
return "Invalid"
category = groups.group_category(ctx, group)
if not groups.user_is_datamanager(ctx, category, user.full_name(ctx)):
return "Access denied"
if not vault_archivable(ctx, coll) or vault_archival_status(ctx, coll):
return "Invalid"
try:
ctx.iiAdminVaultArchive(coll, "archive")
return "Success"
except Exception:
return "Failure"
@api.make()
def api_vault_archival_status(ctx: rule.Context, coll: str) -> api.Result:
"""Request archival status of vault data package.
:param ctx: Combined type of a callback and rei struct
:param coll: Collection of vault data package to request archive status from
:returns: Vault data package archival status
"""
return vault_archival_status(ctx, coll)
@api.make()
def api_vault_extract(ctx: rule.Context, coll: str) -> api.Result:
"""Request to unarchive an archived vault data package.
:param ctx: Combined type of a callback and rei struct
:param coll: Collection of vault data package to unarchive
:returns: API status
"""
space, _, group, _ = pathutil.info(coll)
if space != pathutil.Space.VAULT:
return "Invalid"
category = groups.group_category(ctx, group)
if not groups.user_is_datamanager(ctx, category, user.full_name(ctx)):
return "Access denied"
if vault_archival_status(ctx, coll) != "archived":
return "Invalid"
try:
ctx.iiAdminVaultArchive(coll, "extract")
return "Success"
except Exception:
return "Failure"
@rule.make(inputs=[0, 1, 2], outputs=[3])
def rule_vault_archive(ctx: rule.Context, actor: str, coll: str, action: str) -> str:
if action == "archive":
return vault_archive(ctx, actor, coll)
elif action == "extract":
return vault_unarchive(ctx, actor, coll)
else:
return "Failure"
@rule.make(inputs=[0], outputs=[1])
def rule_vault_create_archive(ctx: rule.Context, coll: str) -> str:
return vault_create_archive(ctx, coll)
@rule.make(inputs=[0], outputs=[1])
def rule_vault_extract_archive(ctx: rule.Context, coll: str) -> str:
return vault_extract_archive(ctx, coll)
@rule.make(inputs=[0], outputs=[1])
def rule_vault_update_archive(ctx: rule.Context, coll: str) -> str:
return vault_update_archive(ctx, coll)