diff --git a/ormar/fields/many_to_many.py b/ormar/fields/many_to_many.py index 582d318..0880906 100644 --- a/ormar/fields/many_to_many.py +++ b/ormar/fields/many_to_many.py @@ -50,3 +50,7 @@ def ManyToMany( class ManyToManyField(ForeignKeyField, ormar.QuerySetProtocol, ormar.RelationProtocol): through: Type["Model"] + + @classmethod + def default_target_field_name(cls) -> str: + return cls.to.get_name() diff --git a/ormar/models/helpers/models.py b/ormar/models/helpers/models.py index 8f76b9b..4b5a364 100644 --- a/ormar/models/helpers/models.py +++ b/ormar/models/helpers/models.py @@ -1,12 +1,56 @@ -from typing import Dict, List, Optional, TYPE_CHECKING, Type +from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type -from ormar import ModelDefinitionError +import ormar from ormar.fields.foreign_key import ForeignKeyField +from ormar.models.helpers.pydantic import populate_pydantic_default_values if TYPE_CHECKING: # pragma no cover from ormar import Model +def populate_default_options_values( + new_model: Type["Model"], model_fields: Dict +) -> None: + """ + Sets all optional Meta values to it's defaults + and set model_fields that were already previously extracted. + + Here should live all options that are not overwritten/set for all models. + + Current options are: + * constraints = [] + * abstract = False + + :param new_model: newly constructed Model + :type new_model: Model class + :param model_fields: + :type model_fields: Union[Dict[str, type], Dict] + """ + if not hasattr(new_model.Meta, "constraints"): + new_model.Meta.constraints = [] + if not hasattr(new_model.Meta, "model_fields"): + new_model.Meta.model_fields = model_fields + if not hasattr(new_model.Meta, "abstract"): + new_model.Meta.abstract = False + + +def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]: + """ + Extracts annotations from class namespace dict and triggers + extraction of ormar model_fields. + + :param attrs: namespace of the class created + :type attrs: Dict + :return: namespace of the class updated, dict of extracted model_fields + :rtype: Tuple[Dict, Dict] + """ + key = "__annotations__" + attrs[key] = attrs.get(key, {}) + attrs, model_fields = populate_pydantic_default_values(attrs) + return attrs, model_fields + + +# cannot be in relations helpers due to cyclical import def validate_related_names_in_relations( model_fields: Dict, new_model: Type["Model"] ) -> None: @@ -28,7 +72,7 @@ def validate_related_names_in_relations( if issubclass(field, ForeignKeyField): previous_related_names = already_registered.setdefault(field.to, []) if field.related_name in previous_related_names: - raise ModelDefinitionError( + raise ormar.ModelDefinitionError( f"Multiple fields declared on {new_model.get_name(lower=False)} " f"model leading to {field.to.get_name(lower=False)} model without " f"related_name property set. \nThere can be only one relation with " diff --git a/ormar/models/helpers/pydantic.py b/ormar/models/helpers/pydantic.py index 74747f7..69c0496 100644 --- a/ormar/models/helpers/pydantic.py +++ b/ormar/models/helpers/pydantic.py @@ -12,74 +12,6 @@ if TYPE_CHECKING: # pragma no cover from ormar import Model -def verify_related_name_dont_duplicate( - child: Type["Model"], parent_model: Type["Model"], related_name: str, -) -> None: - """ - Verifies whether the used related_name (regardless of the fact if user defined or - auto generated) is already used on related model, but is connected with other model - than the one that we connect right now. - - :raises: ModelDefinitionError if name is already used but lead to different related - model - :param child: related Model class - :type child: ormar.models.metaclass.ModelMetaclass - :param parent_model: parent Model class - :type parent_model: ormar.models.metaclass.ModelMetaclass - :param related_name: - :type related_name: - :return: None - :rtype: None - """ - if parent_model.Meta.model_fields.get(related_name): - fk_field = parent_model.Meta.model_fields.get(related_name) - if not fk_field: # pragma: no cover - return - if fk_field.to != child and fk_field.to.Meta != child.Meta: - raise ormar.ModelDefinitionError( - f"Relation with related_name " - f"'{related_name}' " - f"leading to model " - f"{parent_model.get_name(lower=False)} " - f"cannot be used on model " - f"{child.get_name(lower=False)} " - f"because it's already used by model " - f"{fk_field.to.get_name(lower=False)}" - ) - - -def reverse_field_not_already_registered( - child: Type["Model"], child_model_name: str, parent_model: Type["Model"] -) -> bool: - """ - Checks if child is already registered in parents pydantic fields. - - :raises: ModelDefinitionError if related name is already used but lead to different - related model - :param child: related Model class - :type child: ormar.models.metaclass.ModelMetaclass - :param child_model_name: related_name of the child if provided - :type child_model_name: str - :param parent_model: parent Model class - :type parent_model: ormar.models.metaclass.ModelMetaclass - :return: result of the check - :rtype: bool - """ - check_result = child_model_name not in parent_model.Meta.model_fields - check_result2 = child.get_name() not in parent_model.Meta.model_fields - - if not check_result: - verify_related_name_dont_duplicate( - child=child, parent_model=parent_model, related_name=child_model_name - ) - if not check_result2: - verify_related_name_dont_duplicate( - child=child, parent_model=parent_model, related_name=child.get_name() - ) - - return check_result and check_result2 - - def create_pydantic_field( field_name: str, model: Type["Model"], model_field: Type[ManyToManyField] ) -> None: @@ -214,32 +146,6 @@ def get_pydantic_base_orm_config() -> Type[BaseConfig]: return Config -def populate_default_options_values( - new_model: Type["Model"], model_fields: Dict -) -> None: - """ - Sets all optional Meta values to it's defaults - and set model_fields that were already previously extracted. - - Here should live all options that are not overwritten/set for all models. - - Current options are: - * constraints = [] - * abstract = False - - :param new_model: newly constructed Model - :type new_model: Model class - :param model_fields: - :type model_fields: Union[Dict[str, type], Dict] - """ - if not hasattr(new_model.Meta, "constraints"): - new_model.Meta.constraints = [] - if not hasattr(new_model.Meta, "model_fields"): - new_model.Meta.model_fields = model_fields - if not hasattr(new_model.Meta, "abstract"): - new_model.Meta.abstract = False - - def get_potential_fields(attrs: Dict) -> Dict: """ Gets all the fields in current class namespace that are Fields. @@ -250,19 +156,3 @@ def get_potential_fields(attrs: Dict) -> Dict: :rtype: Dict """ return {k: v for k, v in attrs.items() if lenient_issubclass(v, BaseField)} - - -def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]: - """ - Extracts annotations from class namespace dict and triggers - extraction of ormar model_fields. - - :param attrs: namespace of the class created - :type attrs: Dict - :return: namespace of the class updated, dict of extracted model_fields - :rtype: Tuple[Dict, Dict] - """ - key = "__annotations__" - attrs[key] = attrs.get(key, {}) - attrs, model_fields = populate_pydantic_default_values(attrs) - return attrs, model_fields diff --git a/ormar/models/helpers/relations.py b/ormar/models/helpers/relations.py index 7c5461c..c9f8f48 100644 --- a/ormar/models/helpers/relations.py +++ b/ormar/models/helpers/relations.py @@ -1,9 +1,9 @@ from typing import TYPE_CHECKING, Type +import ormar from ormar import ForeignKey, ManyToMany from ormar.fields import ManyToManyField from ormar.fields.foreign_key import ForeignKeyField -from ormar.models.helpers.pydantic import reverse_field_not_already_registered from ormar.models.helpers.sqlalchemy import adjust_through_many_to_many_model from ormar.relations import AliasManager @@ -31,7 +31,7 @@ def register_relation_on_build(new_model: Type["Model"], field_name: str) -> Non def register_many_to_many_relation_on_build( - new_model: Type["Model"], field: Type[ManyToManyField] + new_model: Type["Model"], field: Type[ManyToManyField], field_name: str ) -> None: """ Registers connection between through model and both sides of the m2m relation. @@ -43,13 +43,22 @@ def register_many_to_many_relation_on_build( By default relation name is a model.name.lower(). + :param field_name: name of the relation key + :type field_name: str :param new_model: model on which m2m field is declared :type new_model: Model class :param field: relation field :type field: ManyToManyField class """ - alias_manager.add_relation_type(field.through, new_model.get_name()) - alias_manager.add_relation_type(field.through, field.to.get_name()) + alias_manager.add_relation_type( + field.through, new_model.get_name(), is_multi=True, reverse_name=field_name + ) + alias_manager.add_relation_type( + field.through, + field.to.get_name(), + is_multi=True, + reverse_name=field.related_name or new_model.get_name() + "s", + ) def expand_reverse_relationships(model: Type["Model"]) -> None: @@ -133,6 +142,76 @@ def register_relation_in_alias_manager( :type field_name: str """ if issubclass(field, ManyToManyField): - register_many_to_many_relation_on_build(new_model=new_model, field=field) + register_many_to_many_relation_on_build( + new_model=new_model, field=field, field_name=field_name + ) elif issubclass(field, ForeignKeyField): register_relation_on_build(new_model=new_model, field_name=field_name) + + +def verify_related_name_dont_duplicate( + child: Type["Model"], parent_model: Type["Model"], related_name: str, +) -> None: + """ + Verifies whether the used related_name (regardless of the fact if user defined or + auto generated) is already used on related model, but is connected with other model + than the one that we connect right now. + + :raises: ModelDefinitionError if name is already used but lead to different related + model + :param child: related Model class + :type child: ormar.models.metaclass.ModelMetaclass + :param parent_model: parent Model class + :type parent_model: ormar.models.metaclass.ModelMetaclass + :param related_name: + :type related_name: + :return: None + :rtype: None + """ + if parent_model.Meta.model_fields.get(related_name): + fk_field = parent_model.Meta.model_fields.get(related_name) + if not fk_field: # pragma: no cover + return + if fk_field.to != child and fk_field.to.Meta != child.Meta: + raise ormar.ModelDefinitionError( + f"Relation with related_name " + f"'{related_name}' " + f"leading to model " + f"{parent_model.get_name(lower=False)} " + f"cannot be used on model " + f"{child.get_name(lower=False)} " + f"because it's already used by model " + f"{fk_field.to.get_name(lower=False)}" + ) + + +def reverse_field_not_already_registered( + child: Type["Model"], child_model_name: str, parent_model: Type["Model"] +) -> bool: + """ + Checks if child is already registered in parents pydantic fields. + + :raises: ModelDefinitionError if related name is already used but lead to different + related model + :param child: related Model class + :type child: ormar.models.metaclass.ModelMetaclass + :param child_model_name: related_name of the child if provided + :type child_model_name: str + :param parent_model: parent Model class + :type parent_model: ormar.models.metaclass.ModelMetaclass + :return: result of the check + :rtype: bool + """ + check_result = child_model_name not in parent_model.Meta.model_fields + check_result2 = child.get_name() not in parent_model.Meta.model_fields + + if not check_result: + verify_related_name_dont_duplicate( + child=child, parent_model=parent_model, related_name=child_model_name + ) + if not check_result2: + verify_related_name_dont_duplicate( + child=child, parent_model=parent_model, related_name=child.get_name() + ) + + return check_result and check_result2 diff --git a/ormar/models/metaclass.py b/ormar/models/metaclass.py index baafb50..9f36227 100644 --- a/ormar/models/metaclass.py +++ b/ormar/models/metaclass.py @@ -21,12 +21,14 @@ from ormar import ForeignKey, Integer, ModelDefinitionError # noqa I100 from ormar.fields import BaseField from ormar.fields.foreign_key import ForeignKeyField from ormar.fields.many_to_many import ManyToManyField -from ormar.models.helpers.pydantic import ( +from ormar.models.helpers.models import ( extract_annotations_and_default_vals, + populate_default_options_values, +) +from ormar.models.helpers.pydantic import ( get_potential_fields, get_pydantic_base_orm_config, get_pydantic_field, - populate_default_options_values, ) from ormar.models.helpers.relations import ( alias_manager, @@ -50,6 +52,12 @@ CONFIG_KEY = "Config" class ModelMeta: + """ + Class used for type hinting. + Users can subclass this one for conveniance but it's not required. + The only requirement is that ormar.Model has to have inner class with name Meta. + """ + tablename: str table: sqlalchemy.Table metadata: sqlalchemy.MetaData diff --git a/ormar/models/mixins/__init__.py b/ormar/models/mixins/__init__.py new file mode 100644 index 0000000..bc93e58 --- /dev/null +++ b/ormar/models/mixins/__init__.py @@ -0,0 +1,5 @@ +from ormar.models.mixins.alias_mixin import AliasMixin +from ormar.models.mixins.merge_mixin import MergeModelMixin +from ormar.models.mixins.prefetch_mixin import PrefetchQueryMixin + +__all__ = ["MergeModelMixin", "AliasMixin", "PrefetchQueryMixin"] diff --git a/ormar/models/mixins/alias_mixin.py b/ormar/models/mixins/alias_mixin.py new file mode 100644 index 0000000..c639b4e --- /dev/null +++ b/ormar/models/mixins/alias_mixin.py @@ -0,0 +1,83 @@ +from typing import Dict, List, Optional, Set, TYPE_CHECKING, Type, Union + + +class AliasMixin: + if TYPE_CHECKING: # pragma: no cover + from ormar import Model, ModelMeta + + Meta: ModelMeta + + @classmethod + def get_column_alias(cls, field_name: str) -> str: + field = cls.Meta.model_fields.get(field_name) + return field.get_alias() if field is not None else field_name + + @classmethod + def get_column_name_from_alias(cls, alias: str) -> str: + for field_name, field in cls.Meta.model_fields.items(): + if field.get_alias() == alias: + return field_name + return alias # if not found it's not an alias but actual name + + @classmethod + def translate_columns_to_aliases(cls, new_kwargs: Dict) -> Dict: + for field_name, field in cls.Meta.model_fields.items(): + if field_name in new_kwargs: + new_kwargs[field.get_alias()] = new_kwargs.pop(field_name) + return new_kwargs + + @classmethod + def translate_aliases_to_columns(cls, new_kwargs: Dict) -> Dict: + for field_name, field in cls.Meta.model_fields.items(): + if field.alias and field.alias in new_kwargs: + new_kwargs[field_name] = new_kwargs.pop(field.alias) + return new_kwargs + + @staticmethod + def _populate_pk_column( + model: Type["Model"], columns: List[str], use_alias: bool = False, + ) -> List[str]: + pk_alias = ( + model.get_column_alias(model.Meta.pkname) + if use_alias + else model.Meta.pkname + ) + if pk_alias not in columns: + columns.append(pk_alias) + return columns + + @classmethod + def own_table_columns( + cls, + model: Type["Model"], + fields: Optional[Union[Set, Dict]], + exclude_fields: Optional[Union[Set, Dict]], + use_alias: bool = False, + ) -> List[str]: + columns = [ + model.get_column_name_from_alias(col.name) if not use_alias else col.name + for col in model.Meta.table.columns + ] + field_names = [ + model.get_column_name_from_alias(col.name) + for col in model.Meta.table.columns + ] + if fields: + columns = [ + col + for col, name in zip(columns, field_names) + if model.is_included(fields, name) + ] + if exclude_fields: + columns = [ + col + for col, name in zip(columns, field_names) + if not model.is_excluded(exclude_fields, name) + ] + + # always has to return pk column for ormar to work + columns = cls._populate_pk_column( + model=model, columns=columns, use_alias=use_alias + ) + + return columns diff --git a/ormar/models/mixins/merge_mixin.py b/ormar/models/mixins/merge_mixin.py new file mode 100644 index 0000000..e01464a --- /dev/null +++ b/ormar/models/mixins/merge_mixin.py @@ -0,0 +1,46 @@ +from collections import OrderedDict +from typing import List, Sequence, TYPE_CHECKING + +import ormar + +if TYPE_CHECKING: # pragma no cover + from ormar import Model + + +class MergeModelMixin: + @classmethod + def merge_instances_list(cls, result_rows: Sequence["Model"]) -> Sequence["Model"]: + merged_rows: List["Model"] = [] + grouped_instances: OrderedDict = OrderedDict() + + for model in result_rows: + grouped_instances.setdefault(model.pk, []).append(model) + + for group in grouped_instances.values(): + model = group.pop(0) + if group: + for next_model in group: + model = cls.merge_two_instances(next_model, model) + merged_rows.append(model) + + return merged_rows + + @classmethod + def merge_two_instances(cls, one: "Model", other: "Model") -> "Model": + for field in one.Meta.model_fields.keys(): + current_field = getattr(one, field) + if isinstance(current_field, list) and not isinstance( + current_field, ormar.Model + ): + setattr(other, field, current_field + getattr(other, field)) + elif ( + isinstance(current_field, ormar.Model) + and current_field.pk == getattr(other, field).pk + ): + setattr( + other, + field, + cls.merge_two_instances(current_field, getattr(other, field)), + ) + other.set_save_status(True) + return other diff --git a/ormar/models/mixins/prefetch_mixin.py b/ormar/models/mixins/prefetch_mixin.py new file mode 100644 index 0000000..173e58b --- /dev/null +++ b/ormar/models/mixins/prefetch_mixin.py @@ -0,0 +1,64 @@ +from typing import Callable, Dict, List, TYPE_CHECKING, Tuple, Type + +import ormar +from ormar.fields import BaseField + + +class PrefetchQueryMixin: + if TYPE_CHECKING: # pragma no cover + from ormar import Model + + get_name: Callable # defined in NewBaseModel + extract_related_names: Callable # defined in ModelTableProxy + + @staticmethod + def get_clause_target_and_filter_column_name( + parent_model: Type["Model"], + target_model: Type["Model"], + reverse: bool, + related: str, + ) -> Tuple[Type["Model"], str]: + if reverse: + field_name = ( + parent_model.Meta.model_fields[related].related_name + or parent_model.get_name() + "s" + ) + field = target_model.Meta.model_fields[field_name] + if issubclass(field, ormar.fields.ManyToManyField): + field_name = field.default_target_field_name() + sub_field = field.through.Meta.model_fields[field_name] + return field.through, sub_field.get_alias() + return target_model, field.get_alias() + target_field = target_model.get_column_alias(target_model.Meta.pkname) + return target_model, target_field + + @staticmethod + def get_column_name_for_id_extraction( + parent_model: Type["Model"], reverse: bool, related: str, use_raw: bool, + ) -> str: + if reverse: + column_name = parent_model.Meta.pkname + return ( + parent_model.get_column_alias(column_name) if use_raw else column_name + ) + column = parent_model.Meta.model_fields[related] + return column.get_alias() if use_raw else column.name + + @classmethod + def get_related_field_name(cls, target_field: Type["BaseField"]) -> str: + if issubclass(target_field, ormar.fields.ManyToManyField): + return cls.get_name() + if target_field.virtual: + return target_field.related_name or cls.get_name() + "s" + return target_field.to.Meta.pkname + + @classmethod + def get_filtered_names_to_extract(cls, prefetch_dict: Dict) -> List: + related_to_extract = [] + if prefetch_dict and prefetch_dict is not Ellipsis: + related_to_extract = [ + related + for related in cls.extract_related_names() + if related in prefetch_dict + ] + return related_to_extract diff --git a/ormar/models/model.py b/ormar/models/model.py index 8a2ee4f..9412a26 100644 --- a/ormar/models/model.py +++ b/ormar/models/model.py @@ -125,13 +125,11 @@ class Model(NewBaseModel): ) ): through_field = previous_model.Meta.model_fields[related_name] - rel_name2 = previous_model.resolve_relation_name( - through_field.through, through_field.to, explicit_multi=True - ) + rel_name2 = through_field.default_target_field_name() # type: ignore previous_model = through_field.through # type: ignore if previous_model and rel_name2: - table_prefix = cls.Meta.alias_manager.resolve_relation_join( + table_prefix = cls.Meta.alias_manager.resolve_relation_alias( previous_model, rel_name2 ) else: diff --git a/ormar/models/modelproxy.py b/ormar/models/modelproxy.py index 1be9625..dac850a 100644 --- a/ormar/models/modelproxy.py +++ b/ormar/models/modelproxy.py @@ -1,5 +1,4 @@ import inspect -from collections import OrderedDict from typing import ( AbstractSet, Any, @@ -8,25 +7,22 @@ from typing import ( List, Mapping, Optional, - Sequence, Set, TYPE_CHECKING, - Tuple, - Type, TypeVar, Union, ) import ormar # noqa: I100 from ormar.exceptions import ModelPersistenceError -from ormar.fields import BaseField, ManyToManyField +from ormar.fields import BaseField from ormar.fields.foreign_key import ForeignKeyField from ormar.models.metaclass import ModelMeta +from ormar.models.mixins import AliasMixin, MergeModelMixin, PrefetchQueryMixin from ormar.queryset.utils import translate_list_to_dict, update if TYPE_CHECKING: # pragma no cover from ormar import Model - from ormar.models import NewBaseModel T = TypeVar("T", bound=Model) IntStr = Union[int, str] @@ -36,7 +32,7 @@ if TYPE_CHECKING: # pragma no cover Field = TypeVar("Field", bound=BaseField) -class ModelTableProxy: +class ModelTableProxy(PrefetchQueryMixin, MergeModelMixin, AliasMixin): if TYPE_CHECKING: # pragma no cover Meta: ModelMeta _related_names: Optional[Set] @@ -46,76 +42,6 @@ class ModelTableProxy: _props: Set dict: Callable # noqa: A001, VNE003 - def _extract_own_model_fields(self) -> Dict: - related_names = self.extract_related_names() - self_fields = self.dict(exclude=related_names) - return self_fields - - @classmethod - def get_related_field_name(cls, target_field: Type["BaseField"]) -> str: - if issubclass(target_field, ormar.fields.ManyToManyField): - return cls.resolve_relation_name( - target_field.through, cls, explicit_multi=True - ) - if target_field.virtual: - return target_field.related_name or cls.get_name() + "s" - return target_field.to.Meta.pkname - - @staticmethod - def get_clause_target_and_filter_column_name( - parent_model: Type["Model"], - target_model: Type["Model"], - reverse: bool, - related: str, - ) -> Tuple[Type["Model"], str]: - if reverse: - field_name = ( - parent_model.Meta.model_fields[related].related_name - or parent_model.get_name() + "s" - ) - field = target_model.Meta.model_fields[field_name] - if issubclass(field, ormar.fields.ManyToManyField): - field_name = parent_model.resolve_relation_name( - field.through, field.to, explicit_multi=True - ) - sub_field = field.through.Meta.model_fields[field_name] - return field.through, sub_field.get_alias() - return target_model, field.get_alias() - target_field = target_model.get_column_alias(target_model.Meta.pkname) - return target_model, target_field - - @staticmethod - def get_column_name_for_id_extraction( - parent_model: Type["Model"], reverse: bool, related: str, use_raw: bool, - ) -> str: - if reverse: - column_name = parent_model.Meta.pkname - return ( - parent_model.get_column_alias(column_name) if use_raw else column_name - ) - column = parent_model.Meta.model_fields[related] - return column.get_alias() if use_raw else column.name - - @classmethod - def get_filtered_names_to_extract(cls, prefetch_dict: Dict) -> List: - related_to_extract = [] - if prefetch_dict and prefetch_dict is not Ellipsis: - related_to_extract = [ - related - for related in cls.extract_related_names() - if related in prefetch_dict - ] - return related_to_extract - - def get_relation_model_id(self, target_field: Type["BaseField"]) -> Optional[int]: - if target_field.virtual or issubclass( - target_field, ormar.fields.ManyToManyField - ): - return self.pk - related_name = target_field.name - related_model = getattr(self, related_name) - return None if not related_model else related_model.pk - @classmethod def extract_db_own_fields(cls) -> Set: related_names = cls.extract_related_names() @@ -124,36 +50,6 @@ class ModelTableProxy: } return self_fields - @classmethod - def get_names_to_exclude( - cls, - fields: Optional[Union[Dict, Set]] = None, - exclude_fields: Optional[Union[Dict, Set]] = None, - ) -> Set: - fields_names = cls.extract_db_own_fields() - if fields and fields is not Ellipsis: - fields_to_keep = {name for name in fields if name in fields_names} - else: - fields_to_keep = fields_names - - fields_to_exclude = fields_names - fields_to_keep - - if isinstance(exclude_fields, Set): - fields_to_exclude = fields_to_exclude.union( - {name for name in exclude_fields if name in fields_names} - ) - elif isinstance(exclude_fields, Dict): - new_to_exclude = { - name - for name in exclude_fields - if name in fields_names and exclude_fields[name] is Ellipsis - } - fields_to_exclude = fields_to_exclude.union(new_to_exclude) - - fields_to_exclude = fields_to_exclude - {cls.Meta.pkname} - - return fields_to_exclude - @classmethod def substitute_models_with_pks(cls, model_dict: Dict) -> Dict: # noqa CCR001 for field in cls.extract_related_names(): @@ -194,20 +90,6 @@ class ModelTableProxy: new_kwargs.pop(field_name, None) return new_kwargs - @classmethod - def get_column_alias(cls, field_name: str) -> str: - field = cls.Meta.model_fields.get(field_name) - if field is not None and field.alias is not None: - return field.alias - return field_name - - @classmethod - def get_column_name_from_alias(cls, alias: str) -> str: - for field_name, field in cls.Meta.model_fields.items(): - if field is not None and field.alias == alias: - return field_name - return alias # if not found it's not an alias but actual name - @classmethod def extract_related_fields(cls) -> List: @@ -270,150 +152,32 @@ class ModelTableProxy: exclude = update(related_dict, exclude) return exclude - def _extract_model_db_fields(self) -> Dict: - self_fields = self._extract_own_model_fields() - self_fields = { - k: v - for k, v in self_fields.items() - if self.get_column_alias(k) in self.Meta.table.columns - } - for field in self._extract_db_related_names(): - target_pk_name = self.Meta.model_fields[field].to.Meta.pkname - target_field = getattr(self, field) - self_fields[field] = getattr(target_field, target_pk_name, None) - return self_fields - - @staticmethod - def resolve_relation_name( # noqa CCR001 - item: Union[ - "NewBaseModel", - Type["NewBaseModel"], - "ModelTableProxy", - Type["ModelTableProxy"], - ], - related: Union[ - "NewBaseModel", - Type["NewBaseModel"], - "ModelTableProxy", - Type["ModelTableProxy"], - ], - explicit_multi: bool = False, - ) -> str: - for name, field in item.Meta.model_fields.items(): - # fastapi is creating clones of response model - # that's why it can be a subclass of the original model - # so we need to compare Meta too as this one is copied as is - if issubclass(field, ManyToManyField): - attrib = "to" if not explicit_multi else "through" - if ( - getattr(field, attrib) == related.__class__ - or getattr(field, attrib).Meta == related.Meta - ): - return name - - elif issubclass(field, ForeignKeyField): - if field.to == related.__class__ or field.to.Meta == related.Meta: - return name - - raise ValueError( - f"No relation between {item.get_name()} and {related.get_name()}" - ) # pragma nocover - @classmethod - def translate_columns_to_aliases(cls, new_kwargs: Dict) -> Dict: - for field_name, field in cls.Meta.model_fields.items(): - if field_name in new_kwargs: - new_kwargs[field.get_alias()] = new_kwargs.pop(field_name) - return new_kwargs + def get_names_to_exclude( + cls, + fields: Optional[Union[Dict, Set]] = None, + exclude_fields: Optional[Union[Dict, Set]] = None, + ) -> Set: + fields_names = cls.extract_db_own_fields() + if fields and fields is not Ellipsis: + fields_to_keep = {name for name in fields if name in fields_names} + else: + fields_to_keep = fields_names - @classmethod - def translate_aliases_to_columns(cls, new_kwargs: Dict) -> Dict: - for field_name, field in cls.Meta.model_fields.items(): - if field.alias and field.alias in new_kwargs: - new_kwargs[field_name] = new_kwargs.pop(field.alias) - return new_kwargs + fields_to_exclude = fields_names - fields_to_keep - @classmethod - def merge_instances_list(cls, result_rows: Sequence["Model"]) -> Sequence["Model"]: - merged_rows: List["Model"] = [] - grouped_instances: OrderedDict = OrderedDict() + if isinstance(exclude_fields, Set): + fields_to_exclude = fields_to_exclude.union( + {name for name in exclude_fields if name in fields_names} + ) + elif isinstance(exclude_fields, Dict): + new_to_exclude = { + name + for name in exclude_fields + if name in fields_names and exclude_fields[name] is Ellipsis + } + fields_to_exclude = fields_to_exclude.union(new_to_exclude) - for model in result_rows: - grouped_instances.setdefault(model.pk, []).append(model) + fields_to_exclude = fields_to_exclude - {cls.Meta.pkname} - for group in grouped_instances.values(): - model = group.pop(0) - if group: - for next_model in group: - model = cls.merge_two_instances(next_model, model) - merged_rows.append(model) - - return merged_rows - - @classmethod - def merge_two_instances(cls, one: "Model", other: "Model") -> "Model": - for field in one.Meta.model_fields.keys(): - current_field = getattr(one, field) - if isinstance(current_field, list) and not isinstance( - current_field, ormar.Model - ): - setattr(other, field, current_field + getattr(other, field)) - elif ( - isinstance(current_field, ormar.Model) - and current_field.pk == getattr(other, field).pk - ): - setattr( - other, - field, - cls.merge_two_instances(current_field, getattr(other, field)), - ) - other.set_save_status(True) - return other - - @staticmethod - def _populate_pk_column( - model: Type["Model"], columns: List[str], use_alias: bool = False, - ) -> List[str]: - pk_alias = ( - model.get_column_alias(model.Meta.pkname) - if use_alias - else model.Meta.pkname - ) - if pk_alias not in columns: - columns.append(pk_alias) - return columns - - @staticmethod - def own_table_columns( - model: Type["Model"], - fields: Optional[Union[Set, Dict]], - exclude_fields: Optional[Union[Set, Dict]], - use_alias: bool = False, - ) -> List[str]: - columns = [ - model.get_column_name_from_alias(col.name) if not use_alias else col.name - for col in model.Meta.table.columns - ] - field_names = [ - model.get_column_name_from_alias(col.name) - for col in model.Meta.table.columns - ] - if fields: - columns = [ - col - for col, name in zip(columns, field_names) - if model.is_included(fields, name) - ] - if exclude_fields: - columns = [ - col - for col, name in zip(columns, field_names) - if not model.is_excluded(exclude_fields, name) - ] - - # always has to return pk column - columns = ModelTableProxy._populate_pk_column( - model=model, columns=columns, use_alias=use_alias - ) - - return columns + return fields_to_exclude diff --git a/ormar/models/newbasemodel.py b/ormar/models/newbasemodel.py index 3fbe614..628ae70 100644 --- a/ormar/models/newbasemodel.py +++ b/ormar/models/newbasemodel.py @@ -379,3 +379,30 @@ class NewBaseModel( column_name in self.Meta.model_fields and self.Meta.model_fields[column_name].__type__ == pydantic.Json ) + + def _extract_own_model_fields(self) -> Dict: + related_names = self.extract_related_names() + self_fields = self.dict(exclude=related_names) + return self_fields + + def _extract_model_db_fields(self) -> Dict: + self_fields = self._extract_own_model_fields() + self_fields = { + k: v + for k, v in self_fields.items() + if self.get_column_alias(k) in self.Meta.table.columns + } + for field in self._extract_db_related_names(): + target_pk_name = self.Meta.model_fields[field].to.Meta.pkname + target_field = getattr(self, field) + self_fields[field] = getattr(target_field, target_pk_name, None) + return self_fields + + def get_relation_model_id(self, target_field: Type["BaseField"]) -> Optional[int]: + if target_field.virtual or issubclass( + target_field, ormar.fields.ManyToManyField + ): + return self.pk + related_name = target_field.name + related_model = getattr(self, related_name) + return None if not related_model else related_model.pk diff --git a/ormar/queryset/clause.py b/ormar/queryset/clause.py index e92df41..c55eed8 100644 --- a/ormar/queryset/clause.py +++ b/ormar/queryset/clause.py @@ -137,11 +137,9 @@ class QueryClause: if issubclass(model_cls.Meta.model_fields[part], ManyToManyField): through_field = model_cls.Meta.model_fields[part] previous_model = through_field.through - part2 = model_cls.resolve_relation_name( - previous_model, through_field.to, explicit_multi=True - ) + part2 = through_field.default_target_field_name() # type: ignore manager = model_cls.Meta.alias_manager - table_prefix = manager.resolve_relation_join(previous_model, part2) + table_prefix = manager.resolve_relation_alias(previous_model, part2) model_cls = model_cls.Meta.model_fields[part].to previous_model = model_cls return select_related, table_prefix, model_cls diff --git a/ormar/queryset/join.py b/ormar/queryset/join.py index 02c9388..7726a8c 100644 --- a/ormar/queryset/join.py +++ b/ormar/queryset/join.py @@ -135,7 +135,7 @@ class SqlJoin: model_cls = join_params.model_cls.Meta.model_fields[part].to to_table = model_cls.Meta.table.name - alias = model_cls.Meta.alias_manager.resolve_relation_join( + alias = model_cls.Meta.alias_manager.resolve_relation_alias( join_params.prev_model, part ) if alias not in self.used_aliases: diff --git a/ormar/queryset/prefetch_query.py b/ormar/queryset/prefetch_query.py index 8ec9051..aff67ac 100644 --- a/ormar/queryset/prefetch_query.py +++ b/ormar/queryset/prefetch_query.py @@ -328,7 +328,7 @@ class PrefetchQuery: if issubclass(target_field, ManyToManyField): query_target = target_field.through select_related = [target_name] - table_prefix = target_field.to.Meta.alias_manager.resolve_relation_join( + table_prefix = target_field.to.Meta.alias_manager.resolve_relation_alias( query_target, target_name ) self.already_extracted.setdefault(target_name, {})["prefix"] = table_prefix diff --git a/ormar/relations/alias_manager.py b/ormar/relations/alias_manager.py index 46c68c9..a990bfc 100644 --- a/ormar/relations/alias_manager.py +++ b/ormar/relations/alias_manager.py @@ -11,11 +11,25 @@ if TYPE_CHECKING: # pragma: no cover def get_table_alias() -> str: + """ + Creates a random string that is used to alias tables in joins. + It's necessary that each relation has it's own aliases cause you can link + to the same target tables from multiple fields on one model as well as from + multiple different models in one join. + + :return: randomly generated alias + :rtype: str + """ alias = "".join(choices(string.ascii_uppercase, k=2)) + uuid.uuid4().hex[:4] return alias.lower() class AliasManager: + """ + Keep all aliases of relations between different tables. + One global instance is shared between all models. + """ + def __init__(self) -> None: self._aliases: Dict[str, str] = dict() self._aliases_new: Dict[str, str] = dict() @@ -24,6 +38,22 @@ class AliasManager: def prefixed_columns( alias: str, table: sqlalchemy.Table, fields: List = None ) -> List[text]: + """ + Creates a list of aliases sqlalchemy text clauses from + string alias and sqlalchemy.Table. + + Optional list of fields to include can be passed to extract only those columns. + List has to have sqlalchemy names of columns (ormar aliases) not the ormar ones. + + :param alias: alias of given table + :type alias: str + :param table: table from which fields should be aliased + :type table: sqlalchemy.Table + :param fields: fields to include + :type fields: Optional[List[str]] + :return: list of sqlalchemy text clauses with "column name as aliased name" + :rtype: List[text] + """ alias = f"{alias}_" if alias else "" all_columns = ( table.columns @@ -37,11 +67,49 @@ class AliasManager: @staticmethod def prefixed_table_name(alias: str, name: str) -> text: + """ + Creates text clause with table name with aliased name. + + :param alias: alias of given table + :type alias: str + :param name: table name + :type name: str + :return: sqlalchemy text clause as "table_name aliased_name" + :rtype: sqlalchemy text clause + """ return text(f"{name} {alias}_{name}") def add_relation_type( - self, source_model: Type["Model"], relation_name: str + self, + source_model: Type["Model"], + relation_name: str, + reverse_name: str = None, + is_multi: bool = False, ) -> None: + """ + Registers the relations defined in ormar models. + Given the relation it registers also the reverse side of this relation. + + Used by both ForeignKey and ManyToMany relations. + + Each relation is registered as Model name and relation name. + Each alias registered has to be unique. + + Aliases are used to construct joins to assure proper links between tables. + That way you can link to the same target tables from multiple fields + on one model as well as from multiple different models in one join. + + :param source_model: model with relation defined + :type source_model: source Model + :param relation_name: name of the relation to define + :type relation_name: str + :param reverse_name: name of related_name fo given relation for m2m relations + :type reverse_name: Optional[str] + :param is_multi: flag if relation being registered is a through m2m model + :type is_multi: bool + :return: none + :rtype: None + """ parent_key = f"{source_model.get_name()}_{relation_name}" if parent_key not in self._aliases_new: self._aliases_new[parent_key] = get_table_alias() @@ -49,15 +117,24 @@ class AliasManager: child_model = to_field.to related_name = to_field.related_name if not related_name: - related_name = child_model.resolve_relation_name( - child_model, source_model, explicit_multi=True - ) + related_name = reverse_name if is_multi else source_model.get_name() + "s" + child_key = f"{child_model.get_name()}_{related_name}" if child_key not in self._aliases_new: self._aliases_new[child_key] = get_table_alias() - def resolve_relation_join( + def resolve_relation_alias( self, from_model: Type["Model"], relation_name: str ) -> str: + """ + Given model and relation name returns the alias for this relation. + + :param from_model: model with relation defined + :type from_model: source Model + :param relation_name: name of the relation field + :type relation_name: str + :return: alias of the relation + :rtype: str + """ alias = self._aliases_new.get(f"{from_model.get_name()}_{relation_name}", "") return alias diff --git a/ormar/relations/relation_manager.py b/ormar/relations/relation_manager.py index 211068d..6eeaac5 100644 --- a/ormar/relations/relation_manager.py +++ b/ormar/relations/relation_manager.py @@ -67,7 +67,7 @@ class RelationsManager: to_field: Type[BaseField] = child.Meta.model_fields[relation_name] # print('comming', child_name, relation_name) (parent, child, child_name, to_name,) = get_relations_sides_and_names( - to_field, parent, child, child_name, virtual + to_field, parent, child, child_name, virtual, relation_name ) # print('adding', parent.get_name(), child.get_name(), child_name) diff --git a/ormar/relations/utils.py b/ormar/relations/utils.py index 9fa09b0..bad83e2 100644 --- a/ormar/relations/utils.py +++ b/ormar/relations/utils.py @@ -14,16 +14,11 @@ def get_relations_sides_and_names( child: "Model", child_name: str, virtual: bool, + relation_name: str, ) -> Tuple["Model", "Model", str, str]: to_name = to_field.name if issubclass(to_field, ManyToManyField): - child_name, to_name = ( - to_field.related_name - or child.resolve_relation_name( - parent, to_field.through, explicit_multi=True - ), - to_name, - ) + child_name = to_field.related_name or child.get_name() + "s" child = proxy(child) elif virtual: child_name, to_name = to_name, child_name or child.get_name()