-
Notifications
You must be signed in to change notification settings - Fork 6
/
import.py
101 lines (86 loc) · 3.23 KB
/
import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import json
import os
import pathlib
import shutil
import requests
API_URL = os.environ["SUPERSET_API_URL"] + "/api/v1/"
ASSETS_PATH = os.path.join(pathlib.Path(__file__).parent.resolve(), "assets")
def get_jwt_token():
payload = {
"password": os.environ["SUPERSET_PASS"],
"provider": "db",
"refresh": True,
"username": os.environ["SUPERSET_USER"]
}
resp = requests.post(API_URL + "security/login", json=payload)
return resp.json().get("access_token")
def get_csrf_token(jwt_token):
headers = {
"accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
}
resp = requests.get(API_URL + "security/csrf_token", headers=headers)
return resp.json().get("result")
def build_password_payload(file_path):
passwords = {}
if not os.path.isdir(os.path.join(file_path, "export", "databases")):
return json.dumps(passwords)
for database_file in os.listdir(os.path.join(file_path, "export", "databases")):
passwords[f"databases/{database_file}"] = "placeholder"
return json.dumps(passwords)
def get_zip_file(file_path):
output_filename = os.path.basename(file_path)
zip_path = os.path.join(pathlib.Path(
file_path).parent.resolve(), output_filename)
shutil.make_archive(zip_path, 'zip', file_path)
with open(zip_path + ".zip", 'rb') as f:
content = f.read()
os.remove(zip_path + ".zip")
return content
def import_assets_zip(jwt_token, asset_type):
url_full = f"{API_URL}{asset_type}/import/"
headers = {
"Authorization": f"Bearer {jwt_token}",
"Referer": url_full
}
_path = os.path.join(ASSETS_PATH, f"{asset_type}")
for filename in os.listdir(_path):
file_path = os.path.join(_path, filename)
if os.path.isdir(file_path):
print(f"Importing {asset_type}: {filename}")
zip_file_obj = get_zip_file(file_path)
files = {
"formData": (
file_path,
zip_file_obj,
)
}
passwords_json = build_password_payload(file_path)
resp = requests.post(url_full, data={
"passwords": passwords_json, "overwrite": "true"}, files=files, headers=headers)
resp.raise_for_status()
print(resp.content)
def import_assets_yaml(jwt_token, asset_type):
url_full = f"{API_URL}{asset_type}/import/"
headers = {
"Authorization": f"Bearer {jwt_token}",
"Referer": url_full
}
_path = os.path.join(ASSETS_PATH, f"{asset_type}")
for filename in os.listdir(_path):
file_path = os.path.join(_path, filename)
if os.path.isfile(file_path) and filename.endswith(".yaml"):
print(f"Importing {asset_type}: {filename}")
files = {
"formData": (
file_path,
open(file_path, 'rb'),
)
}
resp = requests.post(url_full, files=files, headers=headers)
resp.raise_for_status()
print(resp.content)
jwt_token = get_jwt_token()
import_assets_zip(jwt_token, "database")
import_assets_zip(jwt_token, "chart")
import_assets_yaml(jwt_token, "dashboard")