Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten AND and OR conditions #173

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/aiodynamo/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from .errors import CannotAddToNestedField
from .types import AttributeType, Numeric, ParametersDict
from .utils import deparametetrize, low_level_serialize
from .utils import MinLen2AppendOnlyList, deparametetrize, low_level_serialize

_ParametersCache = Dict[Tuple[Any, Any], str]

Expand Down Expand Up @@ -377,10 +377,20 @@ def encode(self, params: Parameters) -> str:

class Condition(metaclass=abc.ABCMeta):
def __and__(self, other: Condition) -> Condition:
return AndCondition(self, other)
if isinstance(self, AndCondition):
if isinstance(other, AndCondition):
return AndCondition(self.children.extending(other.children))
else:
return AndCondition(self.children.appending(other))
return AndCondition(MinLen2AppendOnlyList.create(self, other))

def __or__(self, other: Condition) -> Condition:
return OrCondition(self, other)
if isinstance(self, OrCondition):
if isinstance(other, OrCondition):
return OrCondition(self.children.extending(other.children))
else:
return OrCondition(self.children.appending(other))
return OrCondition(MinLen2AppendOnlyList.create(self, other))

def __invert__(self) -> Condition:
return NotCondition(self)
Expand Down Expand Up @@ -408,20 +418,18 @@ def encode(self, params: Parameters) -> str:

@dataclass(frozen=True)
class AndCondition(Condition):
lhs: Condition
rhs: Condition
children: MinLen2AppendOnlyList[Condition]

def encode(self, params: Parameters) -> str:
return f"({self.lhs.encode(params)} AND {self.rhs.encode(params)})"
return "(" + " AND ".join(child.encode(params) for child in self.children) + ")"


@dataclass(frozen=True)
class OrCondition(Condition):
lhs: Condition
rhs: Condition
children: MinLen2AppendOnlyList[Condition]

def encode(self, params: Parameters) -> str:
return f"({self.lhs.encode(params)} OR {self.rhs.encode(params)})"
return "(" + " OR ".join(child.encode(params) for child in self.children) + ")"


@dataclass(frozen=True)
Expand Down
44 changes: 44 additions & 0 deletions src/aiodynamo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
import decimal
import logging
from collections import abc as collections_abc
from dataclasses import dataclass
from functools import reduce
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Generator,
Generic,
Iterable,
List,
Mapping,
Set,
Expand Down Expand Up @@ -197,3 +201,43 @@ def deparametetrize(
for key, value in params.names.items():
expression = expression.replace(key, value)
return expression


@dataclass(frozen=True)
class MinLen2AppendOnlyList(Generic[T]):
ojii marked this conversation as resolved.
Show resolved Hide resolved
first: T
second: T
ojii marked this conversation as resolved.
Show resolved Hide resolved
rest: tuple[T, ...]

@classmethod
def create(cls, first: T, second: T, *rest: T) -> MinLen2AppendOnlyList[T]:
return cls(first, second, rest)

def appending(self, value: T) -> MinLen2AppendOnlyList[T]:
return MinLen2AppendOnlyList(self.first, self.second, (*self.rest, value))

def extending(self, values: Iterable[T]) -> MinLen2AppendOnlyList[T]:
return MinLen2AppendOnlyList(self.first, self.second, (*self.rest, *values))

def __contains__(self, item: Any) -> bool:
return item == self.first or item == self.second or item in self.rest
ojii marked this conversation as resolved.
Show resolved Hide resolved

def __getitem__(self, index: int) -> T:
if index == 0:
return self.first
elif index == 1:
return self.second
return self.rest[index - 2]

def __len__(self) -> int:
return len(self.rest) + 2

def __iter__(self) -> Generator[T, None, None]:
yield self.first
yield self.second
yield from self.rest

def __reversed__(self) -> Generator[T, None, None]:
yield from reversed(self.rest)
yield self.second
yield self.first
95 changes: 95 additions & 0 deletions tests/unit/test_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import pytest

from aiodynamo.expressions import (
AndCondition,
Comparison,
Condition,
F,
HashKey,
OrCondition,
Parameters,
ProjectionExpression,
UpdateExpression,
)
from aiodynamo.utils import MinLen2AppendOnlyList


@pytest.mark.parametrize(
Expand Down Expand Up @@ -115,6 +119,10 @@ def test_f_repr(f: F, r: str) -> None:
[
(F("a").equals(True) & F("b").gt(1), "(a = True AND b > 1)"),
(F("a", 1).begins_with("foo"), "begins_with(a[1], 'foo')"),
(
F("a").equals("a") & F("b").equals("b") & F("c").equals("c"),
"(a = 'a' AND b = 'b' AND c = 'c')",
),
],
)
def test_condition_debug(expr: Condition, expected: str) -> None:
Expand All @@ -133,3 +141,90 @@ def test_condition_debug(expr: Condition, expected: str) -> None:
)
def test_update_expression_debug(expr: UpdateExpression, expected: str) -> None:
assert expr.debug(int) == expected


@pytest.mark.parametrize(
"expr,expected",
[
(
F("a").equals("a") & F("b").equals("b"),
AndCondition(
MinLen2AppendOnlyList.create(
Comparison(F("a"), "=", "a"), Comparison(F("b"), "=", "b")
)
),
),
(
(F("a").equals("a") & F("b").equals("b")) & F("c").equals("c"),
AndCondition(
MinLen2AppendOnlyList.create(
Comparison(F("a"), "=", "a"),
Comparison(F("b"), "=", "b"),
Comparison(F("c"), "=", "c"),
)
),
),
(
(F("a").equals("a") & F("b").equals("b"))
& (F("c").equals("c") & F("d").equals("d")),
AndCondition(
MinLen2AppendOnlyList.create(
Comparison(F("a"), "=", "a"),
Comparison(F("b"), "=", "b"),
Comparison(F("c"), "=", "c"),
Comparison(F("d"), "=", "d"),
)
),
),
(
F("a").equals("a") | F("b").equals("b"),
OrCondition(
MinLen2AppendOnlyList.create(
Comparison(F("a"), "=", "a"), Comparison(F("b"), "=", "b")
)
),
),
(
(F("a").equals("a") | F("b").equals("b")) | F("c").equals("c"),
OrCondition(
MinLen2AppendOnlyList.create(
Comparison(F("a"), "=", "a"),
Comparison(F("b"), "=", "b"),
Comparison(F("c"), "=", "c"),
)
),
),
(
(F("a").equals("a") | F("b").equals("b"))
| (F("c").equals("c") | F("d").equals("d")),
OrCondition(
MinLen2AppendOnlyList.create(
Comparison(F("a"), "=", "a"),
Comparison(F("b"), "=", "b"),
Comparison(F("c"), "=", "c"),
Comparison(F("d"), "=", "d"),
)
),
),
(
(F("a").equals("a") | F("b").equals("b"))
& (F("c").equals("c") | F("d").equals("d")),
AndCondition(
MinLen2AppendOnlyList.create(
OrCondition(
MinLen2AppendOnlyList.create(
Comparison(F("a"), "=", "a"), Comparison(F("b"), "=", "b")
)
),
OrCondition(
MinLen2AppendOnlyList.create(
Comparison(F("c"), "=", "c"), Comparison(F("d"), "=", "d")
)
),
)
),
),
],
)
def test_condition_flattening(expr: Condition, expected: Condition) -> None:
assert expr == expected
Loading