refactor methaclass functions into helper files, add docstrings

This commit is contained in:
collerek
2020-12-17 15:45:06 +01:00
parent c096e6dbbd
commit e98300233e
15 changed files with 923 additions and 468 deletions

View File

View File

@ -0,0 +1,39 @@
from typing import Dict, List, Optional, TYPE_CHECKING, Type
from ormar import ModelDefinitionError
from ormar.fields.foreign_key import ForeignKeyField
if TYPE_CHECKING: # pragma no cover
from ormar import Model
def validate_related_names_in_relations(
model_fields: Dict, new_model: Type["Model"]
) -> None:
"""
Performs a validation of relation_names in relation fields.
If multiple fields are leading to the same related model
only one can have empty related_name param
(populated by default as model.name.lower()+'s').
Also related_names have to be unique for given related model.
:raises: ModelDefinitionError if validation of related_names fail
:param model_fields: dictionary of declared ormar model fields
:type model_fields: Dict[str, ormar.Field]
:param new_model:
:type new_model: Model class
"""
already_registered: Dict[str, List[Optional[str]]] = dict()
for field in model_fields.values():
if issubclass(field, ForeignKeyField):
previous_related_names = already_registered.setdefault(field.to, [])
if field.related_name in previous_related_names:
raise ModelDefinitionError(
f"Multiple fields declared on {new_model.get_name(lower=False)} "
f"model leading to {field.to.get_name(lower=False)} model without "
f"related_name property set. \nThere can be only one relation with "
f"default/empty name: '{new_model.get_name() + 's'}'"
f"\nTip: provide different related_name for FK and/or M2M fields"
)
else:
previous_related_names.append(field.related_name)

View File

