-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent_updates.py
executable file
·130 lines (105 loc) · 2.97 KB
/
concurrent_updates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#! /usr/bin/env python
import traceback
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine, Connection
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"isolation_level",
choices=["ru", "rc", "rr", "s"],
help="""
isolation level:
ru (read uncomitted)
rc (read committed)
rr (repeatable reads)
s (serializable)
""",
)
parser.add_argument(
"--serial",
"-s",
action="store_true",
help="""
whether to serialize transaction execution or interleave them. Serialized execution will never cause a serialization error
""",
)
args = parser.parse_args()
isolation_level_map = {
"ru": "READ UNCOMMITTED",
"rc": "READ COMMITTED",
"rr": "REPEATABLE READ",
"s": "SERIALIZABLE",
}
target_isolation_level = isolation_level_map[args.isolation_level]
COUNTER_NAME = "a"
engine = create_engine(
"postgresql+psycopg://postgres:postgres@localhost/postgres",
isolation_level=target_isolation_level,
)
def open_connection(engine: Engine) -> Connection:
return engine.connect()
def init_counter(conn: Connection):
conn.execute(
text("CREATE TABLE IF NOT EXISTS counter (name text primary key, value int)")
)
conn.execute(
text(
"INSERT INTO counter VALUES (:name, :value) ON CONFLICT ON CONSTRAINT counter_pkey DO UPDATE SET value = :value"
),
dict(name=COUNTER_NAME, value=1),
)
conn.commit()
def read_counter(conn: Connection) -> int:
(value,) = conn.execute(
text("SELECT value FROM counter WHERE name = :name"), dict(name=COUNTER_NAME)
).fetchone()
return value
def update_counter(conn: Connection, value: int) -> int:
(value,) = conn.execute(
text("UPDATE counter SET value = :value WHERE name = :name RETURNING value"),
dict(value=value, name=COUNTER_NAME),
).fetchone()
return value
c1 = open_connection(engine)
c2 = open_connection(engine)
init_counter(c1)
print(f"initial counter value is {read_counter(c1)}")
print(f"2 transactions are now trying to currently increment the counter")
if args.serial:
print(
"""
execution is serialized:
1: T1 reads counter
2: T1 increments counters and commits
3: T2 reads counter
4: T2 increments counters and commits
"""
)
value1 = read_counter(c1)
update_counter(c1, value1 + 1)
c1.commit()
value2 = read_counter(c2)
update_counter(c2, value2 + 1)
c2.commit()
else:
print(
"""
execution is interleaved:
1: T1 reads counter
2: T2 reads counter
3: T1 increments counters and commits
4: T2 increments counters and commits
"""
)
value1 = read_counter(c1)
value2 = read_counter(c2)
update_counter(c1, value1 + 1)
c1.commit()
try:
update_counter(c2, value2 + 1)
except Exception as e:
print(f"T2 could not update counter: {traceback.format_exc(0)}")
c2.commit()
c3 = open_connection(engine)
value = read_counter(c3)
print(f"final counter value is {value}")