add select_all
This commit is contained in:
@ -1,3 +1,19 @@
|
||||
# 0.10.0
|
||||
|
||||
## Breaking
|
||||
|
||||
* Dropped supported for long deprecated notation of field definition in which you use ormar fields as type hints i.e. `test_field: ormar.Integger() = None`
|
||||
* Improved type hints -> `mypy` can properly resolve related models fields (`ForeignKey` and `ManyToMany`) as well as return types of `QuerySet` methods.
|
||||
Those mentioned are now returning proper model (i.e. `Book`) instead or `ormar.Model` type. There is still problem with reverse sides of relation and `QuerysetProxy` methods,
|
||||
to ease type hints now those return `Any`.
|
||||
|
||||
## Features
|
||||
|
||||
* add `select_all(follow: bool = False)` method to `QuerySet` and `QuerysetProxy`.
|
||||
It is an equivalent of the Model's `load_all()` method but can be used directly in a query.
|
||||
By default `select_all()` adds only directly related models, with `follow=True` also related models
|
||||
of related models are added without loops in relations.
|
||||
|
||||
# 0.9.9
|
||||
|
||||
## Features
|
||||
|
||||
@ -75,7 +75,7 @@ class UndefinedType: # pragma no cover
|
||||
|
||||
Undefined = UndefinedType()
|
||||
|
||||
__version__ = "0.9.9"
|
||||
__version__ = "0.10.0"
|
||||
__all__ = [
|
||||
"Integer",
|
||||
"BigInteger",
|
||||
|
||||
@ -14,7 +14,7 @@ from typing import (
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
Union,
|
||||
cast, no_type_check,
|
||||
cast,
|
||||
)
|
||||
|
||||
try:
|
||||
@ -47,7 +47,6 @@ from ormar.relations.relation_manager import RelationsManager
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar.models import Model
|
||||
from ormar.signals import SignalEmitter
|
||||
from ormar.queryset import QuerySet
|
||||
|
||||
IntStr = Union[int, str]
|
||||
DictStrAny = Dict[str, Any]
|
||||
@ -232,7 +231,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
super().__setattr__(name, value)
|
||||
self.set_save_status(False)
|
||||
|
||||
def __getattribute__(self, item: str): # noqa: CCR001
|
||||
def __getattribute__(self, item: str) -> Any: # noqa: CCR001
|
||||
"""
|
||||
Because we need to overwrite getting the attribute by ormar instead of pydantic
|
||||
as well as returning related models and not the value stored on the model the
|
||||
|
||||
@ -1,13 +1,15 @@
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
Generic, List,
|
||||
Generic,
|
||||
List,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
TypeVar, Union,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
@ -17,7 +19,7 @@ from sqlalchemy import bindparam
|
||||
|
||||
import ormar # noqa I100
|
||||
from ormar import MultipleMatches, NoMatch
|
||||
from ormar.exceptions import ModelError, ModelPersistenceError, QueryDefinitionError
|
||||
from ormar.exceptions import ModelPersistenceError, QueryDefinitionError
|
||||
from ormar.queryset import FilterQuery, SelectAction
|
||||
from ormar.queryset.actions.order_action import OrderAction
|
||||
from ormar.queryset.clause import FilterGroup, QueryClause
|
||||
@ -28,7 +30,6 @@ if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model
|
||||
from ormar.models import T
|
||||
from ormar.models.metaclass import ModelMeta
|
||||
from ormar.relations.querysetproxy import QuerysetProxy
|
||||
from ormar.models.excludable import ExcludableItems
|
||||
else:
|
||||
T = TypeVar("T", bound="Model")
|
||||
@ -65,7 +66,6 @@ class QuerySet(Generic[T]):
|
||||
self.order_bys = order_bys or []
|
||||
self.limit_sql_raw = limit_raw_sql
|
||||
|
||||
|
||||
@property
|
||||
def model_meta(self) -> "ModelMeta":
|
||||
"""
|
||||
@ -369,6 +369,32 @@ class QuerySet(Generic[T]):
|
||||
related = sorted(list(set(list(self._select_related) + related)))
|
||||
return self.rebuild_self(select_related=related,)
|
||||
|
||||
def select_all(self, follow: bool = False) -> "QuerySet[T]":
|
||||
"""
|
||||
By default adds only directly related models.
|
||||
|
||||
If follow=True is set it adds also related models of related models.
|
||||
|
||||
To not get stuck in an infinite loop as related models also keep a relation
|
||||
to parent model visited models set is kept.
|
||||
|
||||
That way already visited models that are nested are loaded, but the load do not
|
||||
follow them inside. So Model A -> Model B -> Model C -> Model A -> Model X
|
||||
will load second Model A but will never follow into Model X.
|
||||
Nested relations of those kind need to be loaded manually.
|
||||
|
||||
:param follow: flag to trigger deep save -
|
||||
by default only directly related models are saved
|
||||
with follow=True also related models of related models are saved
|
||||
:type follow: bool
|
||||
:return: reloaded Model
|
||||
:rtype: Model
|
||||
"""
|
||||
relations = list(self.model.extract_related_names())
|
||||
if follow:
|
||||
relations = self.model._iterate_related_models()
|
||||
return self.rebuild_self(select_related=relations,)
|
||||
|
||||
def prefetch_related(self, related: Union[List, str]) -> "QuerySet[T]":
|
||||
"""
|
||||
Allows to prefetch related models during query - but opposite to
|
||||
|
||||
@ -2,17 +2,19 @@ from _weakref import CallableProxyType
|
||||
from typing import ( # noqa: I100, I201
|
||||
Any,
|
||||
Dict,
|
||||
Generic, List,
|
||||
Generic,
|
||||
List,
|
||||
MutableSequence,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type, TypeVar, Union,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
|
||||
import ormar # noqa: I100, I202
|
||||
from ormar.exceptions import ModelPersistenceError, QueryDefinitionError
|
||||
|
||||
@ -35,10 +37,11 @@ class QuerysetProxy(Generic[T]):
|
||||
relation: "Relation"
|
||||
|
||||
def __init__(
|
||||
self, relation: "Relation",
|
||||
to: Type["T"],
|
||||
type_: "RelationType",
|
||||
qryset: "QuerySet[T]" = None
|
||||
self,
|
||||
relation: "Relation",
|
||||
to: Type["T"],
|
||||
type_: "RelationType",
|
||||
qryset: "QuerySet[T]" = None,
|
||||
) -> None:
|
||||
self.relation: Relation = relation
|
||||
self._queryset: Optional["QuerySet[T]"] = qryset
|
||||
@ -88,9 +91,7 @@ class QuerysetProxy(Generic[T]):
|
||||
rel_name = self.relation.field_name
|
||||
setattr(owner, rel_name, child)
|
||||
|
||||
def _register_related(
|
||||
self, child: Union["T", Sequence[Optional["T"]]]
|
||||
) -> None:
|
||||
def _register_related(self, child: Union["T", Sequence[Optional["T"]]]) -> None:
|
||||
"""
|
||||
Registers child/ children in parents RelationManager.
|
||||
|
||||
@ -418,7 +419,9 @@ class QuerysetProxy(Generic[T]):
|
||||
model = await self.queryset.get(pk=kwargs[pk_name])
|
||||
return await model.update(**kwargs)
|
||||
|
||||
def filter(self, *args: Any, **kwargs: Any) -> "QuerysetProxy[T]": # noqa: A003, A001
|
||||
def filter( # noqa: A003, A001
|
||||
self, *args: Any, **kwargs: Any
|
||||
) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
Allows you to filter by any `Model` attribute/field
|
||||
as well as to fetch instances, with a filter across an FK relationship.
|
||||
@ -449,9 +452,13 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.filter(*args, **kwargs)
|
||||
return self.__class__(relation=self.relation, type_=self.type_, to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def exclude(self, *args: Any, **kwargs: Any) -> "QuerysetProxy[T]": # noqa: A003, A001
|
||||
def exclude(
|
||||
self, *args: Any, **kwargs: Any
|
||||
) -> "QuerysetProxy[T]": # noqa: A003, A001
|
||||
"""
|
||||
Works exactly the same as filter and all modifiers (suffixes) are the same,
|
||||
but returns a *not* condition.
|
||||
@ -473,7 +480,35 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.exclude(*args, **kwargs)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def select_all(self, follow: bool = False) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
By default adds only directly related models.
|
||||
|
||||
If follow=True is set it adds also related models of related models.
|
||||
|
||||
To not get stuck in an infinite loop as related models also keep a relation
|
||||
to parent model visited models set is kept.
|
||||
|
||||
That way already visited models that are nested are loaded, but the load do not
|
||||
follow them inside. So Model A -> Model B -> Model C -> Model A -> Model X
|
||||
will load second Model A but will never follow into Model X.
|
||||
Nested relations of those kind need to be loaded manually.
|
||||
|
||||
:param follow: flag to trigger deep save -
|
||||
by default only directly related models are saved
|
||||
with follow=True also related models of related models are saved
|
||||
:type follow: bool
|
||||
:return: reloaded Model
|
||||
:rtype: Model
|
||||
"""
|
||||
queryset = self.queryset.select_all(follow=follow)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def select_related(self, related: Union[List, str]) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
@ -495,7 +530,9 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.select_related(related)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def prefetch_related(self, related: Union[List, str]) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
@ -518,7 +555,9 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.prefetch_related(related)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def paginate(self, page: int, page_size: int = 20) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
@ -535,7 +574,9 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerySet
|
||||
"""
|
||||
queryset = self.queryset.paginate(page=page, page_size=page_size)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def limit(self, limit_count: int) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
@ -549,7 +590,9 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.limit(limit_count)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def offset(self, offset: int) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
@ -563,7 +606,9 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.offset(offset)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def fields(self, columns: Union[List, str, Set, Dict]) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
@ -611,9 +656,13 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.fields(columns)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def exclude_fields(self, columns: Union[List, str, Set, Dict]) -> "QuerysetProxy[T]":
|
||||
def exclude_fields(
|
||||
self, columns: Union[List, str, Set, Dict]
|
||||
) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
With `exclude_fields()` you can select subset of model columns that will
|
||||
be excluded to limit the data load.
|
||||
@ -643,7 +692,9 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.exclude_fields(columns=columns)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
def order_by(self, columns: Union[List, str]) -> "QuerysetProxy[T]":
|
||||
"""
|
||||
@ -680,4 +731,6 @@ class QuerysetProxy(Generic[T]):
|
||||
:rtype: QuerysetProxy
|
||||
"""
|
||||
queryset = self.queryset.order_by(columns)
|
||||
return self.__class__(relation=self.relation, type_=self.type_,to=self.to, qryset=queryset)
|
||||
return self.__class__(
|
||||
relation=self.relation, type_=self.type_, to=self.to, qryset=queryset
|
||||
)
|
||||
|
||||
@ -1,6 +1,15 @@
|
||||
from enum import Enum
|
||||
from typing import Generic, List, Optional, Set, TYPE_CHECKING, Type, TypeVar, Union, \
|
||||
cast
|
||||
from typing import (
|
||||
Generic,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
import ormar # noqa I100
|
||||
from ormar.exceptions import RelationshipInstanceError # noqa I100
|
||||
@ -9,7 +18,6 @@ from ormar.relations.relation_proxy import RelationProxy
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar.relations import RelationsManager
|
||||
from ormar.models import Model, NewBaseModel, T
|
||||
from ormar.relations.relation_proxy import RelationProxy
|
||||
else:
|
||||
T = TypeVar("T", bound="Model")
|
||||
|
||||
|
||||
@ -52,7 +52,7 @@ class RelationProxy(Generic[T], list):
|
||||
|
||||
return self._related_field_name
|
||||
|
||||
def __getitem__(self, item) -> "T": # type: ignore
|
||||
def __getitem__(self, item: Any) -> "T": # type: ignore
|
||||
return super().__getitem__(item)
|
||||
|
||||
def __getattribute__(self, item: str) -> Any:
|
||||
|
||||
@ -86,6 +86,11 @@ async def test_load_all_fk_rel():
|
||||
assert hq.companies[0].name == "Banzai"
|
||||
assert hq.companies[0].founded == 1988
|
||||
|
||||
hq2 = await HQ.objects.select_all().get(name="Main")
|
||||
assert hq2.companies[0] == company
|
||||
assert hq2.companies[0].name == "Banzai"
|
||||
assert hq2.companies[0].founded == 1988
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_load_all_many_to_many():
|
||||
@ -106,6 +111,12 @@ async def test_load_all_many_to_many():
|
||||
assert hq.nicks[1] == nick2
|
||||
assert hq.nicks[1].name == "Bazinga20"
|
||||
|
||||
hq2 = await HQ.objects.select_all().get(name="Main")
|
||||
assert hq2.nicks[0] == nick1
|
||||
assert hq2.nicks[0].name == "BazingaO"
|
||||
assert hq2.nicks[1] == nick2
|
||||
assert hq2.nicks[1].name == "Bazinga20"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_load_all_with_order():
|
||||
@ -130,6 +141,16 @@ async def test_load_all_with_order():
|
||||
assert hq.nicks[0] == nick1
|
||||
assert hq.nicks[1] == nick2
|
||||
|
||||
hq2 = (
|
||||
await HQ.objects.select_all().order_by("-nicks__name").get(name="Main")
|
||||
)
|
||||
assert hq2.nicks[0] == nick2
|
||||
assert hq2.nicks[1] == nick1
|
||||
|
||||
hq3 = await HQ.objects.select_all().get(name="Main")
|
||||
assert hq3.nicks[0] == nick1
|
||||
assert hq3.nicks[1] == nick2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_loading_reversed_relation():
|
||||
@ -143,6 +164,9 @@ async def test_loading_reversed_relation():
|
||||
|
||||
assert company.hq == hq
|
||||
|
||||
company2 = await Company.objects.select_all().get(name="Banzai")
|
||||
assert company2.hq == hq
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_loading_nested():
|
||||
@ -174,11 +198,31 @@ async def test_loading_nested():
|
||||
assert hq.nicks[1].level.name == "Low"
|
||||
assert hq.nicks[1].level.language.name == "English"
|
||||
|
||||
hq2 = await HQ.objects.select_all(follow=True).get(name="Main")
|
||||
assert hq2.nicks[0] == nick1
|
||||
assert hq2.nicks[0].name == "BazingaO"
|
||||
assert hq2.nicks[0].level.name == "High"
|
||||
assert hq2.nicks[0].level.language.name == "English"
|
||||
|
||||
assert hq2.nicks[1] == nick2
|
||||
assert hq2.nicks[1].name == "Bazinga20"
|
||||
assert hq2.nicks[1].level.name == "Low"
|
||||
assert hq2.nicks[1].level.language.name == "English"
|
||||
|
||||
await hq.load_all(follow=True, exclude="nicks__level__language")
|
||||
assert len(hq.nicks) == 2
|
||||
assert hq.nicks[0].level.language is None
|
||||
assert hq.nicks[1].level.language is None
|
||||
|
||||
hq3 = (
|
||||
await HQ.objects.select_all(follow=True)
|
||||
.exclude_fields("nicks__level__language")
|
||||
.get(name="Main")
|
||||
)
|
||||
assert len(hq3.nicks) == 2
|
||||
assert hq3.nicks[0].level.language is None
|
||||
assert hq3.nicks[1].level.language is None
|
||||
|
||||
await hq.load_all(follow=True, exclude="nicks__level__language__level")
|
||||
assert len(hq.nicks) == 2
|
||||
assert hq.nicks[0].level.language is not None
|
||||
|
||||
Reference in New Issue
Block a user