@ -0,0 +1,221 @@
import warnings
from typing import Dict, Optional, TYPE_CHECKING, Tuple, Type
from pydantic import BaseConfig
from pydantic.fields import ModelField
from pydantic.utils import lenient_issubclass
import ormar # noqa: I100, I202
from ormar.fields import BaseField, ManyToManyField
if TYPE_CHECKING: # pragma no cover
from ormar import Model
def reverse_field_not_already_registered(
child: Type["Model"], child_model_name: str, parent_model: Type["Model"]
) -> bool:
"""
Checks if child is already registered in parents pydantic fields.
:param child: related Model class
:type child: ormar.models.metaclass.ModelMetaclass
:param child_model_name: related_name of the child if provided
:type child_model_name: str
:param parent_model: parent Model class
:type parent_model: ormar.models.metaclass.ModelMetaclass
:return: result of the check
:rtype: bool
"""
return (
child_model_name not in parent_model.__fields__
and child.get_name() not in parent_model.__fields__
)
def create_pydantic_field(
field_name: str, model: Type["Model"], model_field: Type[ManyToManyField]
) -> None:
"""
Registers pydantic field on through model that leads to passed model
and is registered as field_name passed.
Through model is fetched from through attributed on passed model_field.
:param field_name: field name to register
:type field_name: str
:param model: type of field to register
:type model: Model class
:param model_field: relation field from which through model is extracted
:type model_field: ManyToManyField class
"""
model_field.through.__fields__[field_name] = ModelField(
name=field_name,
type_=model,
model_config=model.__config__,
required=False,
class_validators={},
)
def get_pydantic_field(field_name: str, model: Type["Model"]) -> "ModelField":
"""
Extracts field type and if it's required from Model model_fields by passed
field_name. Returns a pydantic field with type of field_name field type.
:param field_name: field name to fetch from Model and name of pydantic field
:type field_name: str
:param model: type of field to register
:type model: Model class
:return: newly created pydantic field
:rtype: pydantic.ModelField
"""
return ModelField(
name=field_name,
type_=model.Meta.model_fields[field_name].__type__, # type: ignore
model_config=model.__config__,
required=not model.Meta.model_fields[field_name].nullable,
class_validators={},
)
def populate_default_pydantic_field_value(
ormar_field: Type[BaseField], field_name: str, attrs: dict
) -> dict:
"""
Grabs current value of the ormar Field in class namespace
(so the default_value declared on ormar model if set)
and converts it to pydantic.FieldInfo
that pydantic is able to extract later.
On FieldInfo there are saved all needed params like max_length of the string
and other constraints that pydantic can use to build
it's own field validation used by ormar.
:param ormar_field: field to convert
:type ormar_field: ormar Field
:param field_name: field to convert name
:type field_name: str
:param attrs: current class namespace
:type attrs: Dict
:return: updated namespace dict
:rtype: Dict
"""
curr_def_value = attrs.get(field_name, ormar.Undefined)
if lenient_issubclass(curr_def_value, ormar.fields.BaseField):
curr_def_value = ormar.Undefined
if curr_def_value is None:
attrs[field_name] = ormar_field.convert_to_pydantic_field_info(allow_null=True)
else:
attrs[field_name] = ormar_field.convert_to_pydantic_field_info()
return attrs
def populate_pydantic_default_values(attrs: Dict) -> Tuple[Dict, Dict]:
"""
Extracts ormar fields from annotations (deprecated) and from namespace
dictionary of the class. Fields declared on model are all subclasses of the
BaseField class.
Trigger conversion of ormar field into pydantic FieldInfo, which has all needed
paramaters saved.
Overwrites the annotations of ormar fields to corresponding types declared on
ormar fields (constructed dynamically for relations).
Those annotations are later used by pydantic to construct it's own fields.
:param attrs: current class namespace
:type attrs: Dict
:return: namespace of the class updated, dict of extracted model_fields
:rtype: Tuple[Dict, Dict]
"""
model_fields = {}
potential_fields = {
k: v
for k, v in attrs["__annotations__"].items()
if lenient_issubclass(v, BaseField)
}
if potential_fields:
warnings.warn(
"Using ormar.Fields as type Model annotation has been deprecated,"
" check documentation of current version",
DeprecationWarning,
)
potential_fields.update(get_potential_fields(attrs))
for field_name, field in potential_fields.items():
field.name = field_name
attrs = populate_default_pydantic_field_value(field, field_name, attrs)
model_fields[field_name] = field
attrs["__annotations__"][field_name] = (
field.__type__ if not field.nullable else Optional[field.__type__]
)
return attrs, model_fields
def get_pydantic_base_orm_config() -> Type[BaseConfig]:
"""
Returns empty pydantic Config with orm_mode set to True.
:return: empty default config with orm_mode set.
:rtype: pydantic Config
"""
class Config(BaseConfig):
orm_mode = True
return Config
def populate_default_options_values(
new_model: Type["Model"], model_fields: Dict
) -> None:
"""
Sets all optional Meta values to it's defaults
and set model_fields that were already previously extracted.
Here should live all options that are not overwritten/set for all models.
Current options are:
* constraints = []
* abstract = False
:param new_model: newly constructed Model
:type new_model: Model class
:param model_fields:
:type model_fields: Union[Dict[str, type], Dict]
"""
if not hasattr(new_model.Meta, "constraints"):
new_model.Meta.constraints = []
if not hasattr(new_model.Meta, "model_fields"):
new_model.Meta.model_fields = model_fields
if not hasattr(new_model.Meta, "abstract"):
new_model.Meta.abstract = False
def get_potential_fields(attrs: Dict) -> Dict:
"""
Gets all the fields in current class namespace that are Fields.
:param attrs: current class namespace
:type attrs: Dict
:return: extracted fields that are ormar Fields
:rtype: Dict
"""
return {k: v for k, v in attrs.items() if lenient_issubclass(v, BaseField)}
def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]:
"""
Extracts annotations from class namespace dict and triggers
extraction of ormar model_fields.
:param attrs: namespace of the class created
:type attrs: Dict
:return: namespace of the class updated, dict of extracted model_fields
:rtype: Tuple[Dict, Dict]
"""
key = "__annotations__"
attrs[key] = attrs.get(key, {})
attrs, model_fields = populate_pydantic_default_values(attrs)
return attrs, model_fields

View File

