import asyncio from datetime import datetime from typing import List import databases import pydantic import pytest import sqlalchemy import ormar from ormar.exceptions import QueryDefinitionError, NoMatch from tests.settings import DATABASE_URL database = databases.Database(DATABASE_URL, force_rollback=True) metadata = sqlalchemy.MetaData() class JsonSample(ormar.Model): class Meta: tablename = "jsons" metadata = metadata database = database id: ormar.Integer(primary_key=True) test_json: ormar.JSON(nullable=True) class User(ormar.Model): class Meta: tablename = "users" metadata = metadata database = database id: ormar.Integer(primary_key=True) name: ormar.String(max_length=100, default="") class Product(ormar.Model): class Meta: tablename = "product" metadata = metadata database = database id: ormar.Integer(primary_key=True) name: ormar.String(max_length=100) rating: ormar.Integer(minimum=1, maximum=5) in_stock: ormar.Boolean(default=False) last_delivery: ormar.Date(default=datetime.now) country_name_choices = ("Canada", "Algeria", "United States") country_taxed_choices = (True,) country_country_code_choices = (-10, 1, 213, 1200) class Country(ormar.Model): class Meta: tablename = "country" metadata = metadata database = database id: ormar.Integer(primary_key=True) name: ormar.String( max_length=9, choices=country_name_choices, default="Canada", ) taxed: ormar.Boolean(choices=country_taxed_choices, default=True) country_code: ormar.Integer( minimum=0, maximum=1000, choices=country_country_code_choices, default=1 ) @pytest.fixture(scope="module") def event_loop(): loop = asyncio.get_event_loop() yield loop loop.close() @pytest.fixture(autouse=True, scope="module") async def create_test_database(): engine = sqlalchemy.create_engine(DATABASE_URL) metadata.drop_all(engine) metadata.create_all(engine) yield metadata.drop_all(engine) def test_model_class(): assert list(User.Meta.model_fields.keys()) == ["id", "name"] assert issubclass(User.Meta.model_fields["id"], pydantic.ConstrainedInt) assert User.Meta.model_fields["id"].primary_key is True assert issubclass(User.Meta.model_fields["name"], pydantic.ConstrainedStr) assert User.Meta.model_fields["name"].max_length == 100 assert isinstance(User.Meta.table, sqlalchemy.Table) def test_model_pk(): user = User(pk=1) assert user.pk == 1 assert user.id == 1 @pytest.mark.asyncio async def test_json_column(): async with database: async with database.transaction(force_rollback=True): await JsonSample.objects.create(test_json=dict(aa=12)) await JsonSample.objects.create(test_json='{"aa": 12}') items = await JsonSample.objects.all() assert len(items) == 2 assert items[0].test_json == dict(aa=12) assert items[1].test_json == dict(aa=12) @pytest.mark.asyncio async def test_model_crud(): async with database: async with database.transaction(force_rollback=True): users = await User.objects.all() assert users == [] user = await User.objects.create(name="Tom") users = await User.objects.all() assert user.name == "Tom" assert user.pk is not None assert users == [user] lookup = await User.objects.get() assert lookup == user await user.update(name="Jane") users = await User.objects.all() assert user.name == "Jane" assert user.pk is not None assert users == [user] await user.delete() users = await User.objects.all() assert users == [] @pytest.mark.asyncio async def test_model_get(): async with database: async with database.transaction(force_rollback=True): with pytest.raises(ormar.NoMatch): await User.objects.get() user = await User.objects.create(name="Tom") lookup = await User.objects.get() assert lookup == user user = await User.objects.create(name="Jane") with pytest.raises(ormar.MultipleMatches): await User.objects.get() same_user = await User.objects.get(pk=user.id) assert same_user.id == user.id assert same_user.pk == user.pk @pytest.mark.asyncio async def test_model_filter(): async with database: async with database.transaction(force_rollback=True): await User.objects.create(name="Tom") await User.objects.create(name="Jane") await User.objects.create(name="Lucy") user = await User.objects.get(name="Lucy") assert user.name == "Lucy" with pytest.raises(ormar.NoMatch): await User.objects.get(name="Jim") await Product.objects.create(name="T-Shirt", rating=5, in_stock=True) await Product.objects.create(name="Dress", rating=4) await Product.objects.create(name="Coat", rating=3, in_stock=True) product = await Product.objects.get(name__iexact="t-shirt", rating=5) assert product.pk is not None assert product.name == "T-Shirt" assert product.rating == 5 assert product.last_delivery == datetime.now().date() products = await Product.objects.all(rating__gte=2, in_stock=True) assert len(products) == 2 products = await Product.objects.all(name__icontains="T") assert len(products) == 2 products = await Product.objects.exclude(rating__gte=4).all() assert len(products) == 1 products = await Product.objects.exclude(rating__gte=4, in_stock=True).all() assert len(products) == 2 products = await Product.objects.exclude(in_stock=True).all() assert len(products) == 1 products = await Product.objects.exclude(name__icontains="T").all() assert len(products) == 1 # Test escaping % character from icontains, contains, and iexact await Product.objects.create(name="100%-Cotton", rating=3) await Product.objects.create(name="Cotton-100%-Egyptian", rating=3) await Product.objects.create(name="Cotton-100%", rating=3) products = Product.objects.filter(name__iexact="100%-cotton") assert await products.count() == 1 products = Product.objects.filter(name__contains="%") assert await products.count() == 3 products = Product.objects.filter(name__icontains="%") assert await products.count() == 3 @pytest.mark.asyncio async def test_wrong_query_contains_model(): async with database: with pytest.raises(QueryDefinitionError): product = Product(name="90%-Cotton", rating=2) await Product.objects.filter(name__contains=product).count() @pytest.mark.asyncio async def test_model_exists(): async with database: async with database.transaction(force_rollback=True): await User.objects.create(name="Tom") assert await User.objects.filter(name="Tom").exists() is True assert await User.objects.filter(name="Jane").exists() is False @pytest.mark.asyncio async def test_model_count(): async with database: async with database.transaction(force_rollback=True): await User.objects.create(name="Tom") await User.objects.create(name="Jane") await User.objects.create(name="Lucy") assert await User.objects.count() == 3 assert await User.objects.filter(name__icontains="T").count() == 1 @pytest.mark.asyncio async def test_model_limit(): async with database: async with database.transaction(force_rollback=True): await User.objects.create(name="Tom") await User.objects.create(name="Jane") await User.objects.create(name="Lucy") assert len(await User.objects.limit(2).all()) == 2 @pytest.mark.asyncio async def test_model_limit_with_filter(): async with database: async with database.transaction(force_rollback=True): await User.objects.create(name="Tom") await User.objects.create(name="Tom") await User.objects.create(name="Tom") assert ( len(await User.objects.limit(2).filter(name__iexact="Tom").all()) == 2 ) @pytest.mark.asyncio async def test_offset(): async with database: async with database.transaction(force_rollback=True): await User.objects.create(name="Tom") await User.objects.create(name="Jane") users = await User.objects.offset(1).limit(1).all() assert users[0].name == "Jane" @pytest.mark.asyncio async def test_model_first(): async with database: async with database.transaction(force_rollback=True): tom = await User.objects.create(name="Tom") jane = await User.objects.create(name="Jane") assert await User.objects.first() == tom assert await User.objects.first(name="Jane") == jane assert await User.objects.filter(name="Jane").first() == jane with pytest.raises(NoMatch): await User.objects.filter(name="Lucy").first() def not_contains(a, b): return a not in b def contains(a, b): return a in b def check_choices(values: tuple, ops: List): ops_dict = {"in": contains, "out": not_contains} checks = (country_name_choices, country_taxed_choices, country_country_code_choices) assert all( [ops_dict[op](value, check) for value, op, check in zip(values, ops, checks)] ) @pytest.mark.asyncio async def test_model_choices(): """Test that choices work properly for various types of fields.""" async with database: # Test valid choices. await asyncio.gather( Country.objects.create(name="Canada", taxed=True, country_code=1), Country.objects.create(name="Algeria", taxed=True, country_code=213), Country.objects.create(name="Algeria"), ) with pytest.raises(ValueError): name, taxed, country_code = "Saudi Arabia", True, 1 check_choices((name, taxed, country_code), ["out", "in", "in"]) await Country.objects.create( name=name, taxed=taxed, country_code=country_code ) with pytest.raises(ValueError): name, taxed, country_code = "Algeria", False, 1 check_choices((name, taxed, country_code), ["in", "out", "in"]) await Country.objects.create( name=name, taxed=taxed, country_code=country_code ) with pytest.raises(ValueError): name, taxed, country_code = "Algeria", True, 967 check_choices((name, taxed, country_code), ["in", "in", "out"]) await Country.objects.create( name=name, taxed=taxed, country_code=country_code ) with pytest.raises(ValueError): name, taxed, country_code = ( "United States", True, 1, ) # name is too long but is a valid choice check_choices((name, taxed, country_code), ["in", "in", "in"]) await Country.objects.create( name=name, taxed=taxed, country_code=country_code ) with pytest.raises(ValueError): name, taxed, country_code = ( "Algeria", True, -10, ) # country code is too small but is a valid choice check_choices((name, taxed, country_code), ["in", "in", "in"]) await Country.objects.create( name=name, taxed=taxed, country_code=country_code ) with pytest.raises(ValueError): name, taxed, country_code = ( "Algeria", True, 1200, ) # country code is too large but is a valid choice check_choices((name, taxed, country_code), ["in", "in", "in"]) await Country.objects.create( name=name, taxed=taxed, country_code=country_code )