finish implementing values, missing docstrings and docs

This commit is contained in:
collerek
2021-06-07 17:42:30 +02:00
parent 13a8655126
commit d441c36d01
8 changed files with 248 additions and 79 deletions

View File

@ -92,8 +92,6 @@ class ExcludableItems:
Returns count of include items inside
"""
count = 0
if not self.items:
return count
for key in self.items.keys():
count += len(self.items[key].include)
return count

View File

@ -559,6 +559,7 @@ class QuerySet(Generic[T]):
async def values(
self,
fields: Union[List, str, Set, Dict] = None,
exclude_through: bool = False,
_as_dict: bool = True,
_flatten: bool = False,
) -> List:
@ -571,6 +572,8 @@ class QuerySet(Generic[T]):
Note that it always return a list even for one row from database.
:param exclude_through: flag if through models should be excluded
:type exclude_through: bool
:param _flatten: internal parameter to flatten one element tuples
:type _flatten: bool
:param _as_dict: internal parameter if return dict or tuples
@ -580,7 +583,7 @@ class QuerySet(Generic[T]):
"""
if fields:
return await self.fields(columns=fields).values(
_as_dict=_as_dict, _flatten=_flatten
_as_dict=_as_dict, _flatten=_flatten, exclude_through=exclude_through
)
expr = self.build_select_expression()
rows = await self.database.fetch_all(expr)
@ -589,7 +592,8 @@ class QuerySet(Generic[T]):
alias_resolver = ReverseAliasResolver(
select_related=self._select_related,
excludable=self._excludable,
model_cls=self.model_cls,
model_cls=self.model_cls, # type: ignore
exclude_through=exclude_through,
)
column_map = alias_resolver.resolve_columns(columns_names=list(rows[0].keys()))
result = [
@ -606,7 +610,10 @@ class QuerySet(Generic[T]):
return tuple_result if not _flatten else [x[0] for x in tuple_result]
async def values_list(
self, fields: Union[List, str, Set, Dict] = None, flatten: bool = False
self,
fields: Union[List, str, Set, Dict] = None,
flatten: bool = False,
exclude_through: bool = False,
) -> List:
"""
Return a list of tuples with column values in order of the fields passed or
@ -620,12 +627,19 @@ class QuerySet(Generic[T]):
Note that it always return a list even for one row from database.
:param exclude_through: flag if through models should be excluded
:type exclude_through: bool
:param fields: field name or list of field names to extract from db
:type fields: Union[str, List[str]]
:param flatten: when one field is passed you can flatten the list of tuples
:type flatten: bool
"""
return await self.values(fields=fields, _as_dict=False, _flatten=flatten)
return await self.values(
fields=fields,
exclude_through=exclude_through,
_as_dict=False,
_flatten=flatten,
)
async def exists(self) -> bool:
"""

View File

@ -1,8 +1,8 @@
from typing import Dict, List, TYPE_CHECKING, Tuple, Type
from typing import Dict, List, TYPE_CHECKING, Type, cast
if TYPE_CHECKING:
from ormar import Model
from ormar.models.excludable import ExcludableItems
if TYPE_CHECKING: # pragma: no cover
from ormar import ForeignKeyField, Model
from ormar.models.excludable import Excludable, ExcludableItems
class ReverseAliasResolver:
@ -11,33 +11,28 @@ class ReverseAliasResolver:
model_cls: Type["Model"],
excludable: "ExcludableItems",
select_related: List[str],
exclude_through: bool = False,
) -> None:
self.select_related = select_related
self.model_cls = model_cls
self.reversed_aliases = self.model_cls.Meta.alias_manager.reversed_aliases
self.excludable = excludable
self.exclude_through = exclude_through
self._fields: Dict[str, "ForeignKeyField"] = dict()
self._prefixes: Dict[str, str] = dict()
self._previous_prefixes: List[str] = [""]
self._resolved_names: Dict[str, str] = dict()
def resolve_columns(self, columns_names: List[str]) -> Dict:
resolved_names = dict()
prefixes, target_models = self._create_prefixes_map()
self._create_prefixes_map()
for column_name in columns_names:
column_parts = column_name.split("_")
potential_prefix = column_parts[0]
if potential_prefix in self.reversed_aliases:
relation = self.reversed_aliases[potential_prefix]
relation_str = prefixes[relation]
target_model = target_models[relation]
allowed_columns = target_model.own_table_columns(
model=target_model,
excludable=self.excludable,
alias=potential_prefix,
add_pk_columns=False,
self._resolve_column_with_prefix(
column_name=column_name, prefix=potential_prefix
)
new_column_name = column_name.replace(f"{potential_prefix}_", "")
if new_column_name in allowed_columns:
resolved_names[column_name] = column_name.replace(
f"{potential_prefix}_", f"{relation_str}__"
)
else:
allowed_columns = self.model_cls.own_table_columns(
model=self.model_cls,
@ -45,35 +40,96 @@ class ReverseAliasResolver:
add_pk_columns=False,
)
if column_name in allowed_columns:
resolved_names[column_name] = column_name
self._resolved_names[column_name] = column_name
return resolved_names
return self._resolved_names
def _create_prefixes_map(self) -> Tuple[Dict, Dict]:
prefixes: Dict = dict()
target_models: Dict = dict()
def _resolve_column_with_prefix(self, column_name: str, prefix: str) -> None:
relation = self.reversed_aliases.get(prefix, None)
relation_str = self._prefixes.get(relation, None)
field = self._fields.get(relation, None)
if relation_str is None or field is None:
return
is_through = field.is_multi and field.through.get_name() in relation_str
if self._check_if_field_is_excluded(
prefix=prefix, field=field, is_through=is_through
):
return
target_model = field.through if is_through else field.to
allowed_columns = target_model.own_table_columns(
model=target_model,
excludable=self.excludable,
alias=prefix,
add_pk_columns=False,
)
new_column_name = column_name.replace(f"{prefix}_", "")
if new_column_name in allowed_columns:
self._resolved_names[column_name] = column_name.replace(
f"{prefix}_", f"{relation_str}__"
)
def _check_if_field_is_excluded(
self, prefix: str, field: "ForeignKeyField", is_through: bool
) -> bool:
shift, field_name = 1, field.name
if is_through:
field_name = field.through.get_name()
elif field.is_multi:
shift = 2
previous_excludable = self._get_previous_excludable(
prefix=prefix, field=field, shift=shift
)
return previous_excludable.is_excluded(field_name)
def _get_previous_excludable(
self, prefix: str, field: "ForeignKeyField", shift: int = 1
) -> "Excludable":
if prefix not in self._previous_prefixes:
self._previous_prefixes.append(prefix)
previous_prefix_ind = self._previous_prefixes.index(prefix)
previous_prefix = (
self._previous_prefixes[previous_prefix_ind - shift]
if previous_prefix_ind > (shift - 1)
else ""
)
return self.excludable.get(field.owner, alias=previous_prefix)
def _create_prefixes_map(self) -> None:
for related in self.select_related:
model_cls = self.model_cls
related_split = related.split("__")
related_str = ""
for related in related_split:
prefix_name = f"{model_cls.get_name()}_{related}"
new_related_str = (f"{related_str}__" if related_str else "") + related
prefixes[prefix_name] = new_related_str
field = model_cls.Meta.model_fields[related]
target_models[prefix_name] = field.to
if field.is_multi:
target_models[prefix_name] = field.through
new_through_str = (
f"{related_str}__" if related_str else ""
) + field.through.get_name()
prefixes[prefix_name] = new_through_str
prefix_name = (
f"{field.through.get_name()}_"
f"{field.default_target_field_name()}"
)
prefixes[prefix_name] = new_related_str
target_models[prefix_name] = field.to
for relation in related_split:
previous_related_str = f"{related_str}__" if related_str else ""
new_related_str = previous_related_str + relation
field = model_cls.Meta.model_fields[relation]
field = cast("ForeignKeyField", field)
prefix_name = self._handle_through_fields_and_prefix(
model_cls=model_cls,
field=field,
previous_related_str=previous_related_str,
relation=relation,
)
self._prefixes[prefix_name] = new_related_str
self._fields[prefix_name] = field
model_cls = field.to
related_str = new_related_str
return prefixes, target_models
def _handle_through_fields_and_prefix(
self,
model_cls: Type["Model"],
field: "ForeignKeyField",
previous_related_str: str,
relation: str,
) -> str:
prefix_name = f"{model_cls.get_name()}_{relation}"
if field.is_multi:
through_name = field.through.get_name()
if not self.exclude_through:
self._fields[prefix_name] = field
new_through_str = previous_related_str + through_name
self._prefixes[prefix_name] = new_through_str
prefix_name = f"{through_name}_{field.default_target_field_name()}"
return prefix_name