@ -0,0 +1,138 @@
from typing import TYPE_CHECKING, Type
from ormar import ForeignKey, ManyToMany
from ormar.fields import ManyToManyField
from ormar.fields.foreign_key import ForeignKeyField
from ormar.models.helpers.pydantic import reverse_field_not_already_registered
from ormar.models.helpers.sqlalchemy import adjust_through_many_to_many_model
from ormar.relations import AliasManager
if TYPE_CHECKING: # pragma no cover
from ormar import Model
alias_manager = AliasManager()
def register_relation_on_build(new_model: Type["Model"], field_name: str) -> None:
"""
Registers ForeignKey relation in alias_manager to set a table_prefix.
Registration include also reverse relation side to be able to join both sides.
Relation is registered by model name and relation field name to allow for multiple
relations between two Models that needs to have different
aliases for proper sql joins.
:param new_model: constructed model
:type new_model: Model class
:param field_name: name of the related field
:type field_name: str
"""
alias_manager.add_relation_type(new_model, field_name)
def register_many_to_many_relation_on_build(
new_model: Type["Model"], field: Type[ManyToManyField]
) -> None:
"""
Registers connection between through model and both sides of the m2m relation.
Registration include also reverse relation side to be able to join both sides.
Relation is registered by model name and relation field name to allow for multiple
relations between two Models that needs to have different
aliases for proper sql joins.
By default relation name is a model.name.lower().
:param new_model: model on which m2m field is declared
:type new_model: Model class
:param field: relation field
:type field: ManyToManyField class
"""
alias_manager.add_relation_type(field.through, new_model.get_name(), is_multi=True)
alias_manager.add_relation_type(field.through, field.to.get_name(), is_multi=True)
def expand_reverse_relationships(model: Type["Model"]) -> None:
"""
Iterates through model_fields of given model and verifies if all reverse
relation have been populated on related models.
If the reverse relation has not been set before it's set here.
:param model: model on which relation should be checked and registered
:type model: Model class
"""
for model_field in model.Meta.model_fields.values():
if issubclass(model_field, ForeignKeyField):
child_model_name = model_field.related_name or model.get_name() + "s"
parent_model = model_field.to
child = model
if reverse_field_not_already_registered(
child, child_model_name, parent_model
):
register_reverse_model_fields(
parent_model, child, child_model_name, model_field
)
def register_reverse_model_fields(
model: Type["Model"],
child: Type["Model"],
related_name: str,
model_field: Type["ForeignKeyField"],
) -> None:
"""
Registers reverse ForeignKey field on related model.
By default it's name.lower()+'s' of the model on which relation is defined.
But if the related_model name is provided it's registered with that name.
Autogenerated reverse fields also set related_name to the original field name.
:param model: related model on which reverse field should be defined
:type model: Model class
:param child: parent model with relation definition
:type child: Model class
:param related_name: name by which reverse key should be registered
:type related_name: str
:param model_field: original relation ForeignKey field
:type model_field: relation Field
"""
if issubclass(model_field, ManyToManyField):
model.Meta.model_fields[related_name] = ManyToMany(
child,
through=model_field.through,
name=related_name,
virtual=True,
related_name=model_field.name,
)
# register foreign keys on through model
adjust_through_many_to_many_model(model, child, model_field)
else:
model.Meta.model_fields[related_name] = ForeignKey(
child, real_name=related_name, virtual=True, related_name=model_field.name,
)
def register_relation_in_alias_manager(
new_model: Type["Model"], field: Type[ForeignKeyField], field_name: str
) -> None:
"""
Registers the relation (and reverse relation) in alias manager.
The m2m relations require registration of through model between
actual end models of the relation.
Delegates the actual registration to:
m2m - register_many_to_many_relation_on_build
fk - register_relation_on_build
:param new_model: model on which relation field is declared
:type new_model: Model class
:param field: relation field
:type field: ForeignKey or ManyToManyField class
:param field_name: name of the relation key
:type field_name: str
"""
if issubclass(field, ManyToManyField):
register_many_to_many_relation_on_build(new_model=new_model, field=field)
elif issubclass(field, ForeignKeyField):
register_relation_on_build(new_model=new_model, field_name=field_name)

View File

