add is null filter, add complex fiters (and_ & or_) and basic tests for them

This commit is contained in:
collerek
2021-03-06 13:07:22 +01:00
parent 7c0f8e976a
commit eeabb60200
14 changed files with 509 additions and 21 deletions

View File

@ -70,6 +70,8 @@ You can use special filter suffix to change the filter operands:
* contains - like `album__name__contains='Mal'` (sql like) * contains - like `album__name__contains='Mal'` (sql like)
* icontains - like `album__name__icontains='mal'` (sql like case insensitive) * icontains - like `album__name__icontains='mal'` (sql like case insensitive)
* in - like `album__name__in=['Malibu', 'Barclay']` (sql in) * in - like `album__name__in=['Malibu', 'Barclay']` (sql in)
* isnull - like `album__name__isnull=True` (sql is null)
(isnotnull `album__name__isnull=False` (sql is not null))
* gt - like `position__gt=3` (sql >) * gt - like `position__gt=3` (sql >)
* gte - like `position__gte=3` (sql >=) * gte - like `position__gte=3` (sql >=)
* lt - like `position__lt=3` (sql <) * lt - like `position__lt=3` (sql <)

View File

@ -1,3 +1,13 @@
# 0.9.7
## Features
* Add `isnull` operator to filter and exclude methods.
```python
album__name__isnull=True #(sql: album.name is null)
album__name__isnull=False #(sql: album.name is not null))
```
# 0.9.6 # 0.9.6
##Important ##Important

View File

