refactor and cleanup - drop of resolving relation names as not fully proper, extract mixins from modelproxy to be more maintainable, add some docstrings
This commit is contained in:
@ -1,5 +1,4 @@
|
||||
import inspect
|
||||
from collections import OrderedDict
|
||||
from typing import (
|
||||
AbstractSet,
|
||||
Any,
|
||||
@ -8,25 +7,22 @@ from typing import (
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
import ormar # noqa: I100
|
||||
from ormar.exceptions import ModelPersistenceError
|
||||
from ormar.fields import BaseField, ManyToManyField
|
||||
from ormar.fields import BaseField
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.models.metaclass import ModelMeta
|
||||
from ormar.models.mixins import AliasMixin, MergeModelMixin, PrefetchQueryMixin
|
||||
from ormar.queryset.utils import translate_list_to_dict, update
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model
|
||||
from ormar.models import NewBaseModel
|
||||
|
||||
T = TypeVar("T", bound=Model)
|
||||
IntStr = Union[int, str]
|
||||
@ -36,7 +32,7 @@ if TYPE_CHECKING: # pragma no cover
|
||||
Field = TypeVar("Field", bound=BaseField)
|
||||
|
||||
|
||||
class ModelTableProxy:
|
||||
class ModelTableProxy(PrefetchQueryMixin, MergeModelMixin, AliasMixin):
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
Meta: ModelMeta
|
||||
_related_names: Optional[Set]
|
||||
@ -46,76 +42,6 @@ class ModelTableProxy:
|
||||
_props: Set
|
||||
dict: Callable # noqa: A001, VNE003
|
||||
|
||||
def _extract_own_model_fields(self) -> Dict:
|
||||
related_names = self.extract_related_names()
|
||||
self_fields = self.dict(exclude=related_names)
|
||||
return self_fields
|
||||
|
||||
@classmethod
|
||||
def get_related_field_name(cls, target_field: Type["BaseField"]) -> str:
|
||||
if issubclass(target_field, ormar.fields.ManyToManyField):
|
||||
return cls.resolve_relation_name(
|
||||
target_field.through, cls, explicit_multi=True
|
||||
)
|
||||
if target_field.virtual:
|
||||
return target_field.related_name or cls.get_name() + "s"
|
||||
return target_field.to.Meta.pkname
|
||||
|
||||
@staticmethod
|
||||
def get_clause_target_and_filter_column_name(
|
||||
parent_model: Type["Model"],
|
||||
target_model: Type["Model"],
|
||||
reverse: bool,
|
||||
related: str,
|
||||
) -> Tuple[Type["Model"], str]:
|
||||
if reverse:
|
||||
field_name = (
|
||||
parent_model.Meta.model_fields[related].related_name
|
||||
or parent_model.get_name() + "s"
|
||||
)
|
||||
field = target_model.Meta.model_fields[field_name]
|
||||
if issubclass(field, ormar.fields.ManyToManyField):
|
||||
field_name = parent_model.resolve_relation_name(
|
||||
field.through, field.to, explicit_multi=True
|
||||
)
|
||||
sub_field = field.through.Meta.model_fields[field_name]
|
||||
return field.through, sub_field.get_alias()
|
||||
return target_model, field.get_alias()
|
||||
target_field = target_model.get_column_alias(target_model.Meta.pkname)
|
||||
return target_model, target_field
|
||||
|
||||
@staticmethod
|
||||
def get_column_name_for_id_extraction(
|
||||
parent_model: Type["Model"], reverse: bool, related: str, use_raw: bool,
|
||||
) -> str:
|
||||
if reverse:
|
||||
column_name = parent_model.Meta.pkname
|
||||
return (
|
||||
parent_model.get_column_alias(column_name) if use_raw else column_name
|
||||
)
|
||||
column = parent_model.Meta.model_fields[related]
|
||||
return column.get_alias() if use_raw else column.name
|
||||
|
||||
@classmethod
|
||||
def get_filtered_names_to_extract(cls, prefetch_dict: Dict) -> List:
|
||||
related_to_extract = []
|
||||
if prefetch_dict and prefetch_dict is not Ellipsis:
|
||||
related_to_extract = [
|
||||
related
|
||||
for related in cls.extract_related_names()
|
||||
if related in prefetch_dict
|
||||
]
|
||||
return related_to_extract
|
||||
|
||||
def get_relation_model_id(self, target_field: Type["BaseField"]) -> Optional[int]:
|
||||
if target_field.virtual or issubclass(
|
||||
target_field, ormar.fields.ManyToManyField
|
||||
):
|
||||
return self.pk
|
||||
related_name = target_field.name
|
||||
related_model = getattr(self, related_name)
|
||||
return None if not related_model else related_model.pk
|
||||
|
||||
@classmethod
|
||||
def extract_db_own_fields(cls) -> Set:
|
||||
related_names = cls.extract_related_names()
|
||||
@ -124,36 +50,6 @@ class ModelTableProxy:
|
||||
}
|
||||
return self_fields
|
||||
|
||||
@classmethod
|
||||
def get_names_to_exclude(
|
||||
cls,
|
||||
fields: Optional[Union[Dict, Set]] = None,
|
||||
exclude_fields: Optional[Union[Dict, Set]] = None,
|
||||
) -> Set:
|
||||
fields_names = cls.extract_db_own_fields()
|
||||
if fields and fields is not Ellipsis:
|
||||
fields_to_keep = {name for name in fields if name in fields_names}
|
||||
else:
|
||||
fields_to_keep = fields_names
|
||||
|
||||
fields_to_exclude = fields_names - fields_to_keep
|
||||
|
||||
if isinstance(exclude_fields, Set):
|
||||
fields_to_exclude = fields_to_exclude.union(
|
||||
{name for name in exclude_fields if name in fields_names}
|
||||
)
|
||||
elif isinstance(exclude_fields, Dict):
|
||||
new_to_exclude = {
|
||||
name
|
||||
for name in exclude_fields
|
||||
if name in fields_names and exclude_fields[name] is Ellipsis
|
||||
}
|
||||
fields_to_exclude = fields_to_exclude.union(new_to_exclude)
|
||||
|
||||
fields_to_exclude = fields_to_exclude - {cls.Meta.pkname}
|
||||
|
||||
return fields_to_exclude
|
||||
|
||||
@classmethod
|
||||
def substitute_models_with_pks(cls, model_dict: Dict) -> Dict: # noqa CCR001
|
||||
for field in cls.extract_related_names():
|
||||
@ -194,20 +90,6 @@ class ModelTableProxy:
|
||||
new_kwargs.pop(field_name, None)
|
||||
return new_kwargs
|
||||
|
||||
@classmethod
|
||||
def get_column_alias(cls, field_name: str) -> str:
|
||||
field = cls.Meta.model_fields.get(field_name)
|
||||
if field is not None and field.alias is not None:
|
||||
return field.alias
|
||||
return field_name
|
||||
|
||||
@classmethod
|
||||
def get_column_name_from_alias(cls, alias: str) -> str:
|
||||
for field_name, field in cls.Meta.model_fields.items():
|
||||
if field is not None and field.alias == alias:
|
||||
return field_name
|
||||
return alias # if not found it's not an alias but actual name
|
||||
|
||||
@classmethod
|
||||
def extract_related_fields(cls) -> List:
|
||||
|
||||
@ -270,150 +152,32 @@ class ModelTableProxy:
|
||||
exclude = update(related_dict, exclude)
|
||||
return exclude
|
||||
|
||||
def _extract_model_db_fields(self) -> Dict:
|
||||
self_fields = self._extract_own_model_fields()
|
||||
self_fields = {
|
||||
k: v
|
||||
for k, v in self_fields.items()
|
||||
if self.get_column_alias(k) in self.Meta.table.columns
|
||||
}
|
||||
for field in self._extract_db_related_names():
|
||||
target_pk_name = self.Meta.model_fields[field].to.Meta.pkname
|
||||
target_field = getattr(self, field)
|
||||
self_fields[field] = getattr(target_field, target_pk_name, None)
|
||||
return self_fields
|
||||
|
||||
@staticmethod
|
||||
def resolve_relation_name( # noqa CCR001
|
||||
item: Union[
|
||||
"NewBaseModel",
|
||||
Type["NewBaseModel"],
|
||||
"ModelTableProxy",
|
||||
Type["ModelTableProxy"],
|
||||
],
|
||||
related: Union[
|
||||
"NewBaseModel",
|
||||
Type["NewBaseModel"],
|
||||
"ModelTableProxy",
|
||||
Type["ModelTableProxy"],
|
||||
],
|
||||
explicit_multi: bool = False,
|
||||
) -> str:
|
||||
for name, field in item.Meta.model_fields.items():
|
||||
# fastapi is creating clones of response model
|
||||
# that's why it can be a subclass of the original model
|
||||
# so we need to compare Meta too as this one is copied as is
|
||||
if issubclass(field, ManyToManyField):
|
||||
attrib = "to" if not explicit_multi else "through"
|
||||
if (
|
||||
getattr(field, attrib) == related.__class__
|
||||
or getattr(field, attrib).Meta == related.Meta
|
||||
):
|
||||
return name
|
||||
|
||||
elif issubclass(field, ForeignKeyField):
|
||||
if field.to == related.__class__ or field.to.Meta == related.Meta:
|
||||
return name
|
||||
|
||||
raise ValueError(
|
||||
f"No relation between {item.get_name()} and {related.get_name()}"
|
||||
) # pragma nocover
|
||||
|
||||
@classmethod
|
||||
def translate_columns_to_aliases(cls, new_kwargs: Dict) -> Dict:
|
||||
for field_name, field in cls.Meta.model_fields.items():
|
||||
if field_name in new_kwargs:
|
||||
new_kwargs[field.get_alias()] = new_kwargs.pop(field_name)
|
||||
return new_kwargs
|
||||
def get_names_to_exclude(
|
||||
cls,
|
||||
fields: Optional[Union[Dict, Set]] = None,
|
||||
exclude_fields: Optional[Union[Dict, Set]] = None,
|
||||
) -> Set:
|
||||
fields_names = cls.extract_db_own_fields()
|
||||
if fields and fields is not Ellipsis:
|
||||
fields_to_keep = {name for name in fields if name in fields_names}
|
||||
else:
|
||||
fields_to_keep = fields_names
|
||||
|
||||
@classmethod
|
||||
def translate_aliases_to_columns(cls, new_kwargs: Dict) -> Dict:
|
||||
for field_name, field in cls.Meta.model_fields.items():
|
||||
if field.alias and field.alias in new_kwargs:
|
||||
new_kwargs[field_name] = new_kwargs.pop(field.alias)
|
||||
return new_kwargs
|
||||
fields_to_exclude = fields_names - fields_to_keep
|
||||
|
||||
@classmethod
|
||||
def merge_instances_list(cls, result_rows: Sequence["Model"]) -> Sequence["Model"]:
|
||||
merged_rows: List["Model"] = []
|
||||
grouped_instances: OrderedDict = OrderedDict()
|
||||
if isinstance(exclude_fields, Set):
|
||||
fields_to_exclude = fields_to_exclude.union(
|
||||
{name for name in exclude_fields if name in fields_names}
|
||||
)
|
||||
elif isinstance(exclude_fields, Dict):
|
||||
new_to_exclude = {
|
||||
name
|
||||
for name in exclude_fields
|
||||
if name in fields_names and exclude_fields[name] is Ellipsis
|
||||
}
|
||||
fields_to_exclude = fields_to_exclude.union(new_to_exclude)
|
||||
|
||||
for model in result_rows:
|
||||
grouped_instances.setdefault(model.pk, []).append(model)
|
||||
fields_to_exclude = fields_to_exclude - {cls.Meta.pkname}
|
||||
|
||||
for group in grouped_instances.values():
|
||||
model = group.pop(0)
|
||||
if group:
|
||||
for next_model in group:
|
||||
model = cls.merge_two_instances(next_model, model)
|
||||
merged_rows.append(model)
|
||||
|
||||
return merged_rows
|
||||
|
||||
@classmethod
|
||||
def merge_two_instances(cls, one: "Model", other: "Model") -> "Model":
|
||||
for field in one.Meta.model_fields.keys():
|
||||
current_field = getattr(one, field)
|
||||
if isinstance(current_field, list) and not isinstance(
|
||||
current_field, ormar.Model
|
||||
):
|
||||
setattr(other, field, current_field + getattr(other, field))
|
||||
elif (
|
||||
isinstance(current_field, ormar.Model)
|
||||
and current_field.pk == getattr(other, field).pk
|
||||
):
|
||||
setattr(
|
||||
other,
|
||||
field,
|
||||
cls.merge_two_instances(current_field, getattr(other, field)),
|
||||
)
|
||||
other.set_save_status(True)
|
||||
return other
|
||||
|
||||
@staticmethod
|
||||
def _populate_pk_column(
|
||||
model: Type["Model"], columns: List[str], use_alias: bool = False,
|
||||
) -> List[str]:
|
||||
pk_alias = (
|
||||
model.get_column_alias(model.Meta.pkname)
|
||||
if use_alias
|
||||
else model.Meta.pkname
|
||||
)
|
||||
if pk_alias not in columns:
|
||||
columns.append(pk_alias)
|
||||
return columns
|
||||
|
||||
@staticmethod
|
||||
def own_table_columns(
|
||||
model: Type["Model"],
|
||||
fields: Optional[Union[Set, Dict]],
|
||||
exclude_fields: Optional[Union[Set, Dict]],
|
||||
use_alias: bool = False,
|
||||
) -> List[str]:
|
||||
columns = [
|
||||
model.get_column_name_from_alias(col.name) if not use_alias else col.name
|
||||
for col in model.Meta.table.columns
|
||||
]
|
||||
field_names = [
|
||||
model.get_column_name_from_alias(col.name)
|
||||
for col in model.Meta.table.columns
|
||||
]
|
||||
if fields:
|
||||
columns = [
|
||||
col
|
||||
for col, name in zip(columns, field_names)
|
||||
if model.is_included(fields, name)
|
||||
]
|
||||
if exclude_fields:
|
||||
columns = [
|
||||
col
|
||||
for col, name in zip(columns, field_names)
|
||||
if not model.is_excluded(exclude_fields, name)
|
||||
]
|
||||
|
||||
# always has to return pk column
|
||||
columns = ModelTableProxy._populate_pk_column(
|
||||
model=model, columns=columns, use_alias=use_alias
|
||||
)
|
||||
|
||||
return columns
|
||||
return fields_to_exclude
|
||||
|
||||
Reference in New Issue
Block a user