add signals, register six signals on each models (pre/post + save/update/delete)
This commit is contained in:
@ -18,6 +18,7 @@ from ormar.fields.many_to_many import ManyToMany, ManyToManyField
|
||||
from ormar.models.quick_access_views import quick_access_set
|
||||
from ormar.queryset import QuerySet
|
||||
from ormar.relations.alias_manager import AliasManager
|
||||
from ormar.signals import Signal, SignalEmitter
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model
|
||||
@ -38,6 +39,7 @@ class ModelMeta:
|
||||
]
|
||||
alias_manager: AliasManager
|
||||
property_fields: Set
|
||||
signals: SignalEmitter
|
||||
|
||||
|
||||
def register_relation_on_build(table_name: str, field: Type[ForeignKeyField]) -> None:
|
||||
@ -332,15 +334,12 @@ def add_cached_properties(new_model: Type["Model"]) -> None:
|
||||
new_model._pydantic_fields = {name for name in new_model.__fields__}
|
||||
|
||||
|
||||
def property_fields_not_set(new_model: Type["Model"]) -> bool:
|
||||
return (
|
||||
not hasattr(new_model.Meta, "property_fields")
|
||||
or not new_model.Meta.property_fields
|
||||
)
|
||||
def meta_field_not_set(model: Type["Model"], field_name: str) -> bool:
|
||||
return not hasattr(model.Meta, field_name) or not getattr(model.Meta, field_name)
|
||||
|
||||
|
||||
def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001
|
||||
if property_fields_not_set(new_model):
|
||||
if meta_field_not_set(model=new_model, field_name="property_fields"):
|
||||
props = set()
|
||||
for var_name, value in attrs.items():
|
||||
if isinstance(value, property):
|
||||
@ -351,6 +350,18 @@ def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa:
|
||||
new_model.Meta.property_fields = props
|
||||
|
||||
|
||||
def register_signals(new_model: Type["Model"]) -> None: # noqa: CCR001
|
||||
if meta_field_not_set(model=new_model, field_name="signals"):
|
||||
signals = SignalEmitter()
|
||||
signals.pre_save = Signal()
|
||||
signals.pre_update = Signal()
|
||||
signals.pre_delete = Signal()
|
||||
signals.post_save = Signal()
|
||||
signals.post_update = Signal()
|
||||
signals.post_delete = Signal()
|
||||
new_model.Meta.signals = signals
|
||||
|
||||
|
||||
class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
def __new__( # type: ignore
|
||||
mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict
|
||||
@ -379,5 +390,6 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
new_model.Meta.alias_manager = alias_manager
|
||||
new_model.objects = QuerySet(new_model)
|
||||
add_property_fields(new_model, attrs)
|
||||
register_signals(new_model=new_model)
|
||||
|
||||
return new_model
|
||||
|
||||
@ -195,6 +195,9 @@ class Model(NewBaseModel):
|
||||
if not self.pk and self.Meta.model_fields[self.Meta.pkname].autoincrement:
|
||||
self_fields.pop(self.Meta.pkname, None)
|
||||
self_fields = self.populate_default_values(self_fields)
|
||||
self.from_dict(self_fields)
|
||||
|
||||
await self.signals.pre_save.send(sender=self.__class__, instance=self)
|
||||
|
||||
self_fields = self.translate_columns_to_aliases(self_fields)
|
||||
expr = self.Meta.table.insert()
|
||||
@ -204,6 +207,7 @@ class Model(NewBaseModel):
|
||||
if pk and isinstance(pk, self.pk_type()):
|
||||
setattr(self, self.Meta.pkname, pk)
|
||||
|
||||
self.set_save_status(True)
|
||||
# refresh server side defaults
|
||||
if any(
|
||||
field.server_default is not None
|
||||
@ -211,9 +215,8 @@ class Model(NewBaseModel):
|
||||
if name not in self_fields
|
||||
):
|
||||
await self.load()
|
||||
return self
|
||||
|
||||
self.set_save_status(True)
|
||||
await self.signals.post_save.send(sender=self.__class__, instance=self)
|
||||
return self
|
||||
|
||||
async def save_related( # noqa: CCR001
|
||||
@ -268,6 +271,7 @@ class Model(NewBaseModel):
|
||||
"You cannot update not saved model! Use save or upsert method."
|
||||
)
|
||||
|
||||
await self.signals.pre_update.send(sender=self.__class__, instance=self)
|
||||
self_fields = self._extract_model_db_fields()
|
||||
self_fields.pop(self.get_column_name_from_alias(self.Meta.pkname))
|
||||
self_fields = self.translate_columns_to_aliases(self_fields)
|
||||
@ -276,13 +280,16 @@ class Model(NewBaseModel):
|
||||
|
||||
await self.Meta.database.execute(expr)
|
||||
self.set_save_status(True)
|
||||
await self.signals.post_update.send(sender=self.__class__, instance=self)
|
||||
return self
|
||||
|
||||
async def delete(self: T) -> int:
|
||||
await self.signals.pre_delete.send(sender=self.__class__, instance=self)
|
||||
expr = self.Meta.table.delete()
|
||||
expr = expr.where(self.pk_column == (getattr(self, self.Meta.pkname)))
|
||||
result = await self.Meta.database.execute(expr)
|
||||
self.set_save_status(False)
|
||||
await self.signals.post_delete.send(sender=self.__class__, instance=self)
|
||||
return result
|
||||
|
||||
async def load(self: T) -> T:
|
||||
|
||||
@ -35,6 +35,7 @@ from ormar.relations.relation_manager import RelationsManager
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model
|
||||
from ormar.signals import SignalEmitter
|
||||
|
||||
T = TypeVar("T", bound=Model)
|
||||
|
||||
@ -212,6 +213,10 @@ class NewBaseModel(
|
||||
def saved(self) -> bool:
|
||||
return self._orm_saved
|
||||
|
||||
@property
|
||||
def signals(self) -> "SignalEmitter":
|
||||
return self.Meta.signals
|
||||
|
||||
@classmethod
|
||||
def pk_type(cls) -> Any:
|
||||
return cls.Meta.model_fields[cls.Meta.pkname].__type__
|
||||
|
||||
Reference in New Issue
Block a user