working for simple models, not including related models yet

This commit is contained in:
collerek
2020-11-08 09:44:30 +01:00
parent 8e2a368203
commit 9f4bde595f
3 changed files with 154 additions and 3 deletions

View File

@ -21,6 +21,7 @@ class Query:
limit_count: Optional[int], limit_count: Optional[int],
offset: Optional[int], offset: Optional[int],
fields: Optional[List], fields: Optional[List],
order_bys: Optional[List],
) -> None: ) -> None:
self.query_offset = offset self.query_offset = offset
self.limit_count = limit_count self.limit_count = limit_count
@ -36,6 +37,7 @@ class Query:
self.select_from: List[str] = [] self.select_from: List[str] = []
self.columns = [sqlalchemy.Column] self.columns = [sqlalchemy.Column]
self.order_columns = order_bys
self.order_bys: List[sqlalchemy.sql.elements.TextClause] = [] self.order_bys: List[sqlalchemy.sql.elements.TextClause] = []
@property @property
@ -43,6 +45,15 @@ class Query:
pkname_alias = self.model_cls.get_column_alias(self.model_cls.Meta.pkname) pkname_alias = self.model_cls.get_column_alias(self.model_cls.Meta.pkname)
return f"{self.table.name}.{pkname_alias}" return f"{self.table.name}.{pkname_alias}"
def apply_order_bys_for_primary_model(self):
if self.order_columns:
return [
text(f"{x[1:]} desc") if x.startswith("-") else text(x)
for x in self.order_columns
if "__" not in x
]
return [text(self.prefixed_pk_name)]
def build_select_expression(self) -> Tuple[sqlalchemy.sql.select, List[str]]: def build_select_expression(self) -> Tuple[sqlalchemy.sql.select, List[str]]:
self_related_fields = self.model_cls.own_table_columns( self_related_fields = self.model_cls.own_table_columns(
self.model_cls, self.fields self.model_cls, self.fields
@ -50,7 +61,7 @@ class Query:
self.columns = self.model_cls.Meta.alias_manager.prefixed_columns( self.columns = self.model_cls.Meta.alias_manager.prefixed_columns(
"", self.table, self_related_fields "", self.table, self_related_fields
) )
self.order_bys = [text(self.prefixed_pk_name)] self.order_bys = self.apply_order_bys_for_primary_model()
self.select_from = self.table self.select_from = self.table
self._select_related.sort(key=lambda item: (item, -len(item))) self._select_related.sort(key=lambda item: (item, -len(item)))

View File

@ -27,6 +27,7 @@ class QuerySet:
limit_count: int = None, limit_count: int = None,
offset: int = None, offset: int = None,
columns: List = None, columns: List = None,
order_bys: List = None,
) -> None: ) -> None:
self.model_cls = model_cls self.model_cls = model_cls
self.filter_clauses = [] if filter_clauses is None else filter_clauses self.filter_clauses = [] if filter_clauses is None else filter_clauses
@ -35,7 +36,7 @@ class QuerySet:
self.limit_count = limit_count self.limit_count = limit_count
self.query_offset = offset self.query_offset = offset
self._columns = columns or [] self._columns = columns or []
self.order_bys = None self.order_bys = order_bys or []
def __get__( def __get__(
self, self,
@ -110,9 +111,10 @@ class QuerySet:
offset=self.query_offset, offset=self.query_offset,
limit_count=self.limit_count, limit_count=self.limit_count,
fields=self._columns, fields=self._columns,
order_bys=self.order_bys,
) )
exp = qry.build_select_expression() exp = qry.build_select_expression()
# print(exp.compile(compile_kwargs={"literal_binds": True})) print(exp.compile(compile_kwargs={"literal_binds": True}))
return exp return exp
def filter(self, _exclude: bool = False, **kwargs: Any) -> "QuerySet": # noqa: A003 def filter(self, _exclude: bool = False, **kwargs: Any) -> "QuerySet": # noqa: A003
@ -137,6 +139,7 @@ class QuerySet:
limit_count=self.limit_count, limit_count=self.limit_count,
offset=self.query_offset, offset=self.query_offset,
columns=self._columns, columns=self._columns,
order_bys=self.order_bys,
) )
def exclude(self, **kwargs: Any) -> "QuerySet": # noqa: A003 def exclude(self, **kwargs: Any) -> "QuerySet": # noqa: A003
@ -155,6 +158,7 @@ class QuerySet:
limit_count=self.limit_count, limit_count=self.limit_count,
offset=self.query_offset, offset=self.query_offset,
columns=self._columns, columns=self._columns,
order_bys=self.order_bys,
) )
def fields(self, columns: Union[List, str]) -> "QuerySet": def fields(self, columns: Union[List, str]) -> "QuerySet":
@ -170,6 +174,23 @@ class QuerySet:
limit_count=self.limit_count, limit_count=self.limit_count,
offset=self.query_offset, offset=self.query_offset,
columns=columns, columns=columns,
order_bys=self.order_bys,
)
def order_by(self, columns: Union[List, str]) -> "QuerySet":
if not isinstance(columns, list):
columns = [columns]
order_bys = self.order_bys + [x for x in columns if x not in self.order_bys]
return self.__class__(
model_cls=self.model,
filter_clauses=self.filter_clauses,
exclude_clauses=self.exclude_clauses,
select_related=self._select_related,
limit_count=self.limit_count,
offset=self.query_offset,
columns=self._columns,
order_bys=order_bys,
) )
async def exists(self) -> bool: async def exists(self) -> bool:
@ -218,6 +239,7 @@ class QuerySet:
limit_count=limit_count, limit_count=limit_count,
offset=self.query_offset, offset=self.query_offset,
columns=self._columns, columns=self._columns,
order_bys=self.order_bys,
) )
def offset(self, offset: int) -> "QuerySet": def offset(self, offset: int) -> "QuerySet":
@ -229,6 +251,7 @@ class QuerySet:
limit_count=self.limit_count, limit_count=self.limit_count,
offset=offset, offset=offset,
columns=self._columns, columns=self._columns,
order_bys=self.order_bys,
) )
async def first(self, **kwargs: Any) -> "Model": async def first(self, **kwargs: Any) -> "Model":

