Skip to content

Commit c7de87d

Browse files
committed
Add leaderelection module
Add leaderelection module, based off of the leaderelection module in kubernetes-client/python. The module has been altered slightly to support asyncio. Fixes #297
1 parent 3ab6408 commit c7de87d

File tree

8 files changed

+921
-0
lines changed

8 files changed

+921
-0
lines changed

kubernetes_asyncio/leaderelection/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
import sys
17+
18+
logging.basicConfig(level=logging.INFO)
19+
20+
21+
class Config:
22+
# Validate config, exit if an error is detected
23+
def __init__(
24+
self,
25+
lock,
26+
lease_duration,
27+
renew_deadline,
28+
retry_period,
29+
onstarted_leading,
30+
onstopped_leading,
31+
):
32+
self.jitter_factor = 1.2
33+
34+
if lock is None:
35+
sys.exit("lock cannot be None")
36+
self.lock = lock
37+
38+
if lease_duration <= renew_deadline:
39+
sys.exit("lease_duration must be greater than renew_deadline")
40+
41+
if renew_deadline <= self.jitter_factor * retry_period:
42+
sys.exit("renewDeadline must be greater than retry_period*jitter_factor")
43+
44+
if lease_duration < 1:
45+
sys.exit("lease_duration must be greater than one")
46+
47+
if renew_deadline < 1:
48+
sys.exit("renew_deadline must be greater than one")
49+
50+
if retry_period < 1:
51+
sys.exit("retry_period must be greater than one")
52+
53+
self.lease_duration = lease_duration
54+
self.renew_deadline = renew_deadline
55+
self.retry_period = retry_period
56+
57+
if onstarted_leading is None:
58+
sys.exit("callback onstarted_leading cannot be None")
59+
self.onstarted_leading = onstarted_leading
60+
61+
if onstopped_leading is None:
62+
self.onstopped_leading = self.on_stoppedleading_callback
63+
else:
64+
self.onstopped_leading = onstopped_leading
65+
66+
# Default callback for when the current candidate if a leader, stops leading
67+
def on_stoppedleading_callback(self):
68+
logging.info("{} stopped leading".format(self.lock.identity))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import os
17+
import uuid
18+
19+
from kubernetes_asyncio import config
20+
from kubernetes_asyncio.client import api_client
21+
from kubernetes_asyncio.leaderelection import electionconfig, leaderelection
22+
from kubernetes_asyncio.leaderelection.resourcelock.configmaplock import (
23+
ConfigMapLock,
24+
)
25+
26+
27+
async def main():
28+
29+
# Authenticate using config file
30+
await config.load_kube_config(config_file=os.environ.get("KUBECONFIG", ""))
31+
32+
# Parameters required from the user
33+
34+
# A unique identifier for this candidate
35+
candidate_id = uuid.uuid4()
36+
37+
# Name of the lock object to be created
38+
lock_name = "examplepython"
39+
40+
# Kubernetes namespace
41+
lock_namespace = "default"
42+
43+
# The function that a user wants to run once a candidate is elected as a
44+
# leader. Cancellation is supported (when a held leader lock is lost).
45+
async def example_start_func():
46+
try:
47+
print("I am leader")
48+
except asyncio.CancelledError:
49+
print(
50+
"Start function cancelled - lost leader election after becoming leader"
51+
)
52+
53+
async def example_end_func():
54+
print("I am no longer leader")
55+
56+
# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
57+
# In that case, a default callback function will be used
58+
59+
async with api_client.ApiClient() as apic:
60+
# Create config
61+
leader_election_config = electionconfig.Config(
62+
ConfigMapLock(lock_name, lock_namespace, candidate_id, apic),
63+
lease_duration=17,
64+
renew_deadline=15,
65+
retry_period=5,
66+
onstarted_leading=example_start_func,
67+
onstopped_leading=example_end_func,
68+
)
69+
70+
# Enter leader election
71+
await leaderelection.LeaderElection(leader_election_config).run()
72+
# User can choose to do another round of election or simply exit
73+
print("Exited leader election")
74+
75+
76+
if __name__ == "__main__":
77+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import datetime
17+
import json
18+
import logging
19+
import sys
20+
import time
21+
from http import HTTPStatus
22+
23+
from .leaderelectionrecord import LeaderElectionRecord
24+
25+
logging.basicConfig(level=logging.INFO)
26+
27+
"""
28+
This package implements leader election using an annotation in a Kubernetes
29+
object. The onstarted_leading coroutine is run as a task, which is cancelled if
30+
the leader lock is obtained and then lost.
31+
32+
At first all candidates are considered followers. The one to create a lock or
33+
update an existing lock first becomes the leader and remains so until it keeps
34+
renewing its lease.
35+
"""
36+
37+
38+
class LeaderElection:
39+
def __init__(self, election_config):
40+
if election_config is None:
41+
sys.exit("argument config not passed")
42+
43+
# Latest record observed in the created lock object
44+
self.observed_record = None
45+
46+
# The configuration set for this candidate
47+
self.election_config = election_config
48+
49+
# Latest update time of the lock
50+
self.observed_time_milliseconds = 0
51+
52+
# Point of entry to Leader election
53+
async def run(self):
54+
# Try to create/ acquire a lock
55+
if await self.acquire():
56+
logging.info(
57+
"{} successfully acquired lease".format(
58+
self.election_config.lock.identity
59+
)
60+
)
61+
62+
task = asyncio.create_task(self.election_config.onstarted_leading())
63+
64+
await self.renew_loop()
65+
66+
# Leader lock lost - cancel the onstarted_leading coroutine if it's
67+
# still running
68+
task.cancel()
69+
70+
# Failed to update lease, run OnStoppedLeading callback
71+
await self.election_config.onstopped_leading()
72+
73+
async def acquire(self):
74+
# Follower
75+
logging.info("{} is a follower".format(self.election_config.lock.identity))
76+
retry_period = self.election_config.retry_period
77+
78+
while True:
79+
succeeded = await self.try_acquire_or_renew()
80+
81+
if succeeded:
82+
return True
83+
84+
await asyncio.sleep(retry_period)
85+
86+
async def renew_loop(self):
87+
# Leader
88+
logging.info(
89+
"Leader has entered renew loop and will try to update lease continuously"
90+
)
91+
92+
retry_period = self.election_config.retry_period
93+
renew_deadline = self.election_config.renew_deadline * 1000
94+
95+
while True:
96+
timeout = int(time.time() * 1000) + renew_deadline
97+
succeeded = False
98+
99+
while int(time.time() * 1000) < timeout:
100+
succeeded = await self.try_acquire_or_renew()
101+
102+
if succeeded:
103+
break
104+
await asyncio.sleep(retry_period)
105+
106+
if succeeded:
107+
await asyncio.sleep(retry_period)
108+
continue
109+
110+
# failed to renew, return
111+
return
112+
113+
async def try_acquire_or_renew(self):
114+
now_timestamp = time.time()
115+
now = datetime.datetime.fromtimestamp(now_timestamp)
116+
117+
# Check if lock is created
118+
lock_status, old_election_record = await self.election_config.lock.get(
119+
self.election_config.lock.name, self.election_config.lock.namespace
120+
)
121+
122+
# create a default Election record for this candidate
123+
leader_election_record = LeaderElectionRecord(
124+
self.election_config.lock.identity,
125+
str(self.election_config.lease_duration),
126+
str(now),
127+
str(now),
128+
)
129+
130+
# A lock is not created with that name, try to create one
131+
if not lock_status:
132+
if json.loads(old_election_record.body)["code"] != HTTPStatus.NOT_FOUND:
133+
logging.info(
134+
"Error retrieving resource lock {} as {}".format(
135+
self.election_config.lock.name, old_election_record.reason
136+
)
137+
)
138+
return False
139+
140+
logging.info(
141+
"{} is trying to create a lock".format(
142+
leader_election_record.holder_identity
143+
)
144+
)
145+
create_status = await self.election_config.lock.create(
146+
name=self.election_config.lock.name,
147+
namespace=self.election_config.lock.namespace,
148+
election_record=leader_election_record,
149+
)
150+
151+
if create_status is False:
152+
logging.info(
153+
"{} Failed to create lock".format(
154+
leader_election_record.holder_identity
155+
)
156+
)
157+
return False
158+
159+
self.observed_record = leader_election_record
160+
self.observed_time_milliseconds = int(time.time() * 1000)
161+
return True
162+
163+
# A lock exists with that name
164+
# Validate old_election_record
165+
if old_election_record is None:
166+
# try to update lock with proper annotation and election record
167+
return await self.update_lock(leader_election_record)
168+
169+
if (
170+
old_election_record.holder_identity is None
171+
or old_election_record.lease_duration is None
172+
or old_election_record.acquire_time is None
173+
or old_election_record.renew_time is None
174+
):
175+
# try to update lock with proper annotation and election record
176+
return await self.update_lock(leader_election_record)
177+
178+
# Report transitions
179+
if (
180+
self.observed_record
181+
and self.observed_record.holder_identity
182+
!= old_election_record.holder_identity
183+
):
184+
logging.info(
185+
"Leader has switched to {}".format(old_election_record.holder_identity)
186+
)
187+
188+
if (
189+
self.observed_record is None
190+
or old_election_record.__dict__ != self.observed_record.__dict__
191+
):
192+
self.observed_record = old_election_record
193+
self.observed_time_milliseconds = int(time.time() * 1000)
194+
195+
# If This candidate is not the leader and lease duration is yet to finish
196+
if (
197+
self.election_config.lock.identity != self.observed_record.holder_identity
198+
and self.observed_time_milliseconds
199+
+ self.election_config.lease_duration * 1000
200+
> int(now_timestamp * 1000)
201+
):
202+
logging.info(
203+
"yet to finish lease_duration, lease held by {} and has not expired".format(
204+
old_election_record.holder_identity
205+
)
206+
)
207+
return False
208+
209+
# If this candidate is the Leader
210+
if self.election_config.lock.identity == self.observed_record.holder_identity:
211+
# Leader updates renewTime, but keeps acquire_time unchanged
212+
leader_election_record.acquire_time = self.observed_record.acquire_time
213+
214+
return await self.update_lock(leader_election_record)
215+
216+
async def update_lock(self, leader_election_record):
217+
# Update object with latest election record
218+
update_status = await self.election_config.lock.update(
219+
self.election_config.lock.name,
220+
self.election_config.lock.namespace,
221+
leader_election_record,
222+
)
223+
224+
if update_status is False:
225+
logging.info(
226+
"{} failed to acquire lease".format(
227+
leader_election_record.holder_identity
228+
)
229+
)
230+
return False
231+
232+
self.observed_record = leader_election_record
233+
self.observed_time_milliseconds = int(time.time() * 1000)
234+
logging.info(
235+
"leader {} has successfully acquired lease".format(
236+
leader_election_record.holder_identity
237+
)
238+
)
239+
return True

0 commit comments

Comments
 (0)