378 lines
12 KiB
Python
378 lines
12 KiB
Python
from typing import Any, Sequence, cast
|
|
|
|
import databases
|
|
import pytest
|
|
import sqlalchemy
|
|
from pydantic.typing import ForwardRef
|
|
|
|
import ormar
|
|
from tests.settings import DATABASE_URL
|
|
|
|
database = databases.Database(DATABASE_URL, force_rollback=True)
|
|
metadata = sqlalchemy.MetaData()
|
|
|
|
|
|
class BaseMeta(ormar.ModelMeta):
|
|
database = database
|
|
metadata = metadata
|
|
|
|
|
|
class Category(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "categories"
|
|
|
|
id = ormar.Integer(primary_key=True)
|
|
name = ormar.String(max_length=40)
|
|
|
|
|
|
class PostCategory(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "posts_x_categories"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
sort_order: int = ormar.Integer(nullable=True)
|
|
param_name: str = ormar.String(default="Name", max_length=200)
|
|
|
|
|
|
class Blog(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
pass
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
title: str = ormar.String(max_length=200)
|
|
|
|
|
|
class Post(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
pass
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
title: str = ormar.String(max_length=200)
|
|
categories = ormar.ManyToMany(Category, through=PostCategory)
|
|
blog = ormar.ForeignKey(Blog)
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def create_test_database():
|
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
|
metadata.drop_all(engine)
|
|
metadata.create_all(engine)
|
|
yield
|
|
metadata.drop_all(engine)
|
|
|
|
|
|
class PostCategory2(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "posts_x_categories2"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
sort_order: int = ormar.Integer(nullable=True)
|
|
|
|
|
|
class Post2(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
pass
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
title: str = ormar.String(max_length=200)
|
|
categories = ormar.ManyToMany(Category, through=ForwardRef("PostCategory2"))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_forward_ref_is_updated():
|
|
async with database:
|
|
assert Post2.Meta.requires_ref_update
|
|
Post2.update_forward_refs()
|
|
|
|
assert Post2.Meta.model_fields["postcategory2"].to == PostCategory2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_setting_fields_on_through_model():
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
category = await Category(name="Test category").save()
|
|
await post.categories.add(category)
|
|
|
|
assert hasattr(post.categories[0], "postcategory")
|
|
assert post.categories[0].postcategory is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_setting_additional_fields_on_through_model_in_add():
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
category = await Category(name="Test category").save()
|
|
await post.categories.add(category, sort_order=1)
|
|
postcat = await PostCategory.objects.get()
|
|
assert postcat.sort_order == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_setting_additional_fields_on_through_model_in_create():
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 2}
|
|
)
|
|
postcat = await PostCategory.objects.get()
|
|
assert postcat.sort_order == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_getting_additional_fields_from_queryset() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1", postcategory={"sort_order": 1}
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 2}
|
|
)
|
|
|
|
await post.categories.all()
|
|
assert post.postcategory is None
|
|
assert post.categories[0].postcategory.sort_order == 1
|
|
assert post.categories[1].postcategory.sort_order == 2
|
|
|
|
post2 = await Post.objects.select_related("categories").get(
|
|
categories__name="Test category2"
|
|
)
|
|
assert post2.categories[0].postcategory.sort_order == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_only_one_side_has_through() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1", postcategory={"sort_order": 1}
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 2}
|
|
)
|
|
|
|
post2 = await Post.objects.select_related("categories").get()
|
|
assert post2.postcategory is None
|
|
assert post2.categories[0].postcategory is not None
|
|
|
|
await post2.categories.all()
|
|
assert post2.postcategory is None
|
|
assert post2.categories[0].postcategory is not None
|
|
|
|
categories = await Category.objects.select_related("posts").all()
|
|
categories = cast(Sequence[Category], categories)
|
|
assert categories[0].postcategory is None
|
|
assert categories[0].posts[0].postcategory is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filtering_by_through_model() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1",
|
|
postcategory={"sort_order": 1, "param_name": "volume"},
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 2, "param_name": "area"}
|
|
)
|
|
|
|
post2 = (
|
|
await Post.objects.select_related("categories")
|
|
.filter(postcategory__sort_order__gt=1)
|
|
.get()
|
|
)
|
|
assert len(post2.categories) == 1
|
|
assert post2.categories[0].postcategory.sort_order == 2
|
|
|
|
post3 = await Post.objects.filter(
|
|
categories__postcategory__param_name="volume"
|
|
).get()
|
|
assert len(post3.categories) == 1
|
|
assert post3.categories[0].postcategory.param_name == "volume"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deep_filtering_by_through_model() -> Any:
|
|
async with database:
|
|
blog = await Blog(title="My Blog").save()
|
|
post = await Post(title="Test post", blog=blog).save()
|
|
|
|
await post.categories.create(
|
|
name="Test category1",
|
|
postcategory={"sort_order": 1, "param_name": "volume"},
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 2, "param_name": "area"}
|
|
)
|
|
|
|
blog2 = (
|
|
await Blog.objects.select_related("posts__categories")
|
|
.filter(posts__postcategory__sort_order__gt=1)
|
|
.get()
|
|
)
|
|
assert len(blog2.posts) == 1
|
|
assert len(blog2.posts[0].categories) == 1
|
|
assert blog2.posts[0].categories[0].postcategory.sort_order == 2
|
|
|
|
blog3 = await Blog.objects.filter(
|
|
posts__categories__postcategory__param_name="volume"
|
|
).get()
|
|
assert len(blog3.posts) == 1
|
|
assert len(blog3.posts[0].categories) == 1
|
|
assert blog3.posts[0].categories[0].postcategory.param_name == "volume"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ordering_by_through_model() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1",
|
|
postcategory={"sort_order": 2, "param_name": "volume"},
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 1, "param_name": "area"}
|
|
)
|
|
await post.categories.create(
|
|
name="Test category3",
|
|
postcategory={"sort_order": 3, "param_name": "velocity"},
|
|
)
|
|
|
|
post2 = (
|
|
await Post.objects.select_related("categories")
|
|
.order_by("-postcategory__sort_order")
|
|
.get()
|
|
)
|
|
assert len(post2.categories) == 3
|
|
assert post2.categories[0].name == "Test category3"
|
|
assert post2.categories[2].name == "Test category2"
|
|
|
|
post3 = (
|
|
await Post.objects.select_related("categories")
|
|
.order_by("categories__postcategory__param_name")
|
|
.get()
|
|
)
|
|
assert len(post3.categories) == 3
|
|
assert post3.categories[0].postcategory.param_name == "area"
|
|
assert post3.categories[2].postcategory.param_name == "volume"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_through_models_from_queryset_on_through() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1",
|
|
postcategory={"sort_order": 2, "param_name": "volume"},
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 1, "param_name": "area"}
|
|
)
|
|
await post.categories.create(
|
|
name="Test category3",
|
|
postcategory={"sort_order": 3, "param_name": "velocity"},
|
|
)
|
|
|
|
await PostCategory.objects.filter(param_name="volume", post=post.id).update(
|
|
sort_order=4
|
|
)
|
|
post2 = (
|
|
await Post.objects.select_related("categories")
|
|
.order_by("-postcategory__sort_order")
|
|
.get()
|
|
)
|
|
assert len(post2.categories) == 3
|
|
assert post2.categories[0].postcategory.param_name == "volume"
|
|
assert post2.categories[2].postcategory.param_name == "area"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_through_model_after_load() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1",
|
|
postcategory={"sort_order": 2, "param_name": "volume"},
|
|
)
|
|
post2 = await Post.objects.select_related("categories").get()
|
|
assert len(post2.categories) == 1
|
|
|
|
await post2.categories[0].postcategory.load()
|
|
await post2.categories[0].postcategory.update(sort_order=3)
|
|
|
|
post3 = await Post.objects.select_related("categories").get()
|
|
assert len(post3.categories) == 1
|
|
assert post3.categories[0].postcategory.sort_order == 3
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_through_from_related() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1",
|
|
postcategory={"sort_order": 2, "param_name": "volume"},
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 1, "param_name": "area"}
|
|
)
|
|
await post.categories.create(
|
|
name="Test category3",
|
|
postcategory={"sort_order": 3, "param_name": "velocity"},
|
|
)
|
|
|
|
await post.categories.filter(name="Test category3").update(
|
|
postcategory={"sort_order": 4}
|
|
)
|
|
|
|
post2 = (
|
|
await Post.objects.select_related("categories")
|
|
.order_by("postcategory__sort_order")
|
|
.get()
|
|
)
|
|
assert len(post2.categories) == 3
|
|
assert post2.categories[2].postcategory.sort_order == 4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_excluding_fields_on_through_model() -> Any:
|
|
async with database:
|
|
post = await Post(title="Test post").save()
|
|
await post.categories.create(
|
|
name="Test category1",
|
|
postcategory={"sort_order": 2, "param_name": "volume"},
|
|
)
|
|
await post.categories.create(
|
|
name="Test category2", postcategory={"sort_order": 1, "param_name": "area"}
|
|
)
|
|
await post.categories.create(
|
|
name="Test category3",
|
|
postcategory={"sort_order": 3, "param_name": "velocity"},
|
|
)
|
|
|
|
post2 = (
|
|
await Post.objects.select_related("categories")
|
|
.exclude_fields("postcategory__param_name")
|
|
.order_by("postcategory__sort_order")
|
|
.get()
|
|
)
|
|
assert len(post2.categories) == 3
|
|
assert post2.categories[0].postcategory.param_name is None
|
|
assert post2.categories[0].postcategory.sort_order == 1
|
|
|
|
assert post2.categories[2].postcategory.param_name is None
|
|
assert post2.categories[2].postcategory.sort_order == 3
|
|
|
|
post3 = (
|
|
await Post.objects.select_related("categories")
|
|
.fields({"postcategory": ..., "title": ...})
|
|
.exclude_fields({"postcategory": {"param_name", "sort_order"}})
|
|
.get()
|
|
)
|
|
assert len(post3.categories) == 3
|
|
for category in post3.categories:
|
|
assert category.postcategory.param_name is None
|
|
assert category.postcategory.sort_order is None
|