-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathSqlExperimentsDriver.py
executable file
·194 lines (171 loc) · 6.85 KB
/
SqlExperimentsDriver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
'''
Created on Mar 15, 2015
@author: dyerke
'''
from multiprocessing import Process, Value
import os
import time
import traceback
import pg8000
class StatementExecutorTemplateCallback:
def __init__(self):
self._mQuery = self._get_query()
def _get_query(self):
raise NotImplementedError
def get_description(self):
return "Executing {}".format(self._mQuery)
def do_in_cursor(self, cur):
cur.execute(self._mQuery)
class ManagedExecutorTemplateCallback:
INFINITE = 99999.
def __init__(self, target_callback):
self._mTargetCallback = target_callback
def execute_target_method(self, cur, duration):
start = time.time()
self._mTargetCallback.do_in_cursor(cur)
end = time.time()
callback_duration = (end - start) * 1000.
duration.value = callback_duration
def do_in_cursor(self, cur):
print self._mTargetCallback.get_description()
# wait at most 10min
timeout = 600.0
#
duration = Value('f', ManagedExecutorTemplateCallback.INFINITE)
p = Process(target=self.execute_target_method, args=(cur, duration))
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
return duration.value
class StatementExecutorTemplate:
SIG_DIGITS = 6
def __init__(self, db_name, username, password, hostname, port):
self._mDbName = db_name
self._mUsername = username
self._mPassword = password
self._mHostname = hostname
self._mPort = port
def execute(self, callback):
conn = None
cur = None
try:
conn = pg8000.connect(database=self._mDbName, user=self._mUsername, password=self._mPassword, host=self._mHostname, port=self._mPort)
cur = conn.cursor()
l_wrapped_callback = ManagedExecutorTemplateCallback(callback)
duration = l_wrapped_callback.do_in_cursor(cur)
print "duration= {}ms".format(duration)
except:
traceback.print_exc()
finally:
if cur is not None:
cur.close()
if conn is not None:
conn.close()
class InternalStatesCallback(StatementExecutorTemplateCallback):
def _get_query(self):
return "select * from sales"
class InternalTotalSalesForEachCustomerCallback(StatementExecutorTemplateCallback):
def _get_query(self):
statement = """
SELECT users.name as customer_name, sum(sales.quantity) as quantity_sold, sum(sales.price) as dollar_value
FROM users, sales
WHERE users.id = sales.uid
GROUP BY users.name
"""
return statement
class InternalTotalSalesForEachStateCallback(StatementExecutorTemplateCallback):
def _get_query(self):
statement = """
SELECT states.name as state_name, sum(sales.quantity) as quantity_sold, sum(sales.price) as dollar_value
FROM sales, users, states
WHERE sales.uid = users.id AND
users.state = states.id
GROUP BY states.name
"""
return statement
class InternalTotalSalesForAGivenCustomerCallback(StatementExecutorTemplateCallback):
def _get_query(self):
statement = """
SELECT products.sku as product_sku, sum(sales.quantity) as quantity_sold, sum(sales.price) as dollar_value
FROM sales, products, users
WHERE users.name = 'user_1000' AND
sales.uid = users.id AND
products.id = sales.pid
GROUP BY products.sku
ORDER BY dollar_value
"""
return statement
class InternalTotalSalesForEachProductCustomerCallback(StatementExecutorTemplateCallback):
def _get_query(self):
statement = """
SELECT products.sku as product_sku, users.name as customer, sum(sales.quantity) as quantity_sold, sum(sales.price) as dollar_value
FROM sales, products, users
WHERE sales.uid = users.id AND
products.id = sales.pid
GROUP BY products.sku, users.name
ORDER BY dollar_value
"""
return statement
class InternalTotalSalesForEachCategoryAndStateCallback(StatementExecutorTemplateCallback):
def _get_query(self):
statement = """
SELECT states.name as state_name, categories.name as category_name, sum(sales.quantity) as total_quantity, sum(sales.price) as dollar_value
FROM sales, products, users, categories, states
WHERE sales.uid = users.id AND
users.state = states.id AND
products.id = sales.pid AND
categories.id = products.cid
GROUP BY states.name, categories.name
"""
return statement
class InternalTotalSalesForTop20CategoriesAndCustomersCallback(StatementExecutorTemplateCallback):
def _get_query(self):
statement = """
with top_20_categories as (
select c.id as category_id, c.name as category_name, sum(s.quantity) as quantity_sold, sum(s.price) as dollar_value
from sales as s
inner join products as p on s.pid=p.id
inner join categories as c on p.cid=c.id
group by c.id, c.name
order by dollar_value desc
limit 20
), top_20_customers as (
select u.id as customer_id, u.name as customer_name, sum(s.quantity) as quantity_sold, sum(s.price) as dollar_value
from sales as s
inner join users as u on s.uid=u.id
group by u.id, u.name
order by dollar_value desc
limit 20
)
select tcat.category_name as top_category, tc.customer_name as top_customer, sum(s.quantity) as quantity_sold, sum(s.price) as dollar_value
from sales s
inner join products as p on s.pid=p.id
inner join top_20_categories as tcat on p.cid=tcat.category_id
inner join top_20_customers as tc on s.uid=tc.customer_id
group by tcat.category_name, tc.customer_name
order by dollar_value desc
"""
return statement
if __name__ == '__main__':
db_name = 'dse201'
username = 'ec2user'
password = 'ec2user'
#hostname = 'ec2-54-68-182-118.us-west-2.compute.amazonaws.com'
hostname = 'ip-172-31-37-65'
port = 5432
template = StatementExecutorTemplate(db_name, username, password, hostname, port)
callbacks = [InternalTotalSalesForEachCustomerCallback,
InternalTotalSalesForEachStateCallback,
InternalTotalSalesForAGivenCustomerCallback,
InternalTotalSalesForEachProductCustomerCallback,
InternalTotalSalesForEachCategoryAndStateCallback,
InternalTotalSalesForTop20CategoriesAndCustomersCallback
]
clear_cache_script_name= './clear_cache.sh'
for c in callbacks:
print "Invoking script {}".format(clear_cache_script_name)
os.system(clear_cache_script_name)
l_callback = c()
template.execute(l_callback)
print '\n'