rc for skip of literal binds
This commit is contained in:
@ -1,5 +1,5 @@
|
||||
import datetime
|
||||
from typing import Any, Dict, TYPE_CHECKING, Type
|
||||
from typing import Any, TYPE_CHECKING, Type
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
@ -153,9 +153,8 @@ class FilterAction(QueryAction):
|
||||
else:
|
||||
aliased_column = self.column
|
||||
clause = getattr(aliased_column, op_attr)(filter_value)
|
||||
clause = self._compile_clause(
|
||||
clause, modifiers={"escape": "\\" if self.has_escaped_character else None}
|
||||
)
|
||||
if self.has_escaped_character:
|
||||
clause.modifiers["escape"] = "\\"
|
||||
return clause
|
||||
|
||||
def _convert_dates_if_required(self) -> None:
|
||||
@ -174,42 +173,3 @@ class FilterAction(QueryAction):
|
||||
else x
|
||||
for x in self.filter_value
|
||||
]
|
||||
|
||||
def _compile_clause(
|
||||
self, clause: sqlalchemy.sql.expression.BinaryExpression, modifiers: Dict
|
||||
) -> sqlalchemy.sql.expression.TextClause:
|
||||
"""
|
||||
Compiles the clause to str using appropriate database dialect, replace columns
|
||||
names with aliased names and converts it back to TextClause.
|
||||
|
||||
:param clause: original not compiled clause
|
||||
:type clause: sqlalchemy.sql.elements.BinaryExpression
|
||||
:param modifiers: sqlalchemy modifiers - used only to escape chars here
|
||||
:type modifiers: Dict[str, NoneType]
|
||||
:return: compiled and escaped clause
|
||||
:rtype: sqlalchemy.sql.elements.TextClause
|
||||
"""
|
||||
for modifier, modifier_value in modifiers.items():
|
||||
clause.modifiers[modifier] = modifier_value
|
||||
|
||||
# compiled_clause = clause.compile(
|
||||
# dialect=self.target_model.Meta.database._backend._dialect,
|
||||
# # compile_kwargs={"literal_binds": True},
|
||||
# )
|
||||
#
|
||||
# compiled_clause2 = clause.compile(
|
||||
# dialect=self.target_model.Meta.database._backend._dialect,
|
||||
# compile_kwargs={"literal_binds": True},
|
||||
# )
|
||||
#
|
||||
# alias = f"{self.table_prefix}_" if self.table_prefix else ""
|
||||
# aliased_name = f"{alias}{self.table.name}.{self.column.name}"
|
||||
# clause_text = self.compile_query(compiled_query=compiled_clause)
|
||||
# clause_text = clause_text.replace(
|
||||
# f"{self.table.name}.{self.column.name}", aliased_name
|
||||
# )
|
||||
# # dialect_name = self.target_model.Meta.database._backend._dialect.name
|
||||
# # if dialect_name != "sqlite": # pragma: no cover
|
||||
# # clause_text = clause_text.replace("%%", "%")
|
||||
# clause = text(clause_text)
|
||||
return clause
|
||||
|
||||
@ -121,21 +121,10 @@ class FilterGroup:
|
||||
:return: complied and escaped clause
|
||||
:rtype: sqlalchemy.sql.elements.TextClause
|
||||
"""
|
||||
# prefix = " NOT " if self.exclude else ""
|
||||
if self.filter_type == FilterType.AND:
|
||||
# clause = sqlalchemy.text(
|
||||
# f"{prefix}( "
|
||||
# + str(sqlalchemy.sql.and_(*self._get_text_clauses()))
|
||||
# + " )"
|
||||
# )
|
||||
clause = sqlalchemy.sql.and_(*self._get_text_clauses())
|
||||
clause = sqlalchemy.sql.and_(*self._get_text_clauses()).self_group()
|
||||
else:
|
||||
# clause = sqlalchemy.text(
|
||||
# f"{prefix}( "
|
||||
# + str(sqlalchemy.sql.or_(*self._get_text_clauses()))
|
||||
# + " )"
|
||||
# )
|
||||
clause = sqlalchemy.sql.or_(*self._get_text_clauses())
|
||||
clause = sqlalchemy.sql.or_(*self._get_text_clauses()).self_group()
|
||||
if self.exclude:
|
||||
clause = sqlalchemy.sql.not_(clause)
|
||||
return clause
|
||||
|
||||
@ -78,7 +78,7 @@ class AliasManager:
|
||||
:rtype: List[text]
|
||||
"""
|
||||
alias = f"{alias}_" if alias else ""
|
||||
aliased_fields = [f"{alias}{x}" for x in fields]
|
||||
aliased_fields = [f"{alias}{x}" for x in fields] if fields else []
|
||||
# TODO: check if normal fields still needed or only aliased one
|
||||
all_columns = (
|
||||
table.columns
|
||||
|
||||
Reference in New Issue
Block a user