@ -56,7 +56,7 @@ from ormar.fields import (
) # noqa: I100 ) # noqa: I100
from ormar.models import ExcludableItems, Model from ormar.models import ExcludableItems, Model
from ormar.models.metaclass import ModelMeta from ormar.models.metaclass import ModelMeta
from ormar.queryset import OrderAction, QuerySet from ormar.queryset import OrderAction, QuerySet, and_, or_
from ormar.relations import RelationType from ormar.relations import RelationType
from ormar.signals import Signal from ormar.signals import Signal
@ -68,7 +68,7 @@ class UndefinedType: # pragma no cover
Undefined = UndefinedType() Undefined = UndefinedType()
__version__ = "0.9.6" __version__ = "0.9.7"
__all__ = [ __all__ = [
"Integer", "Integer",
"BigInteger", "BigInteger",
@ -108,4 +108,6 @@ __all__ = [
"ForeignKeyField", "ForeignKeyField",
"OrderAction", "OrderAction",
"ExcludableItems", "ExcludableItems",
"and_",
"or_",
] ]

View File

@ -1,4 +1,3 @@
import inspect
from typing import Any, List, Optional, TYPE_CHECKING, Type, Union from typing import Any, List, Optional, TYPE_CHECKING, Type, Union
import sqlalchemy import sqlalchemy

View File

@ -75,7 +75,6 @@ def create_dummy_model(
fields = {f"{pk_field.name}": (pk_field.__type__, None)} fields = {f"{pk_field.name}": (pk_field.__type__, None)}
dummy_model = create_model( # type: ignore dummy_model = create_model( # type: ignore
f"PkOnly{base_model.get_name(lower=False)}{alias}", f"PkOnly{base_model.get_name(lower=False)}{alias}",
__module__=base_model.__module__, __module__=base_model.__module__,
**fields, # type: ignore **fields, # type: ignore

View File

@ -2,6 +2,7 @@
Contains QuerySet and different Query classes to allow for constructing of sql queries. Contains QuerySet and different Query classes to allow for constructing of sql queries.
""" """
from ormar.queryset.actions import FilterAction, OrderAction from ormar.queryset.actions import FilterAction, OrderAction
from ormar.queryset.clause import and_, or_
from ormar.queryset.filter_query import FilterQuery from ormar.queryset.filter_query import FilterQuery
from ormar.queryset.limit_query import LimitQuery from ormar.queryset.limit_query import LimitQuery
from ormar.queryset.offset_query import OffsetQuery from ormar.queryset.offset_query import OffsetQuery
@ -16,4 +17,6 @@ __all__ = [
"OrderQuery", "OrderQuery",
"FilterAction", "FilterAction",
"OrderAction", "OrderAction",
"and_",
"or_",
] ]

View File

@ -19,6 +19,7 @@ FILTER_OPERATORS = {
"istartswith": "ilike", "istartswith": "ilike",
"endswith": "like", "endswith": "like",
"iendswith": "ilike", "iendswith": "ilike",
"isnull": "is_",
"in": "in_", "in": "in_",
"gt": "__gt__", "gt": "__gt__",
"gte": "__ge__", "gte": "__ge__",
@ -38,7 +39,7 @@ class FilterAction(QueryAction):
Extracted in order to easily change table prefixes on complex relations. Extracted in order to easily change table prefixes on complex relations.
""" """
def __init__(self, filter_str: str, value: Any, model_cls: Type["Model"]) -> None: def __init__(self, filter_str: str, value: Any, model_cls: Type["Model"],) -> None:
super().__init__(query_str=filter_str, model_cls=model_cls) super().__init__(query_str=filter_str, model_cls=model_cls)
self.filter_value = value self.filter_value = value
self._escape_characters_in_clause() self._escape_characters_in_clause()
@ -124,6 +125,9 @@ class FilterAction(QueryAction):
self.filter_value = self.filter_value.pk self.filter_value = self.filter_value.pk
op_attr = FILTER_OPERATORS[self.operator] op_attr = FILTER_OPERATORS[self.operator]
if self.operator == "isnull":
op_attr = "is_" if self.filter_value else "isnot"
self.filter_value = None
clause = getattr(self.column, op_attr)(self.filter_value) clause = getattr(self.column, op_attr)(self.filter_value)
clause = self._compile_clause( clause = self._compile_clause(
clause, modifiers={"escape": "\\" if self.has_escaped_character else None}, clause, modifiers={"escape": "\\" if self.has_escaped_character else None},

View File

@ -1,6 +1,9 @@
import itertools import itertools
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, List, TYPE_CHECKING, Tuple, Type from enum import Enum
from typing import Any, Generator, List, TYPE_CHECKING, Tuple, Type
import sqlalchemy
import ormar # noqa I100 import ormar # noqa I100
from ormar.queryset.actions.filter_action import FilterAction from ormar.queryset.actions.filter_action import FilterAction
@ -10,6 +13,99 @@ if TYPE_CHECKING: # pragma no cover
from ormar import Model from ormar import Model
class FilterType(Enum):
AND = 1
OR = 2
class FilterGroup:
def __init__(
self, *args: Any, _filter_type: FilterType = FilterType.AND, **kwargs: Any,
) -> None:
self.filter_type = _filter_type
self.exclude = False
self._nested_groups: List["FilterGroup"] = list(args)
self._resolved = False
self.is_source_model_filter = False
self._kwargs_dict = kwargs
self.actions: List[FilterAction] = []
def resolve(
self,
model_cls: Type["Model"],
select_related: List = None,
filter_clauses: List = None,
) -> Tuple[List[FilterAction], List[str]]:
select_related = select_related if select_related is not None else []
filter_clauses = filter_clauses if filter_clauses is not None else []
qryclause = QueryClause(
model_cls=model_cls,
select_related=select_related,
filter_clauses=filter_clauses,
)
own_filter_clauses, select_related = qryclause.prepare_filter(
_own_only=True, **self._kwargs_dict
)
self.actions = own_filter_clauses
filter_clauses = filter_clauses + own_filter_clauses
self._resolved = True
if self._nested_groups:
for group in self._nested_groups:
if not group._resolved:
(filter_clauses, select_related) = group.resolve(
model_cls=model_cls,
select_related=select_related,
filter_clauses=filter_clauses,
)
self._is_self_model_group()
return filter_clauses, select_related
def _iter(self) -> Generator:
if not self._nested_groups:
yield from self.actions
return
for group in self._nested_groups:
yield from group._iter()
yield from self.actions
def _is_self_model_group(self) -> None:
if self.actions and self._nested_groups:
if all([action.is_source_model_filter for action in self.actions]) and all(
group.is_source_model_filter for group in self._nested_groups
):
self.is_source_model_filter = True
elif self.actions:
if all([action.is_source_model_filter for action in self.actions]):
self.is_source_model_filter = True
else:
if all(group.is_source_model_filter for group in self._nested_groups):
self.is_source_model_filter = True
def _get_text_clauses(self) -> List[sqlalchemy.sql.expression.TextClause]:
return [x.get_text_clause() for x in self._nested_groups] + [
x.get_text_clause() for x in self.actions
]
def get_text_clause(self) -> sqlalchemy.sql.expression.TextClause:
if self.filter_type == FilterType.AND:
clause = sqlalchemy.text(
"( " + str(sqlalchemy.sql.and_(*self._get_text_clauses())) + " )"
)
else:
clause = sqlalchemy.text(
"( " + str(sqlalchemy.sql.or_(*self._get_text_clauses())) + " )"
)
return clause
def or_(*args: Any, **kwargs: Any) -> FilterGroup:
return FilterGroup(_filter_type=FilterType.OR, *args, **kwargs)
def and_(*args: Any, **kwargs: Any) -> FilterGroup:
return FilterGroup(_filter_type=FilterType.AND, *args, **kwargs)
@dataclass @dataclass
class Prefix: class Prefix:
source_model: Type["Model"] source_model: Type["Model"]
@ -40,13 +136,15 @@ class QueryClause:
self.table = self.model_cls.Meta.table self.table = self.model_cls.Meta.table
def prepare_filter( # noqa: A003 def prepare_filter( # noqa: A003
self, **kwargs: Any self, _own_only: bool = False, **kwargs: Any
) -> Tuple[List[FilterAction], List[str]]: ) -> Tuple[List[FilterAction], List[str]]:
""" """
Main external access point that processes the clauses into sqlalchemy text Main external access point that processes the clauses into sqlalchemy text
clauses and updates select_related list with implicit related tables clauses and updates select_related list with implicit related tables
mentioned in select_related strings but not included in select_related. mentioned in select_related strings but not included in select_related.
:param _own_only:
:type _own_only:
:param kwargs: key, value pair with column names and values :param kwargs: key, value pair with column names and values
:type kwargs: Any :type kwargs: Any
:return: Tuple with list of where clauses and updated select_related list :return: Tuple with list of where clauses and updated select_related list
@ -56,12 +154,14 @@ class QueryClause:
pk_name = self.model_cls.get_column_alias(self.model_cls.Meta.pkname) pk_name = self.model_cls.get_column_alias(self.model_cls.Meta.pkname)
kwargs[pk_name] = kwargs.pop("pk") kwargs[pk_name] = kwargs.pop("pk")
filter_clauses, select_related = self._populate_filter_clauses(**kwargs) filter_clauses, select_related = self._populate_filter_clauses(
_own_only=_own_only, **kwargs
)
return filter_clauses, select_related return filter_clauses, select_related
def _populate_filter_clauses( def _populate_filter_clauses(
self, **kwargs: Any self, _own_only: bool, **kwargs: Any
) -> Tuple[List[FilterAction], List[str]]: ) -> Tuple[List[FilterAction], List[str]]:
""" """
Iterates all clauses and extracts used operator and field from related Iterates all clauses and extracts used operator and field from related
@ -74,6 +174,7 @@ class QueryClause:
:rtype: Tuple[List[sqlalchemy.sql.elements.TextClause], List[str]] :rtype: Tuple[List[sqlalchemy.sql.elements.TextClause], List[str]]
""" """
filter_clauses = self.filter_clauses filter_clauses = self.filter_clauses
own_filter_clauses = []
select_related = list(self._select_related) select_related = list(self._select_related)
for key, value in kwargs.items(): for key, value in kwargs.items():
@ -84,12 +185,14 @@ class QueryClause:
select_related=select_related select_related=select_related
) )
filter_clauses.append(filter_action) own_filter_clauses.append(filter_action)
self._register_complex_duplicates(select_related) self._register_complex_duplicates(select_related)
filter_clauses = self._switch_filter_action_prefixes( filter_clauses = self._switch_filter_action_prefixes(
filter_clauses=filter_clauses filter_clauses=filter_clauses + own_filter_clauses
) )
if _own_only:
return own_filter_clauses, select_related
return filter_clauses, select_related return filter_clauses, select_related
def _register_complex_duplicates(self, select_related: List[str]) -> None: def _register_complex_duplicates(self, select_related: List[str]) -> None:
@ -150,11 +253,17 @@ class QueryClause:
:return: list of actions with aliases changed if needed :return: list of actions with aliases changed if needed
:rtype: List[FilterAction] :rtype: List[FilterAction]
""" """
manager = self.model_cls.Meta.alias_manager
for action in filter_clauses: for action in filter_clauses:
new_alias = manager.resolve_relation_alias( if isinstance(action, FilterGroup):
self.model_cls, action.related_str for action2 in action._iter():
) self._verify_prefix_and_switch(action2)
else:
self._verify_prefix_and_switch(action)
return filter_clauses
def _verify_prefix_and_switch(self, action: "FilterAction") -> None:
manager = self.model_cls.Meta.alias_manager
new_alias = manager.resolve_relation_alias(self.model_cls, action.related_str)
if "__" in action.related_str and new_alias: if "__" in action.related_str and new_alias:
action.table_prefix = new_alias action.table_prefix = new_alias
return filter_clauses

View File

@ -266,7 +266,7 @@ class PrefetchQuery:
model_cls=clause_target, select_related=[], filter_clauses=[], model_cls=clause_target, select_related=[], filter_clauses=[],
) )
kwargs = {f"{filter_column}__in": ids} kwargs = {f"{filter_column}__in": ids}
filter_clauses, _ = qryclause.prepare_filter(**kwargs) filter_clauses, _ = qryclause.prepare_filter(_own_only=False, **kwargs)
return filter_clauses return filter_clauses
return [] return []

