Refactor join, fix owner on added fks on through model, fix coverage, add .coveragerc settings.
This commit is contained in:
7
.coveragerc
Normal file
7
.coveragerc
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
[run]
|
||||||
|
source = ormar, tests
|
||||||
|
omit = ./tests/test.db, *py.typed*
|
||||||
|
data_file = .coverage
|
||||||
|
|
||||||
|
[report]
|
||||||
|
omit = ./tests/test.db, *py.typed*
|
||||||
@ -32,13 +32,13 @@ def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> Non
|
|||||||
model_field.to,
|
model_field.to,
|
||||||
real_name=parent_name,
|
real_name=parent_name,
|
||||||
ondelete="CASCADE",
|
ondelete="CASCADE",
|
||||||
owner=model_field.owner,
|
owner=model_field.through,
|
||||||
)
|
)
|
||||||
model_field.through.Meta.model_fields[child_name] = ForeignKey(
|
model_field.through.Meta.model_fields[child_name] = ForeignKey(
|
||||||
model_field.owner,
|
model_field.owner,
|
||||||
real_name=child_name,
|
real_name=child_name,
|
||||||
ondelete="CASCADE",
|
ondelete="CASCADE",
|
||||||
owner=model_field.owner,
|
owner=model_field.through,
|
||||||
)
|
)
|
||||||
|
|
||||||
create_and_append_m2m_fk(
|
create_and_append_m2m_fk(
|
||||||
|
|||||||
@ -14,6 +14,7 @@ from typing import (
|
|||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
from ormar.exceptions import RelationshipInstanceError # noqa I100
|
||||||
from ormar.fields import BaseField, ManyToManyField # noqa I100
|
from ormar.fields import BaseField, ManyToManyField # noqa I100
|
||||||
from ormar.relations import AliasManager
|
from ormar.relations import AliasManager
|
||||||
|
|
||||||
@ -32,12 +33,12 @@ class SqlJoin:
|
|||||||
order_columns: Optional[List],
|
order_columns: Optional[List],
|
||||||
sorted_orders: OrderedDict,
|
sorted_orders: OrderedDict,
|
||||||
main_model: Type["Model"],
|
main_model: Type["Model"],
|
||||||
|
relation_name: str,
|
||||||
related_models: Any = None,
|
related_models: Any = None,
|
||||||
own_alias: str = "",
|
own_alias: str = "",
|
||||||
) -> None:
|
) -> None:
|
||||||
self.own_alias = own_alias
|
self.relation_name = relation_name
|
||||||
self.related_models = related_models or []
|
self.related_models = related_models or []
|
||||||
self.used_aliases = used_aliases
|
|
||||||
self.select_from = select_from
|
self.select_from = select_from
|
||||||
self.columns = columns
|
self.columns = columns
|
||||||
self.fields = fields
|
self.fields = fields
|
||||||
@ -45,6 +46,34 @@ class SqlJoin:
|
|||||||
self.order_columns = order_columns
|
self.order_columns = order_columns
|
||||||
self.sorted_orders = sorted_orders
|
self.sorted_orders = sorted_orders
|
||||||
self.main_model = main_model
|
self.main_model = main_model
|
||||||
|
self.own_alias = own_alias
|
||||||
|
self.used_aliases = used_aliases
|
||||||
|
self.target_field = self.main_model.Meta.model_fields[self.relation_name]
|
||||||
|
|
||||||
|
self._next_model: Optional[Type["Model"]] = None
|
||||||
|
self._next_alias: Optional[str] = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def next_model(self) -> Type["Model"]:
|
||||||
|
if not self._next_model: # pragma: nocover
|
||||||
|
raise RelationshipInstanceError(
|
||||||
|
"Cannot link to related table if " "relation to model is not set."
|
||||||
|
)
|
||||||
|
return self._next_model
|
||||||
|
|
||||||
|
@next_model.setter
|
||||||
|
def next_model(self, value: Type["Model"]) -> None:
|
||||||
|
self._next_model = value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def next_alias(self) -> str:
|
||||||
|
if not self._next_alias: # pragma: nocover
|
||||||
|
raise RelationshipInstanceError("Alias for given relation not found.")
|
||||||
|
return self._next_alias
|
||||||
|
|
||||||
|
@next_alias.setter
|
||||||
|
def next_alias(self, value: str) -> None:
|
||||||
|
self._next_alias = value
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def alias_manager(self) -> AliasManager:
|
def alias_manager(self) -> AliasManager:
|
||||||
@ -56,18 +85,13 @@ class SqlJoin:
|
|||||||
"""
|
"""
|
||||||
return self.main_model.Meta.alias_manager
|
return self.main_model.Meta.alias_manager
|
||||||
|
|
||||||
@staticmethod
|
def on_clause(self, previous_alias: str, from_clause: str, to_clause: str,) -> text:
|
||||||
def on_clause(
|
|
||||||
previous_alias: str, alias: str, from_clause: str, to_clause: str,
|
|
||||||
) -> text:
|
|
||||||
"""
|
"""
|
||||||
Receives aliases and names of both ends of the join and combines them
|
Receives aliases and names of both ends of the join and combines them
|
||||||
into one text clause used in joins.
|
into one text clause used in joins.
|
||||||
|
|
||||||
:param previous_alias: alias of previous table
|
:param previous_alias: alias of previous table
|
||||||
:type previous_alias: str
|
:type previous_alias: str
|
||||||
:param alias: alias of current table
|
|
||||||
:type alias: str
|
|
||||||
:param from_clause: from table name
|
:param from_clause: from table name
|
||||||
:type from_clause: str
|
:type from_clause: str
|
||||||
:param to_clause: to table name
|
:param to_clause: to table name
|
||||||
@ -75,13 +99,69 @@ class SqlJoin:
|
|||||||
:return: clause combining all strings
|
:return: clause combining all strings
|
||||||
:rtype: sqlalchemy.text
|
:rtype: sqlalchemy.text
|
||||||
"""
|
"""
|
||||||
left_part = f"{alias}_{to_clause}"
|
left_part = f"{self.next_alias}_{to_clause}"
|
||||||
right_part = f"{previous_alias + '_' if previous_alias else ''}{from_clause}"
|
right_part = f"{previous_alias + '_' if previous_alias else ''}{from_clause}"
|
||||||
return text(f"{left_part}={right_part}")
|
return text(f"{left_part}={right_part}")
|
||||||
|
|
||||||
def process_deeper_join(
|
def build_join(self) -> Tuple[List, sqlalchemy.sql.select, List, OrderedDict]:
|
||||||
self, related_name: str, model_cls: Type["Model"], remainder: Any, alias: str,
|
"""
|
||||||
) -> None:
|
Main external access point for building a join.
|
||||||
|
Splits the join definition, updates fields and exclude_fields if needed,
|
||||||
|
handles switching to through models for m2m relations, returns updated lists of
|
||||||
|
used_aliases and sort_orders.
|
||||||
|
|
||||||
|
:return: list of used aliases, select from, list of aliased columns, sort orders
|
||||||
|
:rtype: Tuple[List[str], Join, List[TextClause], collections.OrderedDict]
|
||||||
|
"""
|
||||||
|
if issubclass(self.target_field, ManyToManyField):
|
||||||
|
self.process_m2m_through_table()
|
||||||
|
|
||||||
|
self.next_model = self.target_field.to
|
||||||
|
self.next_alias = self.alias_manager.resolve_relation_alias(
|
||||||
|
from_model=self.target_field.owner, relation_name=self.relation_name
|
||||||
|
)
|
||||||
|
if self.next_alias not in self.used_aliases:
|
||||||
|
self._process_join()
|
||||||
|
|
||||||
|
self._process_following_joins()
|
||||||
|
|
||||||
|
return (
|
||||||
|
self.used_aliases,
|
||||||
|
self.select_from,
|
||||||
|
self.columns,
|
||||||
|
self.sorted_orders,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_following_joins(self) -> None:
|
||||||
|
"""
|
||||||
|
Iterates through nested models to create subsequent joins.
|
||||||
|
"""
|
||||||
|
for related_name in self.related_models:
|
||||||
|
remainder = None
|
||||||
|
if (
|
||||||
|
isinstance(self.related_models, dict)
|
||||||
|
and self.related_models[related_name]
|
||||||
|
):
|
||||||
|
remainder = self.related_models[related_name]
|
||||||
|
self._process_deeper_join(related_name=related_name, remainder=remainder)
|
||||||
|
|
||||||
|
def _process_deeper_join(self, related_name: str, remainder: Any) -> None:
|
||||||
|
"""
|
||||||
|
Creates nested recurrent instance of SqlJoin for each nested join table,
|
||||||
|
updating needed return params here as a side effect.
|
||||||
|
|
||||||
|
Updated are:
|
||||||
|
|
||||||
|
* self.used_aliases,
|
||||||
|
* self.select_from,
|
||||||
|
* self.columns,
|
||||||
|
* self.sorted_orders,
|
||||||
|
|
||||||
|
:param related_name: name of the relation to follow
|
||||||
|
:type related_name: str
|
||||||
|
:param remainder: deeper tables if there are more nested joins
|
||||||
|
:type remainder: Any
|
||||||
|
"""
|
||||||
sql_join = SqlJoin(
|
sql_join = SqlJoin(
|
||||||
used_aliases=self.used_aliases,
|
used_aliases=self.used_aliases,
|
||||||
select_from=self.select_from,
|
select_from=self.select_from,
|
||||||
@ -92,94 +172,47 @@ class SqlJoin:
|
|||||||
),
|
),
|
||||||
order_columns=self.order_columns,
|
order_columns=self.order_columns,
|
||||||
sorted_orders=self.sorted_orders,
|
sorted_orders=self.sorted_orders,
|
||||||
main_model=model_cls,
|
main_model=self.next_model,
|
||||||
|
relation_name=related_name,
|
||||||
related_models=remainder,
|
related_models=remainder,
|
||||||
own_alias=alias,
|
own_alias=self.next_alias,
|
||||||
)
|
)
|
||||||
(
|
(
|
||||||
self.used_aliases,
|
self.used_aliases,
|
||||||
self.select_from,
|
self.select_from,
|
||||||
self.columns,
|
self.columns,
|
||||||
self.sorted_orders,
|
self.sorted_orders,
|
||||||
) = sql_join.build_join(related_name)
|
) = sql_join.build_join()
|
||||||
|
|
||||||
def build_join( # noqa: CCR001
|
def process_m2m_through_table(self) -> None:
|
||||||
self, related: str
|
|
||||||
) -> Tuple[List, sqlalchemy.sql.select, List, OrderedDict]:
|
|
||||||
"""
|
"""
|
||||||
Main external access point for building a join.
|
Process Through table of the ManyToMany relation so that source table is
|
||||||
Splits the join definition, updates fields and exclude_fields if needed,
|
linked to the through table (one additional join)
|
||||||
handles switching to through models for m2m relations, returns updated lists of
|
|
||||||
used_aliases and sort_orders.
|
|
||||||
|
|
||||||
:param related: string with join definition
|
Replaces needed parameters like:
|
||||||
:type related: str
|
|
||||||
:return: list of used aliases, select from, list of aliased columns, sort orders
|
* self.next_model,
|
||||||
:rtype: Tuple[List[str], Join, List[TextClause], collections.OrderedDict]
|
* self.next_alias,
|
||||||
|
* self.relation_name,
|
||||||
|
* self.own_alias,
|
||||||
|
* self.target_field
|
||||||
|
|
||||||
|
To point to through model
|
||||||
"""
|
"""
|
||||||
target_field = self.main_model.Meta.model_fields[related]
|
new_part = self.process_m2m_related_name_change()
|
||||||
prev_model = self.main_model
|
self._replace_many_to_many_order_by_columns(self.relation_name, new_part)
|
||||||
# TODO: Finish refactoring here
|
|
||||||
if issubclass(target_field, ManyToManyField):
|
|
||||||
new_part = self.process_m2m_related_name_change(
|
|
||||||
target_field=target_field, related=related
|
|
||||||
)
|
|
||||||
self._replace_many_to_many_order_by_columns(related, new_part)
|
|
||||||
|
|
||||||
model_cls = target_field.through
|
self.next_model = self.target_field.through
|
||||||
alias = self.alias_manager.resolve_relation_alias(
|
self.next_alias = self.alias_manager.resolve_relation_alias(
|
||||||
from_model=prev_model, relation_name=related
|
from_model=self.target_field.owner, relation_name=self.relation_name
|
||||||
)
|
)
|
||||||
if alias not in self.used_aliases:
|
if self.next_alias not in self.used_aliases:
|
||||||
self._process_join(
|
self._process_join()
|
||||||
model_cls=model_cls,
|
self.relation_name = new_part
|
||||||
related=related,
|
self.own_alias = self.next_alias
|
||||||
alias=alias,
|
self.target_field = self.next_model.Meta.model_fields[self.relation_name]
|
||||||
target_field=target_field,
|
|
||||||
)
|
|
||||||
related = new_part
|
|
||||||
self.own_alias = alias
|
|
||||||
prev_model = model_cls
|
|
||||||
target_field = target_field.through.Meta.model_fields[related]
|
|
||||||
|
|
||||||
model_cls = target_field.to
|
def process_m2m_related_name_change(self, reverse: bool = False) -> str:
|
||||||
alias = model_cls.Meta.alias_manager.resolve_relation_alias(
|
|
||||||
from_model=prev_model, relation_name=related
|
|
||||||
)
|
|
||||||
if alias not in self.used_aliases:
|
|
||||||
self._process_join(
|
|
||||||
model_cls=model_cls,
|
|
||||||
prev_model=prev_model,
|
|
||||||
related=related,
|
|
||||||
alias=alias,
|
|
||||||
target_field=target_field,
|
|
||||||
)
|
|
||||||
|
|
||||||
for related_name in self.related_models:
|
|
||||||
remainder = None
|
|
||||||
if (
|
|
||||||
isinstance(self.related_models, dict)
|
|
||||||
and self.related_models[related_name]
|
|
||||||
):
|
|
||||||
remainder = self.related_models[related_name]
|
|
||||||
self.process_deeper_join(
|
|
||||||
related_name=related_name,
|
|
||||||
model_cls=model_cls,
|
|
||||||
remainder=remainder,
|
|
||||||
alias=alias,
|
|
||||||
)
|
|
||||||
|
|
||||||
return (
|
|
||||||
self.used_aliases,
|
|
||||||
self.select_from,
|
|
||||||
self.columns,
|
|
||||||
self.sorted_orders,
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def process_m2m_related_name_change(
|
|
||||||
target_field: Type[ManyToManyField], related: str, reverse: bool = False
|
|
||||||
) -> str:
|
|
||||||
"""
|
"""
|
||||||
Extracts relation name to link join through the Through model declared on
|
Extracts relation name to link join through the Through model declared on
|
||||||
relation field.
|
relation field.
|
||||||
@ -188,16 +221,13 @@ class SqlJoin:
|
|||||||
|
|
||||||
:param reverse: flag if it's on_clause lookup - use reverse fields
|
:param reverse: flag if it's on_clause lookup - use reverse fields
|
||||||
:type reverse: bool
|
:type reverse: bool
|
||||||
:param target_field: relation field
|
|
||||||
:type target_field: Type[ManyToManyField]
|
|
||||||
:param related: name of the relation
|
|
||||||
:type related: str
|
|
||||||
:return: new relation name switched to through model field
|
:return: new relation name switched to through model field
|
||||||
:rtype: str
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
|
target_field = self.target_field
|
||||||
is_primary_self_ref = (
|
is_primary_self_ref = (
|
||||||
target_field.self_reference
|
target_field.self_reference
|
||||||
and related == target_field.self_reference_primary
|
and self.relation_name == target_field.self_reference_primary
|
||||||
)
|
)
|
||||||
if (is_primary_self_ref and not reverse) or (
|
if (is_primary_self_ref and not reverse) or (
|
||||||
not is_primary_self_ref and reverse
|
not is_primary_self_ref and reverse
|
||||||
@ -207,14 +237,7 @@ class SqlJoin:
|
|||||||
new_part = target_field.default_target_field_name() # type: ignore
|
new_part = target_field.default_target_field_name() # type: ignore
|
||||||
return new_part
|
return new_part
|
||||||
|
|
||||||
def _process_join( # noqa: CFQ002
|
def _process_join(self,) -> None: # noqa: CFQ002
|
||||||
self,
|
|
||||||
model_cls: Type["Model"],
|
|
||||||
related: str,
|
|
||||||
alias: str,
|
|
||||||
target_field: Type[BaseField],
|
|
||||||
prev_model: Type["Model"] = None,
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Resolves to and from column names and table names.
|
Resolves to and from column names and table names.
|
||||||
|
|
||||||
@ -228,51 +251,38 @@ class SqlJoin:
|
|||||||
|
|
||||||
Process order_by causes for non m2m relations.
|
Process order_by causes for non m2m relations.
|
||||||
|
|
||||||
:param model_cls:
|
|
||||||
:type model_cls: ormar.models.metaclass.ModelMetaclass
|
|
||||||
:param related: name of the field used in join
|
|
||||||
:type related: str
|
|
||||||
:param alias: alias of the current join
|
|
||||||
:type alias: str
|
|
||||||
"""
|
"""
|
||||||
to_table = model_cls.Meta.table.name
|
to_table = self.next_model.Meta.table.name
|
||||||
to_key, from_key = self.get_to_and_from_keys(related, target_field)
|
to_key, from_key = self.get_to_and_from_keys()
|
||||||
|
|
||||||
prev_model = prev_model or self.main_model
|
|
||||||
|
|
||||||
on_clause = self.on_clause(
|
on_clause = self.on_clause(
|
||||||
previous_alias=self.own_alias,
|
previous_alias=self.own_alias,
|
||||||
alias=alias,
|
from_clause=f"{self.target_field.owner.Meta.tablename}.{from_key}",
|
||||||
from_clause=f"{prev_model.Meta.tablename}.{from_key}",
|
|
||||||
to_clause=f"{to_table}.{to_key}",
|
to_clause=f"{to_table}.{to_key}",
|
||||||
)
|
)
|
||||||
target_table = self.alias_manager.prefixed_table_name(alias, to_table)
|
target_table = self.alias_manager.prefixed_table_name(self.next_alias, to_table)
|
||||||
self.select_from = sqlalchemy.sql.outerjoin(
|
self.select_from = sqlalchemy.sql.outerjoin(
|
||||||
self.select_from, target_table, on_clause
|
self.select_from, target_table, on_clause
|
||||||
)
|
)
|
||||||
|
|
||||||
pkname_alias = model_cls.get_column_alias(model_cls.Meta.pkname)
|
pkname_alias = self.next_model.get_column_alias(self.next_model.Meta.pkname)
|
||||||
if not issubclass(target_field, ManyToManyField):
|
if not issubclass(self.target_field, ManyToManyField):
|
||||||
self.get_order_bys(
|
self.get_order_bys(
|
||||||
alias=alias,
|
to_table=to_table, pkname_alias=pkname_alias,
|
||||||
to_table=to_table,
|
|
||||||
pkname_alias=pkname_alias,
|
|
||||||
part=related,
|
|
||||||
model_cls=model_cls,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
self_related_fields = model_cls.own_table_columns(
|
self_related_fields = self.next_model.own_table_columns(
|
||||||
model=model_cls,
|
model=self.next_model,
|
||||||
fields=self.fields,
|
fields=self.fields,
|
||||||
exclude_fields=self.exclude_fields,
|
exclude_fields=self.exclude_fields,
|
||||||
use_alias=True,
|
use_alias=True,
|
||||||
)
|
)
|
||||||
self.columns.extend(
|
self.columns.extend(
|
||||||
self.alias_manager.prefixed_columns(
|
self.alias_manager.prefixed_columns(
|
||||||
alias, model_cls.Meta.table, self_related_fields
|
self.next_alias, self.next_model.Meta.table, self_related_fields
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.used_aliases.append(alias)
|
self.used_aliases.append(self.next_alias)
|
||||||
|
|
||||||
def _replace_many_to_many_order_by_columns(self, part: str, new_part: str) -> None:
|
def _replace_many_to_many_order_by_columns(self, part: str, new_part: str) -> None:
|
||||||
"""
|
"""
|
||||||
@ -310,63 +320,42 @@ class SqlJoin:
|
|||||||
condition[-2] == part or condition[-2][1:] == part
|
condition[-2] == part or condition[-2][1:] == part
|
||||||
)
|
)
|
||||||
|
|
||||||
def set_aliased_order_by(
|
def set_aliased_order_by(self, condition: List[str], to_table: str,) -> None:
|
||||||
self, condition: List[str], alias: str, to_table: str, model_cls: Type["Model"],
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Substitute hyphens ('-') with descending order.
|
Substitute hyphens ('-') with descending order.
|
||||||
Construct actual sqlalchemy text clause using aliased table and column name.
|
Construct actual sqlalchemy text clause using aliased table and column name.
|
||||||
|
|
||||||
:param condition: list of parts of a current condition split by '__'
|
:param condition: list of parts of a current condition split by '__'
|
||||||
:type condition: List[str]
|
:type condition: List[str]
|
||||||
:param alias: alias of the table in current join
|
|
||||||
:type alias: str
|
|
||||||
:param to_table: target table
|
:param to_table: target table
|
||||||
:type to_table: sqlalchemy.sql.elements.quoted_name
|
:type to_table: sqlalchemy.sql.elements.quoted_name
|
||||||
:param model_cls: ormar model class
|
|
||||||
:type model_cls: ormar.models.metaclass.ModelMetaclass
|
|
||||||
"""
|
"""
|
||||||
direction = f"{'desc' if condition[0][0] == '-' else ''}"
|
direction = f"{'desc' if condition[0][0] == '-' else ''}"
|
||||||
column_alias = model_cls.get_column_alias(condition[-1])
|
column_alias = self.next_model.get_column_alias(condition[-1])
|
||||||
order = text(f"{alias}_{to_table}.{column_alias} {direction}")
|
order = text(f"{self.next_alias}_{to_table}.{column_alias} {direction}")
|
||||||
self.sorted_orders["__".join(condition)] = order
|
self.sorted_orders["__".join(condition)] = order
|
||||||
|
|
||||||
def get_order_bys( # noqa: CCR001
|
def get_order_bys(self, to_table: str, pkname_alias: str,) -> None: # noqa: CCR001
|
||||||
self,
|
|
||||||
alias: str,
|
|
||||||
to_table: str,
|
|
||||||
pkname_alias: str,
|
|
||||||
part: str,
|
|
||||||
model_cls: Type["Model"],
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Triggers construction of order bys if they are given.
|
Triggers construction of order bys if they are given.
|
||||||
Otherwise by default each table is sorted by a primary key column asc.
|
Otherwise by default each table is sorted by a primary key column asc.
|
||||||
|
|
||||||
:param alias: alias of current table in join
|
|
||||||
:type alias: str
|
|
||||||
:param to_table: target table
|
:param to_table: target table
|
||||||
:type to_table: sqlalchemy.sql.elements.quoted_name
|
:type to_table: sqlalchemy.sql.elements.quoted_name
|
||||||
:param pkname_alias: alias of the primary key column
|
:param pkname_alias: alias of the primary key column
|
||||||
:type pkname_alias: str
|
:type pkname_alias: str
|
||||||
:param part: name of the current relation join
|
|
||||||
:type part: str
|
|
||||||
:param model_cls: ormar model class
|
|
||||||
:type model_cls: Type[Model]
|
|
||||||
"""
|
"""
|
||||||
|
alias = self.next_alias
|
||||||
if self.order_columns:
|
if self.order_columns:
|
||||||
current_table_sorted = False
|
current_table_sorted = False
|
||||||
split_order_columns = [
|
split_order_columns = [
|
||||||
x.split("__") for x in self.order_columns if "__" in x
|
x.split("__") for x in self.order_columns if "__" in x
|
||||||
]
|
]
|
||||||
for condition in split_order_columns:
|
for condition in split_order_columns:
|
||||||
if self._check_if_condition_apply(condition, part):
|
if self._check_if_condition_apply(condition, self.relation_name):
|
||||||
current_table_sorted = True
|
current_table_sorted = True
|
||||||
self.set_aliased_order_by(
|
self.set_aliased_order_by(
|
||||||
condition=condition,
|
condition=condition, to_table=to_table,
|
||||||
alias=alias,
|
|
||||||
to_table=to_table,
|
|
||||||
model_cls=model_cls,
|
|
||||||
)
|
)
|
||||||
if not current_table_sorted:
|
if not current_table_sorted:
|
||||||
order = text(f"{alias}_{to_table}.{pkname_alias}")
|
order = text(f"{alias}_{to_table}.{pkname_alias}")
|
||||||
@ -376,34 +365,28 @@ class SqlJoin:
|
|||||||
order = text(f"{alias}_{to_table}.{pkname_alias}")
|
order = text(f"{alias}_{to_table}.{pkname_alias}")
|
||||||
self.sorted_orders[f"{alias}.{pkname_alias}"] = order
|
self.sorted_orders[f"{alias}.{pkname_alias}"] = order
|
||||||
|
|
||||||
def get_to_and_from_keys(
|
def get_to_and_from_keys(self) -> Tuple[str, str]:
|
||||||
self, related: str, target_field: Type[BaseField]
|
|
||||||
) -> Tuple[str, str]:
|
|
||||||
"""
|
"""
|
||||||
Based on the relation type, name of the relation and previous models and parts
|
Based on the relation type, name of the relation and previous models and parts
|
||||||
stored in JoinParameters it resolves the current to and from keys, which are
|
stored in JoinParameters it resolves the current to and from keys, which are
|
||||||
different for ManyToMany relation, ForeignKey and reverse related of relations.
|
different for ManyToMany relation, ForeignKey and reverse related of relations.
|
||||||
|
|
||||||
:param target_field: relation field
|
|
||||||
:type target_field: Type[ForeignKeyField]
|
|
||||||
:param related: name of the current relation join
|
|
||||||
:type related: str
|
|
||||||
:return: to key and from key
|
:return: to key and from key
|
||||||
:rtype: Tuple[str, str]
|
:rtype: Tuple[str, str]
|
||||||
"""
|
"""
|
||||||
if issubclass(target_field, ManyToManyField):
|
if issubclass(self.target_field, ManyToManyField):
|
||||||
to_key = self.process_m2m_related_name_change(
|
to_key = self.process_m2m_related_name_change(reverse=True)
|
||||||
target_field=target_field, related=related, reverse=True
|
|
||||||
)
|
|
||||||
from_key = self.main_model.get_column_alias(self.main_model.Meta.pkname)
|
from_key = self.main_model.get_column_alias(self.main_model.Meta.pkname)
|
||||||
|
|
||||||
elif target_field.virtual:
|
elif self.target_field.virtual:
|
||||||
to_field = target_field.get_related_name()
|
to_field = self.target_field.get_related_name()
|
||||||
to_key = target_field.to.get_column_alias(to_field)
|
to_key = self.target_field.to.get_column_alias(to_field)
|
||||||
from_key = self.main_model.get_column_alias(self.main_model.Meta.pkname)
|
from_key = self.main_model.get_column_alias(self.main_model.Meta.pkname)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
to_key = target_field.to.get_column_alias(target_field.to.Meta.pkname)
|
to_key = self.target_field.to.get_column_alias(
|
||||||
from_key = self.main_model.get_column_alias(related)
|
self.target_field.to.Meta.pkname
|
||||||
|
)
|
||||||
|
from_key = self.main_model.get_column_alias(self.relation_name)
|
||||||
|
|
||||||
return to_key, from_key
|
return to_key, from_key
|
||||||
|
|||||||
@ -141,8 +141,6 @@ class Query:
|
|||||||
else:
|
else:
|
||||||
self.select_from = self.table
|
self.select_from = self.table
|
||||||
|
|
||||||
# TODO: Refactor to convert to nested dict like in from_row in model
|
|
||||||
self._select_related.sort(key=lambda item: (item, -len(item)))
|
|
||||||
related_models = group_related_list(self._select_related)
|
related_models = group_related_list(self._select_related)
|
||||||
|
|
||||||
for related in related_models:
|
for related in related_models:
|
||||||
@ -160,6 +158,7 @@ class Query:
|
|||||||
order_columns=self.order_columns,
|
order_columns=self.order_columns,
|
||||||
sorted_orders=self.sorted_orders,
|
sorted_orders=self.sorted_orders,
|
||||||
main_model=self.model_cls,
|
main_model=self.model_cls,
|
||||||
|
relation_name=related,
|
||||||
related_models=remainder,
|
related_models=remainder,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -168,7 +167,7 @@ class Query:
|
|||||||
self.select_from,
|
self.select_from,
|
||||||
self.columns,
|
self.columns,
|
||||||
self.sorted_orders,
|
self.sorted_orders,
|
||||||
) = sql_join.build_join(related)
|
) = sql_join.build_join()
|
||||||
|
|
||||||
expr = sqlalchemy.sql.select(self.columns)
|
expr = sqlalchemy.sql.select(self.columns)
|
||||||
expr = expr.select_from(self.select_from)
|
expr = expr.select_from(self.select_from)
|
||||||
|
|||||||
@ -105,13 +105,13 @@ class AliasManager:
|
|||||||
"""
|
"""
|
||||||
parent_key = f"{source_model.get_name()}_{relation_name}"
|
parent_key = f"{source_model.get_name()}_{relation_name}"
|
||||||
if parent_key not in self._aliases_new:
|
if parent_key not in self._aliases_new:
|
||||||
self._aliases_new[parent_key] = get_table_alias()
|
self.add_alias(parent_key)
|
||||||
|
|
||||||
to_field = source_model.Meta.model_fields[relation_name]
|
to_field = source_model.Meta.model_fields[relation_name]
|
||||||
child_model = to_field.to
|
child_model = to_field.to
|
||||||
child_key = f"{child_model.get_name()}_{reverse_name}"
|
child_key = f"{child_model.get_name()}_{reverse_name}"
|
||||||
if child_key not in self._aliases_new:
|
if child_key not in self._aliases_new:
|
||||||
self._aliases_new[child_key] = get_table_alias()
|
self.add_alias(child_key)
|
||||||
|
|
||||||
def add_alias(self, alias_key: str) -> str:
|
def add_alias(self, alias_key: str) -> str:
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user