diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index f14de25bc172..4004e31233be 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -4,9 +4,8 @@ #### Features Added -#### Breaking Changes - #### Bugs Fixed +* Fixed issue where Query Change Feed did not return items if the container uses legacy Hash V1 Partition Keys. This also fixes issues with not being able to change feed query for Specific Partition Key Values for HPK. See [PR 41270](https://github.com/Azure/azure-sdk-for-python/pull/41270/) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index 24e6b5bfd58c..69fd2cf53440 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -3118,12 +3118,15 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: # check if query has prefix partition key isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None) - if isPrefixPartitionQuery: + if isPrefixPartitionQuery and "partitionKeyDefinition" in kwargs: last_response_headers = CaseInsensitiveDict() # here get the over lapping ranges - partition_key_definition = kwargs.pop("partitionKeyDefinition", None) - pk_properties = partition_key_definition - partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) + # Default to empty Dictionary, but unlikely to be empty as we first check if we have it in kwargs + pk_properties: Union[PartitionKey, Dict] = kwargs.pop("partitionKeyDefinition", {}) + partition_key_definition = PartitionKey( + path=pk_properties["paths"], + kind=pk_properties["kind"], + version=pk_properties["version"]) partition_key_value = pk_properties["partition_key"] feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key( partition_key_value diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_integers.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_integers.py index ae219197e819..66dc02d03b80 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_integers.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_integers.py @@ -22,6 +22,97 @@ from typing import NoReturn, Tuple, Union +class _UInt32: + def __init__(self, value: int) -> None: + self._value: int = value & 0xFFFFFFFF + + @property + def value(self) -> int: + return self._value + + @value.setter + def value(self, new_value: int) -> None: + self._value = new_value & 0xFFFFFFFF + + def __add__(self, other: Union[int, '_UInt32']) -> '_UInt32': + result = self.value + (other.value if isinstance(other, _UInt32) else other) + return _UInt32(result & 0xFFFFFFFF) + + def __sub__(self, other: Union[int, '_UInt32']) -> '_UInt32': + result = self.value - (other.value if isinstance(other, _UInt32) else other) + return _UInt32(result & 0xFFFFFFFF) + + def __mul__(self, other: Union[int, '_UInt32']) -> '_UInt32': + result = self.value * (other.value if isinstance(other, _UInt32) else other) + return _UInt32(result & 0xFFFFFFFF) + + def __xor__(self, other: Union[int, '_UInt32']) -> '_UInt32': + result = self.value ^ (other.value if isinstance(other, _UInt32) else other) + return _UInt32(result & 0xFFFFFFFF) + + def __lshift__(self, other: Union[int, '_UInt32']) -> '_UInt32': + result = self.value << (other.value if isinstance(other, _UInt32) else other) + return _UInt32(result & 0xFFFFFFFF) + + def __ilshift__(self, other: Union[int, '_UInt32']) -> '_UInt32': + self._value = (self.value << (other.value if isinstance(other, _UInt32) else other)) & 0xFFFFFFFF + return self + + def __rshift__(self, other: Union[int, '_UInt32']) -> '_UInt32': + result = self.value >> (other.value if isinstance(other, _UInt32) else other) + return _UInt32(result & 0xFFFFFFFF) + + def __irshift__(self, other: Union[int, '_UInt32']) -> '_UInt32': + self._value = (self.value >> (other.value if isinstance(other, _UInt32) else other)) & 0xFFFFFFFF + return self + + def __and__(self, other: Union[int, '_UInt32']) -> '_UInt32': + result = self.value & (other.value if isinstance(other, _UInt32) else other) + return _UInt32(result & 0xFFFFFFFF) + + def __or__(self, other: Union[int, '_UInt32']) -> '_UInt32': + if isinstance(other, _UInt32): + return _UInt32(self.value | other.value) + if isinstance(other, int): + return _UInt32(self.value | other) + raise TypeError("Unsupported type for OR operation") + + def __invert__(self) -> '_UInt32': + return _UInt32(~self.value & 0xFFFFFFFF) + + def __eq__(self, other: Union[int, '_UInt32', object]) -> bool: + return self.value == (other.value if isinstance(other, _UInt32) else other) + + def __ne__(self, other: Union[int, '_UInt32', object]) -> bool: + return not self.__eq__(other) + + def __lt__(self, other: Union[int, '_UInt32']) -> bool: + return self.value < (other.value if isinstance(other, _UInt32) else other) + + def __gt__(self, other: Union[int, '_UInt32']) -> bool: + return self.value > (other.value if isinstance(other, _UInt32) else other) + + def __le__(self, other: Union[int, '_UInt32']) -> bool: + return self.value <= (other.value if isinstance(other, _UInt32) else other) + + def __ge__(self, other: Union[int, '_UInt32']) -> bool: + return self.value >= (other.value if isinstance(other, _UInt32) else other) + + @staticmethod + def encode_double_as_uint32(value: float) -> int: + value_in_uint32 = struct.unpack(' int: + mask = 0x80000000 + value = ~(value - 1) if value < mask else value ^ mask + return struct.unpack(' int: + return self.value + class _UInt64: def __init__(self, value: int) -> None: self._value: int = value & 0xFFFFFFFFFFFFFFFF @@ -72,6 +163,32 @@ def __or__(self, other: Union[int, '_UInt64']) -> '_UInt64': def __invert__(self) -> '_UInt64': return _UInt64(~self.value & 0xFFFFFFFFFFFFFFFF) + def __eq__(self, other: Union[int, '_UInt64', object]) -> bool: + return self.value == (other.value if isinstance(other, _UInt64) else other) + + def __ne__(self, other: Union[int, '_UInt64', object]) -> bool: + return not self.__eq__(other) + + def __irshift__(self, other: Union[int, '_UInt64']) -> '_UInt64': + self._value = (self.value >> (other.value if isinstance(other, _UInt64) else other)) & 0xFFFFFFFFFFFFFFFF + return self + + def __ilshift__(self, other: Union[int, '_UInt64']) -> '_UInt64': + self._value = (self.value << (other.value if isinstance(other, _UInt64) else other)) & 0xFFFFFFFFFFFFFFFF + return self + + def __lt__(self, other: Union[int, '_UInt64']) -> bool: + return self.value < (other.value if isinstance(other, _UInt64) else other) + + def __gt__(self, other: Union[int, '_UInt64']) -> bool: + return self.value > (other.value if isinstance(other, _UInt64) else other) + + def __le__(self, other: Union[int, '_UInt64']) -> bool: + return self.value <= (other.value if isinstance(other, _UInt64) else other) + + def __ge__(self, other: Union[int, '_UInt64']) -> bool: + return self.value >= (other.value if isinstance(other, _UInt64) else other) + @staticmethod def encode_double_as_uint64(value: float) -> int: value_in_uint64 = struct.unpack('SMHasher: # "All MurmurHash versions are public domain software, and the author disclaims all copyright to their code." -from ._cosmos_integers import _UInt128, _UInt64 +from ._cosmos_integers import _UInt128, _UInt64, _UInt32 def rotate_left_64(val: int, shift: int) -> int: @@ -147,3 +147,47 @@ def murmurhash3_128(span: bytearray, seed: _UInt128) -> _UInt128: # pylint: dis h2 += h1 return _UInt128(int(h1.value), int(h2.value)) + + +def murmurhash3_32(data: bytearray, seed: int) -> _UInt32: + c1: _UInt32 = _UInt32(0xcc9e2d51) + c2: _UInt32 = _UInt32(0x1b873593) + length: _UInt32 = _UInt32(len(data)) + h1: _UInt32 = _UInt32(seed) + rounded_end: _UInt32 = _UInt32(length.value & 0xfffffffc) # round down to 4 byte block + + for i in range(0, rounded_end.value, 4): + # little endian load order + k1: _UInt32 = _UInt32( + (data[i] & 0xff) | ((data[i + 1] & 0xff) << 8) | ((data[i + 2] & 0xff) << 16) | (data[i + 3] << 24) + ) + k1 *= c1 + k1.value = (k1.value << 15) | (k1.value >> 17) # ROTL32(k1,15) + k1 *= c2 + + h1 ^= k1 + h1.value = (h1.value << 13) | (h1.value >> 19) # ROTL32(h1,13) + h1 = h1 * _UInt32(5) + _UInt32(0xe6546b64) + + # tail + k1 = _UInt32(0) + if length.value & 0x03 == 3: + k1 ^= _UInt32((data[rounded_end.value + 2] & 0xff) << 16) + if length.value & 0x03 >= 2: + k1 ^= _UInt32((data[rounded_end.value + 1] & 0xff) << 8) + if length.value & 0x03 >= 1: + k1 ^= _UInt32(data[rounded_end.value] & 0xff) + k1 *= c1 + k1.value = (k1.value << 15) | (k1.value >> 17) + k1 *= c2 + h1 ^= k1 + + # finalization + h1 ^= length + h1.value ^= h1.value >> 16 + h1 *= _UInt32(0x85ebca6b) + h1.value ^= h1.value >> 13 + h1 *= _UInt32(0xc2b2ae35) + h1.value ^= h1.value >> 16 + + return h1 diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 38b0c63bd412..4be3342dd26d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -144,7 +144,10 @@ async def _get_epk_range_for_partition_key(self, partition_key_value: PartitionK container_properties = await self._get_properties() partition_key_definition = container_properties["partitionKey"] - partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"]) + partition_key = PartitionKey( + path=partition_key_definition["paths"], + kind=partition_key_definition["kind"], + version=partition_key_definition["version"]) return partition_key._get_epk_range_for_partition_key(partition_key_value) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 6f3fd6d8962d..599c1cb5c0c6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -2914,7 +2914,10 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: if cont_prop: cont_prop = await cont_prop() pk_properties = cont_prop["partitionKey"] - partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) + partition_key_definition = PartitionKey( + path=pk_properties["paths"], + kind=pk_properties["kind"], + version=pk_properties["version"]) if partition_key_definition.kind == "MultiHash" and \ (isinstance(partition_key, List) and \ len(partition_key_definition['paths']) != len(partition_key)): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 08203b803cf6..72459ede7a3a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -138,7 +138,10 @@ def _set_partition_key( def _get_epk_range_for_partition_key( self, partition_key_value: PartitionKeyType) -> Range: container_properties = self._get_properties() partition_key_definition = container_properties["partitionKey"] - partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"]) + partition_key = PartitionKey( + path=partition_key_definition["paths"], + kind=partition_key_definition["kind"], + version=partition_key_definition["version"]) return partition_key._get_epk_range_for_partition_key(partition_key_value) @@ -715,7 +718,10 @@ def __is_prefix_partitionkey( self, partition_key: PartitionKeyType) -> bool: properties = self._get_properties() pk_properties = properties["partitionKey"] - partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) + partition_key_definition = PartitionKey( + path=pk_properties["paths"], + kind=pk_properties["kind"], + version=pk_properties["version"]) return partition_key_definition._is_prefix_partition_key(partition_key) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py index 07274b2387e4..abb9b1698ecd 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py @@ -26,8 +26,8 @@ from typing import IO, Sequence, Type, Union, overload, List, cast from typing_extensions import Literal -from ._cosmos_integers import _UInt64, _UInt128 -from ._cosmos_murmurhash3 import murmurhash3_128 as _murmurhash3_128 +from ._cosmos_integers import _UInt32, _UInt64, _UInt128 +from ._cosmos_murmurhash3 import murmurhash3_128 as _murmurhash3_128, murmurhash3_32 as _murmurhash3_32 from ._routing.routing_range import Range as _Range @@ -182,14 +182,47 @@ def _get_epk_range_for_partition_key( cast(Sequence[Union[None, bool, int, float, str, Type[NonePartitionKeyValue]]], pk_value)) # else return point range - effective_partition_key_string =\ - self._get_effective_partition_key_string( - cast(List[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]], [pk_value])) + if isinstance(pk_value, (list, tuple)) or (isinstance(pk_value, Sequence) and not isinstance(pk_value, str)): + effective_partition_key_string = self._get_effective_partition_key_string( + cast(Sequence[Union[None, bool, int, float, str, Type[NonePartitionKeyValue]]], pk_value)) + else: + effective_partition_key_string =\ + self._get_effective_partition_key_string( + cast(List[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]], [pk_value])) return _Range(effective_partition_key_string, effective_partition_key_string, True, True) - def _get_effective_partition_key_for_hash_partitioning(self) -> str: - # We shouldn't be supporting V1 - return "" + @staticmethod + def _truncate_for_v1_hashing( + value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]] + ) -> Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]: + if isinstance(value, str): + return value[:100] + return value + + @staticmethod + def _get_effective_partition_key_for_hash_partitioning( + pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] + ) -> str: + truncated_components = [] + # In Python, Strings are sequences, so we make sure we instead hash the entire string instead of each character + if isinstance(pk_value, str): + truncated_components.append(PartitionKey._truncate_for_v1_hashing(pk_value)) + else: + truncated_components = [PartitionKey._truncate_for_v1_hashing(v) for v in pk_value] + with BytesIO() as ms: + for component in truncated_components: + if isinstance(component, int) and not isinstance(component, bool): + component = float(int(_UInt32(component))) + PartitionKey._write_for_hashing(component, ms) + + ms_bytes: bytes = ms.getvalue() + # We use Our own MurmurHash3 implementation to match the behavior of other SDKs + # We put into a Cosmos Integer of Unsigned 32-bit Integer, to match the behavior of other SDKs + hash_as_int: _UInt32 = _murmurhash3_32(bytearray(ms_bytes), 0) + hash_value = float(int(hash_as_int)) + + partition_key_components = [hash_value] + truncated_components + return _to_hex_encoded_binary_string_v1(partition_key_components) def _get_effective_partition_key_string( self, @@ -205,17 +238,32 @@ def _get_effective_partition_key_string( if kind == 'Hash': version = self.version or 2 if version == 1: - return self._get_effective_partition_key_for_hash_partitioning() + return PartitionKey._get_effective_partition_key_for_hash_partitioning(pk_value) if version == 2: - return self._get_effective_partition_key_for_hash_partitioning_v2(pk_value) + return PartitionKey._get_effective_partition_key_for_hash_partitioning_v2(pk_value) elif kind == 'MultiHash': return self._get_effective_partition_key_for_multi_hash_partitioning_v2(pk_value) return _to_hex_encoded_binary_string(pk_value) + @staticmethod + def _write_for_hashing( + value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], + writer: IO[bytes] + ) -> None: + PartitionKey._write_for_hashing_core(value, bytes([0]), writer) + + @staticmethod def _write_for_hashing_v2( - self, value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], writer: IO[bytes] + ) -> None: + PartitionKey._write_for_hashing_core(value, bytes([0xFF]), writer) + + @staticmethod + def _write_for_hashing_core( + value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], + string_suffix: bytes, + writer: IO[bytes] ) -> None: if value is True: writer.write(bytes([_PartitionKeyComponentType.PTrue])) @@ -225,24 +273,25 @@ def _write_for_hashing_v2( writer.write(bytes([_PartitionKeyComponentType.Null])) elif isinstance(value, int): writer.write(bytes([_PartitionKeyComponentType.Number])) - writer.write(value.to_bytes(8, 'little')) # assuming value is a 64-bit integer + # Cast to Float to ensure correct packing + writer.write(struct.pack(' str: with BytesIO() as ms: for component in pk_value: - self._write_for_hashing_v2(component, ms) + PartitionKey._write_for_hashing_v2(component, ms) ms_bytes = ms.getvalue() hash128 = _murmurhash3_128(bytearray(ms_bytes), _UInt128(0, 0)) @@ -255,8 +304,8 @@ def _get_effective_partition_key_for_hash_partitioning_v2( return ''.join('{:02X}'.format(x) for x in hash_bytes) + @staticmethod def _get_effective_partition_key_for_multi_hash_partitioning_v2( - self, pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] ) -> str: sb = [] @@ -265,7 +314,7 @@ def _get_effective_partition_key_for_multi_hash_partitioning_v2( binary_writer = ms # In Python, you can write bytes directly to a BytesIO object # Assuming paths[i] is the correct object to call write_for_hashing_v2 on - self._write_for_hashing_v2(value, binary_writer) + PartitionKey._write_for_hashing_v2(value, binary_writer) ms_bytes = ms.getvalue() hash128 = _murmurhash3_128(bytearray(ms_bytes), _UInt128(0, 0)) @@ -311,6 +360,68 @@ def _to_hex_encoded_binary_string(components: Sequence[object]) -> str: return _to_hex(buffer_bytes[:ms.tell()], 0, ms.tell()) +def _to_hex_encoded_binary_string_v1(components: Sequence[object]) -> str: + ms = BytesIO() + for component in components: + if isinstance(component, (bool, int, float, str, _Infinity, _Undefined)): + _write_for_binary_encoding_v1(component, ms) + else: + raise TypeError(f"Unexpected type for PK component: {type(component)}") + + return _to_hex(bytearray(ms.getvalue()), 0, ms.tell()) + +def _write_for_binary_encoding_v1( + value: Union[bool, int, float, str, _Infinity, _Undefined], + binary_writer: IO[bytes] +) -> None: + if isinstance(value, bool): + binary_writer.write(bytes([(_PartitionKeyComponentType.PTrue if value else _PartitionKeyComponentType.PFalse)])) + + elif isinstance(value, _Infinity): + binary_writer.write(bytes([_PartitionKeyComponentType.Infinity])) + + elif isinstance(value, (int, float)): # Assuming number value is int or float + binary_writer.write(bytes([_PartitionKeyComponentType.Number])) + # For V1 Hashing we need to encode the value as a UInt64 From a Float regardless if it was an int or float + if isinstance(value, float): + payload = _UInt64(_UInt64.encode_double_as_uint64(value)) + else: + payload = _UInt64(_UInt64.encode_double_as_uint64(float(value))) + + # Encode first chunk with 8-bits of payload + binary_writer.write(bytes([int((payload >> (64 - 8)))])) + payload <<= 8 + + # Encode remaining chunks with 7 bits of payload followed by single "1" bit each. + byte_to_write = 0 + first_iteration = True + while payload != 0: + if not first_iteration: + binary_writer.write(bytes([byte_to_write])) + else: + first_iteration = False + + byte_to_write = int((payload >> (64 - 8)) | int(0x01)) + payload <<= 7 + + # Except for last chunk that ends with "0" bit. + binary_writer.write(bytes([(byte_to_write & 0xFE)])) + + elif isinstance(value, str): + binary_writer.write(bytes([_PartitionKeyComponentType.String])) + utf8_value = value.encode('utf-8') + short_string = len(utf8_value) <= _MaxStringBytesToAppend + + for index in range(short_string and len(utf8_value) or _MaxStringBytesToAppend + 1): + char_byte = utf8_value[index] + char_byte += 1 + binary_writer.write(bytes([char_byte])) + + if short_string: + binary_writer.write(bytes([0x00])) + + elif isinstance(value, _Undefined): + binary_writer.write(bytes([_PartitionKeyComponentType.Undefined])) def _write_for_binary_encoding( value: Union[bool, int, float, str, _Infinity, _Undefined], diff --git a/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation.py b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation.py new file mode 100644 index 000000000000..ef6d9b22b009 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation.py @@ -0,0 +1,228 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +import unittest +import uuid +import pytest + +import azure.cosmos.cosmos_client as cosmos_client +import test_config +from azure.cosmos.partition_key import PartitionKey + + +@pytest.mark.cosmosEmulator +@pytest.mark.cosmosQuery +class TestChangeFeedPKVariation(unittest.TestCase): + """Test change feed with different partition key variations.""" + + configs = test_config.TestConfig + host = configs.host + masterKey = configs.masterKey + connectionPolicy = configs.connectionPolicy + client: cosmos_client.CosmosClient = None + # Items for Hash V1 partition key + single_hash_items = [ + {"id": str(i), "pk": f"short_string"} for i in range(1, 101) + ] + [ + {"id": str(i), "pk": f"long_string_" + "a" * 251} for i in range(101, 201) + ] + [ + {"id": str(i), "pk": 1000} for i in range(201, 301) + ] + [ + {"id": str(i), "pk": 1000 * 1.1111} for i in range(301, 401) + ] + [ + {"id": str(i), "pk": True} for i in range(401, 501) + ] + + # Items for Hierarchical Partition Keys + hpk_items = [ + {"id": str(i), "pk1": "level1_", "pk2": "level2_"} for i in range(1, 101) + ] + [ + {"id": str(i), "pk1": "level1_", "pk2": f"level2_long__" + "c" * 101} for i in range(101, 201) + ] + [ + {"id": str(i), "pk1": 10, "pk2": 1000} for i in range(201, 301) + ] + [ + {"id": str(i), "pk1": 10 * 1.1, "pk2": 10 * 2.2} for i in range(301, 401) + ] + [ + {"id": str(i), "pk1": True, 'pk2': False} for i in range(401, 501) + ] + + test_data_hash = [ + { + "version": 1, + "expected_epk": ( + "05C1DD5D8149640862636465666768696A6B6C6D6E6F70717273747576" + "7778797A7B62636465666768696A6B6C6D6E6F70717273747576777879" + "7A7B62636465666768696A6B6C6D6E6F707172737475767778797A7B62636465666768696A6B6C6D6E6F707172737475767700" + ), + }, + { + "version": 2, + "expected_epk": ( + "2032D236DA8678BFB900E866D7EBCE76" + ), + }, + ] + + @classmethod + def setUpClass(cls): + cls.config = test_config.TestConfig() + if (cls.config.masterKey == '[YOUR_KEY_HERE]' or + cls.config.host == '[YOUR_ENDPOINT_HERE]'): + raise Exception( + "You must specify your Azure Cosmos account values for " + "'masterKey' and 'host' at the top of this class to run the " + "tests.") + cls.client = cosmos_client.CosmosClient(cls.config.host, cls.config.masterKey) + cls.db = cls.client.get_database_client(cls.config.TEST_DATABASE_ID) + + def create_container(self, db, container_id, partition_key, version=None, throughput=None): + """Helper to create a container with a specific partition key definition.""" + if isinstance(partition_key, list): + # Assume multihash (hierarchical partition key) for the container + pk_definition = PartitionKey(path=partition_key, kind='MultiHash') + else: + pk_definition = PartitionKey(path=partition_key, kind='Hash', version=version) + if throughput: + return db.create_container(id=container_id, partition_key=pk_definition, offer_throughput=throughput) + return db.create_container(id=container_id, partition_key=pk_definition) + + def insert_items(self, container, items): + """Helper to insert items into a container.""" + for item in items: + container.create_item(body=item) + + def validate_changefeed(self, container): + """Helper to validate changefeed results.""" + partition_keys = ["short_string", "long_string_" + "a" * 251, 1000, 1000 * 1.1111, True] + for pk in partition_keys: + changefeed = container.query_items_change_feed(is_start_from_beginning=True, partition_key=pk) + changefeed_items = [item for item in changefeed] + + # Perform a regular query + regular_query = container.query_items(query="SELECT * FROM c", partition_key=pk) + regular_query_items = [item for item in regular_query] + + # Compare the number of documents returned + assert len(changefeed_items) == len(regular_query_items), ( + f"Mismatch in document count for partition key {pk}: Changefeed returned {len(changefeed_items)} items," + f"while regular query returned {len(regular_query_items)} items." + ) + + def validate_changefeed_hpk(self, container): + """Helper to validate changefeed results for hierarchical partition keys.""" + partition_keys = [ + ["level1_", "level2_"], + ["level1_", "level2_long__" + "c" * 101], + [10, 1000], + [10 * 1.1, 10 * 2.2], + [True, False] + ] + for pk in partition_keys: + # Perform a change feed query + changefeed = container.query_items_change_feed(is_start_from_beginning=True, partition_key=pk) + changefeed_items = [item for item in changefeed] + + # Perform a regular query + regular_query = container.query_items(query="SELECT * FROM c", partition_key=pk) + regular_query_items = [item for item in regular_query] + + # Compare the number of documents returned + assert len(changefeed_items) == len(regular_query_items), ( + f"Mismatch in document count for partition key {pk}: Changefeed returned {len(changefeed_items)} items," + f"while regular query returned {len(regular_query_items)} items." + ) + + def test_partition_key_hashing(self): + """Test effective partition key string generation for different hash versions.""" + for hash_data in self.test_data_hash: + version = hash_data["version"] + expected_epk = hash_data["expected_epk"] + part_k = ("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghij" + "klmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz") + partition_key = PartitionKey( + path="/field1", + kind="Hash", + version=version + ) + epk_str = partition_key._get_effective_partition_key_string(part_k) + assert epk_str.upper() == expected_epk + + def test_hash_v1_partition_key(self): + """Test changefeed with Hash V1 partition key.""" + db = self.db + container = self.create_container(db, f"container_test_hash_V1_{uuid.uuid4()}", + "/pk", version=1) + items = self.single_hash_items + self.insert_items(container, items) + self.validate_changefeed(container) + self.db.delete_container(container.id) + + def test_hash_v2_partition_key(self): + """Test changefeed with Hash V2 partition key.""" + db = self.db + container = self.create_container(db, f"container_test_hash_V2_{uuid.uuid4()}", + "/pk", version=2) + items = self.single_hash_items + self.insert_items(container, items) + self.validate_changefeed(container) + self.db.delete_container(container.id) + + def test_hpk_partition_key(self): + """Test changefeed with hierarchical partition key.""" + db = self.db + container = self.create_container(db, f"container_test_hpk_{uuid.uuid4()}", + ["/pk1", "/pk2"]) + items = self.hpk_items + self.insert_items(container, items) + self.validate_changefeed_hpk(container) + self.db.delete_container(container.id) + + def test_multiple_physical_partitions(self): + """Test change feed with a container having multiple physical partitions.""" + db = self.db + + # Test for Hash V1 partition key + container_id_v1 = f"container_test_multiple_partitions_hash_v1_{uuid.uuid4()}" + throughput = 12000 # Ensure multiple physical partitions + container_v1 = self.create_container(db, container_id_v1, "/pk", version=1, throughput=throughput) + + # Verify the container has more than one physical partition + feed_ranges_v1 = container_v1.read_feed_ranges() + feed_ranges_v1 = [feed_range for feed_range in feed_ranges_v1] + assert len(feed_ranges_v1) > 1, "Hash V1 container does not have multiple physical partitions." + + # Insert items and validate change feed for Hash V1 + self.insert_items(container_v1, self.single_hash_items) + self.validate_changefeed(container_v1) + self.db.delete_container(container_v1.id) + + # Test for Hash V2 partition key + container_id_v2 = f"container_test_multiple_partitions_hash_v2_{uuid.uuid4()}" + container_v2 = self.create_container(db, container_id_v2, "/pk", version=2, throughput=throughput) + + # Verify the container has more than one physical partition + feed_ranges_v2 = container_v2.read_feed_ranges() + feed_ranges_v2 = [feed_range for feed_range in feed_ranges_v2] + assert len(feed_ranges_v2) > 1, "Hash V2 container does not have multiple physical partitions." + + # Insert items and validate change feed for Hash V2 + self.insert_items(container_v2, self.single_hash_items) + self.validate_changefeed(container_v2) + self.db.delete_container(container_v2.id) + + # Test for Hierarchical Partition Keys (HPK) + container_id_hpk = f"container_test_multiple_partitions_hpk_{uuid.uuid4()}" + container_hpk = self.create_container(db, container_id_hpk, ["/pk1", "/pk2"], throughput=throughput) + + # Verify the container has more than one physical partition + feed_ranges_hpk = container_hpk.read_feed_ranges() + feed_ranges_hpk = [feed_range for feed_range in feed_ranges_hpk] + assert len(feed_ranges_hpk) > 1, "HPK container does not have multiple physical partitions." + + # Insert items and validate change feed for HPK + self.insert_items(container_hpk, self.hpk_items) + self.validate_changefeed_hpk(container_hpk) + self.db.delete_container(container_hpk.id) + +if __name__ == "__main__": + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation_async.py b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation_async.py new file mode 100644 index 000000000000..09ca8edfc716 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_changefeed_partition_key_variation_async.py @@ -0,0 +1,226 @@ +import unittest +import uuid +import pytest +from azure.cosmos.aio import CosmosClient +import test_config +from azure.cosmos.partition_key import PartitionKey + +@pytest.mark.cosmosEmulator +@pytest.mark.asyncio +class TestChangeFeedPKVariationAsync(unittest.IsolatedAsyncioTestCase): + """Test change feed with different partition key variations (async version).""" + + configs = test_config.TestConfig + host = configs.host + masterKey = configs.masterKey + connectionPolicy = configs.connectionPolicy + client: CosmosClient = None + + # Items for Hash V1 partition key + single_hash_items = [ + {"id": str(i), "pk": f"short_string"} for i in range(1, 101) + ] + [ + {"id": str(i), "pk": f"long_string_" + "a" * 251} for i in range(101, 201) + ] + [ + {"id": str(i), "pk": 1000} for i in range(201, 301) + ] + [ + {"id": str(i), "pk": 1000 * 1.1111} for i in range(301, 401) + ] + [ + {"id": str(i), "pk": True} for i in range(401, 501) + ] + + # Items for Hierarchical Partition Keys + hpk_items = [ + {"id": str(i), "pk1": "level1_", "pk2": "level2_"} for i in range(1, 101) + ] + [ + {"id": str(i), "pk1": "level1_", "pk2": f"level2_long__" + "c" * 101} for i in range(101, 201) + ] + [ + {"id": str(i), "pk1": 10, "pk2": 1000} for i in range(201, 301) + ] + [ + {"id": str(i), "pk1": 10 * 1.1, "pk2": 10 * 2.2} for i in range(301, 401) + ] + [ + {"id": str(i), "pk1": True, 'pk2': False} for i in range(401, 501) + ] + + test_data_hash = [ + { + "version": 1, + "expected_epk": ( + "05C1DD5D8149640862636465666768696A6B6C6D6E6F70717273747576" + "7778797A7B62636465666768696A6B6C6D6E6F70717273747576777879" + "7A7B62636465666768696A6B6C6D6E6F707172737475767778797A7B62636465666768696A6B6C6D6E6F707172737475767700" + ), + }, + { + "version": 2, + "expected_epk": ( + "2032D236DA8678BFB900E866D7EBCE76" + ), + }, + ] + + @classmethod + async def asyncSetUpClass(cls): + cls.config = test_config.TestConfig() + if (cls.config.masterKey == '[YOUR_KEY_HERE]' or + cls.config.host == '[YOUR_ENDPOINT_HERE]'): + raise Exception( + "You must specify your Azure Cosmos account values for " + "'masterKey' and 'host' at the top of this class to run the " + "tests.") + + async def asyncSetUp(self): + self.client = CosmosClient(self.host, self.masterKey) + self.db = await self.client.create_database_if_not_exists(self.configs.TEST_DATABASE_ID) + + async def asyncTearDown(self): + await self.client.close() + + async def create_container(self, db, container_id, partition_key, version=None, throughput=None): + """Helper to create a container with a specific partition key definition.""" + if isinstance(partition_key, list): + # Assume multihash (hierarchical partition key) for the container + pk_definition = PartitionKey(path=partition_key, kind='MultiHash') + else: + pk_definition = PartitionKey(path=partition_key, kind='Hash', version=version) + if throughput: + return await db.create_container(container_id, pk_definition, offer_throughput=throughput) + return await db.create_container(container_id, pk_definition) + + async def insert_items(self, container, items): + """Helper to insert items into a container.""" + for item in items: + await container.create_item(body=item) + + async def validate_changefeed(self, container): + """Helper to validate changefeed results.""" + partition_keys = ["short_string", "long_string_" + "a" * 251, 1000, 1000 * 1.1111, True] + for pk in partition_keys: + changefeed = container.query_items_change_feed(is_start_from_beginning=True, partition_key=pk) + changefeed_items = [item async for item in changefeed] + + # Perform a regular query + regular_query = container.query_items(query="SELECT * FROM c", partition_key=pk) + regular_query_items = [item async for item in regular_query] + + # Compare the number of documents returned + assert len(changefeed_items) == len(regular_query_items), ( + f"Mismatch in document count for partition key {pk}: Changefeed returned {len(changefeed_items)} items, " + f"while regular query returned {len(regular_query_items)} items." + ) + + async def validate_changefeed_hpk(self, container): + """Helper to validate changefeed results for hierarchical partition keys.""" + partition_keys = [ + ["level1_", "level2_"], + ["level1_", "level2_long__" + "c" * 101], + [10, 1000], + [10 * 1.1, 10 * 2.2], + [True, False] + ] + for pk in partition_keys: + # Perform a change feed query + changefeed = container.query_items_change_feed(is_start_from_beginning=True, partition_key=pk) + changefeed_items = [item async for item in changefeed] + + # Perform a regular query + regular_query = container.query_items(query="SELECT * FROM c", partition_key=pk) + regular_query_items = [item async for item in regular_query] + + # Compare the number of documents returned + assert len(changefeed_items) == len(regular_query_items), ( + f"Mismatch in document count for partition key {pk}: Changefeed returned {len(changefeed_items)} items, " + f"while regular query returned {len(regular_query_items)} items." + ) + + async def test_partition_key_hashing(self): + """Test effective partition key string generation for different hash versions.""" + for hash_data in self.test_data_hash: + version = hash_data["version"] + expected_epk = hash_data["expected_epk"] + part_k = ("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghij" + "klmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz") + partition_key = PartitionKey( + path="/field1", + kind="Hash", + version=version + ) + epk_str = partition_key._get_effective_partition_key_string(part_k) + assert epk_str.upper() == expected_epk + + async def test_hash_v1_partition_key(self): + """Test changefeed with Hash V1 partition key.""" + db = self.db + container = await self.create_container(db, f"container_test_hash_V1_{uuid.uuid4()}", "/pk", version=1) + items = self.single_hash_items + await self.insert_items(container, items) + await self.validate_changefeed(container) + await self.db.delete_container(container.id) + + async def test_hash_v2_partition_key(self): + """Test changefeed with Hash V2 partition key.""" + db = self.db + container = await self.create_container(db, f"container_test_hash_V2_{uuid.uuid4()}", "/pk", version=2) + items = self.single_hash_items + await self.insert_items(container, items) + await self.validate_changefeed(container) + await self.db.delete_container(container.id) + + async def test_hpk_partition_key(self): + """Test changefeed with hierarchical partition key.""" + db = self.db + container = await self.create_container(db, f"container_test_hpk_{uuid.uuid4()}", ["/pk1", "/pk2"]) + items = self.hpk_items + await self.insert_items(container, items) + await self.validate_changefeed_hpk(container) + await self.db.delete_container(container.id) + + async def test_multiple_physical_partitions(self): + """Test change feed with a container having multiple physical partitions.""" + db = self.db + + # Test for Hash V1 partition key + container_id_v1 = f"container_test_multiple_partitions_hash_v1_{uuid.uuid4()}" + throughput = 12000 # Ensure multiple physical partitions + container_v1 = await self.create_container(db, container_id_v1, "/pk", version=1, throughput=throughput) + + # Verify the container has more than one physical partition + feed_ranges_v1 = container_v1.read_feed_ranges() + feed_ranges_v1 = [feed_range async for feed_range in feed_ranges_v1] + assert len(feed_ranges_v1) > 1, "Hash V1 container does not have multiple physical partitions." + + # Insert items and validate change feed for Hash V1 + await self.insert_items(container_v1, self.single_hash_items) + await self.validate_changefeed(container_v1) + await self.db.delete_container(container_v1.id) + + # Test for Hash V2 partition key + container_id_v2 = f"container_test_multiple_partitions_hash_v2_{uuid.uuid4()}" + container_v2 = await self.create_container(db, container_id_v2, "/pk", version=2, throughput=throughput) + + # Verify the container has more than one physical partition + feed_ranges_v2 = container_v2.read_feed_ranges() + feed_ranges_v2 = [feed_range async for feed_range in feed_ranges_v2] + assert len(feed_ranges_v2) > 1, "Hash V2 container does not have multiple physical partitions." + + # Insert items and validate change feed for Hash V2 + await self.insert_items(container_v2, self.single_hash_items) + await self.validate_changefeed(container_v2) + await self.db.delete_container(container_v2.id) + + # Test for Hierarchical Partition Keys (HPK) + container_id_hpk = f"container_test_multiple_partitions_hpk_{uuid.uuid4()}" + container_hpk = await self.create_container(db, container_id_hpk, ["/pk1", "/pk2"], throughput=throughput) + + # Verify the container has more than one physical partition + feed_ranges_hpk = container_hpk.read_feed_ranges() + feed_ranges_hpk = [feed_range async for feed_range in feed_ranges_hpk] + assert len(feed_ranges_hpk) > 1, "HPK container does not have multiple physical partitions." + + # Insert items and validate change feed for HPK + await self.insert_items(container_hpk, self.hpk_items) + await self.validate_changefeed_hpk(container_hpk) + await self.db.delete_container(container_hpk.id) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file