View File

@ -256,7 +256,9 @@ class QuerySet:
# print("\n", exp.compile(compile_kwargs={"literal_binds": True})) # print("\n", exp.compile(compile_kwargs={"literal_binds": True}))
return exp return exp
def filter(self, _exclude: bool = False, **kwargs: Any) -> "QuerySet": # noqa: A003 def filter( # noqa: A003
self, *args: Any, _exclude: bool = False, **kwargs: Any
) -> "QuerySet":
""" """
Allows you to filter by any `Model` attribute/field Allows you to filter by any `Model` attribute/field
as well as to fetch instances, with a filter across an FK relationship. as well as to fetch instances, with a filter across an FK relationship.
@ -268,6 +270,8 @@ class QuerySet:
* contains - like `album__name__contains='Mal'` (sql like) * contains - like `album__name__contains='Mal'` (sql like)
* icontains - like `album__name__icontains='mal'` (sql like case insensitive) * icontains - like `album__name__icontains='mal'` (sql like case insensitive)
* in - like `album__name__in=['Malibu', 'Barclay']` (sql in) * in - like `album__name__in=['Malibu', 'Barclay']` (sql in)
* isnull - like `album__name__isnull=True` (sql is null)
(isnotnull `album__name__isnull=False` (sql is not null))
* gt - like `position__gt=3` (sql >) * gt - like `position__gt=3` (sql >)
* gte - like `position__gte=3` (sql >=) * gte - like `position__gte=3` (sql >=)
* lt - like `position__lt=3` (sql <) * lt - like `position__lt=3` (sql <)
@ -284,12 +288,23 @@ class QuerySet:
:return: filtered QuerySet :return: filtered QuerySet
:rtype: QuerySet :rtype: QuerySet
""" """
filter_groups = []
if args:
for arg in args:
arg.resolve(
model_cls=self.model,
select_related=self._select_related,
filter_clauses=self.filter_clauses,
)
filter_groups.append(arg)
qryclause = QueryClause( qryclause = QueryClause(
model_cls=self.model, model_cls=self.model,
select_related=self._select_related, select_related=self._select_related,
filter_clauses=self.filter_clauses, filter_clauses=self.filter_clauses,
) )
filter_clauses, select_related = qryclause.prepare_filter(**kwargs) filter_clauses, select_related = qryclause.prepare_filter(**kwargs)
filter_clauses = filter_clauses + filter_groups
if _exclude: if _exclude:
exclude_clauses = filter_clauses exclude_clauses = filter_clauses
filter_clauses = self.filter_clauses filter_clauses = self.filter_clauses
@ -303,7 +318,7 @@ class QuerySet:
select_related=select_related, select_related=select_related,
) )
def exclude(self, **kwargs: Any) -> "QuerySet": # noqa: A003 def exclude(self, *args: Any, **kwargs: Any) -> "QuerySet": # noqa: A003
""" """
Works exactly the same as filter and all modifiers (suffixes) are the same, Works exactly the same as filter and all modifiers (suffixes) are the same,
but returns a *not* condition. but returns a *not* condition.
@ -322,7 +337,7 @@ class QuerySet:
:return: filtered QuerySet :return: filtered QuerySet
:rtype: QuerySet :rtype: QuerySet
""" """
return self.filter(_exclude=True, **kwargs) return self.filter(_exclude=True, *args, **kwargs)
def select_related(self, related: Union[List, str]) -> "QuerySet": def select_related(self, related: Union[List, str]) -> "QuerySet":
""" """

