improve date handling
This commit is contained in:
118
tests/test_model_definition/test_dates_with_timezone.py
Normal file
118
tests/test_model_definition/test_dates_with_timezone.py
Normal file
@ -0,0 +1,118 @@
|
||||
from datetime import timezone, timedelta, datetime, date, time
|
||||
|
||||
import databases
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
|
||||
import ormar
|
||||
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
|
||||
|
||||
class DateFieldsModel(ormar.Model):
|
||||
class Meta:
|
||||
database = database
|
||||
metadata = metadata
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
created_date: datetime = ormar.DateTime(
|
||||
default=datetime.now(tz=timezone(timedelta(hours=3))), timezone=True
|
||||
)
|
||||
updated_date: datetime = ormar.DateTime(
|
||||
default=datetime.now(tz=timezone(timedelta(hours=3))),
|
||||
name="modification_date",
|
||||
timezone=True,
|
||||
)
|
||||
|
||||
|
||||
class SampleModel(ormar.Model):
|
||||
class Meta:
|
||||
database = database
|
||||
metadata = metadata
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
updated_at: datetime = ormar.DateTime()
|
||||
|
||||
|
||||
class TimeModel(ormar.Model):
|
||||
class Meta:
|
||||
database = database
|
||||
metadata = metadata
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
elapsed: time = ormar.Time()
|
||||
|
||||
|
||||
class DateModel(ormar.Model):
|
||||
class Meta:
|
||||
database = database
|
||||
metadata = metadata
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
creation_date: date = ormar.Date()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_model_crud_with_timezone():
|
||||
async with database:
|
||||
datemodel = await DateFieldsModel().save()
|
||||
assert datemodel.created_date is not None
|
||||
assert datemodel.updated_date is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_with_datetime_in_filter():
|
||||
async with database:
|
||||
creation_dt = datetime(2021, 5, 18, 0, 0, 0, 0)
|
||||
sample = await SampleModel.objects.create(updated_at=creation_dt)
|
||||
|
||||
current_dt = datetime(2021, 5, 19, 0, 0, 0, 0)
|
||||
outdated_samples = await SampleModel.objects.filter(
|
||||
updated_at__lt=current_dt
|
||||
).all()
|
||||
|
||||
assert outdated_samples[0] == sample
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_with_date_in_filter():
|
||||
async with database:
|
||||
sample = await TimeModel.objects.create(elapsed=time(0, 20, 20))
|
||||
await TimeModel.objects.create(elapsed=time(0, 12, 0))
|
||||
await TimeModel.objects.create(elapsed=time(0, 19, 55))
|
||||
sample4 = await TimeModel.objects.create(elapsed=time(0, 21, 15))
|
||||
|
||||
threshold = time(0, 20, 0)
|
||||
samples = await TimeModel.objects.filter(TimeModel.elapsed >= threshold).all()
|
||||
|
||||
assert len(samples) == 2
|
||||
assert samples[0] == sample
|
||||
assert samples[1] == sample4
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_with_time_in_filter():
|
||||
async with database:
|
||||
await DateModel.objects.create(creation_date=date(2021, 5, 18))
|
||||
sample2 = await DateModel.objects.create(creation_date=date(2021, 5, 19))
|
||||
sample3 = await DateModel.objects.create(creation_date=date(2021, 5, 20))
|
||||
|
||||
outdated_samples = await DateModel.objects.filter(
|
||||
creation_date__in=[date(2021, 5, 19), date(2021, 5, 20)]
|
||||
).all()
|
||||
|
||||
assert len(outdated_samples) == 2
|
||||
assert outdated_samples[0] == sample2
|
||||
assert outdated_samples[1] == sample3
|
||||
@ -185,25 +185,14 @@ def test_combining_groups_together():
|
||||
)
|
||||
|
||||
|
||||
# @pytest.mark.asyncio
|
||||
# async def test_filtering_by_field_access():
|
||||
# async with database:
|
||||
# async with database.transaction(force_rollback=True):
|
||||
# category = await Category(name='Toys').save()
|
||||
# product1 = await Product(name="G.I Joe",
|
||||
# rating=4.7,
|
||||
# category=category).save()
|
||||
# product2 = await Product(name="My Little Pony",
|
||||
# rating=3.8,
|
||||
# category=category).save()
|
||||
#
|
||||
# check = Product.object.get(Product.name == "My Little Pony")
|
||||
# assert check == product2
|
||||
@pytest.mark.asyncio
|
||||
async def test_filtering_by_field_access():
|
||||
async with database:
|
||||
async with database.transaction(force_rollback=True):
|
||||
category = await Category(name="Toys").save()
|
||||
product2 = await Product(
|
||||
name="My Little Pony", rating=3.8, category=category
|
||||
).save()
|
||||
|
||||
# TODO: Finish implementation
|
||||
# * overload operators and add missing functions that return FilterAction (V)
|
||||
# * return OrderAction for desc() and asc() (V)
|
||||
# * create filter groups for & and | (and ~ - NOT?) (V)
|
||||
# * accept args in all functions that accept filters? or only filter and exclude? (V)
|
||||
# all functions: delete, first, get, get_or_none, get_or_create, all, filter, exclude
|
||||
# * accept OrderActions in order_by (V)
|
||||
check = await Product.objects.get(Product.name == "My Little Pony")
|
||||
assert check == product2
|
||||
|
||||
Reference in New Issue
Block a user