refactor translating of aliases from queryset to modelproxy
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@ -2,6 +2,7 @@ p38venv
|
|||||||
.idea
|
.idea
|
||||||
.pytest_cache
|
.pytest_cache
|
||||||
.mypy_cache
|
.mypy_cache
|
||||||
|
.coverage
|
||||||
*.pyc
|
*.pyc
|
||||||
*.log
|
*.log
|
||||||
test.db
|
test.db
|
||||||
|
|||||||
@ -145,7 +145,7 @@ class Model(NewBaseModel):
|
|||||||
|
|
||||||
self_fields = self._extract_model_db_fields()
|
self_fields = self._extract_model_db_fields()
|
||||||
self_fields.pop(self.get_column_name_from_alias(self.Meta.pkname))
|
self_fields.pop(self.get_column_name_from_alias(self.Meta.pkname))
|
||||||
self_fields = self.objects._translate_columns_to_aliases(self_fields)
|
self_fields = self.translate_columns_to_aliases(self_fields)
|
||||||
expr = self.Meta.table.update().values(**self_fields)
|
expr = self.Meta.table.update().values(**self_fields)
|
||||||
expr = expr.where(self.pk_column == getattr(self, self.Meta.pkname))
|
expr = expr.where(self.pk_column == getattr(self, self.Meta.pkname))
|
||||||
|
|
||||||
@ -166,6 +166,6 @@ class Model(NewBaseModel):
|
|||||||
"Instance was deleted from database and cannot be refreshed"
|
"Instance was deleted from database and cannot be refreshed"
|
||||||
)
|
)
|
||||||
kwargs = dict(row)
|
kwargs = dict(row)
|
||||||
kwargs = self.objects._translate_aliases_to_columns(kwargs)
|
kwargs = self.translate_aliases_to_columns(kwargs)
|
||||||
self.from_dict(kwargs)
|
self.from_dict(kwargs)
|
||||||
return self
|
return self
|
||||||
|
|||||||
@ -140,6 +140,24 @@ class ModelTableProxy:
|
|||||||
)
|
)
|
||||||
return to_field
|
return to_field
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def translate_columns_to_aliases(cls, new_kwargs: dict) -> dict:
|
||||||
|
for field_name, field in cls.Meta.model_fields.items():
|
||||||
|
if (
|
||||||
|
field_name in new_kwargs
|
||||||
|
and field.name is not None
|
||||||
|
and field.name != field_name
|
||||||
|
):
|
||||||
|
new_kwargs[field.name] = new_kwargs.pop(field_name)
|
||||||
|
return new_kwargs
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def translate_aliases_to_columns(cls, new_kwargs: dict) -> dict:
|
||||||
|
for field_name, field in cls.Meta.model_fields.items():
|
||||||
|
if field.name in new_kwargs and field.name != field_name:
|
||||||
|
new_kwargs[field_name] = new_kwargs.pop(field.name)
|
||||||
|
return new_kwargs
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def merge_instances_list(cls, result_rows: List["Model"]) -> List["Model"]:
|
def merge_instances_list(cls, result_rows: List["Model"]) -> List["Model"]:
|
||||||
merged_rows: List["Model"] = []
|
merged_rows: List["Model"] = []
|
||||||
|
|||||||
@ -74,7 +74,7 @@ class QuerySet:
|
|||||||
new_kwargs = self._remove_pk_from_kwargs(new_kwargs)
|
new_kwargs = self._remove_pk_from_kwargs(new_kwargs)
|
||||||
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
||||||
new_kwargs = self._populate_default_values(new_kwargs)
|
new_kwargs = self._populate_default_values(new_kwargs)
|
||||||
new_kwargs = self._translate_columns_to_aliases(new_kwargs)
|
new_kwargs = self.model.translate_columns_to_aliases(new_kwargs)
|
||||||
return new_kwargs
|
return new_kwargs
|
||||||
|
|
||||||
def _populate_default_values(self, new_kwargs: dict) -> dict:
|
def _populate_default_values(self, new_kwargs: dict) -> dict:
|
||||||
@ -83,22 +83,6 @@ class QuerySet:
|
|||||||
new_kwargs[field_name] = field.get_default()
|
new_kwargs[field_name] = field.get_default()
|
||||||
return new_kwargs
|
return new_kwargs
|
||||||
|
|
||||||
def _translate_columns_to_aliases(self, new_kwargs: dict) -> dict:
|
|
||||||
for field_name, field in self.model_meta.model_fields.items():
|
|
||||||
if (
|
|
||||||
field_name in new_kwargs
|
|
||||||
and field.name is not None
|
|
||||||
and field.name != field_name
|
|
||||||
):
|
|
||||||
new_kwargs[field.name] = new_kwargs.pop(field_name)
|
|
||||||
return new_kwargs
|
|
||||||
|
|
||||||
def _translate_aliases_to_columns(self, new_kwargs: dict) -> dict:
|
|
||||||
for field_name, field in self.model_meta.model_fields.items():
|
|
||||||
if field.name in new_kwargs and field.name != field_name:
|
|
||||||
new_kwargs[field_name] = new_kwargs.pop(field.name)
|
|
||||||
return new_kwargs
|
|
||||||
|
|
||||||
def _remove_pk_from_kwargs(self, new_kwargs: dict) -> dict:
|
def _remove_pk_from_kwargs(self, new_kwargs: dict) -> dict:
|
||||||
pkname = self.model_meta.pkname
|
pkname = self.model_meta.pkname
|
||||||
pk = self.model_meta.model_fields[pkname]
|
pk = self.model_meta.model_fields[pkname]
|
||||||
@ -207,7 +191,7 @@ class QuerySet:
|
|||||||
async def update(self, each: bool = False, **kwargs: Any) -> int:
|
async def update(self, each: bool = False, **kwargs: Any) -> int:
|
||||||
self_fields = self.model.extract_db_own_fields()
|
self_fields = self.model.extract_db_own_fields()
|
||||||
updates = {k: v for k, v in kwargs.items() if k in self_fields}
|
updates = {k: v for k, v in kwargs.items() if k in self_fields}
|
||||||
updates = self._translate_columns_to_aliases(updates)
|
updates = self.model.translate_columns_to_aliases(updates)
|
||||||
if not each and not self.filter_clauses:
|
if not each and not self.filter_clauses:
|
||||||
raise QueryDefinitionError(
|
raise QueryDefinitionError(
|
||||||
"You cannot update without filtering the queryset first. "
|
"You cannot update without filtering the queryset first. "
|
||||||
@ -353,7 +337,7 @@ class QuerySet:
|
|||||||
f"{self.model.__name__} has to have {pk_name} filled."
|
f"{self.model.__name__} has to have {pk_name} filled."
|
||||||
)
|
)
|
||||||
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
||||||
new_kwargs = self._translate_columns_to_aliases(new_kwargs)
|
new_kwargs = self.model.translate_columns_to_aliases(new_kwargs)
|
||||||
new_kwargs = {"new_" + k: v for k, v in new_kwargs.items() if k in columns}
|
new_kwargs = {"new_" + k: v for k, v in new_kwargs.items() if k in columns}
|
||||||
ready_objects.append(new_kwargs)
|
ready_objects.append(new_kwargs)
|
||||||
|
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
@ -6,7 +7,6 @@ import databases
|
|||||||
import pydantic
|
import pydantic
|
||||||
import pytest
|
import pytest
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
import uuid
|
|
||||||
|
|
||||||
import ormar
|
import ormar
|
||||||
from ormar.exceptions import QueryDefinitionError, NoMatch
|
from ormar.exceptions import QueryDefinitionError, NoMatch
|
||||||
@ -437,3 +437,4 @@ async def test_start_and_end_filters():
|
|||||||
|
|
||||||
users = await User.objects.filter(name__endswith="igo").all()
|
users = await User.objects.filter(name__endswith="igo").all()
|
||||||
assert len(users) == 2
|
assert len(users) == 2
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user