switch to decorator used to register property_fields and save it on Meta inner class to expose to cloned fastapi models
This commit is contained in:
@ -1,4 +1,3 @@
|
||||
import inspect
|
||||
import json
|
||||
import uuid
|
||||
from typing import (
|
||||
@ -68,14 +67,12 @@ class NewBaseModel(
|
||||
_orm_saved: bool
|
||||
_related_names: Optional[Set]
|
||||
_related_names_hash: str
|
||||
_props: Set
|
||||
_pydantic_fields: Set
|
||||
_quick_access_fields: Set
|
||||
Meta: ModelMeta
|
||||
|
||||
# noinspection PyMissingConstructor
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
caller_name = inspect.currentframe().f_back.f_code.co_name
|
||||
object.__setattr__(self, "_orm_id", uuid.uuid4().hex)
|
||||
object.__setattr__(self, "_orm_saved", False)
|
||||
object.__setattr__(
|
||||
@ -96,9 +93,14 @@ class NewBaseModel(
|
||||
|
||||
if "pk" in kwargs:
|
||||
kwargs[self.Meta.pkname] = kwargs.pop("pk")
|
||||
|
||||
# remove property fields values from validation
|
||||
kwargs = {
|
||||
k: v
|
||||
for k, v in kwargs.items()
|
||||
if k not in object.__getattribute__(self, "Meta").property_fields
|
||||
}
|
||||
# build the models to set them and validate but don't register
|
||||
if self.Meta.include_props_in_dict:
|
||||
kwargs = {k: v for k, v in kwargs.items() if k not in object.__getattribute__(self, '_props')}
|
||||
try:
|
||||
new_kwargs: Dict[str, Any] = {
|
||||
k: self._convert_json(
|
||||
@ -136,7 +138,7 @@ class NewBaseModel(
|
||||
)
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001
|
||||
if name in ("_orm_id", "_orm_saved", "_orm", "_related_names", "_props"):
|
||||
if name in object.__getattribute__(self, "_quick_access_fields"):
|
||||
object.__setattr__(self, name, value)
|
||||
elif name == "pk":
|
||||
object.__setattr__(self, self.Meta.pkname, value)
|
||||
@ -168,8 +170,9 @@ class NewBaseModel(
|
||||
return object.__getattribute__(
|
||||
self, "_extract_related_model_instead_of_field"
|
||||
)(item)
|
||||
if item in object.__getattribute__(self, "_props"):
|
||||
return object.__getattribute__(self, item)
|
||||
if item in object.__getattribute__(self, "Meta").property_fields:
|
||||
value = object.__getattribute__(self, item)
|
||||
return value() if callable(value) else value
|
||||
if item in object.__getattribute__(self, "_pydantic_fields"):
|
||||
value = object.__getattribute__(self, "__dict__").get(item, None)
|
||||
value = object.__getattribute__(self, "_convert_json")(item, value, "loads")
|
||||
@ -177,7 +180,7 @@ class NewBaseModel(
|
||||
return object.__getattribute__(self, item) # pragma: no cover
|
||||
|
||||
def _extract_related_model_instead_of_field(
|
||||
self, item: str
|
||||
self, item: str
|
||||
) -> Optional[Union["T", Sequence["T"]]]:
|
||||
if item in self._orm:
|
||||
return self._orm.get(item)
|
||||
@ -190,9 +193,9 @@ class NewBaseModel(
|
||||
|
||||
def __same__(self, other: "NewBaseModel") -> bool:
|
||||
return (
|
||||
self._orm_id == other._orm_id
|
||||
or (self.pk == other.pk and self.pk is not None)
|
||||
or self.dict() == other.dict()
|
||||
self._orm_id == other._orm_id
|
||||
or (self.pk == other.pk and self.pk is not None)
|
||||
or self.dict() == other.dict()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@ -226,10 +229,10 @@ class NewBaseModel(
|
||||
|
||||
@classmethod
|
||||
def get_properties(
|
||||
cls, include: Union[Set, Dict, None], exclude: Union[Set, Dict, None]
|
||||
cls, include: Union[Set, Dict, None], exclude: Union[Set, Dict, None]
|
||||
) -> Set[str]:
|
||||
|
||||
props = cls._props
|
||||
props = cls.Meta.property_fields
|
||||
if include:
|
||||
props = {prop for prop in props if prop in include}
|
||||
if exclude:
|
||||
@ -237,7 +240,7 @@ class NewBaseModel(
|
||||
return props
|
||||
|
||||
def _get_related_not_excluded_fields(
|
||||
self, include: Optional[Dict], exclude: Optional[Dict],
|
||||
self, include: Optional[Dict], exclude: Optional[Dict],
|
||||
) -> List:
|
||||
fields = [field for field in self.extract_related_names()]
|
||||
if include:
|
||||
@ -252,15 +255,15 @@ class NewBaseModel(
|
||||
|
||||
@staticmethod
|
||||
def _extract_nested_models_from_list(
|
||||
models: MutableSequence,
|
||||
include: Union[Set, Dict, None],
|
||||
exclude: Union[Set, Dict, None],
|
||||
models: MutableSequence,
|
||||
include: Union[Set, Dict, None],
|
||||
exclude: Union[Set, Dict, None],
|
||||
) -> List:
|
||||
result = []
|
||||
for model in models:
|
||||
try:
|
||||
result.append(
|
||||
model.dict(nested=True, include=include, exclude=exclude, )
|
||||
model.dict(nested=True, include=include, exclude=exclude,)
|
||||
)
|
||||
except ReferenceError: # pragma no cover
|
||||
continue
|
||||
@ -268,17 +271,17 @@ class NewBaseModel(
|
||||
|
||||
@staticmethod
|
||||
def _skip_ellipsis(
|
||||
items: Union[Set, Dict, None], key: str
|
||||
items: Union[Set, Dict, None], key: str
|
||||
) -> Union[Set, Dict, None]:
|
||||
result = Excludable.get_child(items, key)
|
||||
return result if result is not Ellipsis else None
|
||||
|
||||
def _extract_nested_models( # noqa: CCR001
|
||||
self,
|
||||
nested: bool,
|
||||
dict_instance: Dict,
|
||||
include: Optional[Dict],
|
||||
exclude: Optional[Dict],
|
||||
self,
|
||||
nested: bool,
|
||||
dict_instance: Dict,
|
||||
include: Optional[Dict],
|
||||
exclude: Optional[Dict],
|
||||
) -> Dict:
|
||||
|
||||
fields = self._get_related_not_excluded_fields(include=include, exclude=exclude)
|
||||
@ -304,16 +307,16 @@ class NewBaseModel(
|
||||
return dict_instance
|
||||
|
||||
def dict( # type: ignore # noqa A003
|
||||
self,
|
||||
*,
|
||||
include: Union[Set, Dict] = None,
|
||||
exclude: Union[Set, Dict] = None,
|
||||
by_alias: bool = False,
|
||||
skip_defaults: bool = None,
|
||||
exclude_unset: bool = False,
|
||||
exclude_defaults: bool = False,
|
||||
exclude_none: bool = False,
|
||||
nested: bool = False,
|
||||
self,
|
||||
*,
|
||||
include: Union[Set, Dict] = None,
|
||||
exclude: Union[Set, Dict] = None,
|
||||
by_alias: bool = False,
|
||||
skip_defaults: bool = None,
|
||||
exclude_unset: bool = False,
|
||||
exclude_defaults: bool = False,
|
||||
exclude_none: bool = False,
|
||||
nested: bool = False,
|
||||
) -> "DictStrAny": # noqa: A003'
|
||||
# callable_name = inspect.currentframe().f_back.f_code.co_name
|
||||
# print('dict', callable_name)
|
||||
@ -339,8 +342,8 @@ class NewBaseModel(
|
||||
exclude=exclude, # type: ignore
|
||||
)
|
||||
|
||||
# include model properties as fields
|
||||
if self.Meta.include_props_in_dict:
|
||||
# include model properties as fields in dict
|
||||
if object.__getattribute__(self, "Meta").property_fields:
|
||||
props = self.get_properties(include=include, exclude=exclude)
|
||||
if props:
|
||||
dict_instance.update({prop: getattr(self, prop) for prop in props})
|
||||
@ -372,6 +375,6 @@ class NewBaseModel(
|
||||
|
||||
def _is_conversion_to_json_needed(self, column_name: str) -> bool:
|
||||
return (
|
||||
column_name in self.Meta.model_fields
|
||||
and self.Meta.model_fields[column_name].__type__ == pydantic.Json
|
||||
column_name in self.Meta.model_fields
|
||||
and self.Meta.model_fields[column_name].__type__ == pydantic.Json
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user