forked from compose/governor
-
Notifications
You must be signed in to change notification settings - Fork 1
/
governor.py
executable file
·151 lines (120 loc) · 5.07 KB
/
governor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python
import sys, os, yaml, time, urllib2, atexit, signal
import logging
from helpers.keystore import Etcd
from helpers.postgresql import Postgresql
from helpers.ha import Ha
from helpers.ascii import splash, showtime
LOG_LEVEL = logging.DEBUG if os.getenv('DEBUG', None) else logging.INFO
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=LOG_LEVEL)
# load passed config file, or default
config_file = 'postgres0.yml'
if len(sys.argv) > 1:
config_file = sys.argv[1]
with open(config_file, "r") as f:
config = yaml.load(f.read())
# allow config setting from env, for docker
if os.getenv('GOVERNOR_ETCD_HOST'):
config['etcd']['host'] = os.getenv('GOVERNOR_ETCD_HOST')
if os.getenv('GOVERNOR_POSTGRESQL_NAME'):
config['postgresql']['name'] = os.getenv('GOVERNOR_POSTGRESQL_NAME')
if os.getenv('GOVERNOR_POSTGRESQL_CONNECT'):
config['postgresql']['connect'] = os.getenv('GOVERNOR_POSTGRESQL_CONNECT')
if os.getenv('GOVERNOR_POSTGRESQL_LISTEN'):
config['postgresql']['listen'] = os.getenv('GOVERNOR_POSTGRESQL_LISTEN')
if os.getenv('GOVERNOR_POSTGRESQL_READ_ONLY_PORT'):
config['postgresql']['read_only_port'] = os.getenv('GOVERNOR_POSTGRESQL_READ_ONLY_PORT')
if os.getenv('GOVERNOR_POSTGRESQL_DATA_DIR'):
config['postgresql']['data_dir'] = os.getenv('GOVERNOR_POSTGRESQL_DATA_DIR')
if os.getenv('GOVERNOR_POSTGRESQL_REPLICATION_NETWORK'):
config['postgresql']['replication']['network'] = os.getenv('GOVERNOR_POSTGRESQL_REPLICATION_NETWORK')
etcd = Etcd(config["etcd"])
postgresql = Postgresql(config["postgresql"])
ha = Ha(postgresql, etcd)
# leave things clean when shutting down, if possible
def shutdown(signal, frame):
logging.info("Governor Shutting Down: Received Shutdown Signal")
try:
if ha.has_lock():
logging.info("Governor Shutting Down: Abdicating Leadership")
etcd.abdicate(postgresql.name)
logging.info("Governor Shutting Down: Removing Membership")
etcd.delete_member(postgresql.name)
except:
logging.exception("Error during Abdication")
pass
logging.info("Governor Shutting Down: Stopping Postgres")
postgresql.stop()
sys.exit(0)
def graceful_reload(signal, frame):
logging.info("Governor Running: Received HUP Signal - Reloading")
postgresql.reload()
# atexit.register(shutdown)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGHUP, graceful_reload)
# wait for etcd to be available
splash()
logging.info("Governor Starting up: Connect to Etcd")
etcd_ready = False
while not etcd_ready:
try:
etcd.touch_member(postgresql.name, postgresql.advertised_connection_string)
etcd_ready = True
except urllib2.URLError:
logging.info("waiting on etcd")
time.sleep(5)
# is data directory empty?
if postgresql.data_directory_empty():
logging.info("Governor Starting up: Empty Data Dir")
# racing to initialize
if etcd.race("/initialize", postgresql.name):
logging.info("Governor Starting up: Initialisation Race ... WON!!!")
logging.info("Governor Starting up: Initialise Postgres")
postgresql.initialize()
logging.info("Governor Starting up: Initialise Complete")
etcd.take_leader(postgresql.name)
logging.info("Governor Starting up: Starting Postgres")
postgresql.start(master=True)
else:
logging.info("Governor Starting up: Initialisation Race ... LOST")
time.sleep(20)
logging.info("Governor Starting up: Sync Postgres from Leader")
synced_from_leader = False
while not synced_from_leader:
leader = etcd.current_leader()
if not leader:
time.sleep(5)
continue
if postgresql.sync_from_leader(leader):
logging.info("Governor Starting up: Sync Completed")
postgresql.write_recovery_conf(leader)
logging.info("Governor Starting up: Starting Postgres")
postgresql.start(master=False)
synced_from_leader = True
else:
time.sleep(5)
else:
logging.info("Governor Starting up: Existing Data Dir")
postgresql.copy_pg_hba()
postgresql.follow_no_leader()
logging.info("Governor Starting up: Starting Postgres")
postgresql.start(master=False)
showtime()
logging.info("Governor Running: Starting Running Loop")
while True:
try:
logging.info("Governor Running: %s" % ha.run_cycle())
# create replication slots
if postgresql.is_leader():
logging.debug("Governor Running: I am the Leader")
for member in etcd.members():
member = member['hostname']
if member != postgresql.name:
postgresql.create_replication_slot(member)
etcd.touch_member(postgresql.name, postgresql.advertised_connection_string)
except SystemExit as e:
logging.info("Governor Shutting Down: Exiting Running Loop")
except Exception as e:
logging.exception("Unexpected error: %s" % e)
finally:
time.sleep(config["loop_wait"])