View File

@ -386,6 +386,8 @@ class QuerysetProxy:
* contains - like `album__name__contains='Mal'` (sql like) * contains - like `album__name__contains='Mal'` (sql like)
* icontains - like `album__name__icontains='mal'` (sql like case insensitive) * icontains - like `album__name__icontains='mal'` (sql like case insensitive)
* in - like `album__name__in=['Malibu', 'Barclay']` (sql in) * in - like `album__name__in=['Malibu', 'Barclay']` (sql in)
* isnull - like `album__name__isnull=True` (sql is null)
(isnotnull `album__name__isnull=False` (sql is not null))
* gt - like `position__gt=3` (sql >) * gt - like `position__gt=3` (sql >)
* gte - like `position__gte=3` (sql >=) * gte - like `position__gte=3` (sql >=)
* lt - like `position__lt=3` (sql <) * lt - like `position__lt=3` (sql <)

149
tests/test_filter_groups.py Normal file
View File

@ -0,0 +1,149 @@
from typing import Optional
import databases
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
class Author(ormar.Model):
class Meta(BaseMeta):
tablename = "authors"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class Book(ormar.Model):
class Meta(BaseMeta):
tablename = "books"
id: int = ormar.Integer(primary_key=True)
author: Optional[Author] = ormar.ForeignKey(Author)
title: str = ormar.String(max_length=100)
year: int = ormar.Integer(nullable=True)
def test_or_group():
result = ormar.or_(name="aa", books__title="bb")
result.resolve(model_cls=Author)
assert len(result.actions) == 2
assert result.actions[0].target_model == Author
assert result.actions[1].target_model == Book
assert (
str(result.get_text_clause()) == f"( authors.name = 'aa' OR "
f"{result.actions[1].table_prefix}"
f"_books.title = 'bb' )"
)
assert not result.is_source_model_filter
def test_and_group():
result = ormar.and_(name="aa", books__title="bb")
result.resolve(model_cls=Author)
assert len(result.actions) == 2
assert result.actions[0].target_model == Author
assert result.actions[1].target_model == Book
assert (
str(result.get_text_clause()) == f"( authors.name = 'aa' AND "
f"{result.actions[1].table_prefix}"
f"_books.title = 'bb' )"
)
assert not result.is_source_model_filter
def test_nested_and():
result = ormar.and_(
ormar.or_(name="aa", books__title="bb"), ormar.or_(name="cc", books__title="dd")
)
result.resolve(model_cls=Author)
assert len(result.actions) == 0
assert len(result._nested_groups) == 2
book_prefix = result._nested_groups[0].actions[1].table_prefix
assert (
str(result.get_text_clause()) == f"( ( authors.name = 'aa' OR "
f"{book_prefix}"
f"_books.title = 'bb' ) AND "
f"( authors.name = 'cc' OR "
f"{book_prefix}"
f"_books.title = 'dd' ) )"
)
assert not result.is_source_model_filter
def test_nested_group_and_action():
result = ormar.and_(ormar.or_(name="aa", books__title="bb"), books__title="dd")
result.resolve(model_cls=Author)
assert len(result.actions) == 1
assert len(result._nested_groups) == 1
book_prefix = result._nested_groups[0].actions[1].table_prefix
assert (
str(result.get_text_clause()) == f"( ( authors.name = 'aa' OR "
f"{book_prefix}"
f"_books.title = 'bb' ) AND "
f"{book_prefix}"
f"_books.title = 'dd' )"
)
assert not result.is_source_model_filter
def test_deeply_nested_or():
result = ormar.or_(
ormar.and_(
ormar.or_(name="aa", books__title="bb"),
ormar.or_(name="cc", books__title="dd"),
),
ormar.and_(
ormar.or_(books__year__lt=1900, books__title="11"),
ormar.or_(books__year__gt="xx", books__title="22"),
),
)
result.resolve(model_cls=Author)
assert len(result.actions) == 0
assert len(result._nested_groups) == 2
assert len(result._nested_groups[0]._nested_groups) == 2
book_prefix = result._nested_groups[0]._nested_groups[0].actions[1].table_prefix
result_qry = str(result.get_text_clause())
expected_qry = (
f"( ( ( authors.name = 'aa' OR {book_prefix}_books.title = 'bb' ) AND "
f"( authors.name = 'cc' OR {book_prefix}_books.title = 'dd' ) ) "
f"OR ( ( {book_prefix}_books.year < 1900 OR {book_prefix}_books.title = '11' ) AND "
f"( {book_prefix}_books.year > 'xx' OR {book_prefix}_books.title = '22' ) ) )"
)
assert result_qry.replace("\n", "") == expected_qry.replace("\n", "")
assert not result.is_source_model_filter
def test_one_model_group():
result = ormar.and_(year__gt=1900, title="bb")
result.resolve(model_cls=Book)
assert len(result.actions) == 2
assert len(result._nested_groups) == 0
assert result.is_source_model_filter
def test_one_model_nested_group():
result = ormar.and_(
ormar.or_(year__gt=1900, title="bb"), ormar.or_(year__lt=1800, title="aa")
)
result.resolve(model_cls=Book)
assert len(result.actions) == 0
assert len(result._nested_groups) == 2
assert result.is_source_model_filter
def test_one_model_with_group():
result = ormar.or_(ormar.and_(year__gt=1900, title="bb"), title="uu")
result.resolve(model_cls=Book)
assert len(result.actions) == 1
assert len(result._nested_groups) == 1
assert result.is_source_model_filter

