refactor and cleanup
This commit is contained in:
@ -1,4 +1,3 @@
|
||||
import inspect
|
||||
import json
|
||||
import uuid
|
||||
from typing import (
|
||||
@ -8,7 +7,6 @@ from typing import (
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
TypeVar,
|
||||
@ -22,8 +20,8 @@ from pydantic import BaseModel
|
||||
|
||||
import ormar # noqa I100
|
||||
from ormar.fields import BaseField
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.models.metaclass import ModelMeta, ModelMetaclass
|
||||
from ormar.models.modelproxy import ModelTableProxy
|
||||
from ormar.relations import RelationshipManager
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
@ -35,12 +33,8 @@ if TYPE_CHECKING: # pragma no cover
|
||||
MappingIntStrAny = Mapping[IntStr, Any]
|
||||
|
||||
|
||||
class FakePydantic(pydantic.BaseModel, metaclass=ModelMetaclass):
|
||||
# FakePydantic inherits from list in order to be treated as
|
||||
# request.Body parameter in fastapi routes,
|
||||
# inheriting from pydantic.BaseModel causes metaclass conflicts
|
||||
class FakePydantic(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass):
|
||||
__slots__ = ("_orm_id", "_orm_saved")
|
||||
__abstract__ = True
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
__model_fields__: Dict[str, TypeVar[BaseField]]
|
||||
@ -77,9 +71,6 @@ class FakePydantic(pydantic.BaseModel, metaclass=ModelMetaclass):
|
||||
object.__setattr__(self, "__dict__", values)
|
||||
object.__setattr__(self, "__fields_set__", fields_set)
|
||||
|
||||
# super().__init__(**kwargs)
|
||||
# self.values = self.__pydantic_model__(**kwargs)
|
||||
|
||||
def __del__(self) -> None:
|
||||
self.Meta._orm_relationship_manager.deregister(self)
|
||||
|
||||
@ -175,9 +166,7 @@ class FakePydantic(pydantic.BaseModel, metaclass=ModelMetaclass):
|
||||
|
||||
if self.Meta.model_fields[field].virtual and nested:
|
||||
continue
|
||||
if isinstance(nested_model, list) and not isinstance(
|
||||
nested_model, ormar.Model
|
||||
):
|
||||
if isinstance(nested_model, list):
|
||||
dict_instance[field] = [x.dict(nested=True) for x in nested_model]
|
||||
elif nested_model is not None:
|
||||
dict_instance[field] = nested_model.dict(nested=True)
|
||||
@ -206,70 +195,3 @@ class FakePydantic(pydantic.BaseModel, metaclass=ModelMetaclass):
|
||||
|
||||
def _is_conversion_to_json_needed(self, column_name: str) -> bool:
|
||||
return self.Meta.model_fields.get(column_name).__type__ == pydantic.Json
|
||||
|
||||
def _extract_own_model_fields(self) -> Dict:
|
||||
related_names = self._extract_related_names()
|
||||
self_fields = {k: v for k, v in self.dict().items() if k not in related_names}
|
||||
return self_fields
|
||||
|
||||
@classmethod
|
||||
def _extract_related_names(cls) -> Set:
|
||||
related_names = set()
|
||||
for name, field in cls.Meta.model_fields.items():
|
||||
if inspect.isclass(field) and issubclass(field, ForeignKeyField):
|
||||
related_names.add(name)
|
||||
return related_names
|
||||
|
||||
@classmethod
|
||||
def _exclude_related_names_not_required(cls, nested: bool = False) -> Set:
|
||||
if nested:
|
||||
return cls._extract_related_names()
|
||||
related_names = set()
|
||||
for name, field in cls.Meta.model_fields.items():
|
||||
if (
|
||||
inspect.isclass(field)
|
||||
and issubclass(field, ForeignKeyField)
|
||||
and field.nullable
|
||||
):
|
||||
related_names.add(name)
|
||||
return related_names
|
||||
|
||||
def _extract_model_db_fields(self) -> Dict:
|
||||
self_fields = self._extract_own_model_fields()
|
||||
self_fields = {
|
||||
k: v for k, v in self_fields.items() if k in self.Meta.table.columns
|
||||
}
|
||||
for field in self._extract_related_names():
|
||||
target_pk_name = self.Meta.model_fields[field].to.Meta.pkname
|
||||
if getattr(self, field) is not None:
|
||||
self_fields[field] = getattr(getattr(self, field), target_pk_name)
|
||||
return self_fields
|
||||
|
||||
@classmethod
|
||||
def merge_instances_list(cls, result_rows: List["Model"]) -> List["Model"]:
|
||||
merged_rows = []
|
||||
for index, model in enumerate(result_rows):
|
||||
if index > 0 and model.pk == result_rows[index - 1].pk:
|
||||
result_rows[-1] = cls.merge_two_instances(model, merged_rows[-1])
|
||||
else:
|
||||
merged_rows.append(model)
|
||||
return merged_rows
|
||||
|
||||
@classmethod
|
||||
def merge_two_instances(cls, one: "Model", other: "Model") -> "Model":
|
||||
for field in one.Meta.model_fields.keys():
|
||||
current_field = getattr(one, field)
|
||||
if isinstance(current_field, list) and not isinstance(
|
||||
current_field, ormar.Model
|
||||
):
|
||||
setattr(other, field, current_field + getattr(other, field))
|
||||
elif (
|
||||
isinstance(current_field, ormar.Model)
|
||||
and current_field.pk == getattr(other, field).pk
|
||||
):
|
||||
setattr(
|
||||
other,
|
||||
field,
|
||||
cls.merge_two_instances(current_field, getattr(other, field)),
|
||||
)
|
||||
return other
|
||||
|
||||
@ -161,5 +161,4 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
new_model.Meta._orm_relationship_manager = relationship_manager
|
||||
new_model.objects = QuerySet(new_model)
|
||||
|
||||
# breakpoint()
|
||||
return new_model
|
||||
|
||||
@ -9,8 +9,6 @@ from ormar.models import FakePydantic # noqa I100
|
||||
class Model(FakePydantic):
|
||||
__abstract__ = False
|
||||
|
||||
# objects = ormar.queryset.QuerySet()
|
||||
|
||||
@classmethod
|
||||
def from_row(
|
||||
cls,
|
||||
|
||||
98
ormar/models/modelproxy.py
Normal file
98
ormar/models/modelproxy.py
Normal file
@ -0,0 +1,98 @@
|
||||
import copy
|
||||
import inspect
|
||||
from typing import List, Set, TYPE_CHECKING
|
||||
|
||||
import ormar
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.models.metaclass import ModelMeta
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model
|
||||
|
||||
|
||||
class ModelTableProxy:
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
Meta: ModelMeta
|
||||
|
||||
def dict(): # noqa A003
|
||||
raise NotImplementedError # pragma no cover
|
||||
|
||||
def _extract_own_model_fields(self) -> dict:
|
||||
related_names = self._extract_related_names()
|
||||
self_fields = {k: v for k, v in self.dict().items() if k not in related_names}
|
||||
return self_fields
|
||||
|
||||
@classmethod
|
||||
def substitute_models_with_pks(cls, model_dict: dict) -> dict:
|
||||
model_dict = copy.deepcopy(model_dict)
|
||||
for field in cls._extract_related_names():
|
||||
if field in model_dict and model_dict.get(field) is not None:
|
||||
target_field = cls.Meta.model_fields[field]
|
||||
target_pkname = target_field.to.Meta.pkname
|
||||
if isinstance(model_dict.get(field), ormar.Model):
|
||||
model_dict[field] = getattr(model_dict.get(field), target_pkname)
|
||||
else:
|
||||
model_dict[field] = model_dict.get(field).get(target_pkname)
|
||||
return model_dict
|
||||
|
||||
@classmethod
|
||||
def _extract_related_names(cls) -> Set:
|
||||
related_names = set()
|
||||
for name, field in cls.Meta.model_fields.items():
|
||||
if inspect.isclass(field) and issubclass(field, ForeignKeyField):
|
||||
related_names.add(name)
|
||||
return related_names
|
||||
|
||||
@classmethod
|
||||
def _exclude_related_names_not_required(cls, nested: bool = False) -> Set:
|
||||
if nested:
|
||||
return cls._extract_related_names()
|
||||
related_names = set()
|
||||
for name, field in cls.Meta.model_fields.items():
|
||||
if (
|
||||
inspect.isclass(field)
|
||||
and issubclass(field, ForeignKeyField)
|
||||
and field.nullable
|
||||
):
|
||||
related_names.add(name)
|
||||
return related_names
|
||||
|
||||
def _extract_model_db_fields(self) -> dict:
|
||||
self_fields = self._extract_own_model_fields()
|
||||
self_fields = {
|
||||
k: v for k, v in self_fields.items() if k in self.Meta.table.columns
|
||||
}
|
||||
for field in self._extract_related_names():
|
||||
target_pk_name = self.Meta.model_fields[field].to.Meta.pkname
|
||||
if getattr(self, field) is not None:
|
||||
self_fields[field] = getattr(getattr(self, field), target_pk_name)
|
||||
return self_fields
|
||||
|
||||
@classmethod
|
||||
def merge_instances_list(cls, result_rows: List["Model"]) -> List["Model"]:
|
||||
merged_rows = []
|
||||
for index, model in enumerate(result_rows):
|
||||
if index > 0 and model.pk == result_rows[index - 1].pk:
|
||||
result_rows[-1] = cls.merge_two_instances(model, merged_rows[-1])
|
||||
else:
|
||||
merged_rows.append(model)
|
||||
return merged_rows
|
||||
|
||||
@classmethod
|
||||
def merge_two_instances(cls, one: "Model", other: "Model") -> "Model":
|
||||
for field in one.Meta.model_fields.keys():
|
||||
current_field = getattr(one, field)
|
||||
if isinstance(current_field, list) and not isinstance(
|
||||
current_field, ormar.Model
|
||||
):
|
||||
setattr(other, field, current_field + getattr(other, field))
|
||||
elif (
|
||||
isinstance(current_field, ormar.Model)
|
||||
and current_field.pk == getattr(other, field).pk
|
||||
):
|
||||
setattr(
|
||||
other,
|
||||
field,
|
||||
cls.merge_two_instances(current_field, getattr(other, field)),
|
||||
)
|
||||
return other
|
||||
Reference in New Issue
Block a user