add test for select_related with limit

This commit is contained in:
collerek
2020-12-11 16:28:30 +01:00
parent 4cca8fe63f
commit 4c1acc09ea
2 changed files with 165 additions and 45 deletions

View File

@ -1,6 +1,17 @@
import logging
import warnings
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Tuple, Type, Union, cast
from typing import (
Any,
Dict,
List,
Optional,
Set,
TYPE_CHECKING,
Tuple,
Type,
Union,
cast,
)
import databases
import pydantic
@ -540,9 +551,7 @@ def update_attrs_and_fields(
model_fields.update(new_model_fields)
def update_attrs_from_base_meta(
base_class: "Model",
attrs: Dict, ) -> None:
def update_attrs_from_base_meta(base_class: "Model", attrs: Dict,) -> None:
"""
Updates Meta parameters in child from parent if needed.
@ -564,7 +573,9 @@ def update_attrs_from_base_meta(
curr_value.union(getattr(base_class.Meta, param))
else:
# overwrite with child value if both set and its param / object
setattr(attrs["Meta"], param, getattr(base_class.Meta, param)) # pragma: no cover
setattr(
attrs["Meta"], param, getattr(base_class.Meta, param)
) # pragma: no cover
else:
setattr(attrs["Meta"], param, getattr(base_class.Meta, param))
@ -603,7 +614,9 @@ def extract_mixin_fields_from_dict(
if hasattr(base_class, "Meta"):
if attrs.get("Meta"):
new_fields = set(base_class.Meta.model_fields.keys()) # type: ignore
previous_fields = set({k for k, v in attrs.items() if isinstance(v, FieldInfo)})
previous_fields = set(
{k for k, v in attrs.items() if isinstance(v, FieldInfo)}
)
check_conflicting_fields(
new_fields=new_fields,
attrs=attrs,

View File

@ -0,0 +1,107 @@
from typing import List, Optional
import databases
import sqlalchemy
from sqlalchemy import create_engine
import ormar
import pytest
from tests.settings import DATABASE_URL
db = databases.Database(DATABASE_URL, force_rollback=True)
metadata = sqlalchemy.MetaData()
class Keyword(ormar.Model):
class Meta:
metadata = metadata
database = db
tablename = "keywords"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=50)
class KeywordPrimaryModel(ormar.Model):
class Meta:
metadata = metadata
database = db
tablename = "primary_models_keywords"
id: int = ormar.Integer(primary_key=True)
class PrimaryModel(ormar.Model):
class Meta:
metadata = metadata
database = db
tablename = "primary_models"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=255, index=True)
some_text: str = ormar.Text()
some_other_text: Optional[str] = ormar.Text(nullable=True)
keywords: Optional[List[Keyword]] = ormar.ManyToMany(
Keyword, through=KeywordPrimaryModel
)
class SecondaryModel(ormar.Model):
class Meta:
metadata = metadata
database = db
tablename = "secondary_models"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
primary_model: PrimaryModel = ormar.ForeignKey(
PrimaryModel,
related_name="secondary_models",
)
@pytest.mark.asyncio
async def test_create_primary_models():
async with db:
for name, some_text, some_other_text in [
("Primary 1", "Some text 1", "Some other text 1"),
("Primary 2", "Some text 2", "Some other text 2"),
("Primary 3", "Some text 3", "Some other text 3"),
("Primary 4", "Some text 4", "Some other text 4"),
("Primary 5", "Some text 5", "Some other text 5"),
("Primary 6", "Some text 6", "Some other text 6"),
("Primary 7", "Some text 7", "Some other text 7"),
("Primary 8", "Some text 8", "Some other text 8"),
("Primary 9", "Some text 9", "Some other text 9"),
("Primary 10", "Some text 10", "Some other text 10")]:
await PrimaryModel(
name=name, some_text=some_text, some_other_text=some_other_text
).save()
for tag_id in [1, 2, 3, 4, 5]:
await Keyword.objects.create(name=f"Tag {tag_id}")
p1 = await PrimaryModel.objects.get(pk=1)
p2 = await PrimaryModel.objects.get(pk=2)
for i in range(1, 6):
keyword = await Keyword.objects.get(pk=i)
if i % 2 == 0:
await p1.keywords.add(keyword)
else:
await p2.keywords.add(keyword)
models = await PrimaryModel.objects.prefetch_related("keywords").limit(5).all()
# This test fails, because of the keywords relation.
assert len(models) == 5
assert len(models[0].keywords) == 2
assert len(models[1].keywords) == 3
assert len(models[2].keywords) == 0
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)
yield
metadata.drop_all(engine)