-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
113 lines (95 loc) · 4.5 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
import logging.config
logging.config.fileConfig("logging.conf")
import datetime
from json import dumps
from os import getenv
from os.path import join
from hdx.api.configuration import Configuration
from hdx.data.dataset import Dataset
from hdx.data.resource import Resource
from hdx.utilities.downloader import Download
from hdx.utilities.path import temp_dir
from hdx.utilities.retriever import Retrieve
from hdx_redis_lib import connect_to_hdx_event_bus_with_env_vars
from check_pcodes import get_global_pcodes, process_resource
from helper.facade import facade
from helper.util import do_nothing_for_ever
logger = logging.getLogger(__name__)
def listener_main(**ignore):
"""
Function to run when p-code detector is run in listener mode.
Basically this waits for 'resource-created' OR 'resource-data-changed' events and runs the p-code checking logic.
"""
# Connect to Redis
event_bus = connect_to_hdx_event_bus_with_env_vars()
configuration = Configuration.read()
with temp_dir(folder="TempPCodeDetector") as temp_folder:
with Download(rate_limit={"calls": 1, "period": 0.1}) as downloader:
retriever = Retrieve(
downloader, temp_folder, "saved_data", temp_folder, save=False, use_saved=False
)
global_pcodes = get_global_pcodes(
configuration["global_pcodes"],
retriever,
)
def event_processor(event):
start_time = datetime.datetime.now()
with temp_dir(folder="TempPCodeDetector") as temp_folder:
with Download(rate_limit={"calls": 1, "period": 0.1}) as downloader:
retriever = Retrieve(
downloader, temp_folder, "saved_data", temp_folder, save=False, use_saved=False
)
try:
logger.info(f"Received event: {dumps(event, ensure_ascii=False, indent=4)}")
dataset_id = event.get("dataset_id")
resource_id = event.get("resource_id")
if dataset_id and resource_id:
dataset = Dataset.read_from_hdx(dataset_id)
resource = Resource.read_from_hdx(resource_id)
process_resource(resource, dataset, global_pcodes, retriever, configuration, update=True, flag=True)
end_time = datetime.datetime.now()
elapsed_time = end_time - start_time
logger.info(f"Finished processing resource {resource['name']}, {resource['id']} in {str(elapsed_time)}")
return True, "Success"
except Exception as exc:
logger.error(f"Exception of type {type(exc).__name__} while processing dataset {dataset_id}: {str(exc)}")
return False, str(exc)
event_bus.hdx_listen(event_processor, allowed_event_types=["resource-created", "resource-data-changed"], max_iterations=10_000)
def main(**ignore):
configuration = Configuration.read()
with temp_dir(folder="TempPCodeDetector") as temp_folder:
with Download(rate_limit={"calls": 1, "period": 0.1}) as downloader:
retriever = Retrieve(
downloader, temp_folder, "saved_data", temp_folder, save=False, use_saved=False
)
global_pcodes = get_global_pcodes(
configuration["global_pcodes"],
retriever,
)
datasets = Dataset.get_all_datasets(rows=1000)
for dataset in datasets:
resources = dataset.get_resources()
for resource in resources:
pcoded = process_resource(
resource,
dataset,
global_pcodes,
retriever,
configuration,
cleanup=True,
)
logger.info(f"{dataset['name']}: {resource['name']}: {pcoded}")
if __name__ == "__main__":
if getenv("WORKER_ENABLED") != "true" and getenv("LISTENER_MODE") == "true":
do_nothing_for_ever()
else:
main_function = listener_main if getenv("LISTENER_MODE") == "true" else main
facade(
main_function,
# hdx_site="feature", # passing HDX server via the env variable HDX_URL
user_agent="PCodesDetector",
hdx_read_only=False,
preprefix="HDXINTERNAL",
project_config_yaml=join("config", "project_configuration.yml"),
)