WIP skip relation registration on m2m through instance, simplify registering relations part 2
This commit is contained in:
@ -247,7 +247,6 @@ class BaseField(FieldInfo):
|
|||||||
value: Any,
|
value: Any,
|
||||||
child: Union["Model", "NewBaseModel"],
|
child: Union["Model", "NewBaseModel"],
|
||||||
to_register: bool = True,
|
to_register: bool = True,
|
||||||
relation_name: str = None,
|
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""
|
"""
|
||||||
Function overwritten for relations, in basic field the value is returned as is.
|
Function overwritten for relations, in basic field the value is returned as is.
|
||||||
|
|||||||
@ -213,13 +213,24 @@ class ForeignKeyField(BaseField):
|
|||||||
ondelete: str
|
ondelete: str
|
||||||
onupdate: str
|
onupdate: str
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_source_related_name(cls) -> str:
|
||||||
|
"""
|
||||||
|
Returns name to use for source relation name.
|
||||||
|
For FK it's the same, differs for m2m fields.
|
||||||
|
It's either set as `related_name` or by default it's owner model. get_name + 's'
|
||||||
|
:return: name of the related_name or default related name.
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
|
return cls.get_related_name()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_related_name(cls) -> str:
|
def get_related_name(cls) -> str:
|
||||||
"""
|
"""
|
||||||
Returns name to use for reverse relation.
|
Returns name to use for reverse relation.
|
||||||
It's either set as `related_name` or by default it's owner model. get_name + 's'
|
It's either set as `related_name` or by default it's owner model. get_name + 's'
|
||||||
:return:
|
:return: name of the related_name or default related name.
|
||||||
:rtype:
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
return cls.related_name or cls.owner.get_name() + "s"
|
return cls.related_name or cls.owner.get_name() + "s"
|
||||||
|
|
||||||
@ -250,7 +261,7 @@ class ForeignKeyField(BaseField):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _extract_model_from_sequence(
|
def _extract_model_from_sequence(
|
||||||
cls, value: List, child: "Model", to_register: bool, relation_name: str
|
cls, value: List, child: "Model", to_register: bool,
|
||||||
) -> List["Model"]:
|
) -> List["Model"]:
|
||||||
"""
|
"""
|
||||||
Takes a list of Models and registers them on parent.
|
Takes a list of Models and registers them on parent.
|
||||||
@ -269,17 +280,14 @@ class ForeignKeyField(BaseField):
|
|||||||
"""
|
"""
|
||||||
return [
|
return [
|
||||||
cls.expand_relationship( # type: ignore
|
cls.expand_relationship( # type: ignore
|
||||||
value=val,
|
value=val, child=child, to_register=to_register,
|
||||||
child=child,
|
|
||||||
to_register=to_register,
|
|
||||||
relation_name=relation_name,
|
|
||||||
)
|
)
|
||||||
for val in value
|
for val in value
|
||||||
]
|
]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _register_existing_model(
|
def _register_existing_model(
|
||||||
cls, value: "Model", child: "Model", to_register: bool, relation_name: str
|
cls, value: "Model", child: "Model", to_register: bool,
|
||||||
) -> "Model":
|
) -> "Model":
|
||||||
"""
|
"""
|
||||||
Takes already created instance and registers it for parent.
|
Takes already created instance and registers it for parent.
|
||||||
@ -297,12 +305,12 @@ class ForeignKeyField(BaseField):
|
|||||||
:rtype: Model
|
:rtype: Model
|
||||||
"""
|
"""
|
||||||
if to_register:
|
if to_register:
|
||||||
cls.register_relation(model=value, child=child, relation_name=relation_name)
|
cls.register_relation(model=value, child=child)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _construct_model_from_dict(
|
def _construct_model_from_dict(
|
||||||
cls, value: dict, child: "Model", to_register: bool, relation_name: str
|
cls, value: dict, child: "Model", to_register: bool
|
||||||
) -> "Model":
|
) -> "Model":
|
||||||
"""
|
"""
|
||||||
Takes a dictionary, creates a instance and registers it for parent.
|
Takes a dictionary, creates a instance and registers it for parent.
|
||||||
@ -324,12 +332,12 @@ class ForeignKeyField(BaseField):
|
|||||||
value["__pk_only__"] = True
|
value["__pk_only__"] = True
|
||||||
model = cls.to(**value)
|
model = cls.to(**value)
|
||||||
if to_register:
|
if to_register:
|
||||||
cls.register_relation(model=model, child=child, relation_name=relation_name)
|
cls.register_relation(model=model, child=child)
|
||||||
return model
|
return model
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _construct_model_from_pk(
|
def _construct_model_from_pk(
|
||||||
cls, value: Any, child: "Model", to_register: bool, relation_name: str
|
cls, value: Any, child: "Model", to_register: bool
|
||||||
) -> "Model":
|
) -> "Model":
|
||||||
"""
|
"""
|
||||||
Takes a pk value, creates a dummy instance and registers it for parent.
|
Takes a pk value, creates a dummy instance and registers it for parent.
|
||||||
@ -356,13 +364,11 @@ class ForeignKeyField(BaseField):
|
|||||||
)
|
)
|
||||||
model = create_dummy_instance(fk=cls.to, pk=value)
|
model = create_dummy_instance(fk=cls.to, pk=value)
|
||||||
if to_register:
|
if to_register:
|
||||||
cls.register_relation(model=model, child=child, relation_name=relation_name)
|
cls.register_relation(model=model, child=child)
|
||||||
return model
|
return model
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def register_relation(
|
def register_relation(cls, model: "Model", child: "Model") -> None:
|
||||||
cls, model: "Model", child: "Model", relation_name: str
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Registers relation between parent and child in relation manager.
|
Registers relation between parent and child in relation manager.
|
||||||
Relation manager is kep on each model (different instance).
|
Relation manager is kep on each model (different instance).
|
||||||
@ -376,11 +382,7 @@ class ForeignKeyField(BaseField):
|
|||||||
:type child: Model class
|
:type child: Model class
|
||||||
"""
|
"""
|
||||||
model._orm.add(
|
model._orm.add(
|
||||||
parent=model,
|
parent=model, child=child, field=cls,
|
||||||
child=child,
|
|
||||||
child_name=cls.related_name or child.get_name() + "s",
|
|
||||||
virtual=cls.virtual,
|
|
||||||
relation_name=relation_name,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -400,7 +402,6 @@ class ForeignKeyField(BaseField):
|
|||||||
value: Any,
|
value: Any,
|
||||||
child: Union["Model", "NewBaseModel"],
|
child: Union["Model", "NewBaseModel"],
|
||||||
to_register: bool = True,
|
to_register: bool = True,
|
||||||
relation_name: str = None,
|
|
||||||
) -> Optional[Union["Model", List["Model"]]]:
|
) -> Optional[Union["Model", List["Model"]]]:
|
||||||
"""
|
"""
|
||||||
For relations the child model is first constructed (if needed),
|
For relations the child model is first constructed (if needed),
|
||||||
@ -429,5 +430,5 @@ class ForeignKeyField(BaseField):
|
|||||||
|
|
||||||
model = constructors.get( # type: ignore
|
model = constructors.get( # type: ignore
|
||||||
value.__class__.__name__, cls._construct_model_from_pk
|
value.__class__.__name__, cls._construct_model_from_pk
|
||||||
)(value, child, to_register, relation_name)
|
)(value, child, to_register)
|
||||||
return model
|
return model
|
||||||
|
|||||||
@ -14,7 +14,7 @@ REF_PREFIX = "#/components/schemas/"
|
|||||||
|
|
||||||
def populate_m2m_params_based_on_to_model(
|
def populate_m2m_params_based_on_to_model(
|
||||||
to: Type["Model"], nullable: bool
|
to: Type["Model"], nullable: bool
|
||||||
) -> Tuple[List, Any]:
|
) -> Tuple[Any, Any]:
|
||||||
"""
|
"""
|
||||||
Based on target to model to which relation leads to populates the type of the
|
Based on target to model to which relation leads to populates the type of the
|
||||||
pydantic field to use and type of the target column field.
|
pydantic field to use and type of the target column field.
|
||||||
@ -106,6 +106,20 @@ class ManyToManyField(ForeignKeyField, ormar.QuerySetProtocol, ormar.RelationPro
|
|||||||
Actual class returned from ManyToMany function call and stored in model_fields.
|
Actual class returned from ManyToMany function call and stored in model_fields.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_source_related_name(cls) -> str:
|
||||||
|
"""
|
||||||
|
Returns name to use for source relation name.
|
||||||
|
For FK it's the same, differs for m2m fields.
|
||||||
|
It's either set as `related_name` or by default it's field name.
|
||||||
|
:return: name of the related_name or default related name.
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
|
return (
|
||||||
|
cls.through.Meta.model_fields[cls.default_source_field_name()].related_name
|
||||||
|
or cls.name
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def default_target_field_name(cls) -> str:
|
def default_target_field_name(cls) -> str:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -13,7 +13,7 @@ if TYPE_CHECKING: # pragma no cover
|
|||||||
alias_manager = AliasManager()
|
alias_manager = AliasManager()
|
||||||
|
|
||||||
|
|
||||||
def register_relation_on_build(new_model: Type["Model"], field_name: str) -> None:
|
def register_relation_on_build(field: Type["ForeignKeyField"]) -> None:
|
||||||
"""
|
"""
|
||||||
Registers ForeignKey relation in alias_manager to set a table_prefix.
|
Registers ForeignKey relation in alias_manager to set a table_prefix.
|
||||||
Registration include also reverse relation side to be able to join both sides.
|
Registration include also reverse relation side to be able to join both sides.
|
||||||
@ -22,17 +22,17 @@ def register_relation_on_build(new_model: Type["Model"], field_name: str) -> Non
|
|||||||
relations between two Models that needs to have different
|
relations between two Models that needs to have different
|
||||||
aliases for proper sql joins.
|
aliases for proper sql joins.
|
||||||
|
|
||||||
:param new_model: constructed model
|
:param field: relation field
|
||||||
:type new_model: Model class
|
:type field: ForeignKey class
|
||||||
:param field_name: name of the related field
|
|
||||||
:type field_name: str
|
|
||||||
"""
|
"""
|
||||||
alias_manager.add_relation_type(new_model, field_name)
|
alias_manager.add_relation_type(
|
||||||
|
source_model=field.owner,
|
||||||
|
relation_name=field.name,
|
||||||
|
reverse_name=field.get_source_related_name(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def register_many_to_many_relation_on_build(
|
def register_many_to_many_relation_on_build(field: Type[ManyToManyField]) -> None:
|
||||||
new_model: Type["Model"], field: Type[ManyToManyField], field_name: str
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Registers connection between through model and both sides of the m2m relation.
|
Registers connection between through model and both sides of the m2m relation.
|
||||||
Registration include also reverse relation side to be able to join both sides.
|
Registration include also reverse relation side to be able to join both sides.
|
||||||
@ -43,21 +43,18 @@ def register_many_to_many_relation_on_build(
|
|||||||
|
|
||||||
By default relation name is a model.name.lower().
|
By default relation name is a model.name.lower().
|
||||||
|
|
||||||
:param field_name: name of the relation key
|
|
||||||
:type field_name: str
|
|
||||||
:param new_model: model on which m2m field is declared
|
|
||||||
:type new_model: Model class
|
|
||||||
:param field: relation field
|
:param field: relation field
|
||||||
:type field: ManyToManyField class
|
:type field: ManyToManyField class
|
||||||
"""
|
"""
|
||||||
alias_manager.add_relation_type(
|
alias_manager.add_relation_type(
|
||||||
field.through, new_model.get_name(), is_multi=True, reverse_name=field_name
|
source_model=field.through,
|
||||||
|
relation_name=field.default_source_field_name(),
|
||||||
|
reverse_name=field.get_source_related_name(),
|
||||||
)
|
)
|
||||||
alias_manager.add_relation_type(
|
alias_manager.add_relation_type(
|
||||||
field.through,
|
source_model=field.through,
|
||||||
field.to.get_name(),
|
relation_name=field.default_target_field_name(),
|
||||||
is_multi=True,
|
reverse_name=field.get_related_name(),
|
||||||
reverse_name=field.related_name or new_model.get_name() + "s",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -126,7 +123,7 @@ def register_reverse_model_fields(model_field: Type["ForeignKeyField"]) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def register_relation_in_alias_manager(
|
def register_relation_in_alias_manager(
|
||||||
new_model: Type["Model"], field: Type[ForeignKeyField], field_name: str
|
field: Type[ForeignKeyField], field_name: str
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Registers the relation (and reverse relation) in alias manager.
|
Registers the relation (and reverse relation) in alias manager.
|
||||||
@ -137,8 +134,6 @@ def register_relation_in_alias_manager(
|
|||||||
m2m - register_many_to_many_relation_on_build
|
m2m - register_many_to_many_relation_on_build
|
||||||
fk - register_relation_on_build
|
fk - register_relation_on_build
|
||||||
|
|
||||||
:param new_model: model on which relation field is declared
|
|
||||||
:type new_model: Model class
|
|
||||||
:param field: relation field
|
:param field: relation field
|
||||||
:type field: ForeignKey or ManyToManyField class
|
:type field: ForeignKey or ManyToManyField class
|
||||||
:param field_name: name of the relation key
|
:param field_name: name of the relation key
|
||||||
@ -147,13 +142,11 @@ def register_relation_in_alias_manager(
|
|||||||
if issubclass(field, ManyToManyField):
|
if issubclass(field, ManyToManyField):
|
||||||
if field.has_unresolved_forward_refs():
|
if field.has_unresolved_forward_refs():
|
||||||
return
|
return
|
||||||
register_many_to_many_relation_on_build(
|
register_many_to_many_relation_on_build(field=field)
|
||||||
new_model=new_model, field=field, field_name=field_name
|
|
||||||
)
|
|
||||||
elif issubclass(field, ForeignKeyField):
|
elif issubclass(field, ForeignKeyField):
|
||||||
if field.has_unresolved_forward_refs():
|
if field.has_unresolved_forward_refs():
|
||||||
return
|
return
|
||||||
register_relation_on_build(new_model=new_model, field_name=field_name)
|
register_relation_on_build(field=field)
|
||||||
|
|
||||||
|
|
||||||
def verify_related_name_dont_duplicate(
|
def verify_related_name_dont_duplicate(
|
||||||
|
|||||||
@ -29,10 +29,16 @@ def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> Non
|
|||||||
child_name = model_field.default_source_field_name()
|
child_name = model_field.default_source_field_name()
|
||||||
|
|
||||||
model_field.through.Meta.model_fields[parent_name] = ForeignKey(
|
model_field.through.Meta.model_fields[parent_name] = ForeignKey(
|
||||||
model_field.to, real_name=parent_name, ondelete="CASCADE"
|
model_field.to,
|
||||||
|
real_name=parent_name,
|
||||||
|
ondelete="CASCADE",
|
||||||
|
owner=model_field.owner,
|
||||||
)
|
)
|
||||||
model_field.through.Meta.model_fields[child_name] = ForeignKey(
|
model_field.through.Meta.model_fields[child_name] = ForeignKey(
|
||||||
model_field.owner, real_name=child_name, ondelete="CASCADE"
|
model_field.owner,
|
||||||
|
real_name=child_name,
|
||||||
|
ondelete="CASCADE",
|
||||||
|
owner=model_field.owner,
|
||||||
)
|
)
|
||||||
|
|
||||||
create_and_append_m2m_fk(
|
create_and_append_m2m_fk(
|
||||||
|
|||||||
@ -587,7 +587,9 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
|
|||||||
populate_meta_sqlalchemy_table_if_required(new_model.Meta)
|
populate_meta_sqlalchemy_table_if_required(new_model.Meta)
|
||||||
expand_reverse_relationships(new_model)
|
expand_reverse_relationships(new_model)
|
||||||
for field_name, field in new_model.Meta.model_fields.items():
|
for field_name, field in new_model.Meta.model_fields.items():
|
||||||
register_relation_in_alias_manager(new_model, field, field_name)
|
register_relation_in_alias_manager(
|
||||||
|
field=field, field_name=field_name
|
||||||
|
)
|
||||||
|
|
||||||
if new_model.Meta.pkname not in attrs["__annotations__"]:
|
if new_model.Meta.pkname not in attrs["__annotations__"]:
|
||||||
field_name = new_model.Meta.pkname
|
field_name = new_model.Meta.pkname
|
||||||
|
|||||||
@ -15,6 +15,7 @@ from typing import (
|
|||||||
Type,
|
Type,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
Union,
|
Union,
|
||||||
|
cast,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -143,7 +144,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
|||||||
k: self._convert_json(
|
k: self._convert_json(
|
||||||
k,
|
k,
|
||||||
self.Meta.model_fields[k].expand_relationship(
|
self.Meta.model_fields[k].expand_relationship(
|
||||||
v, self, to_register=False, relation_name=k
|
v, self, to_register=False,
|
||||||
),
|
),
|
||||||
"dumps",
|
"dumps",
|
||||||
)
|
)
|
||||||
@ -172,7 +173,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
|||||||
# register the columns models after initialization
|
# register the columns models after initialization
|
||||||
for related in self.extract_related_names():
|
for related in self.extract_related_names():
|
||||||
self.Meta.model_fields[related].expand_relationship(
|
self.Meta.model_fields[related].expand_relationship(
|
||||||
new_kwargs.get(related), self, to_register=True, relation_name=related
|
new_kwargs.get(related), self, to_register=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001
|
def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001
|
||||||
@ -209,7 +210,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
|||||||
self.set_save_status(False)
|
self.set_save_status(False)
|
||||||
elif name in self._orm:
|
elif name in self._orm:
|
||||||
model = self.Meta.model_fields[name].expand_relationship(
|
model = self.Meta.model_fields[name].expand_relationship(
|
||||||
value=value, child=self, relation_name=name
|
value=value, child=self
|
||||||
)
|
)
|
||||||
if isinstance(self.__dict__.get(name), list):
|
if isinstance(self.__dict__.get(name), list):
|
||||||
# virtual foreign key or many to many
|
# virtual foreign key or many to many
|
||||||
@ -447,13 +448,12 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
|||||||
fields_to_check = cls.Meta.model_fields.copy()
|
fields_to_check = cls.Meta.model_fields.copy()
|
||||||
for field_name, field in fields_to_check.items():
|
for field_name, field in fields_to_check.items():
|
||||||
if field.has_unresolved_forward_refs():
|
if field.has_unresolved_forward_refs():
|
||||||
|
field = cast(Type[ForeignKeyField], field)
|
||||||
field.evaluate_forward_ref(globalns=globalns, localns=localns)
|
field.evaluate_forward_ref(globalns=globalns, localns=localns)
|
||||||
field.set_self_reference_flag()
|
field.set_self_reference_flag()
|
||||||
expand_reverse_relationship(model_field=field)
|
expand_reverse_relationship(model_field=field)
|
||||||
register_relation_in_alias_manager(
|
register_relation_in_alias_manager(
|
||||||
cls, # type: ignore
|
field=field, field_name=field_name,
|
||||||
field,
|
|
||||||
field_name,
|
|
||||||
)
|
)
|
||||||
update_column_definition(model=cls, field=field)
|
update_column_definition(model=cls, field=field)
|
||||||
populate_meta_sqlalchemy_table_if_required(meta=cls.Meta)
|
populate_meta_sqlalchemy_table_if_required(meta=cls.Meta)
|
||||||
|
|||||||
@ -79,11 +79,7 @@ class AliasManager:
|
|||||||
return text(f"{name} {alias}_{name}")
|
return text(f"{name} {alias}_{name}")
|
||||||
|
|
||||||
def add_relation_type(
|
def add_relation_type(
|
||||||
self,
|
self, source_model: Type["Model"], relation_name: str, reverse_name: str = None,
|
||||||
source_model: Type["Model"],
|
|
||||||
relation_name: str,
|
|
||||||
reverse_name: str = None,
|
|
||||||
is_multi: bool = False,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Registers the relations defined in ormar models.
|
Registers the relations defined in ormar models.
|
||||||
@ -104,21 +100,16 @@ class AliasManager:
|
|||||||
:type relation_name: str
|
:type relation_name: str
|
||||||
:param reverse_name: name of related_name fo given relation for m2m relations
|
:param reverse_name: name of related_name fo given relation for m2m relations
|
||||||
:type reverse_name: Optional[str]
|
:type reverse_name: Optional[str]
|
||||||
:param is_multi: flag if relation being registered is a through m2m model
|
|
||||||
:type is_multi: bool
|
|
||||||
:return: none
|
:return: none
|
||||||
:rtype: None
|
:rtype: None
|
||||||
"""
|
"""
|
||||||
parent_key = f"{source_model.get_name()}_{relation_name}"
|
parent_key = f"{source_model.get_name()}_{relation_name}"
|
||||||
if parent_key not in self._aliases_new:
|
if parent_key not in self._aliases_new:
|
||||||
self._aliases_new[parent_key] = get_table_alias()
|
self._aliases_new[parent_key] = get_table_alias()
|
||||||
|
|
||||||
to_field = source_model.Meta.model_fields[relation_name]
|
to_field = source_model.Meta.model_fields[relation_name]
|
||||||
child_model = to_field.to
|
child_model = to_field.to
|
||||||
related_name = to_field.related_name
|
child_key = f"{child_model.get_name()}_{reverse_name}"
|
||||||
if not related_name:
|
|
||||||
related_name = reverse_name if is_multi else source_model.get_name() + "s"
|
|
||||||
|
|
||||||
child_key = f"{child_model.get_name()}_{related_name}"
|
|
||||||
if child_key not in self._aliases_new:
|
if child_key not in self._aliases_new:
|
||||||
self._aliases_new[child_key] = get_table_alias()
|
self._aliases_new[child_key] = get_table_alias()
|
||||||
|
|
||||||
|
|||||||
@ -12,6 +12,7 @@ from typing import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
import ormar
|
import ormar
|
||||||
|
from ormar.exceptions import ModelPersistenceError
|
||||||
|
|
||||||
if TYPE_CHECKING: # pragma no cover
|
if TYPE_CHECKING: # pragma no cover
|
||||||
from ormar.relations import Relation
|
from ormar.relations import Relation
|
||||||
@ -106,11 +107,19 @@ class QuerysetProxy(ormar.QuerySetProtocol):
|
|||||||
:param child: child model instance
|
:param child: child model instance
|
||||||
:type child: Model
|
:type child: Model
|
||||||
"""
|
"""
|
||||||
queryset = ormar.QuerySet(model_cls=self.relation.through)
|
model_cls = self.relation.through
|
||||||
owner_column = self._owner.get_name()
|
owner_column = self._owner.get_name()
|
||||||
child_column = child.get_name()
|
child_column = child.get_name()
|
||||||
kwargs = {owner_column: self._owner, child_column: child}
|
kwargs = {owner_column: self._owner.pk, child_column: child.pk}
|
||||||
await queryset.create(**kwargs)
|
if child.pk is None:
|
||||||
|
raise ModelPersistenceError(
|
||||||
|
f"You cannot save {child.get_name()} "
|
||||||
|
f"model without primary key set! \n"
|
||||||
|
f"Save the child model first."
|
||||||
|
)
|
||||||
|
expr = model_cls.Meta.table.insert()
|
||||||
|
expr = expr.values(**kwargs)
|
||||||
|
await model_cls.Meta.database.execute(expr)
|
||||||
|
|
||||||
async def delete_through_instance(self, child: "T") -> None:
|
async def delete_through_instance(self, child: "T") -> None:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -63,7 +63,7 @@ class Relation:
|
|||||||
self._type: RelationType = type_
|
self._type: RelationType = type_
|
||||||
self._to_remove: Set = set()
|
self._to_remove: Set = set()
|
||||||
self.to: Type["T"] = to
|
self.to: Type["T"] = to
|
||||||
self.through: Optional[Type["T"]] = through
|
self._through: Optional[Type["T"]] = through
|
||||||
self.field_name = field_name
|
self.field_name = field_name
|
||||||
self.related_models: Optional[Union[RelationProxy, "T"]] = (
|
self.related_models: Optional[Union[RelationProxy, "T"]] = (
|
||||||
RelationProxy(relation=self, type_=type_, field_name=field_name)
|
RelationProxy(relation=self, type_=type_, field_name=field_name)
|
||||||
@ -71,6 +71,12 @@ class Relation:
|
|||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def through(self) -> Type["T"]:
|
||||||
|
if not self._through: # pragma: no cover
|
||||||
|
raise RelationshipInstanceError("Relation does not have through model!")
|
||||||
|
return self._through
|
||||||
|
|
||||||
def _clean_related(self) -> None:
|
def _clean_related(self) -> None:
|
||||||
"""
|
"""
|
||||||
Removes dead weakrefs from RelationProxy.
|
Removes dead weakrefs from RelationProxy.
|
||||||
|
|||||||
@ -101,13 +101,7 @@ class RelationsManager:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def add(
|
def add(parent: "Model", child: "Model", field: Type["ForeignKeyField"],) -> None:
|
||||||
parent: "Model",
|
|
||||||
child: "Model",
|
|
||||||
child_name: str,
|
|
||||||
virtual: bool,
|
|
||||||
relation_name: str,
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Adds relation on both sides -> meaning on both child and parent models.
|
Adds relation on both sides -> meaning on both child and parent models.
|
||||||
One side of the relation is always weakref proxy to avoid circular refs.
|
One side of the relation is always weakref proxy to avoid circular refs.
|
||||||
@ -120,25 +114,19 @@ class RelationsManager:
|
|||||||
:type parent: Model
|
:type parent: Model
|
||||||
:param child: child model to register
|
:param child: child model to register
|
||||||
:type child: Model
|
:type child: Model
|
||||||
:param child_name: potential child name used if related name is not set
|
:param field: field with relation definition
|
||||||
:type child_name: str
|
:type field: ForeignKeyField
|
||||||
:param virtual:
|
|
||||||
:type virtual: bool
|
|
||||||
:param relation_name: name of the relation
|
|
||||||
:type relation_name: str
|
|
||||||
"""
|
"""
|
||||||
to_field: Type[BaseField] = child.Meta.model_fields[relation_name]
|
|
||||||
# print('comming', child_name, relation_name)
|
|
||||||
(parent, child, child_name, to_name,) = get_relations_sides_and_names(
|
(parent, child, child_name, to_name,) = get_relations_sides_and_names(
|
||||||
to_field, parent, child, child_name, virtual, relation_name
|
field, parent, child
|
||||||
)
|
)
|
||||||
|
|
||||||
# print('adding', parent.get_name(), child.get_name(), child_name)
|
# print('adding parent', parent.get_name(), child.get_name(), child_name)
|
||||||
parent_relation = parent._orm._get(child_name)
|
parent_relation = parent._orm._get(child_name)
|
||||||
if parent_relation:
|
if parent_relation:
|
||||||
parent_relation.add(child) # type: ignore
|
parent_relation.add(child) # type: ignore
|
||||||
|
|
||||||
# print('adding', child.get_name(), parent.get_name(), child_name)
|
# print('adding child', child.get_name(), parent.get_name(), to_name)
|
||||||
child_relation = child._orm._get(to_name)
|
child_relation = child._orm._get(to_name)
|
||||||
if child_relation:
|
if child_relation:
|
||||||
child_relation.add(parent)
|
child_relation.add(parent)
|
||||||
|
|||||||
@ -1,48 +1,33 @@
|
|||||||
from typing import TYPE_CHECKING, Tuple, Type
|
from typing import TYPE_CHECKING, Tuple, Type
|
||||||
from weakref import proxy
|
from weakref import proxy
|
||||||
|
|
||||||
from ormar.fields import BaseField
|
from ormar.fields.foreign_key import ForeignKeyField
|
||||||
from ormar.fields.many_to_many import ManyToManyField
|
|
||||||
|
|
||||||
if TYPE_CHECKING: # pragma no cover
|
if TYPE_CHECKING: # pragma no cover
|
||||||
from ormar import Model
|
from ormar import Model
|
||||||
|
|
||||||
|
|
||||||
def get_relations_sides_and_names(
|
def get_relations_sides_and_names(
|
||||||
to_field: Type[BaseField],
|
to_field: Type[ForeignKeyField], parent: "Model", child: "Model",
|
||||||
parent: "Model",
|
|
||||||
child: "Model",
|
|
||||||
child_name: str,
|
|
||||||
virtual: bool,
|
|
||||||
relation_name: str,
|
|
||||||
) -> Tuple["Model", "Model", str, str]:
|
) -> Tuple["Model", "Model", str, str]:
|
||||||
"""
|
"""
|
||||||
Determines the names of child and parent relations names, as well as
|
Determines the names of child and parent relations names, as well as
|
||||||
changes one of the sides of the relation into weakref.proxy to model.
|
changes one of the sides of the relation into weakref.proxy to model.
|
||||||
|
|
||||||
:param to_field: field with relation definition
|
:param to_field: field with relation definition
|
||||||
:type to_field: BaseField
|
:type to_field: ForeignKeyField
|
||||||
:param parent: parent model
|
:param parent: parent model
|
||||||
:type parent: Model
|
:type parent: Model
|
||||||
:param child: child model
|
:param child: child model
|
||||||
:type child: Model
|
:type child: Model
|
||||||
:param child_name: name of the child
|
|
||||||
:type child_name: str
|
|
||||||
:param virtual: flag if relation is virtual
|
|
||||||
:type virtual: bool
|
|
||||||
:param relation_name:
|
|
||||||
:type relation_name:
|
|
||||||
:return: parent, child, child_name, to_name
|
:return: parent, child, child_name, to_name
|
||||||
:rtype: Tuple["Model", "Model", str, str]
|
:rtype: Tuple["Model", "Model", str, str]
|
||||||
"""
|
"""
|
||||||
to_name = to_field.name
|
to_name = to_field.name
|
||||||
if issubclass(to_field, ManyToManyField):
|
child_name = to_field.get_related_name()
|
||||||
child_name = to_field.related_name or child.get_name() + "s"
|
if to_field.virtual:
|
||||||
child = proxy(child)
|
child_name, to_name = to_name, child_name
|
||||||
elif virtual:
|
|
||||||
child_name, to_name = to_name, child_name or child.get_name()
|
|
||||||
child, parent = parent, proxy(child)
|
child, parent = parent, proxy(child)
|
||||||
else:
|
else:
|
||||||
child_name = child_name or child.get_name() + "s"
|
|
||||||
child = proxy(child)
|
child = proxy(child)
|
||||||
return parent, child, child_name, to_name
|
return parent, child, child_name, to_name
|
||||||
|
|||||||
@ -21,7 +21,7 @@ class CringeLevel(ormar.Model):
|
|||||||
name: str = ormar.String(max_length=100)
|
name: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
class NickNames(ormar.Model):
|
class NickName(ormar.Model):
|
||||||
class Meta:
|
class Meta:
|
||||||
tablename = "nicks"
|
tablename = "nicks"
|
||||||
metadata = metadata
|
metadata = metadata
|
||||||
@ -48,7 +48,7 @@ class HQ(ormar.Model):
|
|||||||
|
|
||||||
id: int = ormar.Integer(primary_key=True)
|
id: int = ormar.Integer(primary_key=True)
|
||||||
name: str = ormar.String(max_length=100, nullable=False, name="hq_name")
|
name: str = ormar.String(max_length=100, nullable=False, name="hq_name")
|
||||||
nicks: List[NickNames] = ormar.ManyToMany(NickNames, through=NicksHq)
|
nicks: List[NickName] = ormar.ManyToMany(NickName, through=NicksHq)
|
||||||
|
|
||||||
|
|
||||||
class Company(ormar.Model):
|
class Company(ormar.Model):
|
||||||
@ -96,8 +96,8 @@ async def test_saving_related_fk_rel():
|
|||||||
async def test_saving_many_to_many():
|
async def test_saving_many_to_many():
|
||||||
async with database:
|
async with database:
|
||||||
async with database.transaction(force_rollback=True):
|
async with database.transaction(force_rollback=True):
|
||||||
nick1 = await NickNames.objects.create(name="BazingaO", is_lame=False)
|
nick1 = await NickName.objects.create(name="BazingaO", is_lame=False)
|
||||||
nick2 = await NickNames.objects.create(name="Bazinga20", is_lame=True)
|
nick2 = await NickName.objects.create(name="Bazinga20", is_lame=True)
|
||||||
|
|
||||||
hq = await HQ.objects.create(name="Main")
|
hq = await HQ.objects.create(name="Main")
|
||||||
assert hq.saved
|
assert hq.saved
|
||||||
@ -168,10 +168,10 @@ async def test_saving_nested():
|
|||||||
async with database.transaction(force_rollback=True):
|
async with database.transaction(force_rollback=True):
|
||||||
level = await CringeLevel.objects.create(name="High")
|
level = await CringeLevel.objects.create(name="High")
|
||||||
level2 = await CringeLevel.objects.create(name="Low")
|
level2 = await CringeLevel.objects.create(name="Low")
|
||||||
nick1 = await NickNames.objects.create(
|
nick1 = await NickName.objects.create(
|
||||||
name="BazingaO", is_lame=False, level=level
|
name="BazingaO", is_lame=False, level=level
|
||||||
)
|
)
|
||||||
nick2 = await NickNames.objects.create(
|
nick2 = await NickName.objects.create(
|
||||||
name="Bazinga20", is_lame=True, level=level2
|
name="Bazinga20", is_lame=True, level=level2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user