From aeae31fa936b0224efec78ca787033151d28648e Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 20 May 2025 15:55:51 +0000 Subject: [PATCH] Fixing gaps for Containers with HashV1 --- .../azure/cosmos/_cosmos_client_connection.py | 8 ++- .../azure/cosmos/_cosmos_murmurhash3.py | 42 ++++++++++++ .../azure/cosmos/aio/_container.py | 5 +- .../aio/_cosmos_client_connection_async.py | 5 +- .../azure-cosmos/azure/cosmos/container.py | 10 ++- .../azure/cosmos/partition_key.py | 65 +++++++++++++++---- 6 files changed, 116 insertions(+), 19 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index 24e6b5bfd58c..9b14fed3b8ac 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -3121,9 +3121,11 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: if isPrefixPartitionQuery: last_response_headers = CaseInsensitiveDict() # here get the over lapping ranges - partition_key_definition = kwargs.pop("partitionKeyDefinition", None) - pk_properties = partition_key_definition - partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) + pk_properties: Optional[PartitionKey] = kwargs.pop("partitionKeyDefinition", None) + partition_key_definition = PartitionKey( + path=pk_properties["paths"], + kind=pk_properties["kind"], + version=pk_properties["version"]) partition_key_value = pk_properties["partition_key"] feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key( partition_key_value diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_murmurhash3.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_murmurhash3.py index 9d1a6af528e7..56a0add3908f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_murmurhash3.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_murmurhash3.py @@ -147,3 +147,45 @@ def murmurhash3_128(span: bytearray, seed: _UInt128) -> _UInt128: # pylint: dis h2 += h1 return _UInt128(int(h1.value), int(h2.value)) + +def murmurhash3_32(data: bytearray, length: int, seed: int) -> int: + c1: int = 0xcc9e2d51 + c2: int = 0x1b873593 + + h1: int = seed + rounded_end: int = (length & 0xfffffffc) # round down to 4 byte block + + for i in range(0, rounded_end, 4): + # little endian load order + k1: int = (data[i] & 0xff) | ((data[i + 1] & 0xff) << 8) | ((data[i + 2] & 0xff) << 16) | ( + data[i + 3] << 24) + k1 *= c1 + k1 = (k1 << 15) | (k1 >> 17) # ROTL32(k1,15) + k1 *= c2 + + h1 ^= k1 + h1 = (h1 << 13) | (h1 >> 19) # ROTL32(h1,13) + h1 = h1 * 5 + 0xe6546b64 + + # tail + k1: int = 0 + if length & 0x03 == 3: + k1 = (data[rounded_end + 2] & 0xff) << 16 + if length & 0x03 >= 2: + k1 |= (data[rounded_end + 1] & 0xff) << 8 + if length & 0x03 >= 1: + k1 |= (data[rounded_end] & 0xff) + k1 *= c1 + k1 = (k1 << 15) | (k1 >> 17) + k1 *= c2 + h1 ^= k1 + + # finalization + h1 ^= length + h1 ^= h1 >> 16 + h1 *= 0x85ebca6b + h1 ^= h1 >> 13 + h1 *= 0xc2b2ae35 + h1 ^= h1 >> 16 + + return h1 diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 38b0c63bd412..4be3342dd26d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -144,7 +144,10 @@ async def _get_epk_range_for_partition_key(self, partition_key_value: PartitionK container_properties = await self._get_properties() partition_key_definition = container_properties["partitionKey"] - partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"]) + partition_key = PartitionKey( + path=partition_key_definition["paths"], + kind=partition_key_definition["kind"], + version=partition_key_definition["version"]) return partition_key._get_epk_range_for_partition_key(partition_key_value) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 6f3fd6d8962d..599c1cb5c0c6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -2914,7 +2914,10 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: if cont_prop: cont_prop = await cont_prop() pk_properties = cont_prop["partitionKey"] - partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) + partition_key_definition = PartitionKey( + path=pk_properties["paths"], + kind=pk_properties["kind"], + version=pk_properties["version"]) if partition_key_definition.kind == "MultiHash" and \ (isinstance(partition_key, List) and \ len(partition_key_definition['paths']) != len(partition_key)): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 08203b803cf6..72459ede7a3a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -138,7 +138,10 @@ def _set_partition_key( def _get_epk_range_for_partition_key( self, partition_key_value: PartitionKeyType) -> Range: container_properties = self._get_properties() partition_key_definition = container_properties["partitionKey"] - partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"]) + partition_key = PartitionKey( + path=partition_key_definition["paths"], + kind=partition_key_definition["kind"], + version=partition_key_definition["version"]) return partition_key._get_epk_range_for_partition_key(partition_key_value) @@ -715,7 +718,10 @@ def __is_prefix_partitionkey( self, partition_key: PartitionKeyType) -> bool: properties = self._get_properties() pk_properties = properties["partitionKey"] - partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"]) + partition_key_definition = PartitionKey( + path=pk_properties["paths"], + kind=pk_properties["kind"], + version=pk_properties["version"]) return partition_key_definition._is_prefix_partition_key(partition_key) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py index 07274b2387e4..f7137b4c30fb 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/partition_key.py @@ -27,7 +27,7 @@ from typing_extensions import Literal from ._cosmos_integers import _UInt64, _UInt128 -from ._cosmos_murmurhash3 import murmurhash3_128 as _murmurhash3_128 +from ._cosmos_murmurhash3 import murmurhash3_128 as _murmurhash3_128, murmurhash3_32 as _murmurhash3_32 from ._routing.routing_range import Range as _Range @@ -187,9 +187,34 @@ def _get_epk_range_for_partition_key( cast(List[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]], [pk_value])) return _Range(effective_partition_key_string, effective_partition_key_string, True, True) - def _get_effective_partition_key_for_hash_partitioning(self) -> str: - # We shouldn't be supporting V1 - return "" + @staticmethod + def _as_unsigned_long(x: int) -> int: return x & 0xFFFFFFFF + + @staticmethod + def _truncate_for_v1_hashing(value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]) -> Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]: + if isinstance(value, str): + return value[:100] + + return value + + @staticmethod + def _get_effective_partition_key_for_hash_partitioning( + pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] + ) -> str: + with (BytesIO() as ms): + truncated_components: List[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] = \ + [None] + [PartitionKey._truncate_for_v1_hashing(v) for v in pk_value] + + for component in truncated_components[1:]: + PartitionKey._write_for_hashing(component, ms) + + ms_bytes: bytes = ms.getvalue() + hash_as_int: int = _murmurhash3_32(bytearray(ms_bytes),len(bytes), 0) + hash_value = float(PartitionKey._as_unsigned_long(hash_as_int)) + + truncated_components[0] = hash_value + + return _to_hex_encoded_binary_string(truncated_components) def _get_effective_partition_key_string( self, @@ -205,17 +230,32 @@ def _get_effective_partition_key_string( if kind == 'Hash': version = self.version or 2 if version == 1: - return self._get_effective_partition_key_for_hash_partitioning() + return PartitionKey._get_effective_partition_key_for_hash_partitioning(pk_value) if version == 2: - return self._get_effective_partition_key_for_hash_partitioning_v2(pk_value) + return PartitionKey._get_effective_partition_key_for_hash_partitioning_v2(pk_value) elif kind == 'MultiHash': return self._get_effective_partition_key_for_multi_hash_partitioning_v2(pk_value) return _to_hex_encoded_binary_string(pk_value) + @staticmethod + def _write_for_hashing( + value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], + writer: IO[bytes] + ) -> None: + PartitionKey._write_for_hashing_core(value, bytes([0]), writer) + + @staticmethod def _write_for_hashing_v2( - self, value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], writer: IO[bytes] + ) -> None: + PartitionKey._write_for_hashing_core(value, bytes([0xFF]), writer) + + @staticmethod + def _write_for_hashing_core( + value: Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]], + string_suffix: bytes, + writer: IO[bytes] ) -> None: if value is True: writer.write(bytes([_PartitionKeyComponentType.PTrue])) @@ -232,17 +272,18 @@ def _write_for_hashing_v2( elif isinstance(value, str): writer.write(bytes([_PartitionKeyComponentType.String])) writer.write(value.encode('utf-8')) - writer.write(bytes([0xFF])) + writer.write(string_suffix) elif isinstance(value, _Undefined): writer.write(bytes([_PartitionKeyComponentType.Undefined])) + + @staticmethod def _get_effective_partition_key_for_hash_partitioning_v2( - self, pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] ) -> str: with BytesIO() as ms: for component in pk_value: - self._write_for_hashing_v2(component, ms) + PartitionKey._write_for_hashing_v2(component, ms) ms_bytes = ms.getvalue() hash128 = _murmurhash3_128(bytearray(ms_bytes), _UInt128(0, 0)) @@ -255,8 +296,8 @@ def _get_effective_partition_key_for_hash_partitioning_v2( return ''.join('{:02X}'.format(x) for x in hash_bytes) + @staticmethod def _get_effective_partition_key_for_multi_hash_partitioning_v2( - self, pk_value: Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] ) -> str: sb = [] @@ -265,7 +306,7 @@ def _get_effective_partition_key_for_multi_hash_partitioning_v2( binary_writer = ms # In Python, you can write bytes directly to a BytesIO object # Assuming paths[i] is the correct object to call write_for_hashing_v2 on - self._write_for_hashing_v2(value, binary_writer) + PartitionKey._write_for_hashing_v2(value, binary_writer) ms_bytes = ms.getvalue() hash128 = _murmurhash3_128(bytearray(ms_bytes), _UInt128(0, 0))