forked from procub3r/RepCl.py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauto_replicaNode.py
144 lines (114 loc) · 4.7 KB
/
auto_replicaNode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from flask import Flask, request
import uuid, requests, time, pickle, json, os
from sup_repcl import RepCl
from sup_veccl import VecCl
import numpy as np
# RUNNING ON PORT 5001
'''
Read -> 0
Write -> 1
Sync -> 2
'''
app = Flask(__name__)
replicaData = {"known-replica-1" : "rep_init_1",
"known-replica-2" : "rep_init_2",
"known-replica-3" : "rep_init_3",
"known-replica-4" : "rep_init_4"}
event_logs = {}
sync_count = 0
repcl_time = RepCl(1)
veccl_time = VecCl(1)
master_url = "http://master-service.default.svc.cluster.local"
@app.route('/', methods=['GET', 'POST'])
def index() :
global sync_count
# New incoming request from MASTER NODE
if request.method == 'POST' :
# Checking if data (sync data) is from master (in JSON format)
try :
syncData = request.get_json()["data"]
sync_count += 1
# Writing "Before" data to file
# with open("replica.txt", "a") as file :
# file.write(f"\n\nBefore sync # {sync_count} : \n")
# file.write(json.dumps(event_logs))
print("\nSyncing\n")
# Sending replicaData before updating (to reduce sending repeating data)
requests.post(master_url, json = {"data" : event_logs})
# Updating replicaData with masterData
for item in syncData.items() :
# New unseen value
if item not in replicaData.items() :
event_logs[item[0]] = item[1]
# Writing "After" data to file
with open("/data/replica.txt", "w") as file :
#file.write(f"\n\nAfter sync # {sync_count} : \n")
file.write(json.dumps(event_logs))
print("\nDone Syncing\n")
# data is from client (in FORM format)
except :
pickled_repcl = request.files['repcl_time'].read()
pickled_veccl = request.files['veccl_time'].read()
sender_repcl = pickle.loads(pickled_repcl)
sender_veccl = pickle.loads(pickled_veccl)
repcl_time.recv(sender_repcl)
veccl_time.merge(sender_veccl)
# Writing data
userData = request.args.get("data")
event_id = str(uuid.uuid1())
replicaData[event_id] = [userData]
event_logs[event_id] = {"RepTime" : repcl_time.to_dict(),
"VecTime" : veccl_time.to_dict(),
"action" : 'POST',
"status" : 'SUC'}
elif request.method == 'GET' :
event_id = str(uuid.uuid1())
uuid_key = request.args.get("uuid_key")
type = request.args.get("type")
pickled_repcl = request.files['repcl_time'].read()
pickled_veccl = request.files['veccl_time'].read()
sender_repcl = pickle.loads(pickled_repcl)
sender_veccl = pickle.loads(pickled_veccl)
repcl_time.recv(sender_repcl)
veccl_time.merge(sender_veccl)
# get request
if (type == 'get') :
if uuid_key in replicaData.keys() :
status = 'SUC'
else :
status = 'FAIL'
event_logs[event_id] = {"RepTime" : repcl_time.to_dict(),
"VecTime" : veccl_time.to_dict(),
"action" : 'GET',
"status" : status}
# put request
elif type == 'put' :
if uuid_key in replicaData.keys() :
new_val = request.args.get("new_val")
replicaData[uuid_key] = new_val
status = 'SUC'
else :
status = 'FAIL'
event_logs[event_id] = {"RepTime" : repcl_time.to_dict(),
"VecTime" : veccl_time.to_dict(),
"action" : 'PUT',
"status" : status}
# delete request
elif type == 'delete' :
# Deleting from replica node
# Delaying for 3 seconds before the deletion
time.sleep(3)
if uuid_key in replicaData.keys() :
replicaData.pop(uuid_key)
status = 'SUC'
else :
status = 'FAIL'
event_logs[event_id] = {"RepTime" : repcl_time.to_dict(),
"VecTime" : veccl_time.to_dict(),
"action" : 'DEL',
"status" : status}
return'''
This is the replica node
'''
if __name__ == '__main__':
app.run(debug = False, host='0.0.0.0', port = 5001)