add examples to openapi schema, some perf improvements

This commit is contained in:
collerek
2021-04-30 16:46:41 +02:00
parent 734c33920b
commit 12c002776b
11 changed files with 194 additions and 78 deletions

View File

@ -1,5 +1,4 @@
import sys
import uuid
from typing import (
AbstractSet,
Any,
@ -87,6 +86,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
_choices_fields: Optional[Set]
_pydantic_fields: Set
_quick_access_fields: Set
_json_fields: Set
Meta: ModelMeta
# noinspection PyMissingConstructor
@ -124,60 +124,12 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:type kwargs: Any
"""
self._verify_model_can_be_initialized()
object.__setattr__(self, "_orm_id", uuid.uuid4().hex)
object.__setattr__(self, "_orm_saved", False)
object.__setattr__(self, "_pk_column", None)
object.__setattr__(
self,
"_orm",
RelationsManager(
related_fields=self.extract_related_fields(), owner=cast("Model", self),
),
)
self._initialize_internal_attributes()
pk_only = kwargs.pop("__pk_only__", False)
object.__setattr__(self, "__pk_only__", pk_only)
excluded: Set[str] = kwargs.pop("__excluded__", set())
if "pk" in kwargs:
kwargs[self.Meta.pkname] = kwargs.pop("pk")
# build the models to set them and validate but don't register
# also remove property fields values from validation
try:
new_kwargs: Dict[str, Any] = {
k: self._convert_json(
k,
self.Meta.model_fields[k].expand_relationship(
v, self, to_register=False,
)
if k in self.Meta.model_fields
else (
v
if k in self.__fields__
# some random key will raise KeyError
else self.__fields__["_Q*DHPQ(JAS*((JA)###*(&"]
),
"dumps",
)
for k, v in kwargs.items()
if k not in object.__getattribute__(self, "Meta").property_fields
}
except KeyError as e:
raise ModelError(
f"Unknown field '{e.args[0]}' for model {self.get_name(lower=False)}"
)
# explicitly set None to excluded fields
# as pydantic populates them with default if set
for field_to_nullify in excluded:
new_kwargs[field_to_nullify] = None
# extract through fields
through_tmp_dict = dict()
for field_name in self.extract_through_names():
through_tmp_dict[field_name] = new_kwargs.pop(field_name, None)
new_kwargs, through_tmp_dict = self._process_kwargs(kwargs)
values, fields_set, validation_error = pydantic.validate_model(
self, new_kwargs # type: ignore
@ -190,10 +142,10 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
# add back through fields
new_kwargs.update(through_tmp_dict)
model_fields = object.__getattribute__(self, "Meta").model_fields
# register the columns models after initialization
for related in self.extract_related_names().union(self.extract_through_names()):
self.Meta.model_fields[related].expand_relationship(
model_fields[related].expand_relationship(
new_kwargs.get(related), self, to_register=True,
)
@ -314,15 +266,93 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: None
:rtype: None
"""
if self.Meta.abstract:
if object.__getattribute__(self, "Meta").abstract:
raise ModelError(f"You cannot initialize abstract model {self.get_name()}")
if self.Meta.requires_ref_update:
if object.__getattribute__(self, "Meta").requires_ref_update:
raise ModelError(
f"Model {self.get_name()} has not updated "
f"ForwardRefs. \nBefore using the model you "
f"need to call update_forward_refs()."
)
def _process_kwargs(self, kwargs: Dict) -> Tuple[Dict, Dict]:
"""
Initializes nested models.
Removes property_fields
Checks if field is in the model fields or pydatnic fields.
Nullifies fields that should be excluded.
Extracts through models from kwargs into temporary dict.
:param kwargs: passed to init keyword arguments
:type kwargs: Dict
:return: modified kwargs
:rtype: Tuple[Dict, Dict]
"""
meta = object.__getattribute__(self, "Meta")
property_fields = meta.property_fields
model_fields = meta.model_fields
pydantic_fields = object.__getattribute__(self, "__fields__")
# remove property fields
for prop_filed in property_fields:
kwargs.pop(prop_filed, None)
excluded: Set[str] = kwargs.pop("__excluded__", set())
if "pk" in kwargs:
kwargs[meta.pkname] = kwargs.pop("pk")
# extract through fields
through_tmp_dict = dict()
for field_name in self.extract_through_names():
through_tmp_dict[field_name] = kwargs.pop(field_name, None)
try:
new_kwargs: Dict[str, Any] = {
k: self._convert_json(
k,
model_fields[k].expand_relationship(v, self, to_register=False,)
if k in model_fields
else (
v
if k in pydantic_fields
else model_fields["HAP&*YA^)*GW^&QT6567q56gGG%$%"]
),
"dumps",
)
for k, v in kwargs.items()
}
except KeyError as e:
raise ModelError(
f"Unknown field '{e.args[0]}' for model {self.get_name(lower=False)}"
)
# explicitly set None to excluded fields
# as pydantic populates them with default if set
for field_to_nullify in excluded:
new_kwargs[field_to_nullify] = None
return new_kwargs, through_tmp_dict
def _initialize_internal_attributes(self) -> None:
"""
Initializes internal attributes during __init__()
:rtype: None
"""
# object.__setattr__(self, "_orm_id", uuid.uuid4().hex)
object.__setattr__(self, "_orm_saved", False)
object.__setattr__(self, "_pk_column", None)
object.__setattr__(
self,
"_orm",
RelationsManager(
related_fields=self.extract_related_fields(), owner=cast("Model", self),
),
)
def _extract_related_model_instead_of_field(
self, item: str
) -> Optional[Union["Model", Sequence["Model"]]]:
@ -363,8 +393,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:rtype: bool
"""
return (
self._orm_id == other._orm_id
or (self.pk == other.pk and self.pk is not None)
# self._orm_id == other._orm_id
(self.pk == other.pk and self.pk is not None)
or (
(self.pk is None and other.pk is None)
and {
@ -748,7 +778,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: converted value if needed, else original value
:rtype: Any
"""
if not self._is_conversion_to_json_needed(column_name):
if column_name not in object.__getattribute__(self, "_json_fields"):
return value
condition = (
@ -765,20 +795,6 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
pass
return value.decode("utf-8") if isinstance(value, bytes) else value
def _is_conversion_to_json_needed(self, column_name: str) -> bool:
"""
Checks if given column name is related to JSON field.
:param column_name: name of the field
:type column_name: str
:return: result of the check
:rtype: bool
"""
return (
column_name in self.Meta.model_fields
and self.Meta.model_fields[column_name].__type__ == pydantic.Json
)
def _extract_own_model_fields(self) -> Dict:
"""
Returns a dictionary with field names and values for fields that are not