first ugly version of values and values_list - to refactor and check with m2m

This commit is contained in:
collerek
2021-06-04 18:21:16 +02:00
parent 7b92884f39
commit b1b3d5cd92
5 changed files with 313 additions and 4 deletions

View File

@ -368,6 +368,11 @@ You can set this parameter by providing `Meta` class `constraints` argument.
--8<-- "../docs_src/models/docs006.py"
```
!!!note
Note that constraints are meant for combination of columns that should be unique.
To set one column as unique use [`unique`](../fields/common-parameters.md#unique) common parameter.
Of course you can set many columns as unique with this param but each of them will be checked separately.
## Model sort order
When querying the database with given model by default the Model is ordered by the `primary_key`

View File

@ -87,6 +87,17 @@ class ExcludableItems:
new_excludable.items[key] = value.get_copy()
return new_excludable
def include_entry_count(self) -> int:
"""
Returns count of include items inside
"""
count = 0
if not self.items:
return count
for key in self.items.keys():
count += len(self.items[key].include)
return count
def get(self, model_cls: Type["Model"], alias: str = "") -> Excludable:
"""
Return Excludable for given model and alias.

View File

@ -86,6 +86,7 @@ class ExcludableMixin(RelationMixin):
excludable: ExcludableItems,
alias: str = "",
use_alias: bool = False,
add_pk_columns: bool = True,
) -> List[str]:
"""
Returns list of aliases or field names for given model.
@ -96,6 +97,8 @@ class ExcludableMixin(RelationMixin):
Primary key field is always added and cannot be excluded (will be added anyway).
:param add_pk_columns: flag if add primary key - always yes if ormar parses data
:type add_pk_columns: bool
:param alias: relation prefix
:type alias: str
:param excludable: structure of fields to include and exclude
@ -130,9 +133,10 @@ class ExcludableMixin(RelationMixin):
]
# always has to return pk column for ormar to work
columns = cls._populate_pk_column(
model=model, columns=columns, use_alias=use_alias
)
if add_pk_columns:
columns = cls._populate_pk_column(
model=model, columns=columns, use_alias=use_alias
)
return columns

View File

@ -23,9 +23,10 @@ from ormar import MultipleMatches, NoMatch
from ormar.exceptions import ModelPersistenceError, QueryDefinitionError
from ormar.queryset import FilterQuery, SelectAction
from ormar.queryset.actions.order_action import OrderAction
from ormar.queryset.clause import FilterGroup, QueryClause
from ormar.queryset.clause import FilterGroup, Prefix, QueryClause
from ormar.queryset.prefetch_query import PrefetchQuery
from ormar.queryset.query import Query
from ormar.queryset.utils import get_relationship_alias_model_and_str
if TYPE_CHECKING: # pragma no cover
from ormar import Model
@ -301,6 +302,8 @@ class QuerySet(Generic[T]):
* endswith - like `album__name__endswith='ibu'` (exact end match)
* iendswith - like `album__name__iendswith='IBU'` (case insensitive)
Note that you can also use python style filters - check the docs!
:param _exclude: flag if it should be exclude or filter
:type _exclude: bool
:param kwargs: fields names and proper value types
@ -553,6 +556,119 @@ class QuerySet(Generic[T]):
order_bys = self.order_bys + [x for x in orders_by if x not in self.order_bys]
return self.rebuild_self(order_bys=order_bys,)
async def values(
self,
fields: Union[List, str, Set, Dict] = None,
_as_dict: bool = True,
_flatten: bool = False,
) -> List:
"""
Return a list of dictionaries with column values in order of the fields
passed or all fields from queried models.
To filter for given row use filter/exclude methods before values,
to limit number of rows use limit/offset or paginate before values.
Note that it always return a list even for one row from database.
:param _flatten: internal parameter to flatten one element tuples
:type _flatten: bool
:param _as_dict: internal parameter if return dict or tuples
:type _as_dict: bool
:param fields: field name or list of field names to extract from db
:type fields: Union[List, str, Set, Dict]
"""
if fields:
return await self.fields(columns=fields).values(
_as_dict=_as_dict, _flatten=_flatten
)
expr = self.build_select_expression()
rows = await self.database.fetch_all(expr)
if not rows:
return []
column_names = list(rows[0].keys())
column_map = self._resolve_data_prefix_to_relation_str(
column_names=column_names
)
result = [
{column_map.get(k): v for k, v in dict(x).items() if k in column_map}
for x in rows
]
if _as_dict:
return result
if _flatten and not self._excludable.include_entry_count() == 1:
raise QueryDefinitionError(
"You cannot flatten values_list if more than " "one field is selected!"
)
tuple_result = [tuple(x.values()) for x in result]
return tuple_result if not _flatten else [x[0] for x in tuple_result]
async def values_list(
self, fields: Union[List, str, Set, Dict] = None, flatten: bool = False
) -> List:
"""
Return a list of tuples with column values in order of the fields passed or
all fields from queried models.
When one field is passed you can flatten the list of tuples into list of values
of that single field.
To filter for given row use filter/exclude methods before values,
to limit number of rows use limit/offset or paginate before values.
Note that it always return a list even for one row from database.
:param fields: field name or list of field names to extract from db
:type fields: Union[str, List[str]]
:param flatten: when one field is passed you can flatten the list of tuples
:type flatten: bool
"""
return await self.values(fields=fields, _as_dict=False, _flatten=flatten)
def _resolve_data_prefix_to_relation_str(self, column_names: List[str]) -> Dict:
resolved_names = dict()
for column_name in column_names:
prefixes_map = self._create_prefixes_map()
column_parts = column_name.split("_")
potential_prefix = column_parts[0]
if potential_prefix in prefixes_map:
prefix = prefixes_map[potential_prefix]
allowed_columns = prefix.model_cls.own_table_columns(
model=prefix.model_cls,
excludable=self._excludable,
alias=prefix.table_prefix,
add_pk_columns=False,
)
new_column_name = "_".join(column_parts[1:])
if new_column_name in allowed_columns:
resolved_names[column_name] = f"{prefix.relation_str}__" + "_".join(
column_name.split("_")[1:]
)
else:
assert self.model_cls
allowed_columns = self.model_cls.own_table_columns(
model=self.model_cls,
excludable=self._excludable,
add_pk_columns=False,
)
if column_name in allowed_columns:
resolved_names[column_name] = column_name
return resolved_names
def _create_prefixes_map(self) -> Dict[str, Prefix]:
prefixes: List[Prefix] = []
for related in self._select_related:
related_split = related.split("__")
for index in range(len(related_split)):
prefix = Prefix(
self.model_cls, # type: ignore
*get_relationship_alias_model_and_str(
self.model_cls, related_split[0 : (index + 1)] # type: ignore
),
)
prefixes.append(prefix)
return {x.table_prefix: x for x in prefixes}
async def exists(self) -> bool:
"""
Returns a bool value to confirm if there are rows matching the given criteria

View File

@ -0,0 +1,173 @@
from typing import List, Optional
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL, force_rollback=True)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
database = database
metadata = metadata
class User(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class Category(ormar.Model):
class Meta(BaseMeta):
tablename = "categories"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=40)
sort_order: int = ormar.Integer(nullable=True)
created_by: Optional[User] = ormar.ForeignKey(User)
class Post(ormar.Model):
class Meta(BaseMeta):
tablename = "posts"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200)
category: Optional[Category] = ormar.ForeignKey(Category)
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.mark.asyncio
async def test_queryset_values():
async with database:
async with database.transaction(force_rollback=True):
creator = await User(name="Anonymous").save()
news = await Category(name="News", sort_order=0, created_by=creator).save()
await Post(name="Ormar strikes again!", category=news).save()
await Post(name="Why don't you use ormar yet?", category=news).save()
await Post(name="Check this out, ormar now for free", category=news).save()
posts = await Post.objects.values()
assert posts == [
{"id": 1, "name": "Ormar strikes again!", "category": 1},
{"id": 2, "name": "Why don't you use ormar yet?", "category": 1},
{"id": 3, "name": "Check this out, ormar now for free", "category": 1},
]
posts = await Post.objects.select_related("category__created_by").values()
assert posts == [
{
"id": 1,
"name": "Ormar strikes again!",
"category": 1,
"category__id": 1,
"category__name": "News",
"category__sort_order": 0,
"category__created_by": 1,
"category__created_by__id": 1,
"category__created_by__name": "Anonymous",
},
{
"category": 1,
"id": 2,
"name": "Why don't you use ormar yet?",
"category__id": 1,
"category__name": "News",
"category__sort_order": 0,
"category__created_by": 1,
"category__created_by__id": 1,
"category__created_by__name": "Anonymous",
},
{
"id": 3,
"name": "Check this out, ormar now for free",
"category": 1,
"category__id": 1,
"category__name": "News",
"category__sort_order": 0,
"category__created_by": 1,
"category__created_by__id": 1,
"category__created_by__name": "Anonymous",
},
]
posts = await Post.objects.select_related("category__created_by").values(
["name", "category__name", "category__created_by__name"]
)
assert posts == [
{
"name": "Ormar strikes again!",
"category__name": "News",
"category__created_by__name": "Anonymous",
},
{
"name": "Why don't you use ormar yet?",
"category__name": "News",
"category__created_by__name": "Anonymous",
},
{
"name": "Check this out, ormar now for free",
"category__name": "News",
"category__created_by__name": "Anonymous",
},
]
@pytest.mark.asyncio
async def test_queryset_values_list():
async with database:
async with database.transaction(force_rollback=True):
creator = await User(name="Anonymous").save()
news = await Category(name="News", sort_order=0, created_by=creator).save()
await Post(name="Ormar strikes again!", category=news).save()
await Post(name="Why don't you use ormar yet?", category=news).save()
await Post(name="Check this out, ormar now for free", category=news).save()
posts = await Post.objects.values_list()
assert posts == [
(1, "Ormar strikes again!", 1),
(2, "Why don't you use ormar yet?", 1),
(3, "Check this out, ormar now for free", 1),
]
posts = await Post.objects.select_related(
"category__created_by"
).values_list()
assert posts == [
(1, "Ormar strikes again!", 1, 1, "News", 0, 1, 1, "Anonymous"),
(2, "Why don't you use ormar yet?", 1, 1, "News", 0, 1, 1, "Anonymous"),
(
3,
"Check this out, ormar now for free",
1,
1,
"News",
0,
1,
1,
"Anonymous",
),
]
posts = await Post.objects.select_related(
"category__created_by"
).values_list(["name", "category__name", "category__created_by__name"])
assert posts == [
("Ormar strikes again!", "News", "Anonymous"),
("Why don't you use ormar yet?", "News", "Anonymous"),
("Check this out, ormar now for free", "News", "Anonymous"),
]