wip - refactor of alias resolver - through models columns with fields are not properly handled yet
This commit is contained in:
@ -26,6 +26,7 @@ from ormar.queryset.actions.order_action import OrderAction
|
||||
from ormar.queryset.clause import FilterGroup, Prefix, QueryClause
|
||||
from ormar.queryset.prefetch_query import PrefetchQuery
|
||||
from ormar.queryset.query import Query
|
||||
from ormar.queryset.reverse_alias_resolver import ReverseAliasResolver
|
||||
from ormar.queryset.utils import get_relationship_alias_model_and_str
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
@ -586,10 +587,12 @@ class QuerySet(Generic[T]):
|
||||
rows = await self.database.fetch_all(expr)
|
||||
if not rows:
|
||||
return []
|
||||
column_names = list(rows[0].keys())
|
||||
column_map = self._resolve_data_prefix_to_relation_str(
|
||||
column_names=column_names
|
||||
alias_resolver = ReverseAliasResolver(
|
||||
select_related=self._select_related,
|
||||
excludable=self._excludable,
|
||||
model_cls=self.model_cls,
|
||||
)
|
||||
column_map = alias_resolver.resolve_columns(columns_names=list(rows[0].keys()))
|
||||
result = [
|
||||
{column_map.get(k): v for k, v in dict(x).items() if k in column_map}
|
||||
for x in rows
|
||||
@ -598,7 +601,7 @@ class QuerySet(Generic[T]):
|
||||
return result
|
||||
if _flatten and not self._excludable.include_entry_count() == 1:
|
||||
raise QueryDefinitionError(
|
||||
"You cannot flatten values_list if more than " "one field is selected!"
|
||||
"You cannot flatten values_list if more than one field is selected!"
|
||||
)
|
||||
tuple_result = [tuple(x.values()) for x in result]
|
||||
return tuple_result if not _flatten else [x[0] for x in tuple_result]
|
||||
@ -625,50 +628,6 @@ class QuerySet(Generic[T]):
|
||||
"""
|
||||
return await self.values(fields=fields, _as_dict=False, _flatten=flatten)
|
||||
|
||||
def _resolve_data_prefix_to_relation_str(self, column_names: List[str]) -> Dict:
|
||||
resolved_names = dict()
|
||||
for column_name in column_names:
|
||||
prefixes_map = self._create_prefixes_map()
|
||||
column_parts = column_name.split("_")
|
||||
potential_prefix = column_parts[0]
|
||||
if potential_prefix in prefixes_map:
|
||||
prefix = prefixes_map[potential_prefix]
|
||||
allowed_columns = prefix.model_cls.own_table_columns(
|
||||
model=prefix.model_cls,
|
||||
excludable=self._excludable,
|
||||
alias=prefix.table_prefix,
|
||||
add_pk_columns=False,
|
||||
)
|
||||
new_column_name = "_".join(column_parts[1:])
|
||||
if new_column_name in allowed_columns:
|
||||
resolved_names[column_name] = f"{prefix.relation_str}__" + "_".join(
|
||||
column_name.split("_")[1:]
|
||||
)
|
||||
else:
|
||||
assert self.model_cls
|
||||
allowed_columns = self.model_cls.own_table_columns(
|
||||
model=self.model_cls,
|
||||
excludable=self._excludable,
|
||||
add_pk_columns=False,
|
||||
)
|
||||
if column_name in allowed_columns:
|
||||
resolved_names[column_name] = column_name
|
||||
return resolved_names
|
||||
|
||||
def _create_prefixes_map(self) -> Dict[str, Prefix]:
|
||||
prefixes: List[Prefix] = []
|
||||
for related in self._select_related:
|
||||
related_split = related.split("__")
|
||||
for index in range(len(related_split)):
|
||||
prefix = Prefix(
|
||||
self.model_cls, # type: ignore
|
||||
*get_relationship_alias_model_and_str(
|
||||
self.model_cls, related_split[0 : (index + 1)] # type: ignore
|
||||
),
|
||||
)
|
||||
prefixes.append(prefix)
|
||||
return {x.table_prefix: x for x in prefixes}
|
||||
|
||||
async def exists(self) -> bool:
|
||||
"""
|
||||
Returns a bool value to confirm if there are rows matching the given criteria
|
||||
|
||||
82
ormar/queryset/reverse_alias_resolver.py
Normal file
82
ormar/queryset/reverse_alias_resolver.py
Normal file
@ -0,0 +1,82 @@
|
||||
from typing import Dict, List, TYPE_CHECKING, Tuple, Type
|
||||
|
||||
from ormar.queryset.clause import Prefix
|
||||
from ormar.queryset.utils import get_relationship_alias_model_and_str
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ormar import Model
|
||||
from ormar.models.excludable import ExcludableItems
|
||||
|
||||
|
||||
class ReverseAliasResolver:
|
||||
def __init__(
|
||||
self,
|
||||
model_cls: Type["Model"],
|
||||
excludable: "ExcludableItems",
|
||||
select_related: List[str],
|
||||
) -> None:
|
||||
self.select_related = select_related
|
||||
self.model_cls = model_cls
|
||||
self.reversed_aliases = self.model_cls.Meta.alias_manager.reversed_aliases
|
||||
self.excludable = excludable
|
||||
|
||||
def resolve_columns(self, columns_names: List[str]) -> Dict:
|
||||
resolved_names = dict()
|
||||
prefixes, target_models = self._create_prefixes_map()
|
||||
for column_name in columns_names:
|
||||
column_parts = column_name.split("_")
|
||||
potential_prefix = column_parts[0]
|
||||
if potential_prefix in self.reversed_aliases:
|
||||
relation = self.reversed_aliases[potential_prefix]
|
||||
relation_str = prefixes[relation]
|
||||
target_model = target_models[relation]
|
||||
allowed_columns = target_model.own_table_columns(
|
||||
model=target_model,
|
||||
excludable=self.excludable,
|
||||
alias=potential_prefix,
|
||||
add_pk_columns=False,
|
||||
)
|
||||
new_column_name = column_name.replace(f"{potential_prefix}_", "")
|
||||
if new_column_name in allowed_columns:
|
||||
resolved_names[column_name] = column_name.replace(
|
||||
f"{potential_prefix}_", f"{relation_str}__"
|
||||
)
|
||||
else:
|
||||
allowed_columns = self.model_cls.own_table_columns(
|
||||
model=self.model_cls,
|
||||
excludable=self.excludable,
|
||||
add_pk_columns=False,
|
||||
)
|
||||
if column_name in allowed_columns:
|
||||
resolved_names[column_name] = column_name
|
||||
|
||||
return resolved_names
|
||||
|
||||
def _create_prefixes_map(self) -> Tuple[Dict, Dict]:
|
||||
prefixes: Dict = dict()
|
||||
target_models: Dict = dict()
|
||||
for related in self.select_related:
|
||||
model_cls = self.model_cls
|
||||
related_split = related.split("__")
|
||||
related_str = ""
|
||||
for related in related_split:
|
||||
prefix_name = f"{model_cls.get_name()}_{related}"
|
||||
new_related_str = (f"{related_str}__" if related_str else "") + related
|
||||
prefixes[prefix_name] = new_related_str
|
||||
field = model_cls.Meta.model_fields[related]
|
||||
target_models[prefix_name] = field.to
|
||||
if field.is_multi:
|
||||
target_models[prefix_name] = field.through
|
||||
new_through_str = (
|
||||
f"{related_str}__" if related_str else ""
|
||||
) + field.through.get_name()
|
||||
prefixes[prefix_name] = new_through_str
|
||||
prefix_name = (
|
||||
f"{field.through.get_name()}_"
|
||||
f"{field.default_target_field_name()}"
|
||||
)
|
||||
prefixes[prefix_name] = new_related_str
|
||||
target_models[prefix_name] = field.to
|
||||
model_cls = field.to
|
||||
related_str = new_related_str
|
||||
return prefixes, target_models
|
||||
@ -34,6 +34,7 @@ class AliasManager:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._aliases_new: Dict[str, str] = dict()
|
||||
self._reversed_aliases: Dict[str, str] = dict()
|
||||
|
||||
def __contains__(self, item: str) -> bool:
|
||||
return self._aliases_new.__contains__(item)
|
||||
@ -41,6 +42,14 @@ class AliasManager:
|
||||
def __getitem__(self, key: str) -> Any:
|
||||
return self._aliases_new.__getitem__(key)
|
||||
|
||||
@property
|
||||
def reversed_aliases(self):
|
||||
if self._reversed_aliases:
|
||||
return self._reversed_aliases
|
||||
reversed_aliases = {v: k for k, v in self._aliases_new.items()}
|
||||
self._reversed_aliases = reversed_aliases
|
||||
return self._reversed_aliases
|
||||
|
||||
@staticmethod
|
||||
def prefixed_columns(
|
||||
alias: str, table: sqlalchemy.Table, fields: List = None
|
||||
|
||||
Reference in New Issue
Block a user