diff --git a/examples/leaderelection.py b/examples/leaderelection.py new file mode 100644 index 00000000..640be8b1 --- /dev/null +++ b/examples/leaderelection.py @@ -0,0 +1,81 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import os +import uuid + +from kubernetes_asyncio import config +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.leaderelection import electionconfig, leaderelection +from kubernetes_asyncio.leaderelection.resourcelock.leaselock import LeaseLock + +logging.basicConfig(level=logging.INFO) + + +async def main(): + + # Authenticate using config file + await config.load_kube_config(config_file=os.environ.get("KUBECONFIG", "")) + + # Parameters required from the user + + # A unique identifier for this candidate + candidate_id = uuid.uuid4() + + # Name of the lock object to be created + lock_name = "examplepython" + + # Kubernetes namespace + lock_namespace = "default" + + # The function that a user wants to run once a candidate is elected as a + # leader. Cancellation is supported (when a held leader lock is lost). + async def example_start_func(): + try: + print("I am leader") + except asyncio.CancelledError: + print( + "Start function cancelled - lost leader election after becoming leader" + ) + + async def example_end_func(): + print("I am no longer leader") + + # A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading() + # In that case, a default callback function will be used + + async with api_client.ApiClient() as apic: + # Create config + leader_election_config = electionconfig.Config( + # A legacy ConfigMapLock is also available + LeaseLock(lock_name, lock_namespace, candidate_id, apic), + lease_duration=17, + renew_deadline=15, + retry_period=5, + # Coroutines are also accepted, to facilitate providing context + # (e.g. passing apic) + onstarted_leading=example_start_func, + onstopped_leading=example_end_func, + ) + + # Enter leader election + await leaderelection.LeaderElection(leader_election_config).run() + # User can choose to do another round of election or simply exit + print("Exited leader election") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/kubernetes_asyncio/leaderelection/__init__.py b/kubernetes_asyncio/leaderelection/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kubernetes_asyncio/leaderelection/electionconfig.py b/kubernetes_asyncio/leaderelection/electionconfig.py new file mode 100644 index 00000000..e5c5baf6 --- /dev/null +++ b/kubernetes_asyncio/leaderelection/electionconfig.py @@ -0,0 +1,68 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable, Coroutine # noqa:F401 + + +class Config: + # Validate config, exit if an error is detected + + # onstarted_leading and onstopped_leading accept either coroutines or + # coroutine functions. Coroutines faciliate passing context, but coroutine + # functions can be simpler when passing context is not required. + # + # One example of when passing context is helpful is sharing the ApiClient + # used by the leader election, which can then be used for subsequent + # Kubernetes API operations upon onstopped_leading or onstopped_leading. + def __init__( + self, + lock, + lease_duration, + renew_deadline, + retry_period, + onstarted_leading, # type: Coroutine | Callable[[], Coroutine] + onstopped_leading=None, # type: Coroutine | Callable[[], Coroutine] | None + ): + self.jitter_factor = 1.2 + + if lock is None: + raise ValueError("lock cannot be None") + self.lock = lock + + if lease_duration <= renew_deadline: + raise ValueError("lease_duration must be greater than renew_deadline") + + if renew_deadline <= self.jitter_factor * retry_period: + raise ValueError( + "renewDeadline must be greater than retry_period*jitter_factor" + ) + + if lease_duration < 1: + raise ValueError("lease_duration must be greater than one") + + if renew_deadline < 1: + raise ValueError("renew_deadline must be greater than one") + + if retry_period < 1: + raise ValueError("retry_period must be greater than one") + + self.lease_duration = lease_duration + self.renew_deadline = renew_deadline + self.retry_period = retry_period + + if onstarted_leading is None: + raise ValueError("callback onstarted_leading cannot be None") + self.onstarted_leading = onstarted_leading + + self.onstopped_leading = onstopped_leading diff --git a/kubernetes_asyncio/leaderelection/leaderelection.py b/kubernetes_asyncio/leaderelection/leaderelection.py new file mode 100644 index 00000000..46af1efb --- /dev/null +++ b/kubernetes_asyncio/leaderelection/leaderelection.py @@ -0,0 +1,243 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import datetime +import inspect +import json +import logging +import sys +import time +from http import HTTPStatus + +from .leaderelectionrecord import LeaderElectionRecord + +""" +This package implements leader election using an annotation in a Kubernetes +object. The onstarted_leading coroutine is run as a task, which is cancelled if +the leader lock is obtained and then lost. + +At first all candidates are considered followers. The one to create a lock or +update an existing lock first becomes the leader and remains so until it fails +to renew its lease. +""" + + +class LeaderElection: + def __init__(self, election_config): + if election_config is None: + sys.exit("argument config not passed") + + # Latest record observed in the created lock object + self.observed_record = None + + # The configuration set for this candidate + self.election_config = election_config + + # Latest update time of the lock + self.observed_time_milliseconds = 0 + + # Point of entry to Leader election + async def run(self): + # Try to create/ acquire a lock + if await self.acquire(): + logging.info( + "%s successfully acquired lease", self.election_config.lock.identity + ) + + onstarted_leading_coroutine = ( + self.election_config.onstarted_leading + if inspect.iscoroutine(self.election_config.onstarted_leading) + else self.election_config.onstarted_leading() + ) + + task = asyncio.create_task(onstarted_leading_coroutine) + + await self.renew_loop() + + # Leader lock lost - cancel the onstarted_leading coroutine if it's + # still running. This permits onstarted_leading to clean up state + # that might not be accessible to onstopped_leading. + task.cancel() + + # Failed to update lease, run onstopped_leading callback. This is + # preserved in order to continue to provide an interface similar to + # the one provided by `kubernetes-client/python`. + if self.election_config.onstopped_leading is not None: + await ( + self.election_config.onstopped_leading + if inspect.iscoroutine(self.election_config.onstopped_leading) + else self.election_config.onstopped_leading() + ) + + async def acquire(self): + # Follower + logging.debug("%s is a follower", self.election_config.lock.identity) + retry_period = self.election_config.retry_period + + while True: + succeeded = await self.try_acquire_or_renew() + + if succeeded: + return True + + await asyncio.sleep(retry_period) + + async def renew_loop(self): + # Leader + logging.debug( + "Leader has entered renew loop and will try to update lease continuously" + ) + + retry_period = self.election_config.retry_period + renew_deadline = self.election_config.renew_deadline * 1000 + + while True: + timeout = int(time.time() * 1000) + renew_deadline + succeeded = False + + while int(time.time() * 1000) < timeout: + succeeded = await self.try_acquire_or_renew() + + if succeeded: + break + await asyncio.sleep(retry_period) + + if succeeded: + await asyncio.sleep(retry_period) + continue + + # failed to renew, return + return + + async def try_acquire_or_renew(self): + now_timestamp = time.time() + now = datetime.datetime.fromtimestamp(now_timestamp) + + # Check if lock is created + lock_status, old_election_record = await self.election_config.lock.get( + self.election_config.lock.name, self.election_config.lock.namespace + ) + + # create a default Election record for this candidate + leader_election_record = LeaderElectionRecord( + self.election_config.lock.identity, + str(self.election_config.lease_duration), + str(now), + str(now), + ) + + # A lock is not created with that name, try to create one + if not lock_status: + if json.loads(old_election_record.body)["code"] != HTTPStatus.NOT_FOUND: + logging.error( + "Error retrieving resource lock %s as %s", + self.election_config.lock.name, + old_election_record.reason, + ) + return False + + logging.debug( + "%s is trying to create a lock", + leader_election_record.holder_identity, + ) + create_status = await self.election_config.lock.create( + name=self.election_config.lock.name, + namespace=self.election_config.lock.namespace, + election_record=leader_election_record, + ) + + if not create_status: + logging.error( + "%s failed to create lock", leader_election_record.holder_identity + ) + return False + + self.observed_record = leader_election_record + self.observed_time_milliseconds = int(time.time() * 1000) + return True + + # A lock exists with that name + # Validate old_election_record + if old_election_record is None: + # try to update lock with proper election record + return await self.update_lock(leader_election_record) + + if ( + old_election_record.holder_identity is None + or old_election_record.lease_duration is None + or old_election_record.acquire_time is None + or old_election_record.renew_time is None + ): + # try to update lock with proper election record + return await self.update_lock(leader_election_record) + + # Report transitions + if ( + self.observed_record + and self.observed_record.holder_identity + != old_election_record.holder_identity + ): + logging.debug( + "Leader has switched to %s", old_election_record.holder_identity + ) + + if ( + self.observed_record is None + or old_election_record.__dict__ != self.observed_record.__dict__ + ): + self.observed_record = old_election_record + self.observed_time_milliseconds = int(time.time() * 1000) + + # If This candidate is not the leader and lease duration is yet to finish + if ( + self.election_config.lock.identity != self.observed_record.holder_identity + and self.observed_time_milliseconds + + self.election_config.lease_duration * 1000 + > int(now_timestamp * 1000) + ): + logging.debug( + "Yet to finish lease_duration, lease held by %s and has not expired", + old_election_record.holder_identity, + ) + return False + + # If this candidate is the Leader + if self.election_config.lock.identity == self.observed_record.holder_identity: + # Leader updates renewTime, but keeps acquire_time unchanged + leader_election_record.acquire_time = self.observed_record.acquire_time + + return await self.update_lock(leader_election_record) + + async def update_lock(self, leader_election_record): + # Update object with latest election record + update_status = await self.election_config.lock.update( + self.election_config.lock.name, + self.election_config.lock.namespace, + leader_election_record, + ) + + if not update_status: + logging.warning( + "%s failed to acquire lease", leader_election_record.holder_identity + ) + return False + + self.observed_record = leader_election_record + self.observed_time_milliseconds = int(time.time() * 1000) + logging.debug( + "Leader %s has successfully updated lease", + leader_election_record.holder_identity, + ) + return True diff --git a/kubernetes_asyncio/leaderelection/leaderelection_test.py b/kubernetes_asyncio/leaderelection/leaderelection_test.py new file mode 100644 index 00000000..eb35011c --- /dev/null +++ b/kubernetes_asyncio/leaderelection/leaderelection_test.py @@ -0,0 +1,361 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio +import json +import unittest +from unittest import IsolatedAsyncioTestCase + +from kubernetes_asyncio.client.rest import ApiException + +from . import electionconfig, leaderelection + + +class LeaderElectionTest(IsolatedAsyncioTestCase): + + async def test_simple_leader_election(self): + election_history = [] + leadership_history = [] + + def on_create(): + election_history.append("create record") + leadership_history.append("get leadership") + + def on_update(): + election_history.append("update record") + + def on_change(): + election_history.append("change record") + + mock_lock = MockResourceLock( + "mock", + "mock_namespace", + "mock", + asyncio.Lock(), + on_create, + on_update, + on_change, + None, + ) + + async def on_started_leading(): + leadership_history.append("start leading") + + async def on_stopped_leading(): + leadership_history.append("stop leading") + + # Create config 4.5 4 3 + config = electionconfig.Config( + lock=mock_lock, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading(), + onstopped_leading=on_stopped_leading(), + ) + + # Enter leader election + await leaderelection.LeaderElection(config).run() + + self.assert_history( + election_history, + ["create record", "update record", "update record", "update record"], + ) + self.assert_history( + leadership_history, ["get leadership", "start leading", "stop leading"] + ) + + async def test_leader_election(self): + election_history = [] + leadership_history = [] + + def on_create_A(): + election_history.append("A creates record") + leadership_history.append("A gets leadership") + + def on_update_A(): + election_history.append("A updates record") + + def on_change_A(): + election_history.append("A gets leadership") + + lock = asyncio.Lock() + + mock_lock_A = MockResourceLock( + "mock", + "mock_namespace", + "MockA", + lock, + on_create_A, + on_update_A, + on_change_A, + None, + ) + mock_lock_A.renew_count_max = 3 + + async def on_started_leading_A(): + leadership_history.append("A starts leading") + + async def on_stopped_leading_A(): + leadership_history.append("A stops leading") + + config_A = electionconfig.Config( + lock=mock_lock_A, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading_A(), + onstopped_leading=on_stopped_leading_A(), + ) + + def on_create_B(): + election_history.append("B creates record") + leadership_history.append("B gets leadership") + + def on_update_B(): + election_history.append("B updates record") + + def on_change_B(): + leadership_history.append("B gets leadership") + + mock_lock_B = MockResourceLock( + "mock", + "mock_namespace", + "MockB", + lock, + on_create_B, + on_update_B, + on_change_B, + None, + ) + mock_lock_B.renew_count_max = 4 + + async def on_started_leading_B(): + leadership_history.append("B starts leading") + + async def on_stopped_leading_B(): + leadership_history.append("B stops leading") + + config_B = electionconfig.Config( + lock=mock_lock_B, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading_B(), + onstopped_leading=on_stopped_leading_B(), + ) + + mock_lock_B.leader_record = mock_lock_A.leader_record + + config_A_election = asyncio.create_task( + leaderelection.LeaderElection(config_A).run() + ) + config_B_election = asyncio.create_task( + leaderelection.LeaderElection(config_B).run() + ) + + await asyncio.gather(config_A_election, config_B_election) + + self.assert_history( + election_history, + [ + "A creates record", + "A updates record", + "A updates record", + "B updates record", + "B updates record", + "B updates record", + "B updates record", + ], + ) + self.assert_history( + leadership_history, + [ + "A gets leadership", + "A starts leading", + "A stops leading", + "B gets leadership", + "B starts leading", + "B stops leading", + ], + ) + + """Expected behavior: to check if the leader stops leading if it fails to update the lock within the renew_deadline + and stops leading after finally timing out. The difference between each try comes out to be approximately the sleep + time. + Example: + create record: 0s + on try update: 1.5s + on update: zzz s + on try update: 3s + on update: zzz s + on try update: 4.5s + on try update: 6s + Timeout - Leader Exits""" + + async def test_leader_election_with_renew_deadline(self): + election_history = [] + leadership_history = [] + + def on_create(): + election_history.append("create record") + leadership_history.append("get leadership") + + def on_update(): + election_history.append("update record") + + def on_change(): + election_history.append("change record") + + def on_try_update(): + election_history.append("try update record") + + mock_lock = MockResourceLock( + "mock", + "mock_namespace", + "mock", + asyncio.Lock(), + on_create, + on_update, + on_change, + on_try_update, + ) + mock_lock.renew_count_max = 3 + + async def on_started_leading(): + leadership_history.append("start leading") + + async def on_stopped_leading(): + leadership_history.append("stop leading") + + # Create config + config = electionconfig.Config( + lock=mock_lock, + lease_duration=2.5, + renew_deadline=2, + retry_period=1.5, + onstarted_leading=on_started_leading(), + onstopped_leading=on_stopped_leading(), + ) + + # Enter leader election + await leaderelection.LeaderElection(config).run() + + self.assert_history( + election_history, + [ + "create record", + "try update record", + "update record", + "try update record", + "update record", + "try update record", + "try update record", + ], + ) + + self.assert_history( + leadership_history, ["get leadership", "start leading", "stop leading"] + ) + + def assert_history(self, history, expected): + self.assertIsNotNone(expected) + self.assertIsNotNone(history) + self.assertEqual(len(expected), len(history)) + + for idx in range(len(history)): + self.assertEqual( + history[idx], + expected[idx], + msg="Not equal at index {}, expected {}, got {}".format( + idx, expected[idx], history[idx] + ), + ) + + +class MockResourceLock: + def __init__( + self, + name, + namespace, + identity, + shared_lock, + on_create=None, + on_update=None, + on_change=None, + on_try_update=None, + ): + # self.leader_record is shared between two MockResourceLock objects + self.leader_record = [] + self.renew_count = 0 + self.renew_count_max = 4 + self.name = name + self.namespace = namespace + self.identity = str(identity) + self.lock = shared_lock + + self.on_create = on_create + self.on_update = on_update + self.on_change = on_change + self.on_try_update = on_try_update + + async def get(self, name, namespace): + await self.lock.acquire() + try: + if self.leader_record: + return True, self.leader_record[0] + + ApiException.body = json.dumps({"code": 404}) + return False, ApiException + finally: + self.lock.release() + + async def create(self, name, namespace, election_record): + await self.lock.acquire() + try: + if len(self.leader_record) == 1: + return False + self.leader_record.append(election_record) + self.on_create() + self.renew_count += 1 + return True + finally: + self.lock.release() + + async def update(self, name, namespace, updated_record): + await self.lock.acquire() + try: + if self.on_try_update: + self.on_try_update() + if self.renew_count >= self.renew_count_max: + return False + + old_record = self.leader_record[0] + self.leader_record[0] = updated_record + + self.on_update() + + if old_record.holder_identity != updated_record.holder_identity: + await asyncio.sleep(2) + self.on_change() + + self.renew_count += 1 + return True + finally: + self.lock.release() + + +if __name__ == "__main__": + unittest.main() diff --git a/kubernetes_asyncio/leaderelection/leaderelectionrecord.py b/kubernetes_asyncio/leaderelection/leaderelectionrecord.py new file mode 100644 index 00000000..e4825ad4 --- /dev/null +++ b/kubernetes_asyncio/leaderelection/leaderelectionrecord.py @@ -0,0 +1,22 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class LeaderElectionRecord: + # Leader election details, used in the lock object + def __init__(self, holder_identity, lease_duration, acquire_time, renew_time): + self.holder_identity = holder_identity + self.lease_duration = lease_duration + self.acquire_time = acquire_time + self.renew_time = renew_time diff --git a/kubernetes_asyncio/leaderelection/resourcelock/__init__.py b/kubernetes_asyncio/leaderelection/resourcelock/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kubernetes_asyncio/leaderelection/resourcelock/configmaplock.py b/kubernetes_asyncio/leaderelection/resourcelock/configmaplock.py new file mode 100644 index 00000000..16d26cd7 --- /dev/null +++ b/kubernetes_asyncio/leaderelection/resourcelock/configmaplock.py @@ -0,0 +1,152 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging + +from kubernetes_asyncio import client +from kubernetes_asyncio.client.rest import ApiException + +from ..leaderelectionrecord import LeaderElectionRecord + + +class ConfigMapLock: + def __init__(self, name, namespace, identity, api_client): + """ + :param name: name of the lock + :param namespace: namespace + :param identity: A unique identifier that the candidate is using + """ + # self._api_instance = None # See api_instance property + self.api_instance = client.CoreV1Api(api_client=api_client) + self.leader_electionrecord_annotationkey = ( + "control-plane.alpha.kubernetes.io/leader" + ) + self.name = name + self.namespace = namespace + self.identity = str(identity) + self.configmap_reference = None + self.lock_record = { + "holderIdentity": None, + "leaseDurationSeconds": None, + "acquireTime": None, + "renewTime": None, + } + + # get returns the election record from a ConfigMap Annotation + async def get(self, name, namespace): + """ + :param name: Name of the configmap object information to get + :param namespace: Namespace in which the configmap object is to be searched + :return: 'True, election record' if object found else 'False, exception response' + """ + try: + api_response = await self.api_instance.read_namespaced_config_map( + name, namespace + ) + + # If an annotation does not exist - add the leader_electionrecord_annotationkey + annotations = api_response.metadata.annotations + if annotations is None or annotations == "": + api_response.metadata.annotations = { + self.leader_electionrecord_annotationkey: "" + } + self.configmap_reference = api_response + return True, None + + # If an annotation exists but, the leader_electionrecord_annotationkey does not then add it as a key + if not annotations.get(self.leader_electionrecord_annotationkey): + api_response.metadata.annotations = { + self.leader_electionrecord_annotationkey: "" + } + self.configmap_reference = api_response + return True, None + + lock_record = self.get_lock_object( + json.loads(annotations[self.leader_electionrecord_annotationkey]) + ) + + self.configmap_reference = api_response + return True, lock_record + except ApiException as e: + return False, e + + async def create(self, name, namespace, election_record): + """ + :param electionRecord: Annotation string + :param name: Name of the configmap object to be created + :param namespace: Namespace in which the configmap object is to be created + :return: 'True' if object is created else 'False' if failed + """ + body = client.V1ConfigMap( + metadata={ + "name": name, + "annotations": { + self.leader_electionrecord_annotationkey: json.dumps( + self.get_lock_dict(election_record) + ) + }, + } + ) + + try: + await self.api_instance.create_namespaced_config_map( + namespace, body, pretty=True + ) + return True + except ApiException: + logging.exception("Failed to create lock") + return False + + async def update(self, name, namespace, updated_record): + """ + :param name: name of the lock to be updated + :param namespace: namespace the lock is in + :param updated_record: the updated election record + :return: True if update is successful False if it fails + """ + try: + # Set the updated record + self.configmap_reference.metadata.annotations[ + self.leader_electionrecord_annotationkey + ] = json.dumps(self.get_lock_dict(updated_record)) + await self.api_instance.replace_namespaced_config_map( + name=name, namespace=namespace, body=self.configmap_reference + ) + return True + except ApiException: + logging.exception("Failed to update lock") + return False + + def get_lock_object(self, lock_record): + leader_election_record = LeaderElectionRecord(None, None, None, None) + + if lock_record.get("holderIdentity"): + leader_election_record.holder_identity = lock_record["holderIdentity"] + if lock_record.get("leaseDurationSeconds"): + leader_election_record.lease_duration = lock_record["leaseDurationSeconds"] + if lock_record.get("acquireTime"): + leader_election_record.acquire_time = lock_record["acquireTime"] + if lock_record.get("renewTime"): + leader_election_record.renew_time = lock_record["renewTime"] + + return leader_election_record + + def get_lock_dict(self, leader_election_record): + self.lock_record["holderIdentity"] = leader_election_record.holder_identity + self.lock_record["leaseDurationSeconds"] = leader_election_record.lease_duration + self.lock_record["acquireTime"] = leader_election_record.acquire_time + self.lock_record["renewTime"] = leader_election_record.renew_time + + return self.lock_record diff --git a/kubernetes_asyncio/leaderelection/resourcelock/leaselock.py b/kubernetes_asyncio/leaderelection/resourcelock/leaselock.py new file mode 100644 index 00000000..75c801a7 --- /dev/null +++ b/kubernetes_asyncio/leaderelection/resourcelock/leaselock.py @@ -0,0 +1,143 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from datetime import datetime + +from kubernetes_asyncio import client +from kubernetes_asyncio.client.rest import ApiException + +from ..leaderelectionrecord import LeaderElectionRecord + + +class LeaseLock: + def __init__(self, name, namespace, identity, api_client): + """ + :param name: name of the lock + :param namespace: namespace + :param identity: A unique identifier that the candidate is using + """ + self.api_instance = client.CoordinationV1Api(api_client=api_client) + + # lease resource identity and reference + self.name = name + self.namespace = namespace + self.lease_reference = None + + # identity of this candidate + self.identity = str(identity) + + # get returns the election record from a Lease Annotation + async def get(self, name, namespace): + """ + :param name: Name of the lease object information to get + :param namespace: Namespace in which the lease object is to be searched + :return: 'True, election record' if object found else 'False, exception response' + """ + try: + lease = await self.api_instance.read_namespaced_lease(name, namespace) + except ApiException as e: + return False, e + else: + self.lease_reference = lease + return True, self.election_record(lease) + + async def create(self, name, namespace, election_record): + """ + :param electionRecord: Annotation string + :param name: Name of the lease object to be created + :param namespace: Namespace in which the lease object is to be created + :return: 'True' if object is created else 'False' if failed + """ + body = client.V1Lease( + metadata={"name": name}, spec=self.update_lease(election_record) + ) + + try: + await self.api_instance.create_namespaced_lease( + namespace, body, pretty=True + ) + return True + except ApiException: + logging.exception("Failed to create lock") + return False + + async def update(self, name, namespace, updated_record): + """ + :param name: name of the lock to be updated + :param namespace: namespace the lock is in + :param updated_record: the updated election record + :return: True if update is successful False if it fails + """ + try: + # update the Lease from the updated record + self.lease_reference.spec = self.update_lease( + updated_record, self.lease_reference.spec + ) + + await self.api_instance.replace_namespaced_lease( + name=name, namespace=namespace, body=self.lease_reference + ) + return True + except ApiException: + logging.exception("Failed to update lock") + return False + + def update_lease(self, leader_election_record, current_spec=None): + # existing or new lease? + spec = current_spec if current_spec else client.V1LeaseSpec() + + # lease configuration + spec.holder_identity = leader_election_record.holder_identity + spec.lease_duration_seconds = int(leader_election_record.lease_duration) + spec.acquire_time = self.time_str_to_iso(leader_election_record.acquire_time) + spec.renew_time = self.time_str_to_iso(leader_election_record.renew_time) + + return spec + + def election_record(self, lease): + """ + Get leader election record from Lease spec. + """ + leader_election_record = LeaderElectionRecord(None, None, None, None) + + if not lease.spec: + return leader_election_record + + if lease.spec.holder_identity: + leader_election_record.holder_identity = lease.spec.holder_identity + if lease.spec.lease_duration_seconds: + leader_election_record.lease_duration = str( + lease.spec.lease_duration_seconds + ) + if lease.spec.acquire_time: + leader_election_record.acquire_time = str( + datetime.replace(lease.spec.acquire_time, tzinfo=None) + ) + if lease.spec.renew_time: + leader_election_record.renew_time = str( + datetime.replace(lease.spec.renew_time, tzinfo=None) + ) + + return leader_election_record + + # conversion between kubernetes ISO formatted time and elector record time + def time_str_to_iso(self, str_time): + formats = ["%Y-%m-%d %H:%M:%S.%f%z", "%Y-%m-%d %H:%M:%S.%f"] + for fmt in formats: + try: + return datetime.strptime(str_time, fmt).isoformat() + "Z" + except ValueError: + pass + logging.error("Failed to parse time string: %s", str_time)