117
tests/test_order_by.py Normal file
View File

@ -0,0 +1,117 @@
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL, force_rollback=True)
metadata = sqlalchemy.MetaData()
class Song(ormar.Model):
class Meta:
tablename = "songs"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
sort_order: int = ormar.Integer()
class Owner(ormar.Model):
class Meta:
tablename = "owners"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class Toy(ormar.Model):
class Meta:
tablename = "toys"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
owner: Owner = ormar.ForeignKey(Owner)
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.drop_all(engine)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.mark.asyncio
async def test_sort_order_on_main_model():
async with database:
await Song.objects.create(name="Song 3", sort_order=3)
await Song.objects.create(name="Song 1", sort_order=1)
await Song.objects.create(name="Song 2", sort_order=2)
songs = await Song.objects.all()
assert songs[0].name == "Song 3"
assert songs[1].name == "Song 1"
assert songs[2].name == "Song 2"
songs = await Song.objects.order_by("-sort_order").all()
assert songs[0].name == "Song 3"
assert songs[1].name == "Song 2"
assert songs[2].name == "Song 1"
songs = await Song.objects.order_by("sort_order").all()
assert songs[0].name == "Song 1"
assert songs[1].name == "Song 2"
assert songs[2].name == "Song 3"
songs = await Song.objects.order_by("name").all()
assert songs[0].name == "Song 1"
assert songs[1].name == "Song 2"
assert songs[2].name == "Song 3"
await Song.objects.create(name="Song 4", sort_order=1)
songs = await Song.objects.order_by(["sort_order", "name"]).all()
assert songs[0].name == "Song 1"
assert songs[1].name == "Song 4"
assert songs[2].name == "Song 2"
assert songs[3].name == "Song 3"
@pytest.mark.asyncio
async def test_sort_order_on_related_model():
async with database:
aphrodite = await Owner.objects.create(name="Aphrodite")
hermes = await Owner.objects.create(name="Hermes")
zeus = await Owner.objects.create(name="Zeus")
await Toy.objects.create(name="Toy 1", owner=zeus)
await Toy.objects.create(name="Toy 5", owner=hermes)
await Toy.objects.create(name="Toy 2", owner=aphrodite)
await Toy.objects.create(name="Toy 4", owner=zeus)
await Toy.objects.create(name="Toy 3", owner=aphrodite)
await Toy.objects.create(name="Toy 6", owner=hermes)
toys = await Toy.objects.select_related("owner").order_by("name").all()
assert [x.name.replace("Toy ", "") for x in toys] == [
str(x + 1) for x in range(6)
]
assert toys[0].owner == zeus
assert toys[1].owner == aphrodite
toys = await Toy.objects.select_related("owner").order_by("owner__name").all()
owner = (
await Owner.objects.select_related("toys")
.order_by("toys__name")
.filter(name="Zeus")
.all()
)