Skip to content

Users/bambriz/feedrangehotfix0 #41270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

#### Features Added

#### Breaking Changes

#### Bugs Fixed
* Fixed issue where Query Change Feed did not return items if the container uses legacy Hash V1 Partition Keys. This also fixes issues with not being able to change feed query for Specific Partition Key Values for HPK. See [PR 41270](https://github.com/Azure/azure-sdk-for-python/pull/41270/)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3118,12 +3118,15 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:

# check if query has prefix partition key
isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None)
if isPrefixPartitionQuery:
if isPrefixPartitionQuery and "partitionKeyDefinition" in kwargs:
last_response_headers = CaseInsensitiveDict()
# here get the over lapping ranges
partition_key_definition = kwargs.pop("partitionKeyDefinition", None)
pk_properties = partition_key_definition
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
# Default to empty Dictionary, but unlikely to be empty as we first check if we have it in kwargs
pk_properties: Union[PartitionKey, Dict] = kwargs.pop("partitionKeyDefinition", {})
partition_key_definition = PartitionKey(
path=pk_properties["paths"],
kind=pk_properties["kind"],
version=pk_properties["version"])
partition_key_value = pk_properties["partition_key"]
feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key(
partition_key_value
Expand Down
117 changes: 117 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_integers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,97 @@
from typing import NoReturn, Tuple, Union


class _UInt32:
def __init__(self, value: int) -> None:
self._value: int = value & 0xFFFFFFFF

@property
def value(self) -> int:
return self._value

@value.setter
def value(self, new_value: int) -> None:
self._value = new_value & 0xFFFFFFFF

def __add__(self, other: Union[int, '_UInt32']) -> '_UInt32':
result = self.value + (other.value if isinstance(other, _UInt32) else other)
return _UInt32(result & 0xFFFFFFFF)

def __sub__(self, other: Union[int, '_UInt32']) -> '_UInt32':
result = self.value - (other.value if isinstance(other, _UInt32) else other)
return _UInt32(result & 0xFFFFFFFF)

def __mul__(self, other: Union[int, '_UInt32']) -> '_UInt32':
result = self.value * (other.value if isinstance(other, _UInt32) else other)
return _UInt32(result & 0xFFFFFFFF)

def __xor__(self, other: Union[int, '_UInt32']) -> '_UInt32':
result = self.value ^ (other.value if isinstance(other, _UInt32) else other)
return _UInt32(result & 0xFFFFFFFF)

def __lshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
result = self.value << (other.value if isinstance(other, _UInt32) else other)
return _UInt32(result & 0xFFFFFFFF)

def __ilshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
self._value = (self.value << (other.value if isinstance(other, _UInt32) else other)) & 0xFFFFFFFF
return self

def __rshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
result = self.value >> (other.value if isinstance(other, _UInt32) else other)
return _UInt32(result & 0xFFFFFFFF)

def __irshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
self._value = (self.value >> (other.value if isinstance(other, _UInt32) else other)) & 0xFFFFFFFF
return self

def __and__(self, other: Union[int, '_UInt32']) -> '_UInt32':
result = self.value & (other.value if isinstance(other, _UInt32) else other)
return _UInt32(result & 0xFFFFFFFF)

def __or__(self, other: Union[int, '_UInt32']) -> '_UInt32':
if isinstance(other, _UInt32):
return _UInt32(self.value | other.value)
if isinstance(other, int):
return _UInt32(self.value | other)
raise TypeError("Unsupported type for OR operation")

def __invert__(self) -> '_UInt32':
return _UInt32(~self.value & 0xFFFFFFFF)

def __eq__(self, other: Union[int, '_UInt32', object]) -> bool:
return self.value == (other.value if isinstance(other, _UInt32) else other)

def __ne__(self, other: Union[int, '_UInt32', object]) -> bool:
return not self.__eq__(other)

def __lt__(self, other: Union[int, '_UInt32']) -> bool:
return self.value < (other.value if isinstance(other, _UInt32) else other)

def __gt__(self, other: Union[int, '_UInt32']) -> bool:
return self.value > (other.value if isinstance(other, _UInt32) else other)

def __le__(self, other: Union[int, '_UInt32']) -> bool:
return self.value <= (other.value if isinstance(other, _UInt32) else other)

def __ge__(self, other: Union[int, '_UInt32']) -> bool:
return self.value >= (other.value if isinstance(other, _UInt32) else other)

@staticmethod
def encode_double_as_uint32(value: float) -> int:
value_in_uint32 = struct.unpack('<I', struct.pack('<f', value))[0]
mask = 0x80000000
return (value_in_uint32 ^ mask) if value_in_uint32 < mask else (~value_in_uint32) + 1

@staticmethod
def decode_double_from_uint32(value: int) -> int:
mask = 0x80000000
value = ~(value - 1) if value < mask else value ^ mask
return struct.unpack('<f', struct.pack('<I', value))[0]

def __int__(self) -> int:
return self.value

class _UInt64:
def __init__(self, value: int) -> None:
self._value: int = value & 0xFFFFFFFFFFFFFFFF
Expand Down Expand Up @@ -72,6 +163,32 @@ def __or__(self, other: Union[int, '_UInt64']) -> '_UInt64':
def __invert__(self) -> '_UInt64':
return _UInt64(~self.value & 0xFFFFFFFFFFFFFFFF)

def __eq__(self, other: Union[int, '_UInt64', object]) -> bool:
return self.value == (other.value if isinstance(other, _UInt64) else other)

def __ne__(self, other: Union[int, '_UInt64', object]) -> bool:
return not self.__eq__(other)

def __irshift__(self, other: Union[int, '_UInt64']) -> '_UInt64':
self._value = (self.value >> (other.value if isinstance(other, _UInt64) else other)) & 0xFFFFFFFFFFFFFFFF
return self

def __ilshift__(self, other: Union[int, '_UInt64']) -> '_UInt64':
self._value = (self.value << (other.value if isinstance(other, _UInt64) else other)) & 0xFFFFFFFFFFFFFFFF
return self

def __lt__(self, other: Union[int, '_UInt64']) -> bool:
return self.value < (other.value if isinstance(other, _UInt64) else other)

def __gt__(self, other: Union[int, '_UInt64']) -> bool:
return self.value > (other.value if isinstance(other, _UInt64) else other)

def __le__(self, other: Union[int, '_UInt64']) -> bool:
return self.value <= (other.value if isinstance(other, _UInt64) else other)

def __ge__(self, other: Union[int, '_UInt64']) -> bool:
return self.value >= (other.value if isinstance(other, _UInt64) else other)

@staticmethod
def encode_double_as_uint64(value: float) -> int:
value_in_uint64 = struct.unpack('<Q', struct.pack('<d', value))[0]
Expand Down
46 changes: 45 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_murmurhash3.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# This is public domain code with no copyrights. From home page of
# <a href="https://github.com/aappleby/smhasher">SMHasher</a>:
# "All MurmurHash versions are public domain software, and the author disclaims all copyright to their code."
from ._cosmos_integers import _UInt128, _UInt64
from ._cosmos_integers import _UInt128, _UInt64, _UInt32


def rotate_left_64(val: int, shift: int) -> int:
Expand Down Expand Up @@ -147,3 +147,47 @@ def murmurhash3_128(span: bytearray, seed: _UInt128) -> _UInt128: # pylint: dis
h2 += h1

return _UInt128(int(h1.value), int(h2.value))


def murmurhash3_32(data: bytearray, seed: int) -> _UInt32:
c1: _UInt32 = _UInt32(0xcc9e2d51)
c2: _UInt32 = _UInt32(0x1b873593)
length: _UInt32 = _UInt32(len(data))
h1: _UInt32 = _UInt32(seed)
rounded_end: _UInt32 = _UInt32(length.value & 0xfffffffc) # round down to 4 byte block

for i in range(0, rounded_end.value, 4):
# little endian load order
k1: _UInt32 = _UInt32(
(data[i] & 0xff) | ((data[i + 1] & 0xff) << 8) | ((data[i + 2] & 0xff) << 16) | (data[i + 3] << 24)
)
k1 *= c1
k1.value = (k1.value << 15) | (k1.value >> 17) # ROTL32(k1,15)
k1 *= c2

h1 ^= k1
h1.value = (h1.value << 13) | (h1.value >> 19) # ROTL32(h1,13)
h1 = h1 * _UInt32(5) + _UInt32(0xe6546b64)

# tail
k1 = _UInt32(0)
if length.value & 0x03 == 3:
k1 ^= _UInt32((data[rounded_end.value + 2] & 0xff) << 16)
if length.value & 0x03 >= 2:
k1 ^= _UInt32((data[rounded_end.value + 1] & 0xff) << 8)
if length.value & 0x03 >= 1:
k1 ^= _UInt32(data[rounded_end.value] & 0xff)
k1 *= c1
k1.value = (k1.value << 15) | (k1.value >> 17)
k1 *= c2
h1 ^= k1

# finalization
h1 ^= length
h1.value ^= h1.value >> 16
h1 *= _UInt32(0x85ebca6b)
h1.value ^= h1.value >> 13
h1 *= _UInt32(0xc2b2ae35)
h1.value ^= h1.value >> 16

return h1
5 changes: 4 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ async def _get_epk_range_for_partition_key(self, partition_key_value: PartitionK

container_properties = await self._get_properties()
partition_key_definition = container_properties["partitionKey"]
partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"])
partition_key = PartitionKey(
path=partition_key_definition["paths"],
kind=partition_key_definition["kind"],
version=partition_key_definition["version"])

return partition_key._get_epk_range_for_partition_key(partition_key_value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2914,7 +2914,10 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
if cont_prop:
cont_prop = await cont_prop()
pk_properties = cont_prop["partitionKey"]
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
partition_key_definition = PartitionKey(
path=pk_properties["paths"],
kind=pk_properties["kind"],
version=pk_properties["version"])
if partition_key_definition.kind == "MultiHash" and \
(isinstance(partition_key, List) and \
len(partition_key_definition['paths']) != len(partition_key)):
Expand Down
10 changes: 8 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ def _set_partition_key(
def _get_epk_range_for_partition_key( self, partition_key_value: PartitionKeyType) -> Range:
container_properties = self._get_properties()
partition_key_definition = container_properties["partitionKey"]
partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"])
partition_key = PartitionKey(
path=partition_key_definition["paths"],
kind=partition_key_definition["kind"],
version=partition_key_definition["version"])

return partition_key._get_epk_range_for_partition_key(partition_key_value)

Expand Down Expand Up @@ -715,7 +718,10 @@ def __is_prefix_partitionkey(
self, partition_key: PartitionKeyType) -> bool:
properties = self._get_properties()
pk_properties = properties["partitionKey"]
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
partition_key_definition = PartitionKey(
path=pk_properties["paths"],
kind=pk_properties["kind"],
version=pk_properties["version"])
return partition_key_definition._is_prefix_partition_key(partition_key)


Expand Down
Loading
Loading