-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtestrail.py
309 lines (243 loc) · 11.2 KB
/
testrail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import os
import sys
import pandas as pd
from lib.testrail_conn import APIClient
from database import (
Database,
Projects,
TestSuites,
ReportTestCaseCoverage,
# ReportTestRunCounts
)
from utils.datetime_utils import DatetimeUtils as dt
class TestRail:
def __init__(self):
try:
TESTRAIL_HOST = os.environ['TESTRAIL_HOST']
self.client = APIClient(TESTRAIL_HOST)
self.client.user = os.environ['TESTRAIL_USERNAME']
self.client.password = os.environ['TESTRAIL_PASSWORD']
except KeyError:
print("ERROR: Missing testrail env var")
sys.exit(1)
# API: Projects
def projects(self):
return self.client.send_get('get_projects')
def project(self, testrail_project_id):
return self.client.send_get(
'get_project/{0}'.format(testrail_project_id))
# API: Cases
def test_cases(self, testrail_project_id, testrail_test_suite_id):
return self.client.send_get(
'get_cases/{0}&suite_id={1}'
.format(testrail_project_id, testrail_test_suite_id))
def test_case(self, testrail_test_case_id):
return self.client.send_get(
'get_case/{0}'.format(testrail_test_case_id))
# API: Case Fields
def test_case_fields(self):
return self.client.send_get(
'get_case_fields')
# API: Suites
def test_suites(self, testrail_project_id):
return self.client \
.send_get('get_suites/{0}'.format(testrail_project_id))
def test_suite(self, testrail_test_suite_id):
return self.client \
.send_get('get_suite/{0}'.format(testrail_test_suite_id))
# API: Runs
def test_runs(self, testrail_project_id, start_date='', end_date=''):
date_range = ''
if start_date:
after = dt.convert_datetime_to_epoch(start_date)
date_range += '&created_after={0}'.format(after)
if end_date:
before = dt.convert_datetime_to_epoch(end_date)
date_range += '&created_before={0}'.format(before)
return self.client.send_get('get_runs/{0}{1}'.format(testrail_project_id, date_range)) # noqa
def test_run(self, run_id):
return self.client.send_get('get_run/{0}'.format(run_id))
def test_results_for_run(self, run_id):
return self.client.send_get('get_results_for_run/{0}'.format(run_id))
class TestRailClient(TestRail):
def __init__(self):
super().__init__()
self.db = DatabaseTestRail()
def data_pump(self, project='all', suite='all'):
# call database for 'all' values
# convert inputs to a list so we can easily
# loop thru them
project_ids_list = self.testrail_project_ids(project)
# TODO:
# currently only setup for test_case report
# fix this for test run data
# Test suite data is dynamic. Wipe out old test suite data
# in database before updating.
self.db.test_suites_delete()
for project_ids in project_ids_list:
projects_id = project_ids[0]
testrail_project_id = project_ids[1]
suites = self.test_suites(testrail_project_id)
for suite in suites:
"""
print("testrail_project_id: {0}".format(testrail_project_id))
print("suite_id: {0}".format(suite['id']))
print("suite_name: {0}".format(suite['name']))
"""
self.db.test_suites_update(testrail_project_id,
suite['id'], suite['name'])
self.testrail_coverage_update(projects_id,
testrail_project_id, suite['id'])
def testrail_project_ids(self, project):
""" Return the ids needed to be able to query the TestRail API for
a specific test suite from a specific project
[0]. projects.id = id of project in database table: projects
[1]. testrail_id = id of project in testrail
Note:
- Testrail project ids will never change, so we store them
in DB for convenience and use them to query test suites
from each respective project
"""
q = self.db.session.query(Projects)
if project == 'all':
p = q.all()
else:
p = []
proj = q.filter_by(project_name_abbrev=project).first()
p.append(proj)
ids = []
tmp = []
for item in p:
if type(item.testrail_project_id == int):
tmp.append(item.id)
tmp.append(item.testrail_project_id)
ids.append(tmp)
tmp = []
return ids
def testrail_coverage_update(self, projects_id,
testrail_project_id, test_suite_id):
# Pull JSON blob from Testrail
cases = self.test_cases(testrail_project_id, test_suite_id)
# Format and store data in a data payload array
payload = self.db.report_test_coverage_payload(cases)
print(payload)
# Insert data in 'totals' array into DB
self.db.report_test_coverage_insert(projects_id, payload)
def testrail_run_counts_update(self, project, num_days):
start_date = dt.start_date(num_days)
# Get reference IDs from DB
projects_id, testrail_project_id, functional_test_suite_id = self.db.testrail_identity_ids(project) # noqa
# Sample Testrail data from one run:
# [{'run_id': 44113}, {'project_id': 59}, {'suite_id': 3192},
# {'name': 'Smoke and sanity automated tests - Beta 90.0.0-beta.2'},
# {'created_on': 1623151551}, {'completed_on': 1623158050},
# {'failed_count': 0}, {'passed_count': 35}, {'retest_count': 0},
# {'blocked_count': 0}, {'untested_count': 0}, {'untested_count': 0}]
# Pull JSON blob from Testrail
#runs = self.testrail.test_runs(testrail_project_id, start_date) # noqa
runs = self.test_runs(testrail_project_id, start_date) # noqa
# Format and store data in a 'totals' array
totals = self.db.report_test_run_payload(runs)
# Insert data in 'totals' array into DB
self.db.report_test_runs_insert(projects_id, totals)
class DatabaseTestRail(Database):
def __init__(self):
super().__init__()
self.db = Database()
def test_suites_delete(self):
""" Wipe out all test suite data.
NOTE: we'll renew this data from Testrail every session."""
self.session.query(TestSuites).delete()
self.session.commit()
def test_suites_update(self, testrail_project_id,
testrail_test_suites_id, test_suite_name):
suites = TestSuites(testrail_project_id=testrail_project_id,
testrail_test_suites_id=testrail_test_suites_id,
test_suite_name=test_suite_name)
self.session.add(suites)
self.session.commit()
def report_test_coverage_payload(self, cases):
"""given testrail data (cases), calculate test case counts by type"""
payload = []
for case in cases:
row = []
suit = case['suite_id']
subs = case.get("custom_sub_test_suites", [7])
# TODO: diagnostic - delete
print('suite_id: {0}, case_id: {1}, subs: {2}'.format(suit, case['id'], subs)) # noqa
stat = case['custom_automation_status']
cov = case['custom_automation_coverage']
# iterate through multi-select sub_suite data
# we need to create a separate row for each
# test case that belongs to multiple sub suites
for sub in subs:
row = [suit, sub, stat, cov, 1]
payload.append(row)
df = pd.DataFrame(data=payload,
columns=['suit', 'sub', 'status', 'cov', 'tally'])
return df.groupby(['suit', 'sub', 'status', 'cov'])['tally'].sum().reset_index() # noqa
def report_test_coverage_insert(self, projects_id, payload):
# TODO: Error on insert
# insert data from totals into report_test_coverage table
for index, row in payload.iterrows():
# TODO: diagnostic - delete
print('ROW - suit: {0}, asid: {1}, acid: {2}, ssid: {3}, tally: {4}' # noqa
.format(row['suit'], row['status'], row['cov'], row['sub'], row['tally'])) # noqa
report = ReportTestCaseCoverage(projects_id=projects_id,
testrail_test_suites_id=row['suit'], # noqa
test_automation_status_id=row['status'], # noqa
test_automation_coverage_id=row['cov'], # noqa
test_sub_suites_id=row['sub'],
test_count=row['tally'])
self.session.add(report)
self.session.commit()
def report_test_run_payload(self, runs):
"""pack testrail data for 1 run in a data array
NOTE:
run_name
Because storing data for 1 run will occupy multipe db rows,
Storing the run name would require inserting into a reference
table. For now, we will just store the testrail run id.
project_id, suite_id
We will pass along the proj_name_abbrev to the db.
For suite_id, we will always default to Full Functional.
"""
# create array to store values to insert in database
payload = []
for run in runs:
tmp = {}
# identifiers
# tmp.append({'name': run['name']})
tmp.update({'testrail_run_id': run['id']})
# epoch dates
tmp.update({'testrail_created_on': run['created_on']})
tmp.update({'testrail_completed_on': run['completed_on']})
# test data
tmp.update({'passed_count': run['passed_count']})
tmp.update({'retest_count': run['retest_count']})
tmp.update({'failed_count': run['failed_count']})
tmp.update({'blocked_count': run['blocked_count']})
payload.append(tmp)
return payload
def report_test_runs_insert(self, project_id, payload):
# insert data from payload into report_test_run_counts table
totals = payload
for total in totals:
t = total
# only count completed testruns
if t['testrail_completed_on']:
created_on = dt.convert_epoch_to_datetime(t['testrail_created_on']) # noqa
completed_on = dt.convert_epoch_to_datetime(t['testrail_completed_on']) # noqa
'''
report = ReportTestRunCounts(
projects_id=project_id,
testrail_run_id=t['testrail_run_id'],
test_case_passed_count=t['passed_count'],
test_case_retest_count=t['retest_count'],
test_case_failed_count=t['failed_count'],
test_case_blocked_count=t['blocked_count'],
testrail_created_on=created_on,
testrail_completed_on=completed_on)
'''
# self.session.add(report)
self.session.commit()