View File

@ -0,0 +1,76 @@
from typing import Optional
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
class Author(ormar.Model):
class Meta(BaseMeta):
tablename = "authors"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class Book(ormar.Model):
class Meta(BaseMeta):
tablename = "books"
id: int = ormar.Integer(primary_key=True)
author: Optional[Author] = ormar.ForeignKey(Author)
title: str = ormar.String(max_length=100)
year: int = ormar.Integer(nullable=True)
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.drop_all(engine)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.mark.asyncio
async def test_is_null():
async with database:
tolkien = await Author.objects.create(name="J.R.R. Tolkien")
await Book.objects.create(author=tolkien, title="The Hobbit")
await Book.objects.create(
author=tolkien, title="The Lord of the Rings", year=1955
)
await Book.objects.create(author=tolkien, title="The Silmarillion", year=1977)
books = await Book.objects.all(year__isnull=True)
assert len(books) == 1
assert books[0].year is None
assert books[0].title == "The Hobbit"
books = await Book.objects.all(year__isnull=False)
assert len(books) == 2
tolkien = await Author.objects.select_related("books").get(
books__year__isnull=True
)
assert len(tolkien.books) == 1
assert tolkien.books[0].year is None
assert tolkien.books[0].title == "The Hobbit"
tolkien = await Author.objects.select_related("books").get(
books__year__isnull=False
)
assert len(tolkien.books) == 2
assert tolkien.books[0].year == 1955
assert tolkien.books[0].title == "The Lord of the Rings"

