update docs, add load_all(), tests for load_all, make through field optional
This commit is contained in:
@ -1,5 +1,5 @@
|
||||
import sys
|
||||
from typing import Any, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
||||
from typing import Any, List, Optional, TYPE_CHECKING, Tuple, Type, Union, cast
|
||||
|
||||
from pydantic.typing import ForwardRef, evaluate_forwardref
|
||||
import ormar # noqa: I100
|
||||
@ -43,7 +43,7 @@ def populate_m2m_params_based_on_to_model(
|
||||
|
||||
def ManyToMany(
|
||||
to: "ToType",
|
||||
through: "ToType",
|
||||
through: Optional["ToType"] = None,
|
||||
*,
|
||||
name: str = None,
|
||||
unique: bool = False,
|
||||
@ -212,3 +212,21 @@ class ManyToManyField(ForeignKeyField, ormar.QuerySetProtocol, ormar.RelationPro
|
||||
:rtype: Type["Model"]
|
||||
"""
|
||||
return cls.through
|
||||
|
||||
@classmethod
|
||||
def create_default_through_model(cls) -> None:
|
||||
"""
|
||||
Creates default empty through model if no additional fields are required.
|
||||
"""
|
||||
owner_name = cls.owner.get_name(lower=False)
|
||||
to_name = cls.to.get_name(lower=False)
|
||||
class_name = f"{owner_name}{to_name}"
|
||||
table_name = f"{owner_name.lower()}s_{to_name.lower()}s"
|
||||
new_meta_namespace = {
|
||||
"tablename": table_name,
|
||||
"database": cls.owner.Meta.database,
|
||||
"metadata": cls.owner.Meta.metadata,
|
||||
}
|
||||
new_meta = type("Meta", (), new_meta_namespace)
|
||||
through_model = type(class_name, (ormar.Model,), {"Meta": new_meta})
|
||||
cls.through = cast(Type["Model"], through_model)
|
||||
|
||||
@ -154,6 +154,8 @@ def sqlalchemy_columns_from_model_fields(
|
||||
pkname = None
|
||||
for field_name, field in model_fields.items():
|
||||
field.owner = new_model
|
||||
if field.is_multi and not field.through:
|
||||
field.create_default_through_model()
|
||||
if field.primary_key:
|
||||
pkname = check_pk_column_validity(field_name, field, pkname)
|
||||
if not field.pydantic_only and not field.virtual and not field.is_multi:
|
||||
|
||||
@ -209,13 +209,14 @@ def update_attrs_from_base_meta( # noqa: CCR001
|
||||
setattr(attrs["Meta"], param, parent_value)
|
||||
|
||||
|
||||
def copy_and_replace_m2m_through_model(
|
||||
def copy_and_replace_m2m_through_model( # noqa: CFQ002
|
||||
field: Type[ManyToManyField],
|
||||
field_name: str,
|
||||
table_name: str,
|
||||
parent_fields: Dict,
|
||||
attrs: Dict,
|
||||
meta: ModelMeta,
|
||||
base_class: Type["Model"],
|
||||
) -> None:
|
||||
"""
|
||||
Clones class with Through model for m2m relations, appends child name to the name
|
||||
@ -229,6 +230,8 @@ def copy_and_replace_m2m_through_model(
|
||||
|
||||
Removes the original sqlalchemy table from metadata if it was not removed.
|
||||
|
||||
:param base_class: base class model
|
||||
:type base_class: Type["Model"]
|
||||
:param field: field with relations definition
|
||||
:type field: Type[ManyToManyField]
|
||||
:param field_name: name of the relation field
|
||||
@ -249,6 +252,10 @@ def copy_and_replace_m2m_through_model(
|
||||
copy_field.related_name = related_name # type: ignore
|
||||
|
||||
through_class = field.through
|
||||
if not through_class:
|
||||
field.owner = base_class
|
||||
field.create_default_through_model()
|
||||
through_class = field.through
|
||||
new_meta: ormar.ModelMeta = type( # type: ignore
|
||||
"Meta", (), dict(through_class.Meta.__dict__),
|
||||
)
|
||||
@ -338,6 +345,7 @@ def copy_data_from_parent_model( # noqa: CCR001
|
||||
parent_fields=parent_fields,
|
||||
attrs=attrs,
|
||||
meta=meta,
|
||||
base_class=base_class, # type: ignore
|
||||
)
|
||||
|
||||
elif field.is_relation and field.related_name:
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
from collections import OrderedDict
|
||||
from typing import List, Sequence, TYPE_CHECKING
|
||||
from typing import List, TYPE_CHECKING
|
||||
|
||||
import ormar
|
||||
|
||||
@ -17,7 +17,7 @@ class MergeModelMixin:
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def merge_instances_list(cls, result_rows: Sequence["Model"]) -> Sequence["Model"]:
|
||||
def merge_instances_list(cls, result_rows: List["Model"]) -> List["Model"]:
|
||||
"""
|
||||
Merges a list of models into list of unique models.
|
||||
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import inspect
|
||||
from typing import List, Optional, Set, TYPE_CHECKING
|
||||
from typing import List, Optional, Set, TYPE_CHECKING, Type, Union
|
||||
|
||||
|
||||
class RelationMixin:
|
||||
@ -8,7 +8,7 @@ class RelationMixin:
|
||||
"""
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import ModelMeta
|
||||
from ormar import ModelMeta, Model
|
||||
|
||||
Meta: ModelMeta
|
||||
_related_names: Optional[Set]
|
||||
@ -63,7 +63,7 @@ class RelationMixin:
|
||||
return related_fields
|
||||
|
||||
@classmethod
|
||||
def extract_related_names(cls) -> Set:
|
||||
def extract_related_names(cls) -> Set[str]:
|
||||
"""
|
||||
Returns List of fields names for all relations declared on a model.
|
||||
List is cached in cls._related_names for quicker access.
|
||||
@ -118,3 +118,50 @@ class RelationMixin:
|
||||
name for name in related_names if cls.Meta.model_fields[name].nullable
|
||||
}
|
||||
return related_names
|
||||
|
||||
@classmethod
|
||||
def _iterate_related_models(
|
||||
cls,
|
||||
visited: Set[Union[Type["Model"], Type["RelationMixin"]]] = None,
|
||||
source_relation: str = None,
|
||||
source_model: Union[Type["Model"], Type["RelationMixin"]] = None,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Iterates related models recursively to extract relation strings of
|
||||
nested not visited models.
|
||||
|
||||
:param visited: set of already visited models
|
||||
:type visited: Set[str]
|
||||
:param source_relation: name of the current relation
|
||||
:type source_relation: str
|
||||
:param source_model: model from which relation comes in nested relations
|
||||
:type source_model: Type["Model"]
|
||||
:return: list of relation strings to be passed to select_related
|
||||
:rtype: List[str]
|
||||
"""
|
||||
visited = visited or set()
|
||||
visited.add(cls)
|
||||
relations = cls.extract_related_names()
|
||||
processed_relations = []
|
||||
for relation in relations:
|
||||
target_model = cls.Meta.model_fields[relation].to
|
||||
if source_model and target_model == source_model:
|
||||
continue
|
||||
if target_model not in visited:
|
||||
visited.add(target_model)
|
||||
deep_relations = target_model._iterate_related_models(
|
||||
visited=visited, source_relation=relation, source_model=cls
|
||||
)
|
||||
processed_relations.extend(deep_relations)
|
||||
# TODO add test for circular deps
|
||||
else: # pragma: no cover
|
||||
processed_relations.append(relation)
|
||||
if processed_relations:
|
||||
final_relations = [
|
||||
f"{source_relation + '__' if source_relation else ''}{relation}"
|
||||
for relation in processed_relations
|
||||
]
|
||||
else:
|
||||
final_relations = [source_relation] if source_relation else []
|
||||
|
||||
return final_relations
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
import ormar.queryset # noqa I100
|
||||
@ -265,3 +268,45 @@ class Model(ModelRow):
|
||||
self.update_from_dict(kwargs)
|
||||
self.set_save_status(True)
|
||||
return self
|
||||
|
||||
async def load_all(
|
||||
self, follow: bool = False, exclude: Union[List, str, Set, Dict] = None
|
||||
) -> "Model":
|
||||
"""
|
||||
Allow to refresh existing Models fields from database.
|
||||
Performs refresh of the related models fields.
|
||||
|
||||
By default loads only self and the directly related ones.
|
||||
|
||||
If follow=True is set it loads also related models of related models.
|
||||
|
||||
To not get stuck in an infinite loop as related models also keep a relation
|
||||
to parent model visited models set is kept.
|
||||
|
||||
That way already visited models that are nested are loaded, but the load do not
|
||||
follow them inside. So Model A -> Model B -> Model C -> Model A -> Model X
|
||||
will load second Model A but will never follow into Model X.
|
||||
Nested relations of those kind need to be loaded manually.
|
||||
|
||||
:raises NoMatch: If given pk is not found in database.
|
||||
|
||||
:param exclude: related models to exclude
|
||||
:type exclude: Union[List, str, Set, Dict]
|
||||
:param follow: flag to trigger deep save -
|
||||
by default only directly related models are saved
|
||||
with follow=True also related models of related models are saved
|
||||
:type follow: bool
|
||||
:return: reloaded Model
|
||||
:rtype: Model
|
||||
"""
|
||||
relations = list(self.extract_related_names())
|
||||
if follow:
|
||||
relations = self._iterate_related_models()
|
||||
queryset = self.__class__.objects
|
||||
print(relations)
|
||||
if exclude:
|
||||
queryset = queryset.exclude_fields(exclude)
|
||||
instance = await queryset.select_related(relations).get(pk=self.pk)
|
||||
self._orm.clear()
|
||||
self.update_from_dict(instance.dict())
|
||||
return self
|
||||
|
||||
@ -88,6 +88,7 @@ class ModelRow(NewBaseModel):
|
||||
current_relation_str=current_relation_str,
|
||||
source_model=source_model, # type: ignore
|
||||
proxy_source_model=proxy_source_model, # type: ignore
|
||||
table_prefix=table_prefix,
|
||||
)
|
||||
item = cls.extract_prefixed_table_columns(
|
||||
item=item, row=row, table_prefix=table_prefix, excludable=excludable
|
||||
@ -110,6 +111,7 @@ class ModelRow(NewBaseModel):
|
||||
source_model: Type["Model"],
|
||||
related_models: Any,
|
||||
excludable: ExcludableItems,
|
||||
table_prefix: str,
|
||||
current_relation_str: str = None,
|
||||
proxy_source_model: Type["Model"] = None,
|
||||
) -> dict:
|
||||
@ -143,15 +145,20 @@ class ModelRow(NewBaseModel):
|
||||
"""
|
||||
|
||||
for related in related_models:
|
||||
field = cls.Meta.model_fields[related]
|
||||
field = cast(Type["ForeignKeyField"], field)
|
||||
model_cls = field.to
|
||||
model_excludable = excludable.get(
|
||||
model_cls=cast(Type["Model"], cls), alias=table_prefix
|
||||
)
|
||||
if model_excludable.is_excluded(related):
|
||||
return item
|
||||
|
||||
relation_str = (
|
||||
"__".join([current_relation_str, related])
|
||||
if current_relation_str
|
||||
else related
|
||||
)
|
||||
field = cls.Meta.model_fields[related]
|
||||
field = cast(Type["ForeignKeyField"], field)
|
||||
model_cls = field.to
|
||||
|
||||
remainder = None
|
||||
if isinstance(related_models, dict) and related_models[related]:
|
||||
remainder = related_models[related]
|
||||
|
||||
@ -148,8 +148,8 @@ class QuerySet:
|
||||
)
|
||||
|
||||
async def _prefetch_related_models(
|
||||
self, models: Sequence[Optional["Model"]], rows: List
|
||||
) -> Sequence[Optional["Model"]]:
|
||||
self, models: List[Optional["Model"]], rows: List
|
||||
) -> List[Optional["Model"]]:
|
||||
"""
|
||||
Performs prefetch query for selected models names.
|
||||
|
||||
@ -169,7 +169,7 @@ class QuerySet:
|
||||
)
|
||||
return await query.prefetch_related(models=models, rows=rows) # type: ignore
|
||||
|
||||
def _process_query_result_rows(self, rows: List) -> Sequence[Optional["Model"]]:
|
||||
def _process_query_result_rows(self, rows: List) -> List[Optional["Model"]]:
|
||||
"""
|
||||
Process database rows and initialize ormar Model from each of the rows.
|
||||
|
||||
|
||||
@ -68,6 +68,14 @@ class Relation:
|
||||
else None
|
||||
)
|
||||
|
||||
def clear(self) -> None:
|
||||
if self._type in (RelationType.PRIMARY, RelationType.THROUGH):
|
||||
self.related_models = None
|
||||
self._owner.__dict__[self.field_name] = None
|
||||
elif self.related_models is not None:
|
||||
self.related_models._clear()
|
||||
self._owner.__dict__[self.field_name] = []
|
||||
|
||||
@property
|
||||
def through(self) -> Type["Model"]:
|
||||
if not self._through: # pragma: no cover
|
||||
|
||||
@ -26,37 +26,6 @@ class RelationsManager:
|
||||
for field in self._related_fields:
|
||||
self._add_relation(field)
|
||||
|
||||
def _get_relation_type(self, field: Type["BaseField"]) -> RelationType:
|
||||
"""
|
||||
Returns type of the relation declared on a field.
|
||||
|
||||
:param field: field with relation declaration
|
||||
:type field: Type[BaseField]
|
||||
:return: type of the relation defined on field
|
||||
:rtype: RelationType
|
||||
"""
|
||||
if field.is_multi:
|
||||
return RelationType.MULTIPLE
|
||||
if field.is_through:
|
||||
return RelationType.THROUGH
|
||||
return RelationType.PRIMARY if not field.virtual else RelationType.REVERSE
|
||||
|
||||
def _add_relation(self, field: Type["BaseField"]) -> None:
|
||||
"""
|
||||
Registers relation in the manager.
|
||||
Adds Relation instance under field.name.
|
||||
|
||||
:param field: field with relation declaration
|
||||
:type field: Type[BaseField]
|
||||
"""
|
||||
self._relations[field.name] = Relation(
|
||||
manager=self,
|
||||
type_=self._get_relation_type(field),
|
||||
field_name=field.name,
|
||||
to=field.to,
|
||||
through=getattr(field, "through", None),
|
||||
)
|
||||
|
||||
def __contains__(self, item: str) -> bool:
|
||||
"""
|
||||
Checks if relation with given name is already registered.
|
||||
@ -68,6 +37,10 @@ class RelationsManager:
|
||||
"""
|
||||
return item in self._related_names
|
||||
|
||||
def clear(self) -> None:
|
||||
for relation in self._relations.values():
|
||||
relation.clear()
|
||||
|
||||
def get(self, name: str) -> Optional[Union["Model", Sequence["Model"]]]:
|
||||
"""
|
||||
Returns the related model/models if relation is set.
|
||||
@ -83,20 +56,6 @@ class RelationsManager:
|
||||
return relation.get()
|
||||
return None # pragma nocover
|
||||
|
||||
def _get(self, name: str) -> Optional[Relation]:
|
||||
"""
|
||||
Returns the actual relation and not the related model(s).
|
||||
|
||||
:param name: name of the relation
|
||||
:type name: str
|
||||
:return: Relation instance
|
||||
:rtype: ormar.relations.relation.Relation
|
||||
"""
|
||||
relation = self._relations.get(name, None)
|
||||
if relation is not None:
|
||||
return relation
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def add(parent: "Model", child: "Model", field: Type["ForeignKeyField"],) -> None:
|
||||
"""
|
||||
@ -164,3 +123,48 @@ class RelationsManager:
|
||||
relation_name = item.Meta.model_fields[name].get_related_name()
|
||||
item._orm.remove(name, parent)
|
||||
parent._orm.remove(relation_name, item)
|
||||
|
||||
def _get(self, name: str) -> Optional[Relation]:
|
||||
"""
|
||||
Returns the actual relation and not the related model(s).
|
||||
|
||||
:param name: name of the relation
|
||||
:type name: str
|
||||
:return: Relation instance
|
||||
:rtype: ormar.relations.relation.Relation
|
||||
"""
|
||||
relation = self._relations.get(name, None)
|
||||
if relation is not None:
|
||||
return relation
|
||||
return None
|
||||
|
||||
def _get_relation_type(self, field: Type["BaseField"]) -> RelationType:
|
||||
"""
|
||||
Returns type of the relation declared on a field.
|
||||
|
||||
:param field: field with relation declaration
|
||||
:type field: Type[BaseField]
|
||||
:return: type of the relation defined on field
|
||||
:rtype: RelationType
|
||||
"""
|
||||
if field.is_multi:
|
||||
return RelationType.MULTIPLE
|
||||
if field.is_through:
|
||||
return RelationType.THROUGH
|
||||
return RelationType.PRIMARY if not field.virtual else RelationType.REVERSE
|
||||
|
||||
def _add_relation(self, field: Type["BaseField"]) -> None:
|
||||
"""
|
||||
Registers relation in the manager.
|
||||
Adds Relation instance under field.name.
|
||||
|
||||
:param field: field with relation declaration
|
||||
:type field: Type[BaseField]
|
||||
"""
|
||||
self._relations[field.name] = Relation(
|
||||
manager=self,
|
||||
type_=self._get_relation_type(field),
|
||||
field_name=field.name,
|
||||
to=field.to,
|
||||
through=getattr(field, "through", None),
|
||||
)
|
||||
|
||||
@ -75,6 +75,9 @@ class RelationProxy(list):
|
||||
self._initialize_queryset()
|
||||
return getattr(self.queryset_proxy, item)
|
||||
|
||||
def _clear(self) -> None:
|
||||
super().clear()
|
||||
|
||||
def _initialize_queryset(self) -> None:
|
||||
"""
|
||||
Initializes the QuerySetProxy if not yet initialized.
|
||||
|
||||
Reference in New Issue
Block a user