441 lines
14 KiB
Python
441 lines
14 KiB
Python
import asyncio
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import List
|
|
|
|
import databases
|
|
import pydantic
|
|
import pytest
|
|
import sqlalchemy
|
|
|
|
import ormar
|
|
from ormar.exceptions import QueryDefinitionError, NoMatch
|
|
from tests.settings import DATABASE_URL
|
|
|
|
database = databases.Database(DATABASE_URL, force_rollback=True)
|
|
metadata = sqlalchemy.MetaData()
|
|
|
|
|
|
class JsonSample(ormar.Model):
|
|
class Meta:
|
|
tablename = "jsons"
|
|
metadata = metadata
|
|
database = database
|
|
|
|
id: ormar.Integer(primary_key=True)
|
|
test_json: ormar.JSON(nullable=True)
|
|
|
|
|
|
class UUIDSample(ormar.Model):
|
|
class Meta:
|
|
tablename = "uuids"
|
|
metadata = metadata
|
|
database = database
|
|
|
|
id: ormar.UUID(primary_key=True, default=uuid.uuid4)
|
|
test_text: ormar.Text()
|
|
|
|
|
|
class User(ormar.Model):
|
|
class Meta:
|
|
tablename = "users"
|
|
metadata = metadata
|
|
database = database
|
|
|
|
id: ormar.Integer(primary_key=True)
|
|
name: ormar.String(max_length=100, default="")
|
|
|
|
|
|
class Product(ormar.Model):
|
|
class Meta:
|
|
tablename = "product"
|
|
metadata = metadata
|
|
database = database
|
|
|
|
id: ormar.Integer(primary_key=True)
|
|
name: ormar.String(max_length=100)
|
|
rating: ormar.Integer(minimum=1, maximum=5)
|
|
in_stock: ormar.Boolean(default=False)
|
|
last_delivery: ormar.Date(default=datetime.now)
|
|
|
|
|
|
country_name_choices = ("Canada", "Algeria", "United States")
|
|
country_taxed_choices = (True,)
|
|
country_country_code_choices = (-10, 1, 213, 1200)
|
|
|
|
|
|
class Country(ormar.Model):
|
|
class Meta:
|
|
tablename = "country"
|
|
metadata = metadata
|
|
database = database
|
|
|
|
id: ormar.Integer(primary_key=True)
|
|
name: ormar.String(
|
|
max_length=9, choices=country_name_choices, default="Canada",
|
|
)
|
|
taxed: ormar.Boolean(choices=country_taxed_choices, default=True)
|
|
country_code: ormar.Integer(
|
|
minimum=0, maximum=1000, choices=country_country_code_choices, default=1
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def event_loop():
|
|
loop = asyncio.get_event_loop()
|
|
yield loop
|
|
loop.close()
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
async def create_test_database():
|
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
|
metadata.drop_all(engine)
|
|
metadata.create_all(engine)
|
|
yield
|
|
metadata.drop_all(engine)
|
|
|
|
|
|
def test_model_class():
|
|
assert list(User.Meta.model_fields.keys()) == ["id", "name"]
|
|
assert issubclass(User.Meta.model_fields["id"], pydantic.ConstrainedInt)
|
|
assert User.Meta.model_fields["id"].primary_key is True
|
|
assert issubclass(User.Meta.model_fields["name"], pydantic.ConstrainedStr)
|
|
assert User.Meta.model_fields["name"].max_length == 100
|
|
assert isinstance(User.Meta.table, sqlalchemy.Table)
|
|
|
|
|
|
def test_model_pk():
|
|
user = User(pk=1)
|
|
assert user.pk == 1
|
|
assert user.id == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_json_column():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await JsonSample.objects.create(test_json=dict(aa=12))
|
|
await JsonSample.objects.create(test_json='{"aa": 12}')
|
|
|
|
items = await JsonSample.objects.all()
|
|
assert len(items) == 2
|
|
assert items[0].test_json == dict(aa=12)
|
|
assert items[1].test_json == dict(aa=12)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uuid_column():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
u1 = await UUIDSample.objects.create(test_text="aa")
|
|
u2 = await UUIDSample.objects.create(test_text="bb")
|
|
|
|
items = await UUIDSample.objects.all()
|
|
assert len(items) == 2
|
|
|
|
assert isinstance(items[0].id, uuid.UUID)
|
|
assert isinstance(items[1].id, uuid.UUID)
|
|
|
|
assert items[0].id in (u1.id, u2.id)
|
|
assert items[1].id in (u1.id, u2.id)
|
|
|
|
assert items[0].id != items[1].id
|
|
|
|
item = await UUIDSample.objects.filter(id=u1.id).get()
|
|
assert item.id == u1.id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_crud():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
users = await User.objects.all()
|
|
assert users == []
|
|
|
|
user = await User.objects.create(name="Tom")
|
|
users = await User.objects.all()
|
|
assert user.name == "Tom"
|
|
assert user.pk is not None
|
|
assert users == [user]
|
|
|
|
lookup = await User.objects.get()
|
|
assert lookup == user
|
|
|
|
await user.update(name="Jane")
|
|
users = await User.objects.all()
|
|
assert user.name == "Jane"
|
|
assert user.pk is not None
|
|
assert users == [user]
|
|
|
|
await user.delete()
|
|
users = await User.objects.all()
|
|
assert users == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_get():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
with pytest.raises(ormar.NoMatch):
|
|
await User.objects.get()
|
|
|
|
user = await User.objects.create(name="Tom")
|
|
lookup = await User.objects.get()
|
|
assert lookup == user
|
|
|
|
user = await User.objects.create(name="Jane")
|
|
with pytest.raises(ormar.MultipleMatches):
|
|
await User.objects.get()
|
|
|
|
same_user = await User.objects.get(pk=user.id)
|
|
assert same_user.id == user.id
|
|
assert same_user.pk == user.pk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_filter():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await User.objects.create(name="Tom")
|
|
await User.objects.create(name="Jane")
|
|
await User.objects.create(name="Lucy")
|
|
|
|
user = await User.objects.get(name="Lucy")
|
|
assert user.name == "Lucy"
|
|
|
|
with pytest.raises(ormar.NoMatch):
|
|
await User.objects.get(name="Jim")
|
|
|
|
await Product.objects.create(name="T-Shirt", rating=5, in_stock=True)
|
|
await Product.objects.create(name="Dress", rating=4)
|
|
await Product.objects.create(name="Coat", rating=3, in_stock=True)
|
|
|
|
product = await Product.objects.get(name__iexact="t-shirt", rating=5)
|
|
assert product.pk is not None
|
|
assert product.name == "T-Shirt"
|
|
assert product.rating == 5
|
|
assert product.last_delivery == datetime.now().date()
|
|
|
|
products = await Product.objects.all(rating__gte=2, in_stock=True)
|
|
assert len(products) == 2
|
|
|
|
products = await Product.objects.all(name__icontains="T")
|
|
assert len(products) == 2
|
|
|
|
products = await Product.objects.exclude(rating__gte=4).all()
|
|
assert len(products) == 1
|
|
|
|
products = await Product.objects.exclude(rating__gte=4, in_stock=True).all()
|
|
assert len(products) == 2
|
|
|
|
products = await Product.objects.exclude(in_stock=True).all()
|
|
assert len(products) == 1
|
|
|
|
products = await Product.objects.exclude(name__icontains="T").all()
|
|
assert len(products) == 1
|
|
|
|
# Test escaping % character from icontains, contains, and iexact
|
|
await Product.objects.create(name="100%-Cotton", rating=3)
|
|
await Product.objects.create(name="Cotton-100%-Egyptian", rating=3)
|
|
await Product.objects.create(name="Cotton-100%", rating=3)
|
|
products = Product.objects.filter(name__iexact="100%-cotton")
|
|
assert await products.count() == 1
|
|
|
|
products = Product.objects.filter(name__contains="%")
|
|
assert await products.count() == 3
|
|
|
|
products = Product.objects.filter(name__icontains="%")
|
|
assert await products.count() == 3
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wrong_query_contains_model():
|
|
async with database:
|
|
with pytest.raises(QueryDefinitionError):
|
|
product = Product(name="90%-Cotton", rating=2)
|
|
await Product.objects.filter(name__contains=product).count()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_exists():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await User.objects.create(name="Tom")
|
|
assert await User.objects.filter(name="Tom").exists() is True
|
|
assert await User.objects.filter(name="Jane").exists() is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_count():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await User.objects.create(name="Tom")
|
|
await User.objects.create(name="Jane")
|
|
await User.objects.create(name="Lucy")
|
|
|
|
assert await User.objects.count() == 3
|
|
assert await User.objects.filter(name__icontains="T").count() == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_limit():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await User.objects.create(name="Tom")
|
|
await User.objects.create(name="Jane")
|
|
await User.objects.create(name="Lucy")
|
|
|
|
assert len(await User.objects.limit(2).all()) == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_limit_with_filter():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await User.objects.create(name="Tom")
|
|
await User.objects.create(name="Tom")
|
|
await User.objects.create(name="Tom")
|
|
|
|
assert (
|
|
len(await User.objects.limit(2).filter(name__iexact="Tom").all()) == 2
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_offset():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await User.objects.create(name="Tom")
|
|
await User.objects.create(name="Jane")
|
|
|
|
users = await User.objects.offset(1).limit(1).all()
|
|
assert users[0].name == "Jane"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_first():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
tom = await User.objects.create(name="Tom")
|
|
jane = await User.objects.create(name="Jane")
|
|
|
|
assert await User.objects.first() == tom
|
|
assert await User.objects.first(name="Jane") == jane
|
|
assert await User.objects.filter(name="Jane").first() == jane
|
|
with pytest.raises(NoMatch):
|
|
await User.objects.filter(name="Lucy").first()
|
|
|
|
|
|
def not_contains(a, b):
|
|
return a not in b
|
|
|
|
|
|
def contains(a, b):
|
|
return a in b
|
|
|
|
|
|
def check_choices(values: tuple, ops: List):
|
|
ops_dict = {"in": contains, "out": not_contains}
|
|
checks = (country_name_choices, country_taxed_choices, country_country_code_choices)
|
|
assert all(
|
|
[ops_dict[op](value, check) for value, op, check in zip(values, ops, checks)]
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_choices():
|
|
"""Test that choices work properly for various types of fields."""
|
|
async with database:
|
|
# Test valid choices.
|
|
await asyncio.gather(
|
|
Country.objects.create(name="Canada", taxed=True, country_code=1),
|
|
Country.objects.create(name="Algeria", taxed=True, country_code=213),
|
|
Country.objects.create(name="Algeria"),
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
name, taxed, country_code = "Saudi Arabia", True, 1
|
|
check_choices((name, taxed, country_code), ["out", "in", "in"])
|
|
await Country.objects.create(
|
|
name=name, taxed=taxed, country_code=country_code
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
name, taxed, country_code = "Algeria", False, 1
|
|
check_choices((name, taxed, country_code), ["in", "out", "in"])
|
|
await Country.objects.create(
|
|
name=name, taxed=taxed, country_code=country_code
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
name, taxed, country_code = "Algeria", True, 967
|
|
check_choices((name, taxed, country_code), ["in", "in", "out"])
|
|
await Country.objects.create(
|
|
name=name, taxed=taxed, country_code=country_code
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
name, taxed, country_code = (
|
|
"United States",
|
|
True,
|
|
1,
|
|
) # name is too long but is a valid choice
|
|
check_choices((name, taxed, country_code), ["in", "in", "in"])
|
|
await Country.objects.create(
|
|
name=name, taxed=taxed, country_code=country_code
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
name, taxed, country_code = (
|
|
"Algeria",
|
|
True,
|
|
-10,
|
|
) # country code is too small but is a valid choice
|
|
check_choices((name, taxed, country_code), ["in", "in", "in"])
|
|
await Country.objects.create(
|
|
name=name, taxed=taxed, country_code=country_code
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
name, taxed, country_code = (
|
|
"Algeria",
|
|
True,
|
|
1200,
|
|
) # country code is too large but is a valid choice
|
|
check_choices((name, taxed, country_code), ["in", "in", "in"])
|
|
await Country.objects.create(
|
|
name=name, taxed=taxed, country_code=country_code
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_and_end_filters():
|
|
async with database:
|
|
async with database.transaction(force_rollback=True):
|
|
await User.objects.create(name="Markos Uj")
|
|
await User.objects.create(name="Maqua Bigo")
|
|
await User.objects.create(name="maqo quidid")
|
|
await User.objects.create(name="Louis Figo")
|
|
await User.objects.create(name="Loordi Kami")
|
|
await User.objects.create(name="Yuuki Sami")
|
|
|
|
users = await User.objects.filter(name__startswith="Mar").all()
|
|
assert len(users) == 1
|
|
|
|
users = await User.objects.filter(name__istartswith="ma").all()
|
|
assert len(users) == 3
|
|
|
|
users = await User.objects.filter(name__istartswith="Maq").all()
|
|
assert len(users) == 2
|
|
|
|
users = await User.objects.filter(name__iendswith="AMI").all()
|
|
assert len(users) == 2
|
|
|
|
users = await User.objects.filter(name__endswith="Uj").all()
|
|
assert len(users) == 1
|
|
|
|
users = await User.objects.filter(name__endswith="igo").all()
|
|
assert len(users) == 2
|
|
|