diff --git a/README.md b/README.md index f556ecd..58a2d44 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,39 @@ definition= {"treatments":[ {"name":"on"},{"name":"off"}], } splitDef.submit_change_request(definition, 'UPDATE', 'updating default rule', 'comment', ['user@email.com'], '') ``` +### Segments Keys +This build allows fetching segments keys from SDK Endpoints to speedup the download for big sized segments. + +The function `client.segment_definitions.get_all_keys` takes 2 parameters, the segment name and an Environment object retrieved from `client.environments` +If the function is successful, it will return a json below with set of all segment keys and the total keys count, see example below: +```json +{ + "keys": {"key1", "key2", "key3"}, + "count": 3 +} +``` +If any network issue or http returned codes are not within 200-300 range, None is returned. All errors are logged in debug mode. + +Below an example of fetching all segments keys in en environment. + +```python +ws = client.workspaces.find("Default") +env = client.environments.find("Production", ws.id) +env.sdkApiToken = "SDK API Key (Server side)" + +for segDef in client.segment_definitions.list(env.id, ws.id): + print(segDef.name) + print("============") + keys = client.segment_definitions.get_all_keys(segDef.name, env) + if keys == None: + print("Failed to get keys, check debug logs") + else: + print ("Segment Keys: ") + print(keys) + print("\n") + +print("done.") +``` ### Rule-Based Segments diff --git a/pyproject.toml b/pyproject.toml index a31ee63..f8fada0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "splitapiclient" -version = "3.5.4" +version = "3.6.0.rc.1" description = "This Python Library provide full support for Split REST Admin API, allow creating, deleting and editing Environments, Splits, Split Definitions, Segments, Segment Keys, Users, Groups, API Keys, Change Requests, Attributes and Identities" classifiers = [ "Programming Language :: Python :: 3", diff --git a/splitapiclient/microclients/segment_definition_microclient.py b/splitapiclient/microclients/segment_definition_microclient.py index bf14ee3..eb96814 100644 --- a/splitapiclient/microclients/segment_definition_microclient.py +++ b/splitapiclient/microclients/segment_definition_microclient.py @@ -3,6 +3,19 @@ UnknownApiClientError from splitapiclient.util.logger import LOGGER from splitapiclient.util.helpers import as_dict +from splitapiclient.util.fetch_options import FetchOptions, Backoff, build_fetch +from splitapiclient.resources import Environment +from splitapiclient.resources import segments +from splitapiclient.util.logger import LOGGER + +import requests +import json +import time + +_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds +_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute +_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10 +SDK_URL = 'https://sdk.split.io/api' class SegmentDefinitionMicroClient: ''' @@ -188,3 +201,206 @@ def remove_keys(self, segment_name, environment_id, data): ) return True + + def get_all_keys(self, segment_name, environment): + ''' + Get list of keys in segment in environment + + :param data: None + :param apiclient: If this instance wasn't returned by the client, + the IdentifyClient instance should be passed in order to perform the + http call + + :returns: string of keys instance + :rtype: string + ''' + if not self._validate_sdkapi_key(environment.sdkApiToken): + return None + + self._name = segment_name + self._sdk_api_key = environment.sdkApiToken + self._segment_storage = None + self._segment_change_number = None + self._metadata = { + 'SplitSDKVersion': 'python-3.6.0-wrapper', + } + self._backoff = Backoff( + _ON_DEMAND_FETCH_BACKOFF_BASE, + _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT) + + if self._get_segment_from_sdk_endpoint(self._name): + keys = self._segment_storage.keys + self._segment_storage = None + return { + "keys": keys, + "count": len(keys) + } + + LOGGER.error("Failed to fetch segment %s keys", self._name) + return None + + def _validate_sdkapi_key(self, sdkApiToken): + if sdkApiToken == None: + LOGGER.error("Environment object does not have the SDK Api Key set, please set it before calling this method.") + return False + + if not isinstance(sdkApiToken, str): + LOGGER.error("SDK Api Key must be a string, please use a string to set it before calling this method.") + return False + + if len(sdkApiToken) != 36: + LOGGER.error("SDK Api Key string is invalid, please set it before calling this method.") + return False + + return True + + def _fetch_until(self, segment_name, fetch_options, till=None): + """ + Hit endpoint, update storage and return when since==till. + + :param segment_name: Name of the segment to update. + :type segment_name: str + + :param fetch_options Fetch options for getting segment definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. + :type till: int + + :return: last change number + :rtype: int + """ + while True: # Fetch until since==till + change_number = self._segment_change_number + if change_number is None: + change_number = -1 + if till is not None and till < change_number: + # the passed till is less than change_number, no need to perform updates + return change_number + + try: + segment_changes = self._fetch_segment_api(segment_name, change_number, + fetch_options) + if segment_changes == None: + return None + + except Exception as exc: + LOGGER.debug('Exception raised while fetching segment %s', segment_name) + LOGGER.error('Exception information: %s', str(exc)) + return None + + if change_number == -1: # first time fetching the segment + new_segment = segments.from_raw(segment_changes) + self._segment_storage = new_segment + self._segment_change_number = new_segment.change_number + else: + self._segment_change_number = segment_changes['till'] + self._segment_storage.keys.update(segment_changes['added']) + [self._segment_storage.keys.remove(key) for key in segment_changes['removed']] + + if segment_changes['till'] == segment_changes['since']: + return segment_changes['till'] + + def _attempt_segment_sync(self, segment_name, fetch_options, till=None): + """ + Hit endpoint, update storage and return True if sync is complete. + + :param segment_name: Name of the segment to update. + :type segment_name: str + + :param fetch_options Fetch options for getting feature flag definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. + :type till: int + + :return: Flags to check if it should perform bypass or operation ended + :rtype: bool, int, int + """ + self._backoff.reset() + remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES + while True: + remaining_attempts -= 1 + change_number = self._fetch_until(segment_name, fetch_options, till) + if change_number == None: + return False, 0, None + + if till is None or till <= change_number: + return True, remaining_attempts, change_number + + elif remaining_attempts <= 0: + return False, remaining_attempts, change_number + + how_long = self._backoff.get() + time.sleep(how_long) + + def _get_segment_from_sdk_endpoint(self, segment_name, till=None): + """ + Update a segment from queue + + :param segment_name: Name of the segment to update. + :type segment_name: str + + :param till: ChangeNumber received. + :type till: int + + :return: True if no error occurs. False otherwise. + :rtype: bool + """ + fetch_options = FetchOptions(True) # Set Cache-Control to no-cache + successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, fetch_options, till) + if change_number == None: + return False + + attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if successful_sync: # succedeed sync + LOGGER.debug('Refresh completed in %d attempts.', attempts) + return True + with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN + without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, with_cdn_bypass, till) + if change_number == None: + return False + + without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if without_cdn_successful_sync: + LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.', + without_cdn_attempts) + return True + + LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.', + without_cdn_attempts) + return False + + def _fetch_segment_api(self, segment_name, change_number, fetch_options): + try: + query, extra_headers = build_fetch(change_number, fetch_options, self._metadata) + response = requests.get( + SDK_URL + '/segmentChanges/{segment_name}'.format(segment_name=segment_name), + headers=self._build_basic_headers(extra_headers), + params=query, + ) + if 200 <= response.status_code < 300: + return json.loads(response.text) + + return None + except Exception as exc: + LOGGER.debug( + 'Error fetching %s because an exception was raised by the HTTPClient', + segment_name) + LOGGER.error(str(exc)) + return None + + def _build_basic_headers(self, extra_headers): + """ + Build basic headers with auth. + + :param sdk_key: API token used to identify backend calls. + :type sdk_key: str + """ + headers = { + 'Content-Type': 'application/json', + 'Authorization': "Bearer %s" % self._sdk_api_key + } + if extra_headers is not None: + headers.update(extra_headers) + return headers \ No newline at end of file diff --git a/splitapiclient/resources/environment.py b/splitapiclient/resources/environment.py index 39c5e19..0f9b690 100644 --- a/splitapiclient/resources/environment.py +++ b/splitapiclient/resources/environment.py @@ -52,7 +52,7 @@ class Environment(BaseResource): "status" : "string" } - def __init__(self, data=None, workspace_id=None, client=None): + def __init__(self, data=None, workspace_id=None, client=None, sdk_apikey=None): ''' ''' if not data: @@ -70,7 +70,16 @@ def __init__(self, data=None, workspace_id=None, client=None): self._changePermissions = data.get("changePermissions") if "changePermissions" in data else {} self._apiTokens = data.get("apiTokens") if "apiTokens" in data else {} self._client = client + self._sdk_apikey = sdk_apikey + @property + def sdkApiToken(self): + return self._sdk_apikey + + @sdkApiToken.setter + def sdkApiToken(self, value): + self._sdk_apikey = value + @property def apiTokens(self): return self._apiTokens diff --git a/splitapiclient/resources/segment_definition.py b/splitapiclient/resources/segment_definition.py index ed705ac..bffe3e2 100644 --- a/splitapiclient/resources/segment_definition.py +++ b/splitapiclient/resources/segment_definition.py @@ -3,7 +3,7 @@ from splitapiclient.resources.base_resource import BaseResource from splitapiclient.util.helpers import require_client, as_dict from splitapiclient.resources import TrafficType -from splitapiclient.resources import Environment + import csv class SegmentDefinition(BaseResource): @@ -172,3 +172,4 @@ def submit_change_request(self, keys, operation_type, title, comment, approvers, data['rolloutStatus'] = {'id': rollout_status_id} imc = require_client('ChangeRequest', self._client, apiclient) return imc.submit_change_request(self._environment['id'], workspace_id, data) + diff --git a/splitapiclient/resources/segments.py b/splitapiclient/resources/segments.py new file mode 100644 index 0000000..5a9945c --- /dev/null +++ b/splitapiclient/resources/segments.py @@ -0,0 +1,85 @@ +"""Segment module.""" + +class Segment(object): + """Segment object class.""" + + def __init__(self, name, keys, change_number): + """ + Class constructor. + + :param name: Segment name. + :type name: str + + :param keys: List of keys belonging to the segment. + :type keys: List + """ + self._name = name + self._keys = set(keys) + self._change_number = change_number + + @property + def name(self): + """Return segment name.""" + return self._name + + def contains(self, key): + """ + Return whether the supplied key belongs to the segment. + + :param key: User key. + :type key: str + + :return: True if the user is in the segment. False otherwise. + :rtype: bool + """ + return key in self._keys + + def update(self, to_add, to_remove): + """ + Add supplied keys to the segment. + + :param to_add: List of keys to add. + :type to_add: list + :param to_remove: List of keys to remove. + :type to_remove: list + """ + self._keys = self._keys.union(set(to_add)).difference(to_remove) + + @property + def keys(self): + """ + Return the segment keys. + + :return: A set of the segment keys + :rtype: set + """ + return self._keys + + @property + def change_number(self): + """Return segment change number.""" + return self._change_number + + @change_number.setter + def change_number(self, new_value): + """ + Set new change number. + + :param new_value: New change number. + :type new_value: int + """ + self._change_number = new_value + + +def from_raw(raw_segment): + """ + Parse a new segment from a raw segment_changes response. + + :param raw_segment: Segment parsed from segment changes response. + :type raw_segment: dict + + :return: New segment model object + :rtype: splitio.models.segment.Segment + """ + keys = set(raw_segment['added']).difference(raw_segment['removed']) + return Segment(raw_segment['name'], keys, raw_segment['till']) diff --git a/splitapiclient/tests/microclients/segment_definition_microclient_test.py b/splitapiclient/tests/microclients/segment_definition_microclient_test.py index 14acd58..ad02e67 100644 --- a/splitapiclient/tests/microclients/segment_definition_microclient_test.py +++ b/splitapiclient/tests/microclients/segment_definition_microclient_test.py @@ -1,9 +1,11 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals +from unittest import mock +import pytest from splitapiclient.microclients import SegmentDefinitionMicroClient from splitapiclient.http_clients.sync_client import SyncHttpClient -from splitapiclient.resources import TrafficType +from splitapiclient.resources import TrafficType, Environment def object_to_stringified_dict(obj): """ @@ -120,3 +122,131 @@ def test_get_key_count(self, mocker): # Verify the result matches the expected count assert result == 5 + + def test_get_segment_from_sdk_endpoint(self, mocker): + # Create mock HTTP client + sc = SyncHttpClient('abc', 'abc') + env = Environment( + { + 'id': '123', + 'name': 'env1', + 'production':None, + 'creationTime' : None, + 'dataExportPermissions' : None, + 'environmentType' : None, + 'workspaceIds' : None, + 'changePermissions' : None, + 'type': None, + 'orgId' : None, + 'status' : None + }, + mocker.Mock() + ) + env.sdkApiToken = "sdkapixxxxsdkapixxxxsdkapixxxx123456" + + # Create segment definition with mock client + seg = SegmentDefinitionMicroClient(sc) + + self.count = 0 + def fetch_segment_api(*_): + self.count += 1 + if self.count == 1: + return {"name": "test_segment", "since": -1, "till": 123, "added": ["key1", "key2"], "removed": []} + + if self.count == 2: + return {"name": "test_segment", "since": 123, "till": 223, "added": ["key4", "key5"], "removed": ["key1"]} + + return {"name": "test_segment", "since": 223, "till": 223, "added": [], "removed": []} + + seg._fetch_segment_api = fetch_segment_api + assert seg.get_all_keys("test_segment", env) == {"keys": {"key2", "key4", "key5"}, "count": 3} + + assert seg._build_basic_headers({"extra": "val"}) == { + 'Content-Type': 'application/json', + 'Authorization': "Bearer sdkapixxxxsdkapixxxxsdkapixxxx123456", + 'extra': 'val' + } + + def test_errors_fetching_segment_keys(self, mocker): + # Create mock HTTP client + sc = SyncHttpClient('abc', 'abc') + env = Environment( + { + 'id': '123', + 'name': 'env1', + 'production':None, + 'creationTime' : None, + 'dataExportPermissions' : None, + 'environmentType' : None, + 'workspaceIds' : None, + 'changePermissions' : None, + 'type': None, + 'orgId' : None, + 'status' : None + }, + mocker.Mock() + ) + env.sdkApiToken = "sdkapixxxxsdkapixxxxsdkapixxxx123456" + + # Create segment definition with mock client + seg = SegmentDefinitionMicroClient(sc) + + assert seg.get_all_keys("test_segment", env) == None + + def fetch_segment_api(*_): + return None + + seg._fetch_segment_api = fetch_segment_api + assert seg.get_all_keys("test_segment", env) == None + + env.sdkApiToken = None + seg._fetch_segment_api = fetch_segment_api + assert seg.get_all_keys("test_segment", env) == None + + env.sdkApiToken = "1234" + seg._fetch_segment_api = fetch_segment_api + assert seg.get_all_keys("test_segment", env) == None + + env.sdkApiToken = 1234 + seg._fetch_segment_api = fetch_segment_api + assert seg.get_all_keys("test_segment", env) == None + + def test_errors_from_sdk_endpoint(self, mocker): + # Create mock HTTP client + sc = SyncHttpClient('abc', 'abc') + env = Environment( + { + 'id': '123', + 'name': 'env1', + 'production':None, + 'creationTime' : None, + 'dataExportPermissions' : None, + 'environmentType' : None, + 'workspaceIds' : None, + 'changePermissions' : None, + 'type': None, + 'orgId' : None, + 'status' : None + }, + mocker.Mock() + ) + env.sdkApiToken = "sdkapi" + seg = SegmentDefinitionMicroClient(sc) + + response_mock = mocker.Mock() + response_mock.status_code = 404 + response_mock.headers = {} + response_mock.text = 'ok' + get_mock = mocker.Mock() + get_mock.return_value = response_mock + mocker.patch('requests.get', new=get_mock) + assert seg.get_all_keys("test_segment", env) == None + + response_mock = mocker.Mock() + response_mock.status_code = 400 + response_mock.headers = {} + response_mock.text = 'ok' + get_mock = mocker.Mock() + get_mock.return_value = response_mock + mocker.patch('requests.get', new=get_mock) + assert seg.get_all_keys("test_segment", env) == None diff --git a/splitapiclient/tests/resources/test_segment_definition.py b/splitapiclient/tests/resources/test_segment_definition.py index 53578bf..065422e 100644 --- a/splitapiclient/tests/resources/test_segment_definition.py +++ b/splitapiclient/tests/resources/test_segment_definition.py @@ -269,4 +269,4 @@ def test_submit_change_request(self, mocker): 'ruleBasedSegment': None } - assert attr.to_dict() == data1 + assert attr.to_dict() == data1 \ No newline at end of file diff --git a/splitapiclient/util/fetch_options.py b/splitapiclient/util/fetch_options.py new file mode 100644 index 0000000..c283597 --- /dev/null +++ b/splitapiclient/util/fetch_options.py @@ -0,0 +1,96 @@ +_CACHE_CONTROL = 'Cache-Control' +_CACHE_CONTROL_NO_CACHE = 'no-cache' + +class FetchOptions(object): + """Fetch Options object.""" + + def __init__(self, cache_control_headers=False, change_number=None): + """ + Class constructor. + + :param cache_control_headers: Flag for Cache-Control header + :type cache_control_headers: bool + + :param change_number: ChangeNumber to use for bypassing CDN in request. + :type change_number: int + + :param sets: list of flag sets + :type sets: list + """ + self._cache_control_headers = cache_control_headers + self._change_number = change_number + + @property + def cache_control_headers(self): + """Return cache control headers.""" + return self._cache_control_headers + + @property + def change_number(self): + """Return change number.""" + return self._change_number + +def build_fetch(change_number, fetch_options, metadata): + """ + Build fetch with new flags if that is the case. + + :param change_number: Last known timestamp of definition. + :type change_number: int + + :param fetch_options: Fetch options for getting definitions. + :type fetch_options: splitio.api.commons.FetchOptions + + :param metadata: Metadata Headers. + :type metadata: dict + + :param rbs_change_number: Last known timestamp of a rule based segment modification. + :type rbs_change_number: int + + :return: Objects for fetch + :rtype: dict, dict + """ + query = {} + query['since'] = change_number + extra_headers = metadata + if fetch_options is None: + return query, extra_headers + + if fetch_options.cache_control_headers: + extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE + if fetch_options.change_number is not None: + query['till'] = fetch_options.change_number + return query, extra_headers + +class Backoff(object): + """Backoff duration calculator.""" + + MAX_ALLOWED_WAIT = 30 * 60 # half an hour + + def __init__(self, base=1, max_allowed=MAX_ALLOWED_WAIT): + """ + Class constructor. + + :param base: basic unit to be multiplied on each iteration (seconds) + :param base: float + + :param max_allowed: max seconds to wait + :param max_allowed: int + """ + self._base = base + self._max_allowed = max_allowed + self._attempt = 0 + + def get(self): + """ + Return the current time to wait and pre-calculate the next one. + + :returns: time to wait until next retry. + :rtype: float + """ + to_return = min(self._base * (2 ** self._attempt), self._max_allowed) + self._attempt += 1 + return to_return + + def reset(self): + """Reset the attempt count.""" + self._attempt = 0 diff --git a/splitapiclient/version.py b/splitapiclient/version.py index 17553bb..3081b38 100644 --- a/splitapiclient/version.py +++ b/splitapiclient/version.py @@ -1 +1 @@ -__version__ = '3.5.4' +__version__ = '3.6.0.rc.1'