350 lines
13 KiB
Python
350 lines
13 KiB
Python
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, TypeVar, Union
|
|
|
|
import ormar.queryset # noqa I100
|
|
from ormar.exceptions import ModelPersistenceError, NoMatch
|
|
from ormar.models import NewBaseModel # noqa I100
|
|
from ormar.models.model_row import ModelRow
|
|
from ormar.queryset.utils import subtract_dict, translate_list_to_dict
|
|
|
|
T = TypeVar("T", bound="Model")
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from ormar import ForeignKeyField
|
|
from ormar.models.ormar_config import OrmarConfig
|
|
|
|
|
|
class Model(ModelRow):
|
|
__abstract__ = False
|
|
if TYPE_CHECKING: # pragma nocover
|
|
ormar_config: OrmarConfig
|
|
|
|
def __repr__(self) -> str: # pragma nocover
|
|
_repr = {
|
|
k: getattr(self, k)
|
|
for k, v in self.ormar_config.model_fields.items()
|
|
if not v.skip_field
|
|
}
|
|
return f"{self.__class__.__name__}({str(_repr)})"
|
|
|
|
async def upsert(self: T, **kwargs: Any) -> T:
|
|
"""
|
|
Performs either a save or an update depending on the presence of the pk.
|
|
If the pk field is filled it's an update, otherwise the save is performed.
|
|
For save kwargs are ignored, used only in update if provided.
|
|
|
|
:param kwargs: list of fields to update
|
|
:type kwargs: Any
|
|
:return: saved Model
|
|
:rtype: Model
|
|
"""
|
|
|
|
force_save = kwargs.pop("__force_save__", False)
|
|
if force_save:
|
|
expr = self.ormar_config.table.select().where(self.pk_column == self.pk)
|
|
row = await self.ormar_config.database.fetch_one(expr)
|
|
if not row:
|
|
return await self.save()
|
|
return await self.update(**kwargs)
|
|
|
|
if not self.pk:
|
|
return await self.save()
|
|
return await self.update(**kwargs)
|
|
|
|
async def save(self: T) -> T:
|
|
"""
|
|
Performs a save of given Model instance.
|
|
If primary key is already saved, db backend will throw integrity error.
|
|
|
|
Related models are saved by pk number, reverse relation and many to many fields
|
|
are not saved - use corresponding relations methods.
|
|
|
|
If there are fields with server_default set and those fields
|
|
are not already filled save will trigger also a second query
|
|
to refreshed the fields populated server side.
|
|
|
|
Does not recognize if model was previously saved.
|
|
If you want to perform update or insert depending on the pk
|
|
fields presence use upsert.
|
|
|
|
Sends pre_save and post_save signals.
|
|
|
|
Sets model save status to True.
|
|
|
|
:return: saved Model
|
|
:rtype: Model
|
|
"""
|
|
await self.signals.pre_save.send(sender=self.__class__, instance=self)
|
|
self_fields = self._extract_model_db_fields()
|
|
|
|
if (
|
|
not self.pk
|
|
and self.ormar_config.model_fields[self.ormar_config.pkname].autoincrement
|
|
):
|
|
self_fields.pop(self.ormar_config.pkname, None)
|
|
self_fields = self.populate_default_values(self_fields)
|
|
self.update_from_dict(
|
|
{
|
|
k: v
|
|
for k, v in self_fields.items()
|
|
if k not in self.extract_related_names()
|
|
}
|
|
)
|
|
|
|
self_fields = self.translate_columns_to_aliases(self_fields)
|
|
expr = self.ormar_config.table.insert()
|
|
expr = expr.values(**self_fields)
|
|
|
|
pk = await self.ormar_config.database.execute(expr)
|
|
if pk and isinstance(pk, self.pk_type()):
|
|
setattr(self, self.ormar_config.pkname, pk)
|
|
|
|
self.set_save_status(True)
|
|
# refresh server side defaults
|
|
if any(
|
|
field.server_default is not None
|
|
for name, field in self.ormar_config.model_fields.items()
|
|
if name not in self_fields
|
|
):
|
|
await self.load()
|
|
|
|
await self.signals.post_save.send(sender=self.__class__, instance=self)
|
|
return self
|
|
|
|
async def save_related( # noqa: CCR001, CFQ002
|
|
self,
|
|
follow: bool = False,
|
|
save_all: bool = False,
|
|
relation_map: Optional[Dict] = None,
|
|
exclude: Union[Set, Dict, None] = None,
|
|
update_count: int = 0,
|
|
previous_model: Optional["Model"] = None,
|
|
relation_field: Optional["ForeignKeyField"] = None,
|
|
) -> int:
|
|
"""
|
|
Triggers a upsert method on all related models
|
|
if the instances are not already saved.
|
|
By default saves only the directly related ones.
|
|
|
|
If follow=True is set it saves also related models of related models.
|
|
|
|
To not get stuck in an infinite loop as related models also keep a relation
|
|
to parent model visited models set is kept.
|
|
|
|
That way already visited models that are nested are saved, but the save do not
|
|
follow them inside. So Model A -> Model B -> Model A -> Model C will save second
|
|
Model A but will never follow into Model C.
|
|
Nested relations of those kind need to be persisted manually.
|
|
|
|
:param relation_field: field with relation leading to this model
|
|
:type relation_field: Optional[ForeignKeyField]
|
|
:param previous_model: previous model from which method came
|
|
:type previous_model: Model
|
|
:param exclude: items to exclude during saving of relations
|
|
:type exclude: Union[Set, Dict]
|
|
:param relation_map: map of relations to follow
|
|
:type relation_map: Dict
|
|
:param save_all: flag if all models should be saved or only not saved ones
|
|
:type save_all: bool
|
|
:param follow: flag to trigger deep save -
|
|
by default only directly related models are saved
|
|
with follow=True also related models of related models are saved
|
|
:type follow: bool
|
|
:param update_count: internal parameter for recursive calls -
|
|
number of updated instances
|
|
:type update_count: int
|
|
:return: number of updated/saved models
|
|
:rtype: int
|
|
"""
|
|
relation_map = (
|
|
relation_map
|
|
if relation_map is not None
|
|
else translate_list_to_dict(self._iterate_related_models())
|
|
)
|
|
if exclude and isinstance(exclude, Set):
|
|
exclude = translate_list_to_dict(exclude)
|
|
relation_map = subtract_dict(relation_map, exclude or {})
|
|
|
|
if relation_map:
|
|
fields_to_visit = {
|
|
field
|
|
for field in self.extract_related_fields()
|
|
if field.name in relation_map
|
|
}
|
|
pre_save = {
|
|
field
|
|
for field in fields_to_visit
|
|
if not field.virtual and not field.is_multi
|
|
}
|
|
|
|
update_count = await self._update_relation_list(
|
|
fields_list=pre_save,
|
|
follow=follow,
|
|
save_all=save_all,
|
|
relation_map=relation_map,
|
|
update_count=update_count,
|
|
)
|
|
|
|
update_count = await self._upsert_model(
|
|
instance=self,
|
|
save_all=save_all,
|
|
previous_model=previous_model,
|
|
relation_field=relation_field,
|
|
update_count=update_count,
|
|
)
|
|
|
|
post_save = fields_to_visit - pre_save
|
|
|
|
update_count = await self._update_relation_list(
|
|
fields_list=post_save,
|
|
follow=follow,
|
|
save_all=save_all,
|
|
relation_map=relation_map,
|
|
update_count=update_count,
|
|
)
|
|
|
|
else:
|
|
update_count = await self._upsert_model(
|
|
instance=self,
|
|
save_all=save_all,
|
|
previous_model=previous_model,
|
|
relation_field=relation_field,
|
|
update_count=update_count,
|
|
)
|
|
|
|
return update_count
|
|
|
|
async def update(self: T, _columns: Optional[List[str]] = None, **kwargs: Any) -> T:
|
|
"""
|
|
Performs update of Model instance in the database.
|
|
Fields can be updated before or you can pass them as kwargs.
|
|
|
|
Sends pre_update and post_update signals.
|
|
|
|
Sets model save status to True.
|
|
|
|
:param _columns: list of columns to update, if None all are updated
|
|
:type _columns: List
|
|
:raises ModelPersistenceError: If the pk column is not set
|
|
|
|
:param kwargs: list of fields to update as field=value pairs
|
|
:type kwargs: Any
|
|
:return: updated Model
|
|
:rtype: Model
|
|
"""
|
|
if kwargs:
|
|
self.update_from_dict(kwargs)
|
|
|
|
if not self.pk:
|
|
raise ModelPersistenceError(
|
|
"You cannot update not saved model! Use save or upsert method."
|
|
)
|
|
|
|
await self.signals.pre_update.send(
|
|
sender=self.__class__, instance=self, passed_args=kwargs
|
|
)
|
|
self_fields = self._extract_model_db_fields()
|
|
self_fields.pop(self.get_column_name_from_alias(self.ormar_config.pkname))
|
|
if _columns:
|
|
self_fields = {k: v for k, v in self_fields.items() if k in _columns}
|
|
if self_fields:
|
|
self_fields = self.translate_columns_to_aliases(self_fields)
|
|
expr = self.ormar_config.table.update().values(**self_fields)
|
|
expr = expr.where(self.pk_column == getattr(self, self.ormar_config.pkname))
|
|
|
|
await self.ormar_config.database.execute(expr)
|
|
self.set_save_status(True)
|
|
await self.signals.post_update.send(sender=self.__class__, instance=self)
|
|
return self
|
|
|
|
async def delete(self) -> int:
|
|
"""
|
|
Removes the Model instance from the database.
|
|
|
|
Sends pre_delete and post_delete signals.
|
|
|
|
Sets model save status to False.
|
|
|
|
Note it does not delete the Model itself (python object).
|
|
So you can delete and later save (since pk is deleted no conflict will arise)
|
|
or update and the Model will be saved in database again.
|
|
|
|
:return: number of deleted rows (for some backends)
|
|
:rtype: int
|
|
"""
|
|
await self.signals.pre_delete.send(sender=self.__class__, instance=self)
|
|
expr = self.ormar_config.table.delete()
|
|
expr = expr.where(self.pk_column == (getattr(self, self.ormar_config.pkname)))
|
|
result = await self.ormar_config.database.execute(expr)
|
|
self.set_save_status(False)
|
|
await self.signals.post_delete.send(sender=self.__class__, instance=self)
|
|
return result
|
|
|
|
async def load(self: T) -> T:
|
|
"""
|
|
Allow to refresh existing Models fields from database.
|
|
Be careful as the related models can be overwritten by pk_only models in load.
|
|
Does NOT refresh the related models fields if they were loaded before.
|
|
|
|
:raises NoMatch: If given pk is not found in database.
|
|
|
|
:return: reloaded Model
|
|
:rtype: Model
|
|
"""
|
|
expr = self.ormar_config.table.select().where(self.pk_column == self.pk)
|
|
row = await self.ormar_config.database.fetch_one(expr)
|
|
if not row: # pragma nocover
|
|
raise NoMatch("Instance was deleted from database and cannot be refreshed")
|
|
kwargs = dict(row)
|
|
kwargs = self.translate_aliases_to_columns(kwargs)
|
|
self.update_from_dict(kwargs)
|
|
self.set_save_status(True)
|
|
return self
|
|
|
|
async def load_all(
|
|
self: T,
|
|
follow: bool = False,
|
|
exclude: Union[List, str, Set, Dict, None] = None,
|
|
order_by: Union[List, str, None] = None,
|
|
) -> T:
|
|
"""
|
|
Allow to refresh existing Models fields from database.
|
|
Performs refresh of the related models fields.
|
|
|
|
By default, loads only self and the directly related ones.
|
|
|
|
If follow=True is set it loads also related models of related models.
|
|
|
|
To not get stuck in an infinite loop as related models also keep a relation
|
|
to parent model visited models set is kept.
|
|
|
|
That way already visited models that are nested are loaded, but the load do not
|
|
follow them inside. So Model A -> Model B -> Model C -> Model A -> Model X
|
|
will load second Model A but will never follow into Model X.
|
|
Nested relations of those kind need to be loaded manually.
|
|
|
|
:param order_by: columns by which models should be sorted
|
|
:type order_by: Union[List, str]
|
|
:raises NoMatch: If given pk is not found in database.
|
|
|
|
:param exclude: related models to exclude
|
|
:type exclude: Union[List, str, Set, Dict]
|
|
:param follow: flag to trigger deep save -
|
|
by default only directly related models are saved
|
|
with follow=True also related models of related models are saved
|
|
:type follow: bool
|
|
:return: reloaded Model
|
|
:rtype: Model
|
|
"""
|
|
relations = list(self.extract_related_names())
|
|
if follow:
|
|
relations = self._iterate_related_models()
|
|
queryset = self.__class__.objects
|
|
if exclude:
|
|
queryset = queryset.exclude_fields(exclude)
|
|
if order_by:
|
|
queryset = queryset.order_by(order_by)
|
|
instance = await queryset.select_related(relations).get(pk=self.pk)
|
|
self._orm.clear()
|
|
self.update_from_dict(instance.model_dump())
|
|
return self
|