diff --git a/ormar/fields/base.py b/ormar/fields/base.py index 8c25810..3a824e2 100644 --- a/ormar/fields/base.py +++ b/ormar/fields/base.py @@ -87,9 +87,9 @@ class BaseField(FieldInfo): :rtype: bool """ return ( - field_name not in ["default", "default_factory", "alias"] - and not field_name.startswith("__") - and hasattr(cls, field_name) + field_name not in ["default", "default_factory", "alias"] + and not field_name.startswith("__") + and hasattr(cls, field_name) ) @classmethod @@ -180,7 +180,7 @@ class BaseField(FieldInfo): :rtype: bool """ return cls.default is not None or ( - cls.server_default is not None and use_server + cls.server_default is not None and use_server ) @classmethod @@ -199,9 +199,12 @@ class BaseField(FieldInfo): @classmethod def construct_contraints(cls) -> List: - return [sqlalchemy.schema.ForeignKey( - con.name, ondelete=con.ondelete, onupdate=con.onupdate - ) for con in cls.constraints] + return [ + sqlalchemy.schema.ForeignKey( + con.name, ondelete=con.ondelete, onupdate=con.onupdate + ) + for con in cls.constraints + ] @classmethod def get_column(cls, name: str) -> sqlalchemy.Column: @@ -229,11 +232,11 @@ class BaseField(FieldInfo): @classmethod def expand_relationship( - cls, - value: Any, - child: Union["Model", "NewBaseModel"], - to_register: bool = True, - relation_name: str = None, + cls, + value: Any, + child: Union["Model", "NewBaseModel"], + to_register: bool = True, + relation_name: str = None, ) -> Any: """ Function overwritten for relations, in basic field the value is returned as is. diff --git a/ormar/fields/foreign_key.py b/ormar/fields/foreign_key.py index 5ef8e78..9a5bc5b 100644 --- a/ormar/fields/foreign_key.py +++ b/ormar/fields/foreign_key.py @@ -2,7 +2,6 @@ import uuid from dataclasses import dataclass from typing import Any, List, Optional, TYPE_CHECKING, Type, Union -import sqlalchemy from pydantic import BaseModel, create_model from sqlalchemy import UniqueConstraint @@ -46,8 +45,8 @@ def create_dummy_instance(fk: Type["Model"], pk: Any = None) -> "Model": def create_dummy_model( - base_model: Type["Model"], - pk_field: Type[Union[BaseField, "ForeignKeyField", "ManyToManyField"]], + base_model: Type["Model"], + pk_field: Type[Union[BaseField, "ForeignKeyField", "ManyToManyField"]], ) -> Type["BaseModel"]: """ Used to construct a dummy pydantic model for type hints and pydantic validation. @@ -84,16 +83,16 @@ class ForeignKeyConstraint: def ForeignKey( # noqa CFQ002 - to: Type["Model"], - *, - name: str = None, - unique: bool = False, - nullable: bool = True, - related_name: str = None, - virtual: bool = False, - onupdate: str = None, - ondelete: str = None, - **kwargs: Any, + to: Type["Model"], + *, + name: str = None, + unique: bool = False, + nullable: bool = True, + related_name: str = None, + virtual: bool = False, + onupdate: str = None, + ondelete: str = None, + **kwargs: Any, ) -> Any: """ Despite a name it's a function that returns constructed ForeignKeyField. @@ -140,7 +139,9 @@ def ForeignKey( # noqa CFQ002 name=kwargs.pop("real_name", None), nullable=nullable, constraints=[ - ForeignKeyConstraint(name=fk_string, ondelete=ondelete, onupdate=onupdate) + ForeignKeyConstraint( + name=fk_string, ondelete=ondelete, onupdate=onupdate # type: ignore + ) ], unique=unique, column_type=to_field.column_type, @@ -168,7 +169,7 @@ class ForeignKeyField(BaseField): @classmethod def _extract_model_from_sequence( - cls, value: List, child: "Model", to_register: bool, relation_name: str + cls, value: List, child: "Model", to_register: bool, relation_name: str ) -> List["Model"]: """ Takes a list of Models and registers them on parent. @@ -197,7 +198,7 @@ class ForeignKeyField(BaseField): @classmethod def _register_existing_model( - cls, value: "Model", child: "Model", to_register: bool, relation_name: str + cls, value: "Model", child: "Model", to_register: bool, relation_name: str ) -> "Model": """ Takes already created instance and registers it for parent. @@ -220,7 +221,7 @@ class ForeignKeyField(BaseField): @classmethod def _construct_model_from_dict( - cls, value: dict, child: "Model", to_register: bool, relation_name: str + cls, value: dict, child: "Model", to_register: bool, relation_name: str ) -> "Model": """ Takes a dictionary, creates a instance and registers it for parent. @@ -247,7 +248,7 @@ class ForeignKeyField(BaseField): @classmethod def _construct_model_from_pk( - cls, value: Any, child: "Model", to_register: bool, relation_name: str + cls, value: Any, child: "Model", to_register: bool, relation_name: str ) -> "Model": """ Takes a pk value, creates a dummy instance and registers it for parent. @@ -279,7 +280,7 @@ class ForeignKeyField(BaseField): @classmethod def register_relation( - cls, model: "Model", child: "Model", relation_name: str + cls, model: "Model", child: "Model", relation_name: str ) -> None: """ Registers relation between parent and child in relation manager. @@ -303,11 +304,11 @@ class ForeignKeyField(BaseField): @classmethod def expand_relationship( - cls, - value: Any, - child: Union["Model", "NewBaseModel"], - to_register: bool = True, - relation_name: str = None, + cls, + value: Any, + child: Union["Model", "NewBaseModel"], + to_register: bool = True, + relation_name: str = None, ) -> Optional[Union["Model", List["Model"]]]: """ For relations the child model is first constructed (if needed), diff --git a/ormar/models/__init__.py b/ormar/models/__init__.py index e6d8bd5..9e366f1 100644 --- a/ormar/models/__init__.py +++ b/ormar/models/__init__.py @@ -1,5 +1,4 @@ from ormar.models.newbasemodel import NewBaseModel # noqa I100 from ormar.models.model import Model # noqa I100 -from ormar.models.metaclass import expand_reverse_relationships # noqa I100 -__all__ = ["NewBaseModel", "Model", "expand_reverse_relationships"] +__all__ = ["NewBaseModel", "Model"] diff --git a/ormar/models/helpers/__init__.py b/ormar/models/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ormar/models/helpers/models.py b/ormar/models/helpers/models.py new file mode 100644 index 0000000..8f76b9b --- /dev/null +++ b/ormar/models/helpers/models.py @@ -0,0 +1,39 @@ +from typing import Dict, List, Optional, TYPE_CHECKING, Type + +from ormar import ModelDefinitionError +from ormar.fields.foreign_key import ForeignKeyField + +if TYPE_CHECKING: # pragma no cover + from ormar import Model + + +def validate_related_names_in_relations( + model_fields: Dict, new_model: Type["Model"] +) -> None: + """ + Performs a validation of relation_names in relation fields. + If multiple fields are leading to the same related model + only one can have empty related_name param + (populated by default as model.name.lower()+'s'). + Also related_names have to be unique for given related model. + + :raises: ModelDefinitionError if validation of related_names fail + :param model_fields: dictionary of declared ormar model fields + :type model_fields: Dict[str, ormar.Field] + :param new_model: + :type new_model: Model class + """ + already_registered: Dict[str, List[Optional[str]]] = dict() + for field in model_fields.values(): + if issubclass(field, ForeignKeyField): + previous_related_names = already_registered.setdefault(field.to, []) + if field.related_name in previous_related_names: + raise ModelDefinitionError( + f"Multiple fields declared on {new_model.get_name(lower=False)} " + f"model leading to {field.to.get_name(lower=False)} model without " + f"related_name property set. \nThere can be only one relation with " + f"default/empty name: '{new_model.get_name() + 's'}'" + f"\nTip: provide different related_name for FK and/or M2M fields" + ) + else: + previous_related_names.append(field.related_name) diff --git a/ormar/models/helpers/pydantic.py b/ormar/models/helpers/pydantic.py new file mode 100644 index 0000000..6060f36 --- /dev/null +++ b/ormar/models/helpers/pydantic.py @@ -0,0 +1,221 @@ +import warnings +from typing import Dict, Optional, TYPE_CHECKING, Tuple, Type + +from pydantic import BaseConfig +from pydantic.fields import ModelField +from pydantic.utils import lenient_issubclass + +import ormar # noqa: I100, I202 +from ormar.fields import BaseField, ManyToManyField + +if TYPE_CHECKING: # pragma no cover + from ormar import Model + + +def reverse_field_not_already_registered( + child: Type["Model"], child_model_name: str, parent_model: Type["Model"] +) -> bool: + """ + Checks if child is already registered in parents pydantic fields. + + :param child: related Model class + :type child: ormar.models.metaclass.ModelMetaclass + :param child_model_name: related_name of the child if provided + :type child_model_name: str + :param parent_model: parent Model class + :type parent_model: ormar.models.metaclass.ModelMetaclass + :return: result of the check + :rtype: bool + """ + return ( + child_model_name not in parent_model.__fields__ + and child.get_name() not in parent_model.__fields__ + ) + + +def create_pydantic_field( + field_name: str, model: Type["Model"], model_field: Type[ManyToManyField] +) -> None: + """ + Registers pydantic field on through model that leads to passed model + and is registered as field_name passed. + + Through model is fetched from through attributed on passed model_field. + + :param field_name: field name to register + :type field_name: str + :param model: type of field to register + :type model: Model class + :param model_field: relation field from which through model is extracted + :type model_field: ManyToManyField class + """ + model_field.through.__fields__[field_name] = ModelField( + name=field_name, + type_=model, + model_config=model.__config__, + required=False, + class_validators={}, + ) + + +def get_pydantic_field(field_name: str, model: Type["Model"]) -> "ModelField": + """ + Extracts field type and if it's required from Model model_fields by passed + field_name. Returns a pydantic field with type of field_name field type. + + :param field_name: field name to fetch from Model and name of pydantic field + :type field_name: str + :param model: type of field to register + :type model: Model class + :return: newly created pydantic field + :rtype: pydantic.ModelField + """ + return ModelField( + name=field_name, + type_=model.Meta.model_fields[field_name].__type__, # type: ignore + model_config=model.__config__, + required=not model.Meta.model_fields[field_name].nullable, + class_validators={}, + ) + + +def populate_default_pydantic_field_value( + ormar_field: Type[BaseField], field_name: str, attrs: dict +) -> dict: + """ + Grabs current value of the ormar Field in class namespace + (so the default_value declared on ormar model if set) + and converts it to pydantic.FieldInfo + that pydantic is able to extract later. + + On FieldInfo there are saved all needed params like max_length of the string + and other constraints that pydantic can use to build + it's own field validation used by ormar. + + :param ormar_field: field to convert + :type ormar_field: ormar Field + :param field_name: field to convert name + :type field_name: str + :param attrs: current class namespace + :type attrs: Dict + :return: updated namespace dict + :rtype: Dict + """ + curr_def_value = attrs.get(field_name, ormar.Undefined) + if lenient_issubclass(curr_def_value, ormar.fields.BaseField): + curr_def_value = ormar.Undefined + if curr_def_value is None: + attrs[field_name] = ormar_field.convert_to_pydantic_field_info(allow_null=True) + else: + attrs[field_name] = ormar_field.convert_to_pydantic_field_info() + return attrs + + +def populate_pydantic_default_values(attrs: Dict) -> Tuple[Dict, Dict]: + """ + Extracts ormar fields from annotations (deprecated) and from namespace + dictionary of the class. Fields declared on model are all subclasses of the + BaseField class. + + Trigger conversion of ormar field into pydantic FieldInfo, which has all needed + paramaters saved. + + Overwrites the annotations of ormar fields to corresponding types declared on + ormar fields (constructed dynamically for relations). + Those annotations are later used by pydantic to construct it's own fields. + + :param attrs: current class namespace + :type attrs: Dict + :return: namespace of the class updated, dict of extracted model_fields + :rtype: Tuple[Dict, Dict] + """ + model_fields = {} + potential_fields = { + k: v + for k, v in attrs["__annotations__"].items() + if lenient_issubclass(v, BaseField) + } + if potential_fields: + warnings.warn( + "Using ormar.Fields as type Model annotation has been deprecated," + " check documentation of current version", + DeprecationWarning, + ) + + potential_fields.update(get_potential_fields(attrs)) + for field_name, field in potential_fields.items(): + field.name = field_name + attrs = populate_default_pydantic_field_value(field, field_name, attrs) + model_fields[field_name] = field + attrs["__annotations__"][field_name] = ( + field.__type__ if not field.nullable else Optional[field.__type__] + ) + return attrs, model_fields + + +def get_pydantic_base_orm_config() -> Type[BaseConfig]: + """ + Returns empty pydantic Config with orm_mode set to True. + + :return: empty default config with orm_mode set. + :rtype: pydantic Config + """ + + class Config(BaseConfig): + orm_mode = True + + return Config + + +def populate_default_options_values( + new_model: Type["Model"], model_fields: Dict +) -> None: + """ + Sets all optional Meta values to it's defaults + and set model_fields that were already previously extracted. + + Here should live all options that are not overwritten/set for all models. + + Current options are: + * constraints = [] + * abstract = False + + :param new_model: newly constructed Model + :type new_model: Model class + :param model_fields: + :type model_fields: Union[Dict[str, type], Dict] + """ + if not hasattr(new_model.Meta, "constraints"): + new_model.Meta.constraints = [] + if not hasattr(new_model.Meta, "model_fields"): + new_model.Meta.model_fields = model_fields + if not hasattr(new_model.Meta, "abstract"): + new_model.Meta.abstract = False + + +def get_potential_fields(attrs: Dict) -> Dict: + """ + Gets all the fields in current class namespace that are Fields. + + :param attrs: current class namespace + :type attrs: Dict + :return: extracted fields that are ormar Fields + :rtype: Dict + """ + return {k: v for k, v in attrs.items() if lenient_issubclass(v, BaseField)} + + +def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]: + """ + Extracts annotations from class namespace dict and triggers + extraction of ormar model_fields. + + :param attrs: namespace of the class created + :type attrs: Dict + :return: namespace of the class updated, dict of extracted model_fields + :rtype: Tuple[Dict, Dict] + """ + key = "__annotations__" + attrs[key] = attrs.get(key, {}) + attrs, model_fields = populate_pydantic_default_values(attrs) + return attrs, model_fields diff --git a/ormar/models/helpers/relations.py b/ormar/models/helpers/relations.py new file mode 100644 index 0000000..b98243d --- /dev/null +++ b/ormar/models/helpers/relations.py @@ -0,0 +1,138 @@ +from typing import TYPE_CHECKING, Type + +from ormar import ForeignKey, ManyToMany +from ormar.fields import ManyToManyField +from ormar.fields.foreign_key import ForeignKeyField +from ormar.models.helpers.pydantic import reverse_field_not_already_registered +from ormar.models.helpers.sqlalchemy import adjust_through_many_to_many_model +from ormar.relations import AliasManager + +if TYPE_CHECKING: # pragma no cover + from ormar import Model + +alias_manager = AliasManager() + + +def register_relation_on_build(new_model: Type["Model"], field_name: str) -> None: + """ + Registers ForeignKey relation in alias_manager to set a table_prefix. + Registration include also reverse relation side to be able to join both sides. + + Relation is registered by model name and relation field name to allow for multiple + relations between two Models that needs to have different + aliases for proper sql joins. + + :param new_model: constructed model + :type new_model: Model class + :param field_name: name of the related field + :type field_name: str + """ + alias_manager.add_relation_type(new_model, field_name) + + +def register_many_to_many_relation_on_build( + new_model: Type["Model"], field: Type[ManyToManyField] +) -> None: + """ + Registers connection between through model and both sides of the m2m relation. + Registration include also reverse relation side to be able to join both sides. + + Relation is registered by model name and relation field name to allow for multiple + relations between two Models that needs to have different + aliases for proper sql joins. + + By default relation name is a model.name.lower(). + + :param new_model: model on which m2m field is declared + :type new_model: Model class + :param field: relation field + :type field: ManyToManyField class + """ + alias_manager.add_relation_type(field.through, new_model.get_name(), is_multi=True) + alias_manager.add_relation_type(field.through, field.to.get_name(), is_multi=True) + + +def expand_reverse_relationships(model: Type["Model"]) -> None: + """ + Iterates through model_fields of given model and verifies if all reverse + relation have been populated on related models. + + If the reverse relation has not been set before it's set here. + + :param model: model on which relation should be checked and registered + :type model: Model class + """ + for model_field in model.Meta.model_fields.values(): + if issubclass(model_field, ForeignKeyField): + child_model_name = model_field.related_name or model.get_name() + "s" + parent_model = model_field.to + child = model + if reverse_field_not_already_registered( + child, child_model_name, parent_model + ): + register_reverse_model_fields( + parent_model, child, child_model_name, model_field + ) + + +def register_reverse_model_fields( + model: Type["Model"], + child: Type["Model"], + related_name: str, + model_field: Type["ForeignKeyField"], +) -> None: + """ + Registers reverse ForeignKey field on related model. + By default it's name.lower()+'s' of the model on which relation is defined. + + But if the related_model name is provided it's registered with that name. + Autogenerated reverse fields also set related_name to the original field name. + + :param model: related model on which reverse field should be defined + :type model: Model class + :param child: parent model with relation definition + :type child: Model class + :param related_name: name by which reverse key should be registered + :type related_name: str + :param model_field: original relation ForeignKey field + :type model_field: relation Field + """ + if issubclass(model_field, ManyToManyField): + model.Meta.model_fields[related_name] = ManyToMany( + child, + through=model_field.through, + name=related_name, + virtual=True, + related_name=model_field.name, + ) + # register foreign keys on through model + adjust_through_many_to_many_model(model, child, model_field) + else: + model.Meta.model_fields[related_name] = ForeignKey( + child, real_name=related_name, virtual=True, related_name=model_field.name, + ) + + +def register_relation_in_alias_manager( + new_model: Type["Model"], field: Type[ForeignKeyField], field_name: str +) -> None: + """ + Registers the relation (and reverse relation) in alias manager. + The m2m relations require registration of through model between + actual end models of the relation. + + Delegates the actual registration to: + m2m - register_many_to_many_relation_on_build + fk - register_relation_on_build + + :param new_model: model on which relation field is declared + :type new_model: Model class + :param field: relation field + :type field: ForeignKey or ManyToManyField class + :param field_name: name of the relation key + :type field_name: str + """ + if issubclass(field, ManyToManyField): + register_many_to_many_relation_on_build(new_model=new_model, field=field) + elif issubclass(field, ForeignKeyField): + register_relation_on_build(new_model=new_model, field_name=field_name) diff --git a/ormar/models/helpers/sqlalchemy.py b/ormar/models/helpers/sqlalchemy.py new file mode 100644 index 0000000..4a9ff8b --- /dev/null +++ b/ormar/models/helpers/sqlalchemy.py @@ -0,0 +1,208 @@ +import logging +from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type + +import sqlalchemy + +from ormar import ForeignKey, Integer, ModelDefinitionError # noqa: I202 +from ormar.fields import BaseField, ManyToManyField +from ormar.models.helpers.models import validate_related_names_in_relations +from ormar.models.helpers.pydantic import create_pydantic_field + +if TYPE_CHECKING: # pragma no cover + from ormar import Model + + +def adjust_through_many_to_many_model( + model: Type["Model"], child: Type["Model"], model_field: Type[ManyToManyField] +) -> None: + """ + Registers m2m relation on through model. + Sets ormar.ForeignKey from through model to both child and parent models. + Sets sqlalchemy.ForeignKey to both child and parent models. + Sets pydantic fields with child and parent model types. + + :param model: model on which relation is declared + :type model: Model class + :param child: model to which m2m relation leads + :type child: Model class + :param model_field: relation field defined in parent model + :type model_field: ManyToManyField + """ + model_field.through.Meta.model_fields[model.get_name()] = ForeignKey( + model, real_name=model.get_name(), ondelete="CASCADE" + ) + model_field.through.Meta.model_fields[child.get_name()] = ForeignKey( + child, real_name=child.get_name(), ondelete="CASCADE" + ) + + create_and_append_m2m_fk(model, model_field) + create_and_append_m2m_fk(child, model_field) + + create_pydantic_field(model.get_name(), model, model_field) + create_pydantic_field(child.get_name(), child, model_field) + + +def create_and_append_m2m_fk( + model: Type["Model"], model_field: Type[ManyToManyField] +) -> None: + """ + Registers sqlalchemy Column with sqlalchemy.ForeignKey leadning to the model. + + Newly created field is added to m2m relation through model Meta columns and table. + + :param model: Model class to which FK should be created + :type model: Model class + :param model_field: field with ManyToMany relation + :type model_field: ManyToManyField field + """ + column = sqlalchemy.Column( + model.get_name(), + model.Meta.table.columns.get(model.get_column_alias(model.Meta.pkname)).type, + sqlalchemy.schema.ForeignKey( + model.Meta.tablename + "." + model.get_column_alias(model.Meta.pkname), + ondelete="CASCADE", + onupdate="CASCADE", + ), + ) + model_field.through.Meta.columns.append(column) + model_field.through.Meta.table.append_column(column) + + +def check_pk_column_validity( + field_name: str, field: BaseField, pkname: Optional[str] +) -> Optional[str]: + """ + Receives the field marked as primary key and verifies if the pkname + was not already set (only one allowed per model) and if field is not marked + as pydantic_only as it needs to be a database field. + + :raises: ModelDefintionError if pkname already set or field is pydantic_only + :param field_name: name of field + :type field_name: str + :param field: ormar.Field + :type field: BaseField + :param pkname: already set pkname + :type pkname: Optional[str] + :return: name of the field that should be set as pkname + :rtype: str + """ + if pkname is not None: + raise ModelDefinitionError("Only one primary key column is allowed.") + if field.pydantic_only: + raise ModelDefinitionError("Primary key column cannot be pydantic only") + return field_name + + +def sqlalchemy_columns_from_model_fields( + model_fields: Dict, new_model: Type["Model"] +) -> Tuple[Optional[str], List[sqlalchemy.Column]]: + """ + Iterates over declared on Model model fields and extracts fields that + should be treated as database fields. + + If the model is empty it sets mandatory id field as primary key + (used in through models in m2m relations). + + Triggers a validation of relation_names in relation fields. If multiple fields + are leading to the same related model only one can have empty related_name param. + Also related_names have to be unique. + + Trigger validation of primary_key - only one and required pk can be set, + cannot be pydantic_only. + + Append fields to columns if it's not pydantic_only, + virtual ForeignKey or ManyToMany field. + + :raises: ModelDefinitionError if validation of related_names fail, + or pkname validation fails. + :param model_fields: dictionary of declared ormar model fields + :type model_fields: Dict[str, ormar.Field] + :param new_model: + :type new_model: Model class + :return: pkname, list of sqlalchemy columns + :rtype: Tuple[Optional[str], List[sqlalchemy.Column]] + """ + columns = [] + pkname = None + if len(model_fields.keys()) == 0: + model_fields["id"] = Integer(name="id", primary_key=True) + logging.warning( + "Table {table_name} had no fields so auto " + "Integer primary key named `id` created." + ) + validate_related_names_in_relations(model_fields, new_model) + for field_name, field in model_fields.items(): + if field.primary_key: + pkname = check_pk_column_validity(field_name, field, pkname) + if ( + not field.pydantic_only + and not field.virtual + and not issubclass(field, ManyToManyField) + ): + columns.append(field.get_column(field.get_alias())) + return pkname, columns + + +def populate_meta_tablename_columns_and_pk( + name: str, new_model: Type["Model"] +) -> Type["Model"]: + """ + Sets Model tablename if it's not already set in Meta. + Default tablename if not present is class name lower + s (i.e. Bed becomes -> beds) + + Checks if Model's Meta have pkname and columns set. + If not calls the sqlalchemy_columns_from_model_fields to populate + columns from ormar.fields definitions. + + :raises: if pkname is not present raises ModelDefinitionError. + Each model has to have pk. + + :param name: name of the current Model + :type name: str + :param new_model: currently constructed Model + :type new_model: ormar.models.metaclass.ModelMetaclass + :return: Model with populated pkname and columns in Meta + :rtype: ormar.models.metaclass.ModelMetaclass + """ + tablename = name.lower() + "s" + new_model.Meta.tablename = ( + new_model.Meta.tablename if hasattr(new_model.Meta, "tablename") else tablename + ) + pkname: Optional[str] + + if hasattr(new_model.Meta, "columns"): + columns = new_model.Meta.table.columns + pkname = new_model.Meta.pkname + else: + pkname, columns = sqlalchemy_columns_from_model_fields( + new_model.Meta.model_fields, new_model + ) + + if pkname is None: + raise ModelDefinitionError("Table has to have a primary key.") + + new_model.Meta.columns = columns + new_model.Meta.pkname = pkname + return new_model + + +def populate_meta_sqlalchemy_table_if_required( + new_model: Type["Model"], +) -> Type["Model"]: + """ + Constructs sqlalchemy table out of columns and parameters set on Meta class. + It populates name, metadata, columns and constraints. + + :param new_model: class without sqlalchemy table constructed + :type new_model: Model class + :return: class with populated Meta.table + :rtype: Model class + """ + if not hasattr(new_model.Meta, "table"): + new_model.Meta.table = sqlalchemy.Table( + new_model.Meta.tablename, + new_model.Meta.metadata, + *new_model.Meta.columns, + *new_model.Meta.constraints, + ) + return new_model diff --git a/ormar/models/metaclass.py b/ormar/models/metaclass.py index b2b0110..8a12716 100644 --- a/ormar/models/metaclass.py +++ b/ormar/models/metaclass.py @@ -1,6 +1,3 @@ -import copy -import logging -import warnings from typing import ( Any, Dict, @@ -11,21 +8,36 @@ from typing import ( Tuple, Type, Union, + cast, ) import databases import pydantic import sqlalchemy -from pydantic import BaseConfig -from pydantic.fields import FieldInfo, ModelField -from pydantic.utils import lenient_issubclass +from pydantic.fields import FieldInfo from sqlalchemy.sql.schema import ColumnCollectionConstraint import ormar # noqa I100 from ormar import ForeignKey, Integer, ModelDefinitionError # noqa I100 from ormar.fields import BaseField from ormar.fields.foreign_key import ForeignKeyField -from ormar.fields.many_to_many import ManyToMany, ManyToManyField +from ormar.fields.many_to_many import ManyToManyField +from ormar.models.helpers.pydantic import ( + extract_annotations_and_default_vals, + get_potential_fields, + get_pydantic_base_orm_config, + get_pydantic_field, + populate_default_options_values, +) +from ormar.models.helpers.relations import ( + alias_manager, + register_relation_in_alias_manager, +) +from ormar.models.helpers.relations import expand_reverse_relationships +from ormar.models.helpers.sqlalchemy import ( + populate_meta_sqlalchemy_table_if_required, + populate_meta_tablename_columns_and_pk, +) from ormar.models.quick_access_views import quick_access_set from ormar.queryset import QuerySet from ormar.relations.alias_manager import AliasManager @@ -34,7 +46,6 @@ from ormar.signals import Signal, SignalEmitter if TYPE_CHECKING: # pragma no cover from ormar import Model -alias_manager = AliasManager() PARSED_FIELDS_KEY = "__parsed_fields__" CONFIG_KEY = "Config" @@ -56,295 +67,6 @@ class ModelMeta: abstract: bool -def register_relation_on_build_new(new_model: Type["Model"], field_name: str) -> None: - alias_manager.add_relation_type_new(new_model, field_name) - - -def register_many_to_many_relation_on_build_new( - new_model: Type["Model"], field: Type[ManyToManyField] -) -> None: - alias_manager.add_relation_type_new( - field.through, new_model.get_name(), is_multi=True - ) - alias_manager.add_relation_type_new( - field.through, field.to.get_name(), is_multi=True - ) - - -def reverse_field_not_already_registered( - child: Type["Model"], child_model_name: str, parent_model: Type["Model"] -) -> bool: - return ( - child_model_name not in parent_model.__fields__ - and child.get_name() not in parent_model.__fields__ - ) - - -def expand_reverse_relationships(model: Type["Model"]) -> None: - for model_field in model.Meta.model_fields.values(): - if issubclass(model_field, ForeignKeyField): - child_model_name = model_field.related_name or model.get_name() + "s" - parent_model = model_field.to - child = model - if reverse_field_not_already_registered( - child, child_model_name, parent_model - ): - register_reverse_model_fields( - parent_model, child, child_model_name, model_field - ) - - -def register_reverse_model_fields( - model: Type["Model"], - child: Type["Model"], - child_model_name: str, - model_field: Type["ForeignKeyField"], -) -> None: - if issubclass(model_field, ManyToManyField): - model.Meta.model_fields[child_model_name] = ManyToMany( - child, - through=model_field.through, - name=child_model_name, - virtual=True, - related_name=model_field.name, - ) - # register foreign keys on through model - adjust_through_many_to_many_model(model, child, model_field, child_model_name) - else: - model.Meta.model_fields[child_model_name] = ForeignKey( - child, - real_name=child_model_name, - virtual=True, - related_name=model_field.name, - ) - - -def adjust_through_many_to_many_model( - model: Type["Model"], - child: Type["Model"], - model_field: Type[ManyToManyField], - child_model_name: str, -) -> None: - model_field.through.Meta.model_fields[model.get_name()] = ForeignKey( - model, real_name=model.get_name(), ondelete="CASCADE" - ) - model_field.through.Meta.model_fields[child.get_name()] = ForeignKey( - child, real_name=child.get_name(), ondelete="CASCADE" - ) - - create_and_append_m2m_fk(model, model_field) - create_and_append_m2m_fk(child, model_field) - - create_pydantic_field(model.get_name(), model, model_field) - create_pydantic_field(child.get_name(), child, model_field) - - -def create_pydantic_field( - field_name: str, model: Type["Model"], model_field: Type[ManyToManyField] -) -> None: - model_field.through.__fields__[field_name] = ModelField( - name=field_name, - type_=model, - model_config=model.__config__, - required=False, - class_validators={}, - ) - - -def get_pydantic_field(field_name: str, model: Type["Model"]) -> "ModelField": - return ModelField( - name=field_name, - type_=model.Meta.model_fields[field_name].__type__, # type: ignore - model_config=model.__config__, - required=not model.Meta.model_fields[field_name].nullable, - class_validators={}, - ) - - -def create_and_append_m2m_fk( - model: Type["Model"], model_field: Type[ManyToManyField] -) -> None: - column = sqlalchemy.Column( - model.get_name(), - model.Meta.table.columns.get(model.get_column_alias(model.Meta.pkname)).type, - sqlalchemy.schema.ForeignKey( - model.Meta.tablename + "." + model.get_column_alias(model.Meta.pkname), - ondelete="CASCADE", - onupdate="CASCADE", - ), - ) - model_field.through.Meta.columns.append(column) - model_field.through.Meta.table.append_column(column) - - -def check_pk_column_validity( - field_name: str, field: BaseField, pkname: Optional[str] -) -> Optional[str]: - if pkname is not None: - raise ModelDefinitionError("Only one primary key column is allowed.") - if field.pydantic_only: - raise ModelDefinitionError("Primary key column cannot be pydantic only") - return field_name - - -def validate_related_names_in_relations( - model_fields: Dict, new_model: Type["Model"] -) -> None: - already_registered: Dict[str, List[Optional[str]]] = dict() - for field in model_fields.values(): - if issubclass(field, ForeignKeyField): - previous_related_names = already_registered.setdefault(field.to, []) - if field.related_name in previous_related_names: - raise ModelDefinitionError( - f"Multiple fields declared on {new_model.get_name(lower=False)} " - f"model leading to {field.to.get_name(lower=False)} model without " - f"related_name property set. \nThere can be only one relation with " - f"default/empty name: '{new_model.get_name() + 's'}'" - f"\nTip: provide different related_name for FK and/or M2M fields" - ) - else: - previous_related_names.append(field.related_name) - - -def sqlalchemy_columns_from_model_fields( - model_fields: Dict, new_model: Type["Model"] -) -> Tuple[Optional[str], List[sqlalchemy.Column]]: - columns = [] - pkname = None - if len(model_fields.keys()) == 0: - model_fields["id"] = Integer(name="id", primary_key=True) - logging.warning( - "Table {table_name} had no fields so auto " - "Integer primary key named `id` created." - ) - validate_related_names_in_relations(model_fields, new_model) - for field_name, field in model_fields.items(): - if field.primary_key: - pkname = check_pk_column_validity(field_name, field, pkname) - if ( - not field.pydantic_only - and not field.virtual - and not issubclass(field, ManyToManyField) - ): - columns.append(field.get_column(field.get_alias())) - return pkname, columns - - -def register_relation_in_alias_manager_new( - new_model: Type["Model"], field: Type[ForeignKeyField], field_name: str -) -> None: - if issubclass(field, ManyToManyField): - register_many_to_many_relation_on_build_new(new_model=new_model, field=field) - elif issubclass(field, ForeignKeyField): - register_relation_on_build_new(new_model=new_model, field_name=field_name) - - -def populate_default_pydantic_field_value( - ormar_field: Type[BaseField], field_name: str, attrs: dict -) -> dict: - curr_def_value = attrs.get(field_name, ormar.Undefined) - if lenient_issubclass(curr_def_value, ormar.fields.BaseField): - curr_def_value = ormar.Undefined - if curr_def_value is None: - attrs[field_name] = ormar_field.convert_to_pydantic_field_info(allow_null=True) - else: - attrs[field_name] = ormar_field.convert_to_pydantic_field_info() - return attrs - - -def populate_pydantic_default_values(attrs: Dict) -> Tuple[Dict, Dict]: - model_fields = {} - potential_fields = { - k: v - for k, v in attrs["__annotations__"].items() - if lenient_issubclass(v, BaseField) - } - if potential_fields: - warnings.warn( - "Using ormar.Fields as type Model annotation has been deprecated," - " check documentation of current version", - DeprecationWarning, - ) - - potential_fields.update(get_potential_fields(attrs)) - for field_name, field in potential_fields.items(): - field.name = field_name - attrs = populate_default_pydantic_field_value(field, field_name, attrs) - model_fields[field_name] = field - attrs["__annotations__"][field_name] = ( - field.__type__ if not field.nullable else Optional[field.__type__] - ) - return attrs, model_fields - - -def extract_annotations_and_default_vals(attrs: dict) -> Tuple[Dict, Dict]: - key = "__annotations__" - attrs[key] = attrs.get(key, {}) - attrs, model_fields = populate_pydantic_default_values(attrs) - return attrs, model_fields - - -def populate_meta_tablename_columns_and_pk( - name: str, new_model: Type["Model"] -) -> Type["Model"]: - tablename = name.lower() + "s" - new_model.Meta.tablename = ( - new_model.Meta.tablename if hasattr(new_model.Meta, "tablename") else tablename - ) - pkname: Optional[str] - - if hasattr(new_model.Meta, "columns"): - columns = new_model.Meta.table.columns - pkname = new_model.Meta.pkname - else: - pkname, columns = sqlalchemy_columns_from_model_fields( - new_model.Meta.model_fields, new_model - ) - - if pkname is None: - raise ModelDefinitionError("Table has to have a primary key.") - - new_model.Meta.columns = columns - new_model.Meta.pkname = pkname - return new_model - - -def populate_meta_sqlalchemy_table_if_required( - new_model: Type["Model"], -) -> Type["Model"]: - """ - Constructs sqlalchemy table out of columns and parameters set on Meta class. - It populates name, metadata, columns and constraints. - - :param new_model: class without sqlalchemy table constructed - :type new_model: Model class - :return: class with populated Meta.table - :rtype: Model class - """ - if not hasattr(new_model.Meta, "table"): - new_model.Meta.table = sqlalchemy.Table( - new_model.Meta.tablename, - new_model.Meta.metadata, - *new_model.Meta.columns, - *new_model.Meta.constraints, - ) - return new_model - - -def get_pydantic_base_orm_config() -> Type[BaseConfig]: - """ - Returns empty pydantic Config with orm_mode set to True. - - :return: empty default config with orm_mode set. - :rtype: pydantic Config - """ - - class Config(BaseConfig): - orm_mode = True - - return Config - - def check_if_field_has_choices(field: Type[BaseField]) -> bool: """ Checks if given field has choices populated. @@ -400,32 +122,6 @@ def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001 model.__pre_root_validators__ = validators -def populate_default_options_values( - new_model: Type["Model"], model_fields: Dict -) -> None: - """ - Sets all optional Meta values to it's defaults - and set model_fields that were already previously extracted. - - Here should live all options that are not overwritten/set for all models. - - Current options are: - * constraints = [] - * abstract = False - - :param new_model: newly constructed Model - :type new_model: Model class - :param model_fields: - :type model_fields: Union[Dict[str, type], Dict] - """ - if not hasattr(new_model.Meta, "constraints"): - new_model.Meta.constraints = [] - if not hasattr(new_model.Meta, "model_fields"): - new_model.Meta.model_fields = model_fields - if not hasattr(new_model.Meta, "abstract"): - new_model.Meta.abstract = False - - def add_cached_properties(new_model: Type["Model"]) -> None: """ Sets cached properties for both pydantic and ormar models. @@ -510,24 +206,12 @@ def register_signals(new_model: Type["Model"]) -> None: # noqa: CCR001 new_model.Meta.signals = signals -def get_potential_fields(attrs: Dict) -> Dict: - """ - Gets all the fields in current class namespace that are Fields. - - :param attrs: current class namespace - :type attrs: Dict - :return: extracted fields that are ormar Fields - :rtype: Dict - """ - return {k: v for k, v in attrs.items() if lenient_issubclass(v, BaseField)} - - def check_conflicting_fields( - new_fields: Set, - attrs: Dict, - base_class: type, - curr_class: type, - previous_fields: Set = None, + new_fields: Set, + attrs: Dict, + base_class: type, + curr_class: type, + previous_fields: Set = None, ) -> None: """ You cannot redefine fields with same names in inherited classes. @@ -557,11 +241,11 @@ def check_conflicting_fields( def update_attrs_and_fields( - attrs: Dict, - new_attrs: Dict, - model_fields: Dict, - new_model_fields: Dict, - new_fields: Set, + attrs: Dict, + new_attrs: Dict, + model_fields: Dict, + new_model_fields: Dict, + new_fields: Set, ) -> None: """ Updates __annotations__, values of model fields (so pydantic FieldInfos) @@ -585,7 +269,7 @@ def update_attrs_and_fields( def update_attrs_from_base_meta( # noqa: CCR001 - base_class: "Model", attrs: Dict, + base_class: "Model", attrs: Dict, ) -> None: """ Updates Meta parameters in child from parent if needed. @@ -612,13 +296,85 @@ def update_attrs_from_base_meta( # noqa: CCR001 setattr(attrs["Meta"], param, parent_value) +def copy_data_from_parent_model( # noqa: CCR001 + base_class: Type["Model"], + curr_class: type, + attrs: Dict, + model_fields: Dict[ + str, Union[Type[BaseField], Type[ForeignKeyField], Type[ManyToManyField]] + ], +) -> Tuple[Dict, Dict]: + """ + Copy the key parameters [databse, metadata, property_fields and constraints] + and fields from parent models. Overwrites them if needed. + + Only abstract classes can be subclassed. + + Since relation fields requires different related_name for different children + + + :raises: ModelDefinitionError if non abstract model is subclassed + :param base_class: one of the parent classes + :type base_class: Model or model parent class + :param curr_class: current constructed class + :type curr_class: Model or model parent class + :param attrs: new namespace for class being constructed + :type attrs: Dict + :param model_fields: ormar fields in defined in current class + :type model_fields: Dict[str, BaseField] + :return: updated attrs and model_fields + :rtype: Tuple[Dict, Dict] + """ + if attrs.get("Meta"): + new_fields = set(base_class.Meta.model_fields.keys()) # type: ignore + previous_fields = set({k for k, v in attrs.items() if isinstance(v, FieldInfo)}) + check_conflicting_fields( + new_fields=new_fields, + attrs=attrs, + base_class=base_class, + curr_class=curr_class, + previous_fields=previous_fields, + ) + if previous_fields and not base_class.Meta.abstract: # type: ignore + raise ModelDefinitionError( + f"{curr_class.__name__} cannot inherit " + f"from non abstract class {base_class.__name__}" + ) + update_attrs_from_base_meta( + base_class=base_class, # type: ignore + attrs=attrs, + ) + parent_fields = dict() + meta = attrs.get("Meta") + if not meta: # pragma: no cover + raise ModelDefinitionError( + f"Model {curr_class.__name__} declared without Meta" + ) + table_name = ( + meta.tablename + if hasattr(meta, "tablename") and meta.tablename + else attrs.get("__name__", "").lower() + "s" + ) + for field_name, field in base_class.Meta.model_fields.items(): + if issubclass(field, ForeignKeyField) and field.related_name: + copy_field = type(field.__name__, (field,), dict(field.__dict__)) + related_name = field.related_name + "_" + table_name + copy_field.related_name = related_name # type: ignore + parent_fields[field_name] = copy_field + else: + parent_fields[field_name] = field + + model_fields.update(parent_fields) # type: ignore + return attrs, model_fields + + def extract_from_parents_definition( # noqa: CCR001 - base_class: type, - curr_class: type, - attrs: Dict, - model_fields: Dict[ - str, Union[Type[BaseField], Type[ForeignKeyField], Type[ManyToManyField]] - ], + base_class: type, + curr_class: type, + attrs: Dict, + model_fields: Dict[ + str, Union[Type[BaseField], Type[ForeignKeyField], Type[ManyToManyField]] + ], ) -> Tuple[Dict, Dict]: """ Extracts fields from base classes if they have valid oramr fields. @@ -644,40 +400,13 @@ def extract_from_parents_definition( # noqa: CCR001 :rtype: Tuple[Dict, Dict] """ if hasattr(base_class, "Meta"): - if attrs.get("Meta"): - new_fields = set(base_class.Meta.model_fields.keys()) # type: ignore - previous_fields = set( - {k for k, v in attrs.items() if isinstance(v, FieldInfo)} - ) - check_conflicting_fields( - new_fields=new_fields, - attrs=attrs, - base_class=base_class, - curr_class=curr_class, - previous_fields=previous_fields, - ) - if previous_fields and not base_class.Meta.abstract: # type: ignore - raise ModelDefinitionError( - f"{curr_class.__name__} cannot inherit " - f"from non abstract class {base_class.__name__}" - ) - update_attrs_from_base_meta( - base_class=base_class, # type: ignore - attrs=attrs, - ) - parent_fields = dict() - table_name = attrs.get("Meta").tablename if hasattr(attrs.get("Meta"), "tablename") else attrs.get( - '__name__').lower() + 's' - for field_name, field in base_class.Meta.model_fields.items(): - if issubclass(field, ForeignKeyField) and field.related_name: - copy_field = type(field.__name__, (field,), dict(field.__dict__)) - copy_field.related_name = field.related_name + '_' + table_name - parent_fields[field_name] = copy_field - else: - parent_fields[field_name] = field - - model_fields.update(parent_fields) # type: ignore - return attrs, model_fields + base_class = cast(Type["Model"], base_class) + return copy_data_from_parent_model( + base_class=base_class, + curr_class=curr_class, + attrs=attrs, + model_fields=model_fields, + ) key = "__annotations__" if hasattr(base_class, PARSED_FIELDS_KEY): @@ -734,8 +463,37 @@ def extract_from_parents_definition( # noqa: CCR001 class ModelMetaclass(pydantic.main.ModelMetaclass): def __new__( # type: ignore # noqa: CCR001 - mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict + mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict ) -> "ModelMetaclass": + """ + Metaclass used by ormar Models that performs configuration + and build of ormar Models. + + + Sets pydantic configuration. + Extract model_fields and convert them to pydantic FieldInfo, + updates class namespace. + + Extracts settings and fields from parent classes. + Fetches methods decorated with @property_field decorator + to expose them later in dict(). + + Construct parent pydantic Metaclass/ Model. + + If class has Meta class declared (so actual ormar Models) it also: + * populate sqlalchemy columns, pkname and tables from model_fields + * register reverse relationships on related models + * registers all relations in alias manager that populates table_prefixes + * exposes alias manager on each Model + * creates QuerySet for each model and exposes it on a class + + :param name: name of current class + :type name: str + :param bases: base classes + :type bases: Tuple + :param attrs: class namespace + :type attrs: Dict + """ attrs["Config"] = get_pydantic_base_orm_config() attrs["__name__"] = name attrs, model_fields = extract_annotations_and_default_vals(attrs) @@ -760,7 +518,7 @@ class ModelMetaclass(pydantic.main.ModelMetaclass): new_model = populate_meta_sqlalchemy_table_if_required(new_model) expand_reverse_relationships(new_model) for field_name, field in new_model.Meta.model_fields.items(): - register_relation_in_alias_manager_new(new_model, field, field_name) + register_relation_in_alias_manager(new_model, field, field_name) if new_model.Meta.pkname not in attrs["__annotations__"]: field_name = new_model.Meta.pkname diff --git a/ormar/models/model.py b/ormar/models/model.py index 4e45bc3..8a2ee4f 100644 --- a/ormar/models/model.py +++ b/ormar/models/model.py @@ -22,6 +22,20 @@ from ormar.models.metaclass import ModelMeta def group_related_list(list_: List) -> Dict: + """ + Translates the list of related strings into a dictionary. + That way nested models are grouped to traverse them in a right order + and to avoid repetition. + + Sample: ["people__houses", "people__cars__models", "people__cars__colors"] + will become: + {'people': {'houses': [], 'cars': ['models', 'colors']}} + + :param list_: list of related models used in select related + :type list_: List[str] + :return: list converted to dictionary to avoid repetition and group nested models + :rtype: Dict[str, List] + """ test_dict: Dict[str, Any] = dict() grouped = itertools.groupby(list_, key=lambda x: x.split("__")[0]) for key, group in grouped: @@ -63,7 +77,38 @@ class Model(NewBaseModel): fields: Optional[Union[Dict, Set]] = None, exclude_fields: Optional[Union[Dict, Set]] = None, ) -> Optional[T]: + """ + Model method to convert raw sql row from database into ormar.Model instance. + Traverses nested models if they were specified in select_related for query. + Called recurrently and returns model instance if it's present in the row. + Note that it's processing one row at a time, so if there are duplicates of + parent row that needs to be joined/combined + (like parent row in sql join with 2+ child rows) + instances populated in this method are later combined in the QuerySet. + Other method working directly on raw database results is in prefetch_query, + where rows are populated in a different way as they do not have + nested models in result. + + :param row: raw result row from the database + :type row: sqlalchemy.engine.result.ResultProxy + :param select_related: list of names of related models fetched from database + :type select_related: List + :param related_models: list or dict of related models + :type related_models: Union[List, Dict] + :param previous_model: internal param for nested models to specify table_prefix + :type previous_model: Model class + :param related_name: internal parameter - name of current nested model + :type related_name: str + :param fields: fields and related model fields to include + if provided only those are included + :type fields: Optional[Union[Dict, Set]] + :param exclude_fields: fields and related model fields to exclude + excludes the fields even if they are provided in fields + :type exclude_fields: Optional[Union[Dict, Set]] + :return: returns model if model is populated from database + :rtype: Optional[Model] + """ item: Dict[str, Any] = {} select_related = select_related or [] related_models = related_models or [] @@ -86,7 +131,7 @@ class Model(NewBaseModel): previous_model = through_field.through # type: ignore if previous_model and rel_name2: - table_prefix = cls.Meta.alias_manager.resolve_relation_join_new( + table_prefix = cls.Meta.alias_manager.resolve_relation_join( previous_model, rel_name2 ) else: @@ -127,6 +172,32 @@ class Model(NewBaseModel): fields: Optional[Union[Dict, Set]] = None, exclude_fields: Optional[Union[Dict, Set]] = None, ) -> dict: + """ + Traverses structure of related models and populates the nested models + from the database row. + Related models can be a list if only directly related models are to be + populated, converted to dict if related models also have their own related + models to be populated. + + Recurrently calls from_row method on nested instances and create nested + instances. In the end those instances are added to the final model dictionary. + + :param item: dictionary of already populated nested models, otherwise empty dict + :type item: Dict + :param row: raw result row from the database + :type row: sqlalchemy.engine.result.ResultProxy + :param related_models: list or dict of related models + :type related_models: Union[Dict, List] + :param fields: fields and related model fields to include - + if provided only those are included + :type fields: Optional[Union[Dict, Set]] + :param exclude_fields: fields and related model fields to exclude + excludes the fields even if they are provided in fields + :type exclude_fields: Optional[Union[Dict, Set]] + :return: dictionary with keys corresponding to model fields names + and values are database values + :rtype: Dict + """ for related in related_models: if isinstance(related_models, dict) and related_models[related]: first_part, remainder = related, related_models[related] @@ -176,22 +247,26 @@ class Model(NewBaseModel): All joined tables have prefixes to allow duplicate column names, as well as duplicated joins to the same table from multiple different tables. - Extracted fields populates the item dict that is later used to construct a Model. + Extracted fields populates the item dict later used to construct a Model. + + Used in Model.from_row and PrefetchQuery._populate_rows methods. :param item: dictionary of already populated nested models, otherwise empty dict :type item: Dict :param row: raw result row from the database :type row: sqlalchemy.engine.result.ResultProxy :param table_prefix: prefix of the table from AliasManager - each pair of tables have own prefix (two of them depending on direction) - used in joins - to allow multiple joins to the same table. + each pair of tables have own prefix (two of them depending on direction) - + used in joins to allow multiple joins to the same table. :type table_prefix: str - :param fields: fields and related model fields to include - if provided only those are included + :param fields: fields and related model fields to include - + if provided only those are included :type fields: Optional[Union[Dict, Set]] :param exclude_fields: fields and related model fields to exclude excludes the fields even if they are provided in fields :type exclude_fields: Optional[Union[Dict, Set]] - :return: dictionary with keys corresponding to model fields names and values are database values + :return: dictionary with keys corresponding to model fields names + and values are database values :rtype: Dict """ # databases does not keep aliases in Record for postgres, change to raw row @@ -216,7 +291,7 @@ class Model(NewBaseModel): async def upsert(self: T, **kwargs: Any) -> T: """ - Performs either a save or an update depending on the presence of the primary key. + Performs either a save or an update depending on the presence of the pk. If the pk field is filled it's an update, otherwise the save is performed. For save kwargs are ignored, used only in update if provided. @@ -237,11 +312,13 @@ class Model(NewBaseModel): Related models are saved by pk number, reverse relation and many to many fields are not saved - use corresponding relations methods. - If there are fields with server_default set and those fields are not already filled - save will trigger also a second query to refreshed the fields populated server side. + If there are fields with server_default set and those fields + are not already filled save will trigger also a second query + to refreshed the fields populated server side. - Does not recognize if model was previously saved. If you want to perform update or - insert depending on the pk fields presence use upsert. + Does not recognize if model was previously saved. + If you want to perform update or insert depending on the pk + fields presence use upsert. Sends pre_save and post_save signals. @@ -289,7 +366,8 @@ class Model(NewBaseModel): self, follow: bool = False, visited: Set = None, update_count: int = 0 ) -> int: # noqa: CCR001 """ - Triggers a upsert method on all related models if the instances are not already saved. + Triggers a upsert method on all related models + if the instances are not already saved. By default saves only the directly related ones. If follow=True is set it saves also related models of related models. @@ -299,15 +377,17 @@ class Model(NewBaseModel): That way already visited models that are nested are saved, but the save do not follow them inside. So Model A -> Model B -> Model A -> Model C will save second - Model A but will never follow into Model C. Nested relations of those kind need to - be persisted manually. + Model A but will never follow into Model C. + Nested relations of those kind need to be persisted manually. - :param follow: flag to trigger deep save - by default only directly related models are saved + :param follow: flag to trigger deep save - + by default only directly related models are saved with follow=True also related models of related models are saved :type follow: bool :param visited: internal parameter for recursive calls - already visited models :type visited: Set - :param update_count: internal parameter for recursive calls - no uf updated instances + :param update_count: internal parameter for recursive calls - + number of updated instances :type update_count: int :return: number of updated/saved models :rtype: int @@ -348,12 +428,14 @@ class Model(NewBaseModel): :param rel: Model to follow :type rel: Model - :param follow: flag to trigger deep save - by default only directly related models are saved + :param follow: flag to trigger deep save - + by default only directly related models are saved with follow=True also related models of related models are saved :type follow: bool :param visited: internal parameter for recursive calls - already visited models :type visited: Set - :param update_count: internal parameter for recursive calls - no uf updated instances + :param update_count: internal parameter for recursive calls - + number of updated instances :type update_count: int :return: tuple of update count and visited :rtype: Tuple[int, Set] @@ -429,10 +511,10 @@ class Model(NewBaseModel): async def load(self: T) -> T: """ Allow to refresh existing Models fields from database. - Be careful as the related models can be overwritten by pk_only models during load. + Be careful as the related models can be overwritten by pk_only models in load. Does NOT refresh the related models fields if they were loaded before. - :raises: If given primary key is not found in database the NoMatch exception is raised. + :raises: If given pk is not found in database the NoMatch exception is raised. :return: reloaded Model :rtype: Model diff --git a/ormar/queryset/clause.py b/ormar/queryset/clause.py index e5f84f7..6212b8d 100644 --- a/ormar/queryset/clause.py +++ b/ormar/queryset/clause.py @@ -141,7 +141,7 @@ class QueryClause: through_field.through, through_field.to, explicit_multi=True ) manager = model_cls.Meta.alias_manager - table_prefix = manager.resolve_relation_join_new(previous_model, part2) + table_prefix = manager.resolve_relation_join(previous_model, part2) model_cls = model_cls.Meta.model_fields[part].to previous_model = model_cls return select_related, table_prefix, model_cls diff --git a/ormar/queryset/join.py b/ormar/queryset/join.py index 87e7889..02c9388 100644 --- a/ormar/queryset/join.py +++ b/ormar/queryset/join.py @@ -135,7 +135,7 @@ class SqlJoin: model_cls = join_params.model_cls.Meta.model_fields[part].to to_table = model_cls.Meta.table.name - alias = model_cls.Meta.alias_manager.resolve_relation_join_new( + alias = model_cls.Meta.alias_manager.resolve_relation_join( join_params.prev_model, part ) if alias not in self.used_aliases: diff --git a/ormar/queryset/prefetch_query.py b/ormar/queryset/prefetch_query.py index 1f38c38..2e857a1 100644 --- a/ormar/queryset/prefetch_query.py +++ b/ormar/queryset/prefetch_query.py @@ -328,7 +328,7 @@ class PrefetchQuery: if issubclass(target_field, ManyToManyField): query_target = target_field.through select_related = [target_name] - table_prefix = target_field.to.Meta.alias_manager.resolve_relation_join_new( + table_prefix = target_field.to.Meta.alias_manager.resolve_relation_join( query_target, target_name ) self.already_extracted.setdefault(target_name, {})["prefix"] = table_prefix diff --git a/ormar/relations/alias_manager.py b/ormar/relations/alias_manager.py index 477217f..5ab750a 100644 --- a/ormar/relations/alias_manager.py +++ b/ormar/relations/alias_manager.py @@ -39,7 +39,7 @@ class AliasManager: def prefixed_table_name(alias: str, name: str) -> text: return text(f"{name} {alias}_{name}") - def add_relation_type_new( + def add_relation_type( self, source_model: Type["Model"], relation_name: str, is_multi: bool = False ) -> None: parent_key = f"{source_model.get_name()}_{relation_name}" @@ -56,7 +56,7 @@ class AliasManager: if child_key not in self._aliases_new: self._aliases_new[child_key] = get_table_alias() - def resolve_relation_join_new( + def resolve_relation_join( self, from_model: Type["Model"], relation_name: str ) -> str: alias = self._aliases_new.get(f"{from_model.get_name()}_{relation_name}", "") diff --git a/tests/test_inheritance_concrete.py b/tests/test_inheritance_concrete.py index 2c221de..9211102 100644 --- a/tests/test_inheritance_concrete.py +++ b/tests/test_inheritance_concrete.py @@ -100,7 +100,7 @@ class Car(ormar.Model): id: int = ormar.Integer(primary_key=True) name: str = ormar.String(max_length=50) owner: Person = ormar.ForeignKey(Person) - co_owner: Person = ormar.ForeignKey(Person, related_name='coowned') + co_owner: Person = ormar.ForeignKey(Person, related_name="coowned") class Truck(Car): @@ -113,7 +113,7 @@ class Truck(Car): class Bus(Car): class Meta: - tablename = 'buses' + tablename = "buses" metadata = metadata database = db @@ -134,6 +134,7 @@ def test_init_of_abstract_model(): def test_field_redefining_raises_error(): with pytest.raises(ModelDefinitionError): + class WrongField(DateFieldsModel): # pragma: no cover class Meta(ormar.ModelMeta): tablename = "wrongs" @@ -146,6 +147,7 @@ def test_field_redefining_raises_error(): def test_model_subclassing_non_abstract_raises_error(): with pytest.raises(ModelDefinitionError): + class WrongField2(DateFieldsModelNoSubclass): # pragma: no cover class Meta(ormar.ModelMeta): tablename = "wrongs" @@ -163,7 +165,7 @@ def test_params_are_inherited(): def round_date_to_seconds( - date: datetime.datetime, + date: datetime.datetime, ) -> datetime.datetime: # pragma: no cover if date.microsecond >= 500000: date = date + datetime.timedelta(seconds=1) @@ -206,9 +208,9 @@ async def test_fields_inherited_from_mixin(): sub2 = ( await Subject.objects.select_related("category") - .order_by("-created_date") - .exclude_fields("updated_date") - .get() + .order_by("-created_date") + .exclude_fields("updated_date") + .get() ) assert round_date_to_seconds(sub2.created_date) == round_date_to_seconds( sub.created_date @@ -223,9 +225,9 @@ async def test_fields_inherited_from_mixin(): sub3 = ( await Subject.objects.prefetch_related("category") - .order_by("-created_date") - .exclude_fields({"updated_date": ..., "category": {"updated_date"}}) - .get() + .order_by("-created_date") + .exclude_fields({"updated_date": ..., "category": {"updated_date"}}) + .get() ) assert round_date_to_seconds(sub3.created_date) == round_date_to_seconds( sub.created_date @@ -243,15 +245,19 @@ async def test_fields_inherited_from_mixin(): async def test_inheritance_with_relation(): async with db: async with db.transaction(force_rollback=True): - sam = await Person(name='Sam').save() - joe = await Person(name='Joe').save() - await Truck(name='Shelby wanna be', max_capacity=1400, owner=sam, co_owner=joe).save() + sam = await Person(name="Sam").save() + joe = await Person(name="Joe").save() + await Truck( + name="Shelby wanna be", max_capacity=1400, owner=sam, co_owner=joe + ).save() - shelby = await Truck.objects.select_related(['owner', 'co_owner']).get() - assert shelby.name == 'Shelby wanna be' - assert shelby.owner.name == 'Sam' - assert shelby.co_owner.name == 'Joe' + shelby = await Truck.objects.select_related(["owner", "co_owner"]).get() + assert shelby.name == "Shelby wanna be" + assert shelby.owner.name == "Sam" + assert shelby.co_owner.name == "Joe" - joe_check = await Person.objects.select_related('coowned_trucks').get(name='Joe') + joe_check = await Person.objects.select_related("coowned_trucks").get( + name="Joe" + ) assert joe_check.pk == joe.pk assert joe_check.coowned_trucks[0] == shelby