WIP add owner to fields and simplify relation names
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
from typing import ForwardRef, TYPE_CHECKING, Type
|
||||
from typing import TYPE_CHECKING, Type
|
||||
|
||||
import ormar
|
||||
from ormar import ForeignKey, ManyToMany
|
||||
@ -61,26 +61,17 @@ def register_many_to_many_relation_on_build(
|
||||
)
|
||||
|
||||
|
||||
def expand_reverse_relationship(
|
||||
model: Type["Model"], model_field: Type["ForeignKeyField"]
|
||||
) -> None:
|
||||
def expand_reverse_relationship(model_field: Type["ForeignKeyField"]) -> None:
|
||||
"""
|
||||
If the reverse relation has not been set before it's set here.
|
||||
|
||||
:param model: model on which relation should be checked and registered
|
||||
:type model: Model class
|
||||
:param model_field:
|
||||
:type model_field:
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
child_model_name = model_field.related_name or model.get_name() + "s"
|
||||
parent_model = model_field.to
|
||||
child = model
|
||||
if reverse_field_not_already_registered(child, child_model_name, parent_model):
|
||||
register_reverse_model_fields(
|
||||
parent_model, child, child_model_name, model_field
|
||||
)
|
||||
if reverse_field_not_already_registered(model_field=model_field):
|
||||
register_reverse_model_fields(model_field=model_field)
|
||||
|
||||
|
||||
def expand_reverse_relationships(model: Type["Model"]) -> None:
|
||||
@ -96,18 +87,12 @@ def expand_reverse_relationships(model: Type["Model"]) -> None:
|
||||
for model_field in model.Meta.model_fields.values():
|
||||
if (
|
||||
issubclass(model_field, ForeignKeyField)
|
||||
and not isinstance(model_field.to, ForwardRef)
|
||||
and not isinstance(model_field.through, ForwardRef)
|
||||
and not model_field.has_unresolved_forward_refs()
|
||||
):
|
||||
expand_reverse_relationship(model=model, model_field=model_field)
|
||||
expand_reverse_relationship(model_field=model_field)
|
||||
|
||||
|
||||
def register_reverse_model_fields(
|
||||
model: Type["Model"],
|
||||
child: Type["Model"],
|
||||
related_name: str,
|
||||
model_field: Type["ForeignKeyField"],
|
||||
) -> None:
|
||||
def register_reverse_model_fields(model_field: Type["ForeignKeyField"]) -> None:
|
||||
"""
|
||||
Registers reverse ForeignKey field on related model.
|
||||
By default it's name.lower()+'s' of the model on which relation is defined.
|
||||
@ -115,28 +100,28 @@ def register_reverse_model_fields(
|
||||
But if the related_model name is provided it's registered with that name.
|
||||
Autogenerated reverse fields also set related_name to the original field name.
|
||||
|
||||
:param model: related model on which reverse field should be defined
|
||||
:type model: Model class
|
||||
:param child: parent model with relation definition
|
||||
:type child: Model class
|
||||
:param related_name: name by which reverse key should be registered
|
||||
:type related_name: str
|
||||
:param model_field: original relation ForeignKey field
|
||||
:type model_field: relation Field
|
||||
"""
|
||||
related_name = model_field.get_related_name()
|
||||
if issubclass(model_field, ManyToManyField):
|
||||
model.Meta.model_fields[related_name] = ManyToMany(
|
||||
child,
|
||||
model_field.to.Meta.model_fields[related_name] = ManyToMany(
|
||||
model_field.owner,
|
||||
through=model_field.through,
|
||||
name=related_name,
|
||||
virtual=True,
|
||||
related_name=model_field.name,
|
||||
owner=model_field.to,
|
||||
)
|
||||
# register foreign keys on through model
|
||||
adjust_through_many_to_many_model(model, child, model_field)
|
||||
adjust_through_many_to_many_model(model_field=model_field)
|
||||
else:
|
||||
model.Meta.model_fields[related_name] = ForeignKey(
|
||||
child, real_name=related_name, virtual=True, related_name=model_field.name,
|
||||
model_field.to.Meta.model_fields[related_name] = ForeignKey(
|
||||
model_field.owner,
|
||||
real_name=related_name,
|
||||
virtual=True,
|
||||
related_name=model_field.name,
|
||||
owner=model_field.to,
|
||||
)
|
||||
|
||||
|
||||
@ -160,19 +145,19 @@ def register_relation_in_alias_manager(
|
||||
:type field_name: str
|
||||
"""
|
||||
if issubclass(field, ManyToManyField):
|
||||
if isinstance(field.to, ForwardRef) or isinstance(field.through, ForwardRef):
|
||||
if field.has_unresolved_forward_refs():
|
||||
return
|
||||
register_many_to_many_relation_on_build(
|
||||
new_model=new_model, field=field, field_name=field_name
|
||||
)
|
||||
elif issubclass(field, ForeignKeyField):
|
||||
if isinstance(field.to, ForwardRef):
|
||||
if field.has_unresolved_forward_refs():
|
||||
return
|
||||
register_relation_on_build(new_model=new_model, field_name=field_name)
|
||||
|
||||
|
||||
def verify_related_name_dont_duplicate(
|
||||
child: Type["Model"], parent_model: Type["Model"], related_name: str,
|
||||
related_name: str, model_field: Type["ForeignKeyField"]
|
||||
) -> None:
|
||||
"""
|
||||
Verifies whether the used related_name (regardless of the fact if user defined or
|
||||
@ -181,59 +166,51 @@ def verify_related_name_dont_duplicate(
|
||||
|
||||
:raises ModelDefinitionError: if name is already used but lead to different related
|
||||
model
|
||||
:param child: related Model class
|
||||
:type child: ormar.models.metaclass.ModelMetaclass
|
||||
:param parent_model: parent Model class
|
||||
:type parent_model: ormar.models.metaclass.ModelMetaclass
|
||||
:param related_name:
|
||||
:type related_name:
|
||||
:param model_field: original relation ForeignKey field
|
||||
:type model_field: relation Field
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
if parent_model.Meta.model_fields.get(related_name):
|
||||
fk_field = parent_model.Meta.model_fields.get(related_name)
|
||||
if not fk_field: # pragma: no cover
|
||||
return
|
||||
if fk_field.to != child and fk_field.to.Meta != child.Meta:
|
||||
raise ormar.ModelDefinitionError(
|
||||
f"Relation with related_name "
|
||||
f"'{related_name}' "
|
||||
f"leading to model "
|
||||
f"{parent_model.get_name(lower=False)} "
|
||||
f"cannot be used on model "
|
||||
f"{child.get_name(lower=False)} "
|
||||
f"because it's already used by model "
|
||||
f"{fk_field.to.get_name(lower=False)}"
|
||||
)
|
||||
fk_field = model_field.to.Meta.model_fields.get(related_name)
|
||||
if not fk_field: # pragma: no cover
|
||||
return
|
||||
if fk_field.to != model_field.owner and fk_field.to.Meta != model_field.owner.Meta:
|
||||
raise ormar.ModelDefinitionError(
|
||||
f"Relation with related_name "
|
||||
f"'{related_name}' "
|
||||
f"leading to model "
|
||||
f"{model_field.to.get_name(lower=False)} "
|
||||
f"cannot be used on model "
|
||||
f"{model_field.owner.get_name(lower=False)} "
|
||||
f"because it's already used by model "
|
||||
f"{fk_field.to.get_name(lower=False)}"
|
||||
)
|
||||
|
||||
|
||||
def reverse_field_not_already_registered(
|
||||
child: Type["Model"], child_model_name: str, parent_model: Type["Model"]
|
||||
) -> bool:
|
||||
def reverse_field_not_already_registered(model_field: Type["ForeignKeyField"]) -> bool:
|
||||
"""
|
||||
Checks if child is already registered in parents pydantic fields.
|
||||
|
||||
:raises ModelDefinitionError: if related name is already used but lead to different
|
||||
related model
|
||||
:param child: related Model class
|
||||
:type child: ormar.models.metaclass.ModelMetaclass
|
||||
:param child_model_name: related_name of the child if provided
|
||||
:type child_model_name: str
|
||||
:param parent_model: parent Model class
|
||||
:type parent_model: ormar.models.metaclass.ModelMetaclass
|
||||
:param model_field: original relation ForeignKey field
|
||||
:type model_field: relation Field
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
check_result = child_model_name not in parent_model.Meta.model_fields
|
||||
check_result2 = child.get_name() not in parent_model.Meta.model_fields
|
||||
related_name = model_field.get_related_name()
|
||||
check_result = related_name not in model_field.to.Meta.model_fields
|
||||
check_result2 = model_field.owner.get_name() not in model_field.to.Meta.model_fields
|
||||
|
||||
if not check_result:
|
||||
verify_related_name_dont_duplicate(
|
||||
child=child, parent_model=parent_model, related_name=child_model_name
|
||||
related_name=related_name, model_field=model_field
|
||||
)
|
||||
if not check_result2:
|
||||
verify_related_name_dont_duplicate(
|
||||
child=child, parent_model=parent_model, related_name=child.get_name()
|
||||
related_name=model_field.owner.get_name(), model_field=model_field
|
||||
)
|
||||
|
||||
return check_result and check_result2
|
||||
|
||||
@ -15,58 +15,47 @@ if TYPE_CHECKING: # pragma no cover
|
||||
from ormar.models import NewBaseModel
|
||||
|
||||
|
||||
def adjust_through_many_to_many_model(
|
||||
model: Type["Model"], child: Type["Model"], model_field: Type[ManyToManyField]
|
||||
) -> None:
|
||||
def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> None:
|
||||
"""
|
||||
Registers m2m relation on through model.
|
||||
Sets ormar.ForeignKey from through model to both child and parent models.
|
||||
Sets sqlalchemy.ForeignKey to both child and parent models.
|
||||
Sets pydantic fields with child and parent model types.
|
||||
|
||||
:param model: model on which relation is declared
|
||||
:type model: Model class
|
||||
:param child: model to which m2m relation leads
|
||||
:type child: Model class
|
||||
:param model_field: relation field defined in parent model
|
||||
:type model_field: ManyToManyField
|
||||
"""
|
||||
same_table_ref = False
|
||||
if child == model or child.Meta == model.Meta:
|
||||
same_table_ref = True
|
||||
model_field.self_reference = True
|
||||
|
||||
if same_table_ref:
|
||||
parent_name = f'to_{model.get_name()}'
|
||||
child_name = f'from_{child.get_name()}'
|
||||
else:
|
||||
parent_name = model.get_name()
|
||||
child_name = child.get_name()
|
||||
parent_name = model_field.default_target_field_name()
|
||||
child_name = model_field.default_source_field_name()
|
||||
|
||||
model_field.through.Meta.model_fields[parent_name] = ForeignKey(
|
||||
model, real_name=parent_name, ondelete="CASCADE"
|
||||
model_field.to, real_name=parent_name, ondelete="CASCADE"
|
||||
)
|
||||
model_field.through.Meta.model_fields[child_name] = ForeignKey(
|
||||
child, real_name=child_name, ondelete="CASCADE"
|
||||
model_field.owner, real_name=child_name, ondelete="CASCADE"
|
||||
)
|
||||
|
||||
create_and_append_m2m_fk(model=model, model_field=model_field,
|
||||
field_name=parent_name)
|
||||
create_and_append_m2m_fk(model=child, model_field=model_field,
|
||||
field_name=child_name)
|
||||
create_and_append_m2m_fk(
|
||||
model=model_field.to, model_field=model_field, field_name=parent_name
|
||||
)
|
||||
create_and_append_m2m_fk(
|
||||
model=model_field.owner, model_field=model_field, field_name=child_name
|
||||
)
|
||||
|
||||
create_pydantic_field(parent_name, model, model_field)
|
||||
create_pydantic_field(child_name, child, model_field)
|
||||
create_pydantic_field(parent_name, model_field.to, model_field)
|
||||
create_pydantic_field(child_name, model_field.owner, model_field)
|
||||
|
||||
|
||||
def create_and_append_m2m_fk(
|
||||
model: Type["Model"], model_field: Type[ManyToManyField], field_name: str
|
||||
model: Type["Model"], model_field: Type[ManyToManyField], field_name: str
|
||||
) -> None:
|
||||
"""
|
||||
Registers sqlalchemy Column with sqlalchemy.ForeignKey leadning to the model.
|
||||
Registers sqlalchemy Column with sqlalchemy.ForeignKey leading to the model.
|
||||
|
||||
Newly created field is added to m2m relation through model Meta columns and table.
|
||||
|
||||
:param field_name: name of the column to create
|
||||
:type field_name: str
|
||||
:param model: Model class to which FK should be created
|
||||
:type model: Model class
|
||||
:param model_field: field with ManyToMany relation
|
||||
@ -92,7 +81,7 @@ def create_and_append_m2m_fk(
|
||||
|
||||
|
||||
def check_pk_column_validity(
|
||||
field_name: str, field: BaseField, pkname: Optional[str]
|
||||
field_name: str, field: BaseField, pkname: Optional[str]
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Receives the field marked as primary key and verifies if the pkname
|
||||
@ -117,7 +106,7 @@ def check_pk_column_validity(
|
||||
|
||||
|
||||
def sqlalchemy_columns_from_model_fields(
|
||||
model_fields: Dict, new_model: Type["Model"]
|
||||
model_fields: Dict, new_model: Type["Model"]
|
||||
) -> Tuple[Optional[str], List[sqlalchemy.Column]]:
|
||||
"""
|
||||
Iterates over declared on Model model fields and extracts fields that
|
||||
@ -136,6 +125,8 @@ def sqlalchemy_columns_from_model_fields(
|
||||
Append fields to columns if it's not pydantic_only,
|
||||
virtual ForeignKey or ManyToMany field.
|
||||
|
||||
Sets `owner` on each model_field as reference to newly created Model.
|
||||
|
||||
:raises ModelDefinitionError: if validation of related_names fail,
|
||||
or pkname validation fails.
|
||||
:param model_fields: dictionary of declared ormar model fields
|
||||
@ -155,19 +146,20 @@ def sqlalchemy_columns_from_model_fields(
|
||||
columns = []
|
||||
pkname = None
|
||||
for field_name, field in model_fields.items():
|
||||
field.owner = new_model
|
||||
if field.primary_key:
|
||||
pkname = check_pk_column_validity(field_name, field, pkname)
|
||||
if (
|
||||
not field.pydantic_only
|
||||
and not field.virtual
|
||||
and not issubclass(field, ManyToManyField)
|
||||
not field.pydantic_only
|
||||
and not field.virtual
|
||||
and not issubclass(field, ManyToManyField)
|
||||
):
|
||||
columns.append(field.get_column(field.get_alias()))
|
||||
return pkname, columns
|
||||
|
||||
|
||||
def populate_meta_tablename_columns_and_pk(
|
||||
name: str, new_model: Type["Model"]
|
||||
name: str, new_model: Type["Model"]
|
||||
) -> Type["Model"]:
|
||||
"""
|
||||
Sets Model tablename if it's not already set in Meta.
|
||||
@ -234,7 +226,7 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
|
||||
:rtype: Model class
|
||||
"""
|
||||
if not hasattr(meta, "table") and check_for_null_type_columns_from_forward_refs(
|
||||
meta
|
||||
meta
|
||||
):
|
||||
meta.table = sqlalchemy.Table(
|
||||
meta.tablename,
|
||||
@ -245,7 +237,7 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
|
||||
|
||||
|
||||
def update_column_definition(
|
||||
model: Union[Type["Model"], Type["NewBaseModel"]], field: Type[ForeignKeyField]
|
||||
model: Union[Type["Model"], Type["NewBaseModel"]], field: Type[ForeignKeyField]
|
||||
) -> None:
|
||||
"""
|
||||
Updates a column with a new type column based on updated parameters in FK fields.
|
||||
|
||||
Reference in New Issue
Block a user