@ -0,0 +1,208 @@
import logging
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type
import sqlalchemy
from ormar import ForeignKey, Integer, ModelDefinitionError # noqa: I202
from ormar.fields import BaseField, ManyToManyField
from ormar.models.helpers.models import validate_related_names_in_relations
from ormar.models.helpers.pydantic import create_pydantic_field
if TYPE_CHECKING: # pragma no cover
from ormar import Model
def adjust_through_many_to_many_model(
model: Type["Model"], child: Type["Model"], model_field: Type[ManyToManyField]
) -> None:
"""
Registers m2m relation on through model.
Sets ormar.ForeignKey from through model to both child and parent models.
Sets sqlalchemy.ForeignKey to both child and parent models.
Sets pydantic fields with child and parent model types.
:param model: model on which relation is declared
:type model: Model class
:param child: model to which m2m relation leads
:type child: Model class
:param model_field: relation field defined in parent model
:type model_field: ManyToManyField
"""
model_field.through.Meta.model_fields[model.get_name()] = ForeignKey(
model, real_name=model.get_name(), ondelete="CASCADE"
)
model_field.through.Meta.model_fields[child.get_name()] = ForeignKey(
child, real_name=child.get_name(), ondelete="CASCADE"
)
create_and_append_m2m_fk(model, model_field)
create_and_append_m2m_fk(child, model_field)
create_pydantic_field(model.get_name(), model, model_field)
create_pydantic_field(child.get_name(), child, model_field)
def create_and_append_m2m_fk(
model: Type["Model"], model_field: Type[ManyToManyField]
) -> None:
"""
Registers sqlalchemy Column with sqlalchemy.ForeignKey leadning to the model.
Newly created field is added to m2m relation through model Meta columns and table.
:param model: Model class to which FK should be created
:type model: Model class
:param model_field: field with ManyToMany relation
:type model_field: ManyToManyField field
"""
column = sqlalchemy.Column(
model.get_name(),
model.Meta.table.columns.get(model.get_column_alias(model.Meta.pkname)).type,
sqlalchemy.schema.ForeignKey(
model.Meta.tablename + "." + model.get_column_alias(model.Meta.pkname),
ondelete="CASCADE",
onupdate="CASCADE",
),
)
model_field.through.Meta.columns.append(column)
model_field.through.Meta.table.append_column(column)
def check_pk_column_validity(
field_name: str, field: BaseField, pkname: Optional[str]
) -> Optional[str]:
"""
Receives the field marked as primary key and verifies if the pkname
was not already set (only one allowed per model) and if field is not marked
as pydantic_only as it needs to be a database field.
:raises: ModelDefintionError if pkname already set or field is pydantic_only
:param field_name: name of field
:type field_name: str
:param field: ormar.Field
:type field: BaseField
:param pkname: already set pkname
:type pkname: Optional[str]
:return: name of the field that should be set as pkname
:rtype: str
"""
if pkname is not None:
raise ModelDefinitionError("Only one primary key column is allowed.")
if field.pydantic_only:
raise ModelDefinitionError("Primary key column cannot be pydantic only")
return field_name
def sqlalchemy_columns_from_model_fields(
model_fields: Dict, new_model: Type["Model"]
) -> Tuple[Optional[str], List[sqlalchemy.Column]]:
"""
Iterates over declared on Model model fields and extracts fields that
should be treated as database fields.
If the model is empty it sets mandatory id field as primary key
(used in through models in m2m relations).
Triggers a validation of relation_names in relation fields. If multiple fields
are leading to the same related model only one can have empty related_name param.
Also related_names have to be unique.
Trigger validation of primary_key - only one and required pk can be set,
cannot be pydantic_only.
Append fields to columns if it's not pydantic_only,
virtual ForeignKey or ManyToMany field.
:raises: ModelDefinitionError if validation of related_names fail,
or pkname validation fails.
:param model_fields: dictionary of declared ormar model fields
:type model_fields: Dict[str, ormar.Field]
:param new_model:
:type new_model: Model class
:return: pkname, list of sqlalchemy columns
:rtype: Tuple[Optional[str], List[sqlalchemy.Column]]
"""
columns = []
pkname = None
if len(model_fields.keys()) == 0:
model_fields["id"] = Integer(name="id", primary_key=True)
logging.warning(
"Table {table_name} had no fields so auto "
"Integer primary key named `id` created."
)
validate_related_names_in_relations(model_fields, new_model)
for field_name, field in model_fields.items():
if field.primary_key:
pkname = check_pk_column_validity(field_name, field, pkname)
if (
not field.pydantic_only
and not field.virtual
and not issubclass(field, ManyToManyField)
):
columns.append(field.get_column(field.get_alias()))
return pkname, columns
def populate_meta_tablename_columns_and_pk(
name: str, new_model: Type["Model"]
) -> Type["Model"]:
"""
Sets Model tablename if it's not already set in Meta.
Default tablename if not present is class name lower + s (i.e. Bed becomes -> beds)
Checks if Model's Meta have pkname and columns set.
If not calls the sqlalchemy_columns_from_model_fields to populate
columns from ormar.fields definitions.
:raises: if pkname is not present raises ModelDefinitionError.
Each model has to have pk.
:param name: name of the current Model
:type name: str
:param new_model: currently constructed Model
:type new_model: ormar.models.metaclass.ModelMetaclass
:return: Model with populated pkname and columns in Meta
:rtype: ormar.models.metaclass.ModelMetaclass
"""
tablename = name.lower() + "s"
new_model.Meta.tablename = (
new_model.Meta.tablename if hasattr(new_model.Meta, "tablename") else tablename
)
pkname: Optional[str]
if hasattr(new_model.Meta, "columns"):
columns = new_model.Meta.table.columns
pkname = new_model.Meta.pkname
else:
pkname, columns = sqlalchemy_columns_from_model_fields(
new_model.Meta.model_fields, new_model
)
if pkname is None:
raise ModelDefinitionError("Table has to have a primary key.")
new_model.Meta.columns = columns
new_model.Meta.pkname = pkname
return new_model
def populate_meta_sqlalchemy_table_if_required(
new_model: Type["Model"],
) -> Type["Model"]:
"""
Constructs sqlalchemy table out of columns and parameters set on Meta class.
It populates name, metadata, columns and constraints.
:param new_model: class without sqlalchemy table constructed
:type new_model: Model class
:return: class with populated Meta.table
:rtype: Model class
"""
if not hasattr(new_model.Meta, "table"):
new_model.Meta.table = sqlalchemy.Table(
new_model.Meta.tablename,
new_model.Meta.metadata,
*new_model.Meta.columns,
*new_model.Meta.constraints,
)
return new_model