WIP working self fk, adjusting m2m to work with self ref
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type
|
||||
from typing import Dict, ForwardRef, List, Optional, TYPE_CHECKING, Tuple, Type
|
||||
|
||||
import ormar
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
@ -6,6 +6,22 @@ from ormar.models.helpers.pydantic import populate_pydantic_default_values
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model
|
||||
from ormar.fields import BaseField
|
||||
|
||||
|
||||
def is_field_an_forward_ref(field: Type["BaseField"]) -> bool:
|
||||
"""
|
||||
Checks if field is a relation field and whether any of the referenced models
|
||||
are ForwardRefs that needs to be updated before proceeding.
|
||||
|
||||
:param field: model field to verify
|
||||
:type field: Type[BaseField]
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
return issubclass(field, ForeignKeyField) and (
|
||||
isinstance(field.to, ForwardRef) or isinstance(field.through, ForwardRef)
|
||||
)
|
||||
|
||||
|
||||
def populate_default_options_values(
|
||||
@ -33,6 +49,13 @@ def populate_default_options_values(
|
||||
if not hasattr(new_model.Meta, "abstract"):
|
||||
new_model.Meta.abstract = False
|
||||
|
||||
if any(
|
||||
is_field_an_forward_ref(field) for field in new_model.Meta.model_fields.values()
|
||||
):
|
||||
new_model.Meta.requires_ref_update = True
|
||||
else:
|
||||
new_model.Meta.requires_ref_update = False
|
||||
|
||||
|
||||
def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]:
|
||||
"""
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
from typing import TYPE_CHECKING, Type
|
||||
from typing import ForwardRef, TYPE_CHECKING, Type
|
||||
|
||||
import ormar
|
||||
from ormar import ForeignKey, ManyToMany
|
||||
@ -61,6 +61,28 @@ def register_many_to_many_relation_on_build(
|
||||
)
|
||||
|
||||
|
||||
def expand_reverse_relationship(
|
||||
model: Type["Model"], model_field: Type["ForeignKeyField"]
|
||||
) -> None:
|
||||
"""
|
||||
If the reverse relation has not been set before it's set here.
|
||||
|
||||
:param model: model on which relation should be checked and registered
|
||||
:type model: Model class
|
||||
:param model_field:
|
||||
:type model_field:
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
child_model_name = model_field.related_name or model.get_name() + "s"
|
||||
parent_model = model_field.to
|
||||
child = model
|
||||
if reverse_field_not_already_registered(child, child_model_name, parent_model):
|
||||
register_reverse_model_fields(
|
||||
parent_model, child, child_model_name, model_field
|
||||
)
|
||||
|
||||
|
||||
def expand_reverse_relationships(model: Type["Model"]) -> None:
|
||||
"""
|
||||
Iterates through model_fields of given model and verifies if all reverse
|
||||
@ -72,16 +94,12 @@ def expand_reverse_relationships(model: Type["Model"]) -> None:
|
||||
:type model: Model class
|
||||
"""
|
||||
for model_field in model.Meta.model_fields.values():
|
||||
if issubclass(model_field, ForeignKeyField):
|
||||
child_model_name = model_field.related_name or model.get_name() + "s"
|
||||
parent_model = model_field.to
|
||||
child = model
|
||||
if reverse_field_not_already_registered(
|
||||
child, child_model_name, parent_model
|
||||
):
|
||||
register_reverse_model_fields(
|
||||
parent_model, child, child_model_name, model_field
|
||||
)
|
||||
if (
|
||||
issubclass(model_field, ForeignKeyField)
|
||||
and not isinstance(model_field.to, ForwardRef)
|
||||
and not isinstance(model_field.through, ForwardRef)
|
||||
):
|
||||
expand_reverse_relationship(model=model, model_field=model_field)
|
||||
|
||||
|
||||
def register_reverse_model_fields(
|
||||
@ -142,10 +160,14 @@ def register_relation_in_alias_manager(
|
||||
:type field_name: str
|
||||
"""
|
||||
if issubclass(field, ManyToManyField):
|
||||
if isinstance(field.to, ForwardRef) or isinstance(field.through, ForwardRef):
|
||||
return
|
||||
register_many_to_many_relation_on_build(
|
||||
new_model=new_model, field=field, field_name=field_name
|
||||
)
|
||||
elif issubclass(field, ForeignKeyField):
|
||||
if isinstance(field.to, ForwardRef):
|
||||
return
|
||||
register_relation_on_build(new_model=new_model, field_name=field_name)
|
||||
|
||||
|
||||
|
||||
@ -1,20 +1,22 @@
|
||||
import copy
|
||||
import logging
|
||||
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type
|
||||
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
from ormar import ForeignKey, Integer, ModelDefinitionError # noqa: I202
|
||||
from ormar.fields import BaseField, ManyToManyField
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.models.helpers.models import validate_related_names_in_relations
|
||||
from ormar.models.helpers.pydantic import create_pydantic_field
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model, ModelMeta
|
||||
from ormar.models import NewBaseModel
|
||||
|
||||
|
||||
def adjust_through_many_to_many_model(
|
||||
model: Type["Model"], child: Type["Model"], model_field: Type[ManyToManyField]
|
||||
model: Type["Model"], child: Type["Model"], model_field: Type[ManyToManyField]
|
||||
) -> None:
|
||||
"""
|
||||
Registers m2m relation on through model.
|
||||
@ -29,22 +31,36 @@ def adjust_through_many_to_many_model(
|
||||
:param model_field: relation field defined in parent model
|
||||
:type model_field: ManyToManyField
|
||||
"""
|
||||
model_field.through.Meta.model_fields[model.get_name()] = ForeignKey(
|
||||
model, real_name=model.get_name(), ondelete="CASCADE"
|
||||
same_table_ref = False
|
||||
if child == model or child.Meta == model.Meta:
|
||||
same_table_ref = True
|
||||
model_field.self_reference = True
|
||||
|
||||
if same_table_ref:
|
||||
parent_name = f'to_{model.get_name()}'
|
||||
child_name = f'from_{child.get_name()}'
|
||||
else:
|
||||
parent_name = model.get_name()
|
||||
child_name = child.get_name()
|
||||
|
||||
model_field.through.Meta.model_fields[parent_name] = ForeignKey(
|
||||
model, real_name=parent_name, ondelete="CASCADE"
|
||||
)
|
||||
model_field.through.Meta.model_fields[child.get_name()] = ForeignKey(
|
||||
child, real_name=child.get_name(), ondelete="CASCADE"
|
||||
model_field.through.Meta.model_fields[child_name] = ForeignKey(
|
||||
child, real_name=child_name, ondelete="CASCADE"
|
||||
)
|
||||
|
||||
create_and_append_m2m_fk(model, model_field)
|
||||
create_and_append_m2m_fk(child, model_field)
|
||||
create_and_append_m2m_fk(model=model, model_field=model_field,
|
||||
field_name=parent_name)
|
||||
create_and_append_m2m_fk(model=child, model_field=model_field,
|
||||
field_name=child_name)
|
||||
|
||||
create_pydantic_field(model.get_name(), model, model_field)
|
||||
create_pydantic_field(child.get_name(), child, model_field)
|
||||
create_pydantic_field(parent_name, model, model_field)
|
||||
create_pydantic_field(child_name, child, model_field)
|
||||
|
||||
|
||||
def create_and_append_m2m_fk(
|
||||
model: Type["Model"], model_field: Type[ManyToManyField]
|
||||
model: Type["Model"], model_field: Type[ManyToManyField], field_name: str
|
||||
) -> None:
|
||||
"""
|
||||
Registers sqlalchemy Column with sqlalchemy.ForeignKey leadning to the model.
|
||||
@ -63,7 +79,7 @@ def create_and_append_m2m_fk(
|
||||
"ManyToMany relation cannot lead to field without pk"
|
||||
)
|
||||
column = sqlalchemy.Column(
|
||||
model.get_name(),
|
||||
field_name,
|
||||
pk_column.type,
|
||||
sqlalchemy.schema.ForeignKey(
|
||||
model.Meta.tablename + "." + pk_alias,
|
||||
@ -72,12 +88,11 @@ def create_and_append_m2m_fk(
|
||||
),
|
||||
)
|
||||
model_field.through.Meta.columns.append(column)
|
||||
# breakpoint()
|
||||
model_field.through.Meta.table.append_column(copy.deepcopy(column))
|
||||
|
||||
|
||||
def check_pk_column_validity(
|
||||
field_name: str, field: BaseField, pkname: Optional[str]
|
||||
field_name: str, field: BaseField, pkname: Optional[str]
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Receives the field marked as primary key and verifies if the pkname
|
||||
@ -102,7 +117,7 @@ def check_pk_column_validity(
|
||||
|
||||
|
||||
def sqlalchemy_columns_from_model_fields(
|
||||
model_fields: Dict, new_model: Type["Model"]
|
||||
model_fields: Dict, new_model: Type["Model"]
|
||||
) -> Tuple[Optional[str], List[sqlalchemy.Column]]:
|
||||
"""
|
||||
Iterates over declared on Model model fields and extracts fields that
|
||||
@ -143,16 +158,16 @@ def sqlalchemy_columns_from_model_fields(
|
||||
if field.primary_key:
|
||||
pkname = check_pk_column_validity(field_name, field, pkname)
|
||||
if (
|
||||
not field.pydantic_only
|
||||
and not field.virtual
|
||||
and not issubclass(field, ManyToManyField)
|
||||
not field.pydantic_only
|
||||
and not field.virtual
|
||||
and not issubclass(field, ManyToManyField)
|
||||
):
|
||||
columns.append(field.get_column(field.get_alias()))
|
||||
return pkname, columns
|
||||
|
||||
|
||||
def populate_meta_tablename_columns_and_pk(
|
||||
name: str, new_model: Type["Model"]
|
||||
name: str, new_model: Type["Model"]
|
||||
) -> Type["Model"]:
|
||||
"""
|
||||
Sets Model tablename if it's not already set in Meta.
|
||||
@ -194,6 +209,20 @@ def populate_meta_tablename_columns_and_pk(
|
||||
return new_model
|
||||
|
||||
|
||||
def check_for_null_type_columns_from_forward_refs(meta: "ModelMeta") -> bool:
|
||||
"""
|
||||
Check is any column is of NUllType() meaning it's empty column from ForwardRef
|
||||
|
||||
:param meta: Meta class of the Model without sqlalchemy table constructed
|
||||
:type meta: Model class Meta
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
return not any(
|
||||
isinstance(col.type, sqlalchemy.sql.sqltypes.NullType) for col in meta.columns
|
||||
)
|
||||
|
||||
|
||||
def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
|
||||
"""
|
||||
Constructs sqlalchemy table out of columns and parameters set on Meta class.
|
||||
@ -204,10 +233,33 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
|
||||
:return: class with populated Meta.table
|
||||
:rtype: Model class
|
||||
"""
|
||||
if not hasattr(meta, "table"):
|
||||
if not hasattr(meta, "table") and check_for_null_type_columns_from_forward_refs(
|
||||
meta
|
||||
):
|
||||
meta.table = sqlalchemy.Table(
|
||||
meta.tablename,
|
||||
meta.metadata,
|
||||
*[copy.deepcopy(col) for col in meta.columns],
|
||||
*meta.constraints,
|
||||
)
|
||||
|
||||
|
||||
def update_column_definition(
|
||||
model: Union[Type["Model"], Type["NewBaseModel"]], field: Type[ForeignKeyField]
|
||||
) -> None:
|
||||
"""
|
||||
Updates a column with a new type column based on updated parameters in FK fields.
|
||||
|
||||
:param model: model on which columns needs to be updated
|
||||
:type model: Type["Model"]
|
||||
:param field: field with column definition that requires update
|
||||
:type field: Type[ForeignKeyField]
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
columns = model.Meta.columns
|
||||
for ind, column in enumerate(columns):
|
||||
if column.name == field.get_alias():
|
||||
new_column = field.get_column(field.get_alias())
|
||||
columns[ind] = new_column
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user