-
Notifications
You must be signed in to change notification settings - Fork 12
/
provenance.py
147 lines (111 loc) · 5.17 KB
/
provenance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Functions for provenance handling."""
__copyright__ = 'Copyright (c) 2019-2024, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import json
import time
from typing import List
import genquery
import vault
from util import *
__all__ = ['rule_provenance_log_action',
'rule_copy_provenance_log',
'api_provenance_log']
@rule.make()
def rule_provenance_log_action(ctx: rule.Context, actor: str, coll: str, action: str) -> None:
"""Function to add action log record to provenance of specific folder.
:param ctx: Combined type of a callback and rei struct
:param actor: The actor of the action
:param coll: The collection the provenance log is linked to.
:param action: The action that is logged.
"""
try:
log_item = [str(int(time.time())), action, actor]
avu.associate_to_coll(ctx, coll, constants.UUPROVENANCELOG, json.dumps(log_item))
vault.update_archive(ctx, coll)
log.write(ctx, "rule_provenance_log_action: <{}> has <{}> (<{}>)".format(actor, action, coll))
except Exception:
log.write(ctx, "rule_provenance_log_action: failed to log action <{}> to provenance".format(action))
def log_action(ctx: rule.Context, actor: str, coll: str, action: str, update: bool = True) -> None:
"""Function to add action log record to provenance of specific folder.
:param ctx: Combined type of a callback and rei struct
:param actor: The actor of the action
:param coll: The collection the provenance log is linked to.
:param action: The action that is logged.
:param update: Whether to update provenance in any archive (default: True)
"""
try:
log_item = [str(int(time.time())), action, actor]
avu.associate_to_coll(ctx, coll, constants.UUPROVENANCELOG, json.dumps(log_item))
if update:
vault.update_archive(ctx, coll)
log.write(ctx, "rule_provenance_log_action: <{}> has <{}> (<{}>)".format(actor, action, coll))
except Exception:
log.write(ctx, "rule_provenance_log_action: failed to log action <{}> to provenance".format(action))
@rule.make()
def rule_copy_provenance_log(ctx: rule.Context, source: str, target: str) -> None:
"""Copy the provenance log of a collection to another collection.
:param ctx: Combined type of a callback and rei struct
:param source: Path of source collection.
:param target: Path of target collection.
"""
provenance_copy_log(ctx, source, target)
def provenance_copy_log(ctx: rule.Context, source: str, target: str) -> None:
"""Copy the provenance log of a collection to another collection.
:param ctx: Combined type of a callback and rei struct
:param source: Path of source collection.
:param target: Path of target collection.
"""
try:
# Retrieve all provenance logs on source collection.
iter = genquery.row_iterator(
"order_desc(META_COLL_ATTR_VALUE)",
"COLL_NAME = '%s' AND META_COLL_ATTR_NAME = 'org_action_log'" % (source),
genquery.AS_LIST, ctx
)
# Set provenance logs on target collection.
for row in iter:
avu.associate_to_coll(ctx, target, constants.UUPROVENANCELOG, row[0])
log.write(ctx, "rule_copy_provenance_log: copied provenance log from <{}> to <{}>".format(source, target))
except Exception:
log.write(ctx, "rule_copy_provenance_log: failed to copy provenance log from <{}> to <{}>".format(source, target))
def get_provenance_log(ctx: rule.Context, coll: str) -> List:
"""Return provenance log of a collection.
:param ctx: Combined type of a callback and rei struct
:param coll: Path of a collection in research or vault space.
:returns: Provenance log as a list
"""
provenance_log = []
# Retrieve all provenance logs on a folder.
iter = genquery.row_iterator(
"order_desc(META_COLL_ATTR_VALUE)",
"COLL_NAME = '%s' AND META_COLL_ATTR_NAME = 'org_action_log'" % (coll),
genquery.AS_LIST, ctx
)
for row in iter:
log_item = jsonutil.parse(row[0])
provenance_log.append(log_item)
return provenance_log
@api.make()
def api_provenance_log(ctx: rule.Context, coll: str) -> api.Result:
"""Return formatted provenance log of a collection.
:param ctx: Combined type of a callback and rei struct
:param coll: Path of a collection in research or vault space.
:returns: Formatted provenance log as a list
"""
provenance_log = get_provenance_log(ctx, coll)
output = []
for item in provenance_log:
date_time = time.strftime('%Y/%m/%d %H:%M:%S',
time.localtime(int(item[0])))
action = item[1].capitalize()
actor = item[2].split("#")[0]
output.append([actor, action, date_time])
return output
def latest_action_actor(ctx: rule.Context, path: str) -> str:
"""Return the actor of the latest provenance action.
:param ctx: Combined type of a callback and rei struct
:param path: Path of a collection in research or vault space.
:returns: Actor of latest provenance action
"""
provenance_log = get_provenance_log(ctx, path)
return provenance_log[-1][2]