* WIP * WIP - make test_model_definition tests pass * WIP - make test_model_methods pass * WIP - make whole test suit at least run - failing 49/443 tests * WIP fix part of the getting pydantic tests as types of fields are now kept in core schema and not on fieldsinfo * WIP fix validation in update by creating individual fields validators, failing 36/443 * WIP fix __pydantic_extra__ in intializing model, fix test related to pydantic config checks, failing 32/442 * WIP - fix enum schema in model_json_schema, failing 31/442 * WIP - fix copying through model, fix setting pydantic fields on through, fix default config and inheriting from it, failing 26/442 * WIP fix tests checking pydantic schema, fix excluding parent fields, failing 21/442 * WIP some missed files * WIP - fix validators inheritance and fix validators in generated pydantic, failing 17/442 * WIP - fix through models setting - only on reverse side of relation, but always on reverse side, failing 15/442 * WIP - fix through models setting - only on reverse side of relation, but always on reverse side, failing 15/442 * WIP - working on proper populating __dict__ for relations for new schema dumping, some work on openapi docs, failing 13/442 * WIP - remove property fields as pydantic has now computed_field on its own, failing 9/442 * WIP - fixes in docs, failing 8/442 * WIP - fix tests for largebinary schema, wrapped bytes fields fail in pydantic, will be fixed in pydantic-core, remaining is circural schema for related models, failing 6/442 * WIP - fix to pk only models in schemas * Getting test suites to pass (#1249) * wip, fixing tests * iteration, fixing some more tests * iteration, fixing some more tests * adhere to comments * adhere to comments * remove unnecessary dict call, re-add getattribute for testing * todo for reverse relationship * adhere to comments, remove prints * solve circular refs * all tests pass 🎉 * remove 3.7 from tests * add lint and type check jobs * reforat with ruff, fix jobs * rename jobs * fix imports * fix evaluate in py3.8 * partially fix coverage * fix coverage, add more tests * fix test ids * fix test ids * fix lint, fix docs, make docs fully working scripts, add test docs job * fix pyproject * pin py ver in test docs * change dir in test docs * fix pydantic warning hack * rm poetry call in test_docs * switch to pathlib in test docs * remove coverage req test docs * fix type check tests, fix part of types * fix/skip next part of types * fix next part of types * fix next part of types * fix coverage * fix coverage * fix type (bit dirty 🤷) * fix some code smells * change pre-commit * tweak workflows * remove no root from tests * switch to full python path by passing sys.executable * some small refactor in new base model, one sample test, change makefile * small refactors to reduce complexity of methods * temp add tests for prs against pydantic_v2 * remove all references to __fields__ * remove all references to construct, deprecate the method and update model_construct to be in line with pydantic * deprecate dict and add model_dump, todo switch to model_dict in calls * fix tests * change to union * change to union * change to model_dump and model_dump_json from dict and json deprecated methods, deprecate them in ormar too * finish switching dict() -> model_dump() * finish switching json() -> model_dump_json() * remove fully pydantic_only * switch to extra for payment card, change missed json calls * fix coverage - no more warnings internal * fix coverage - no more warnings internal - part 2 * split model_construct into own and pydantic parts * split determine pydantic field type * change to new field validators * fix benchmarks, add codspeed instead of pytest-benchmark, add action and gh workflow * restore pytest-benchmark * remove codspeed * pin pydantic version, restore codspeed * change on push to pydantic_v2 to trigger first one * Use lifespan function instead of event (#1259) * check return types * fix imports order, set warnings=False on json that passes the dict, fix unnecessary loop in one of the test * remove references to model's meta as it's now ormar config, rename related methods too * filter out pydantic serializer warnings * remove choices leftovers * remove leftovers after property_fields, keep only enough to exclude them in initialization * add migration guide * fix meta references * downgrade databases for now * Change line numbers in documentation (#1265) * proofread and fix the docs, part 1 * proofread and fix the docs for models * proofread and fix the docs for fields * proofread and fix the docs for relations * proofread and fix rest of the docs, add release notes for 0.20 * create tables in new docs src * cleanup old deps, uncomment docs publish on tag * fix import reorder --------- Co-authored-by: TouwaStar <30479449+TouwaStar@users.noreply.github.com> Co-authored-by: Goran Mekić <meka@tilda.center>
376 lines
13 KiB
Python
376 lines
13 KiB
Python
from typing import Optional
|
|
|
|
import ormar
|
|
import pydantic
|
|
import pytest
|
|
import pytest_asyncio
|
|
from ormar import (
|
|
post_bulk_update,
|
|
post_delete,
|
|
post_save,
|
|
post_update,
|
|
pre_delete,
|
|
pre_save,
|
|
pre_update,
|
|
)
|
|
from ormar.exceptions import SignalDefinitionError
|
|
from ormar.signals import SignalEmitter
|
|
|
|
from tests.lifespan import init_tests
|
|
from tests.settings import create_config
|
|
|
|
base_ormar_config = create_config()
|
|
|
|
|
|
class AuditLog(ormar.Model):
|
|
ormar_config = base_ormar_config.copy(tablename="audits")
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
event_type: str = ormar.String(max_length=100)
|
|
event_log: pydantic.Json = ormar.JSON()
|
|
|
|
|
|
class Cover(ormar.Model):
|
|
ormar_config = base_ormar_config.copy(tablename="covers")
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
title: str = ormar.String(max_length=100)
|
|
|
|
|
|
class Album(ormar.Model):
|
|
ormar_config = base_ormar_config.copy(tablename="albums")
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
name: str = ormar.String(max_length=100)
|
|
is_best_seller: bool = ormar.Boolean(default=False)
|
|
play_count: int = ormar.Integer(default=0)
|
|
cover: Optional[Cover] = ormar.ForeignKey(Cover)
|
|
|
|
|
|
create_test_database = init_tests(base_ormar_config)
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="function")
|
|
async def cleanup():
|
|
yield
|
|
async with base_ormar_config.database:
|
|
await AuditLog.objects.delete(each=True)
|
|
|
|
|
|
def test_passing_not_callable():
|
|
with pytest.raises(SignalDefinitionError):
|
|
pre_save(Album)("wrong")
|
|
|
|
|
|
def test_passing_callable_without_kwargs():
|
|
with pytest.raises(SignalDefinitionError):
|
|
|
|
@pre_save(Album)
|
|
def trigger(sender, instance): # pragma: no cover
|
|
pass
|
|
|
|
|
|
def test_invalid_signal():
|
|
emitter = SignalEmitter()
|
|
with pytest.raises(SignalDefinitionError):
|
|
emitter.save = 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_signal_functions(cleanup):
|
|
async with base_ormar_config.database:
|
|
async with base_ormar_config.database.transaction(force_rollback=True):
|
|
|
|
@pre_save(Album)
|
|
async def before_save(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"PRE_SAVE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
@post_save(Album)
|
|
async def after_save(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"POST_SAVE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
@pre_update(Album)
|
|
async def before_update(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"PRE_UPDATE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
@post_update(Album)
|
|
async def after_update(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"POST_UPDATE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
@pre_delete(Album)
|
|
async def before_delete(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"PRE_DELETE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
@post_delete(Album)
|
|
async def after_delete(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"POST_DELETE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
@post_bulk_update(Album)
|
|
async def after_bulk_update(sender, instances, **kwargs):
|
|
for it in instances:
|
|
await AuditLog(
|
|
event_type=f"BULK_POST_UPDATE_{sender.get_name()}",
|
|
event_log=it.model_dump_json(),
|
|
).save()
|
|
|
|
album = await Album.objects.create(name="Venice")
|
|
|
|
audits = await AuditLog.objects.all()
|
|
assert len(audits) == 2
|
|
assert audits[0].event_type == "PRE_SAVE_album"
|
|
assert audits[0].event_log.get("name") == album.name
|
|
assert audits[1].event_type == "POST_SAVE_album"
|
|
assert audits[1].event_log.get("id") == album.pk
|
|
|
|
album = await Album(name="Rome").save()
|
|
audits = await AuditLog.objects.all()
|
|
assert len(audits) == 4
|
|
assert audits[2].event_type == "PRE_SAVE_album"
|
|
assert audits[2].event_log.get("name") == album.name
|
|
assert audits[3].event_type == "POST_SAVE_album"
|
|
assert audits[3].event_log.get("id") == album.pk
|
|
|
|
album.is_best_seller = True
|
|
await album.update()
|
|
|
|
audits = await AuditLog.objects.filter(event_type__contains="UPDATE").all()
|
|
assert len(audits) == 2
|
|
assert audits[0].event_type == "PRE_UPDATE_album"
|
|
assert audits[0].event_log.get("name") == album.name
|
|
assert audits[1].event_type == "POST_UPDATE_album"
|
|
assert audits[1].event_log.get("is_best_seller") == album.is_best_seller
|
|
|
|
album.signals.pre_update.disconnect(before_update)
|
|
album.signals.post_update.disconnect(after_update)
|
|
|
|
album.is_best_seller = False
|
|
await album.update()
|
|
|
|
audits = await AuditLog.objects.filter(event_type__contains="UPDATE").all()
|
|
assert len(audits) == 2
|
|
|
|
await album.delete()
|
|
audits = await AuditLog.objects.filter(event_type__contains="DELETE").all()
|
|
assert len(audits) == 2
|
|
assert audits[0].event_type == "PRE_DELETE_album"
|
|
assert (
|
|
audits[0].event_log.get("id")
|
|
== audits[1].event_log.get("id")
|
|
== album.id
|
|
)
|
|
assert audits[1].event_type == "POST_DELETE_album"
|
|
|
|
album.signals.pre_delete.disconnect(before_delete)
|
|
album.signals.post_delete.disconnect(after_delete)
|
|
album.signals.pre_save.disconnect(before_save)
|
|
album.signals.post_save.disconnect(after_save)
|
|
|
|
albums = await Album.objects.all()
|
|
assert len(albums)
|
|
|
|
for album in albums:
|
|
album.play_count = 1
|
|
|
|
await Album.objects.bulk_update(albums)
|
|
|
|
cnt = await AuditLog.objects.filter(
|
|
event_type__contains="BULK_POST"
|
|
).count()
|
|
assert cnt == len(albums)
|
|
|
|
album.signals.bulk_post_update.disconnect(after_bulk_update)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_signals(cleanup):
|
|
async with base_ormar_config.database:
|
|
async with base_ormar_config.database.transaction(force_rollback=True):
|
|
|
|
@pre_save(Album)
|
|
async def before_save(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"PRE_SAVE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
@pre_save(Album)
|
|
async def before_save2(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"PRE_SAVE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
album = await Album.objects.create(name="Miami")
|
|
audits = await AuditLog.objects.all()
|
|
assert len(audits) == 2
|
|
assert audits[0].event_type == "PRE_SAVE_album"
|
|
assert audits[0].event_log.get("name") == album.name
|
|
assert audits[1].event_type == "PRE_SAVE_album"
|
|
assert audits[1].event_log.get("name") == album.name
|
|
|
|
album.signals.pre_save.disconnect(before_save)
|
|
album.signals.pre_save.disconnect(before_save2)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_static_methods_as_signals(cleanup):
|
|
async with base_ormar_config.database:
|
|
async with base_ormar_config.database.transaction(force_rollback=True):
|
|
|
|
class AlbumAuditor:
|
|
event_type = "ALBUM_INSTANCE"
|
|
|
|
@staticmethod
|
|
@pre_save(Album)
|
|
async def before_save(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"{AlbumAuditor.event_type}_SAVE",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
album = await Album.objects.create(name="Colorado")
|
|
audits = await AuditLog.objects.all()
|
|
assert len(audits) == 1
|
|
assert audits[0].event_type == "ALBUM_INSTANCE_SAVE"
|
|
assert audits[0].event_log.get("name") == album.name
|
|
|
|
album.signals.pre_save.disconnect(AlbumAuditor.before_save)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_methods_as_signals(cleanup):
|
|
async with base_ormar_config.database:
|
|
async with base_ormar_config.database.transaction(force_rollback=True):
|
|
|
|
class AlbumAuditor:
|
|
def __init__(self):
|
|
self.event_type = "ALBUM_INSTANCE"
|
|
|
|
async def before_save(self, sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"{self.event_type}_SAVE",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
auditor = AlbumAuditor()
|
|
pre_save(Album)(auditor.before_save)
|
|
|
|
album = await Album.objects.create(name="San Francisco")
|
|
audits = await AuditLog.objects.all()
|
|
assert len(audits) == 1
|
|
assert audits[0].event_type == "ALBUM_INSTANCE_SAVE"
|
|
assert audits[0].event_log.get("name") == album.name
|
|
|
|
album.signals.pre_save.disconnect(auditor.before_save)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_senders_signal(cleanup):
|
|
async with base_ormar_config.database:
|
|
async with base_ormar_config.database.transaction(force_rollback=True):
|
|
|
|
@pre_save([Album, Cover])
|
|
async def before_save(sender, instance, **kwargs):
|
|
await AuditLog(
|
|
event_type=f"PRE_SAVE_{sender.get_name()}",
|
|
event_log=instance.model_dump_json(),
|
|
).save()
|
|
|
|
cover = await Cover(title="Blue").save()
|
|
album = await Album.objects.create(name="San Francisco", cover=cover)
|
|
|
|
audits = await AuditLog.objects.all()
|
|
assert len(audits) == 2
|
|
assert audits[0].event_type == "PRE_SAVE_cover"
|
|
assert audits[0].event_log.get("title") == cover.title
|
|
assert audits[1].event_type == "PRE_SAVE_album"
|
|
assert audits[1].event_log.get("cover") == album.cover.model_dump(
|
|
exclude={"albums"}
|
|
)
|
|
|
|
album.signals.pre_save.disconnect(before_save)
|
|
cover.signals.pre_save.disconnect(before_save)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_modifing_the_instance(cleanup):
|
|
async with base_ormar_config.database:
|
|
async with base_ormar_config.database.transaction(force_rollback=True):
|
|
|
|
@pre_update(Album)
|
|
async def before_update(sender, instance, **kwargs):
|
|
if instance.play_count > 50 and not instance.is_best_seller:
|
|
instance.is_best_seller = True
|
|
|
|
# here album.play_count ans is_best_seller get default values
|
|
album = await Album.objects.create(name="Venice")
|
|
assert not album.is_best_seller
|
|
assert album.play_count == 0
|
|
|
|
album.play_count = 30
|
|
# here a trigger is called but play_count is too low
|
|
await album.update()
|
|
assert not album.is_best_seller
|
|
|
|
album.play_count = 60
|
|
await album.update()
|
|
assert album.is_best_seller
|
|
album.signals.pre_update.disconnect(before_update)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_custom_signal(cleanup):
|
|
async with base_ormar_config.database:
|
|
async with base_ormar_config.database.transaction(force_rollback=True):
|
|
|
|
async def after_update(sender, instance, **kwargs):
|
|
if instance.play_count > 50 and not instance.is_best_seller:
|
|
instance.is_best_seller = True
|
|
elif instance.play_count < 50 and instance.is_best_seller:
|
|
instance.is_best_seller = False
|
|
await instance.update()
|
|
|
|
Album.ormar_config.signals.custom.connect(after_update)
|
|
|
|
# here album.play_count ans is_best_seller get default values
|
|
album = await Album.objects.create(name="Venice")
|
|
assert not album.is_best_seller
|
|
assert album.play_count == 0
|
|
|
|
album.play_count = 30
|
|
# here a trigger is called but play_count is too low
|
|
await album.update()
|
|
assert not album.is_best_seller
|
|
|
|
album.play_count = 60
|
|
await album.update()
|
|
assert not album.is_best_seller
|
|
await Album.ormar_config.signals.custom.send(sender=Album, instance=album)
|
|
assert album.is_best_seller
|
|
|
|
album.play_count = 30
|
|
await album.update()
|
|
assert album.is_best_seller
|
|
await Album.ormar_config.signals.custom.send(sender=Album, instance=album)
|
|
assert not album.is_best_seller
|
|
|
|
Album.ormar_config.signals.custom.disconnect(after_update)
|