118
tests/test_or_filters.py Normal file
View File

@ -0,0 +1,118 @@
from typing import Optional
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
class Author(ormar.Model):
class Meta(BaseMeta):
tablename = "authors"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class Book(ormar.Model):
class Meta(BaseMeta):
tablename = "books"
id: int = ormar.Integer(primary_key=True)
author: Optional[Author] = ormar.ForeignKey(Author)
title: str = ormar.String(max_length=100)
year: int = ormar.Integer(nullable=True)
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.drop_all(engine)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.mark.asyncio
async def test_or_filters():
async with database:
tolkien = await Author(name="J.R.R. Tolkien").save()
await Book(author=tolkien, title="The Hobbit", year=1933).save()
await Book(author=tolkien, title="The Lord of the Rings", year=1955).save()
await Book(author=tolkien, title="The Silmarillion", year=1977).save()
sapkowski = await Author(name="Andrzej Sapkowski").save()
await Book(author=sapkowski, title="The Witcher", year=1990).save()
await Book(author=sapkowski, title="The Tower of Fools", year=2002).save()
books = (
await Book.objects.select_related("author")
.filter(ormar.or_(author__name="J.R.R. Tolkien", year__gt=1970))
.all()
)
assert len(books) == 5
books = (
await Book.objects.select_related("author")
.filter(ormar.or_(author__name="J.R.R. Tolkien", year__lt=1995))
.all()
)
assert len(books) == 4
assert not any([x.title == "The Tower of Fools" for x in books])
books = (
await Book.objects.select_related("author")
.filter(ormar.or_(year__gt=1960, year__lt=1940))
.filter(author__name="J.R.R. Tolkien")
.all()
)
assert len(books) == 2
assert books[0].title == "The Hobbit"
assert books[1].title == "The Silmarillion"
books = (
await Book.objects.select_related("author")
.filter(
ormar.or_(
ormar.and_(year__gt=1960, author__name="J.R.R. Tolkien"),
ormar.and_(year__lt=2000, author__name="Andrzej Sapkowski"),
)
)
.filter(title__startswith="The")
.all()
)
assert len(books) == 2
assert books[0].title == "The Silmarillion"
assert books[1].title == "The Witcher"
books = (
await Book.objects.select_related("author")
.exclude(
ormar.or_(
ormar.and_(year__gt=1960, author__name="J.R.R. Tolkien"),
ormar.and_(year__lt=2000, author__name="Andrzej Sapkowski"),
)
)
.filter(title__startswith="The")
.all()
)
assert len(books) == 3
assert not any([x.title in ["The Silmarillion", "The Witcher"] for x in books])
# TODO: Check / modify
# process and and or into filter groups (V)
# check exclude queries working (V)
# when limit and no sql do not allow main model and other models
# check complex prefixes properly resolved
# fix types for FilterAction and FilterGroup (?)