Refactor in join in order to make possibility for nested duplicated relations (and it was a mess :D)
This commit is contained in:
@ -1,4 +1,5 @@
|
||||
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type
|
||||
import itertools
|
||||
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type
|
||||
|
||||
import ormar # noqa: I100
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
@ -109,3 +110,32 @@ def validate_related_names_in_relations( # noqa CCR001
|
||||
f"\nTip: provide different related_name for FK and/or M2M fields"
|
||||
)
|
||||
previous_related_names.append(field.related_name)
|
||||
|
||||
|
||||
def group_related_list(list_: List) -> Dict:
|
||||
"""
|
||||
Translates the list of related strings into a dictionary.
|
||||
That way nested models are grouped to traverse them in a right order
|
||||
and to avoid repetition.
|
||||
|
||||
Sample: ["people__houses", "people__cars__models", "people__cars__colors"]
|
||||
will become:
|
||||
{'people': {'houses': [], 'cars': ['models', 'colors']}}
|
||||
|
||||
:param list_: list of related models used in select related
|
||||
:type list_: List[str]
|
||||
:return: list converted to dictionary to avoid repetition and group nested models
|
||||
:rtype: Dict[str, List]
|
||||
"""
|
||||
test_dict: Dict[str, Any] = dict()
|
||||
grouped = itertools.groupby(list_, key=lambda x: x.split("__")[0])
|
||||
for key, group in grouped:
|
||||
group_list = list(group)
|
||||
new = [
|
||||
"__".join(x.split("__")[1:]) for x in group_list if len(x.split("__")) > 1
|
||||
]
|
||||
if any("__" in x for x in new):
|
||||
test_dict[key] = group_related_list(new)
|
||||
else:
|
||||
test_dict[key] = new
|
||||
return test_dict
|
||||
|
||||
@ -221,7 +221,7 @@ def update_attrs_and_fields(
|
||||
|
||||
:param attrs: new namespace for class being constructed
|
||||
:type attrs: Dict
|
||||
:param new_attrs: part of the namespace extracted from parent class
|
||||
:param new_attrs: related of the namespace extracted from parent class
|
||||
:type new_attrs: Dict
|
||||
:param model_fields: ormar fields in defined in current class
|
||||
:type model_fields: Dict[str, BaseField]
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import itertools
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
@ -18,38 +17,9 @@ import ormar.queryset # noqa I100
|
||||
from ormar.exceptions import ModelPersistenceError, NoMatch
|
||||
from ormar.fields.many_to_many import ManyToManyField
|
||||
from ormar.models import NewBaseModel # noqa I100
|
||||
from ormar.models.helpers.models import group_related_list
|
||||
from ormar.models.metaclass import ModelMeta
|
||||
|
||||
|
||||
def group_related_list(list_: List) -> Dict:
|
||||
"""
|
||||
Translates the list of related strings into a dictionary.
|
||||
That way nested models are grouped to traverse them in a right order
|
||||
and to avoid repetition.
|
||||
|
||||
Sample: ["people__houses", "people__cars__models", "people__cars__colors"]
|
||||
will become:
|
||||
{'people': {'houses': [], 'cars': ['models', 'colors']}}
|
||||
|
||||
:param list_: list of related models used in select related
|
||||
:type list_: List[str]
|
||||
:return: list converted to dictionary to avoid repetition and group nested models
|
||||
:rtype: Dict[str, List]
|
||||
"""
|
||||
test_dict: Dict[str, Any] = dict()
|
||||
grouped = itertools.groupby(list_, key=lambda x: x.split("__")[0])
|
||||
for key, group in grouped:
|
||||
group_list = list(group)
|
||||
new = [
|
||||
"__".join(x.split("__")[1:]) for x in group_list if len(x.split("__")) > 1
|
||||
]
|
||||
if any("__" in x for x in new):
|
||||
test_dict[key] = group_related_list(new)
|
||||
else:
|
||||
test_dict[key] = new
|
||||
return test_dict
|
||||
|
||||
|
||||
if TYPE_CHECKING: # pragma nocover
|
||||
from ormar import QuerySet
|
||||
|
||||
@ -73,9 +43,11 @@ class Model(NewBaseModel):
|
||||
select_related: List = None,
|
||||
related_models: Any = None,
|
||||
previous_model: Type[T] = None,
|
||||
source_model: Type[T] = None,
|
||||
related_name: str = None,
|
||||
fields: Optional[Union[Dict, Set]] = None,
|
||||
exclude_fields: Optional[Union[Dict, Set]] = None,
|
||||
current_relation_str: str = None,
|
||||
) -> Optional[T]:
|
||||
"""
|
||||
Model method to convert raw sql row from database into ormar.Model instance.
|
||||
@ -112,7 +84,10 @@ class Model(NewBaseModel):
|
||||
item: Dict[str, Any] = {}
|
||||
select_related = select_related or []
|
||||
related_models = related_models or []
|
||||
table_prefix = ""
|
||||
|
||||
if select_related:
|
||||
source_model = cls
|
||||
related_models = group_related_list(select_related)
|
||||
|
||||
rel_name2 = related_name
|
||||
@ -135,11 +110,15 @@ class Model(NewBaseModel):
|
||||
previous_model = through_field.through # type: ignore
|
||||
|
||||
if previous_model and rel_name2:
|
||||
table_prefix = cls.Meta.alias_manager.resolve_relation_alias(
|
||||
previous_model, rel_name2
|
||||
)
|
||||
else:
|
||||
table_prefix = ""
|
||||
# TODO finish duplicated nested relation or remove this
|
||||
if current_relation_str and "__" in current_relation_str and source_model:
|
||||
table_prefix = cls.Meta.alias_manager.resolve_relation_alias(
|
||||
from_model=source_model, relation_name=current_relation_str
|
||||
)
|
||||
if not table_prefix:
|
||||
table_prefix = cls.Meta.alias_manager.resolve_relation_alias(
|
||||
from_model=previous_model, relation_name=rel_name2
|
||||
)
|
||||
|
||||
item = cls.populate_nested_models_from_row(
|
||||
item=item,
|
||||
@ -147,6 +126,8 @@ class Model(NewBaseModel):
|
||||
related_models=related_models,
|
||||
fields=fields,
|
||||
exclude_fields=exclude_fields,
|
||||
current_relation_str=current_relation_str,
|
||||
source_model=source_model,
|
||||
)
|
||||
item = cls.extract_prefixed_table_columns(
|
||||
item=item,
|
||||
@ -163,8 +144,6 @@ class Model(NewBaseModel):
|
||||
)
|
||||
instance = cls(**item)
|
||||
instance.set_save_status(True)
|
||||
else:
|
||||
instance = None
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
@ -175,6 +154,8 @@ class Model(NewBaseModel):
|
||||
related_models: Any,
|
||||
fields: Optional[Union[Dict, Set]] = None,
|
||||
exclude_fields: Optional[Union[Dict, Set]] = None,
|
||||
current_relation_str: str = None,
|
||||
source_model: Type[T] = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Traverses structure of related models and populates the nested models
|
||||
@ -202,35 +183,31 @@ class Model(NewBaseModel):
|
||||
and values are database values
|
||||
:rtype: Dict
|
||||
"""
|
||||
|
||||
for related in related_models:
|
||||
relation_str = (
|
||||
"__".join([current_relation_str, related])
|
||||
if current_relation_str
|
||||
else related
|
||||
)
|
||||
fields = cls.get_included(fields, related)
|
||||
exclude_fields = cls.get_excluded(exclude_fields, related)
|
||||
model_cls = cls.Meta.model_fields[related].to
|
||||
|
||||
remainder = None
|
||||
if isinstance(related_models, dict) and related_models[related]:
|
||||
first_part, remainder = related, related_models[related]
|
||||
model_cls = cls.Meta.model_fields[first_part].to
|
||||
|
||||
fields = cls.get_included(fields, first_part)
|
||||
exclude_fields = cls.get_excluded(exclude_fields, first_part)
|
||||
|
||||
child = model_cls.from_row(
|
||||
row,
|
||||
related_models=remainder,
|
||||
previous_model=cls,
|
||||
related_name=related,
|
||||
fields=fields,
|
||||
exclude_fields=exclude_fields,
|
||||
)
|
||||
item[model_cls.get_column_name_from_alias(first_part)] = child
|
||||
else:
|
||||
model_cls = cls.Meta.model_fields[related].to
|
||||
fields = cls.get_included(fields, related)
|
||||
exclude_fields = cls.get_excluded(exclude_fields, related)
|
||||
child = model_cls.from_row(
|
||||
row,
|
||||
previous_model=cls,
|
||||
related_name=related,
|
||||
fields=fields,
|
||||
exclude_fields=exclude_fields,
|
||||
)
|
||||
item[model_cls.get_column_name_from_alias(related)] = child
|
||||
remainder = related_models[related]
|
||||
child = model_cls.from_row(
|
||||
row,
|
||||
related_models=remainder,
|
||||
previous_model=cls,
|
||||
related_name=related,
|
||||
fields=fields,
|
||||
exclude_fields=exclude_fields,
|
||||
current_relation_str=relation_str,
|
||||
source_model=source_model,
|
||||
)
|
||||
item[model_cls.get_column_name_from_alias(related)] = child
|
||||
|
||||
return item
|
||||
|
||||
@ -251,7 +228,7 @@ class Model(NewBaseModel):
|
||||
All joined tables have prefixes to allow duplicate column names,
|
||||
as well as duplicated joins to the same table from multiple different tables.
|
||||
|
||||
Extracted fields populates the item dict later used to construct a Model.
|
||||
Extracted fields populates the related dict later used to construct a Model.
|
||||
|
||||
Used in Model.from_row and PrefetchQuery._populate_rows methods.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user