refactor into descriptors, cleanup, docs update
This commit is contained in:
@ -10,7 +10,6 @@ from typing import (
|
||||
Mapping,
|
||||
MutableSequence,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Tuple,
|
||||
@ -39,7 +38,6 @@ from ormar.models.helpers.sqlalchemy import (
|
||||
populate_meta_sqlalchemy_table_if_required,
|
||||
update_column_definition,
|
||||
)
|
||||
from ormar.models.helpers.validation import validate_choices
|
||||
from ormar.models.metaclass import ModelMeta, ModelMetaclass
|
||||
from ormar.models.modelproxy import ModelTableProxy
|
||||
from ormar.queryset.utils import translate_list_to_dict
|
||||
@ -89,6 +87,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
_pydantic_fields: Set
|
||||
_quick_access_fields: Set
|
||||
_json_fields: Set
|
||||
_bytes_fields: Set
|
||||
Meta: ModelMeta
|
||||
|
||||
# noinspection PyMissingConstructor
|
||||
@ -157,23 +156,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001
|
||||
"""
|
||||
Overwrites setattr in object to allow for special behaviour of certain params.
|
||||
|
||||
Parameter "pk" is translated into actual primary key field name.
|
||||
|
||||
Relations are expanded (child model constructed if needed) and registered on
|
||||
both ends of the relation. The related models are handled by RelationshipManager
|
||||
exposed at _orm param.
|
||||
|
||||
Json fields converted if needed.
|
||||
|
||||
Setting pk, foreign key value or any other field value sets Model save status
|
||||
to False. Setting a reverse relation or many to many relation does not as it
|
||||
does not modify the state of the model (but related model or through model).
|
||||
|
||||
To short circuit all checks and expansions the set of attribute names present
|
||||
on each model is gathered into _quick_access_fields that is looked first and
|
||||
if field is in this set the object setattr is called directly.
|
||||
Overwrites setattr in pydantic parent as otherwise descriptors are not called.
|
||||
|
||||
:param name: name of the attribute to set
|
||||
:type name: str
|
||||
@ -187,89 +170,30 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
else:
|
||||
# let pydantic handle errors for unknown fields
|
||||
super().__setattr__(name, value)
|
||||
# if name in object.__getattribute__(self, "_quick_access_fields"):
|
||||
# object.__setattr__(self, name, value)
|
||||
# elif name == "pk":
|
||||
# object.__setattr__(self, self.Meta.pkname, value)
|
||||
# object.__getattribute__(self, "set_save_status")(False)
|
||||
# elif name in object.__getattribute__(self, "_orm"):
|
||||
# model = (
|
||||
# object.__getattribute__(self, "Meta")
|
||||
# .model_fields[name]
|
||||
# .expand_relationship(value=value, child=self)
|
||||
# )
|
||||
# if isinstance(object.__getattribute__(self, "__dict__").get(name), list):
|
||||
# # virtual foreign key or many to many
|
||||
# # TODO: Fix double items in dict, no effect on real action ugly repr
|
||||
# # if model.pk not in [x.pk for x in related_list]:
|
||||
# object.__getattribute__(self, "__dict__")[name].append(model)
|
||||
# else:
|
||||
# # foreign key relation
|
||||
# object.__getattribute__(self, "__dict__")[name] = model
|
||||
# object.__getattribute__(self, "set_save_status")(False)
|
||||
# else:
|
||||
# if name in object.__getattribute__(self, "_choices_fields"):
|
||||
# validate_choices(field=self.Meta.model_fields[name], value=value)
|
||||
# value = object.__getattribute__(self, '_convert_bytes')(name, value, op="write")
|
||||
# value = object.__getattribute__(self, '_convert_json')(name, value, op="dumps")
|
||||
# super().__setattr__(name, value)
|
||||
# object.__getattribute__(self, "set_save_status")(False)
|
||||
|
||||
def _internal_set(self, name, value):
|
||||
def __getattr__(self, item: str) -> Any:
|
||||
"""
|
||||
Used only to silence mypy errors for Through models and reverse relations.
|
||||
Not used in real life as in practice calls are intercepted
|
||||
by RelationDescriptors
|
||||
|
||||
:param item: name of attribute
|
||||
:type item: str
|
||||
:return: Any
|
||||
:rtype: Any
|
||||
"""
|
||||
return super().__getattribute__(item)
|
||||
|
||||
def _internal_set(self, name: str, value: Any) -> None:
|
||||
"""
|
||||
Delegates call to pydantic.
|
||||
|
||||
:param name: name of param
|
||||
:type name: str
|
||||
:param value: value to set
|
||||
:type value: Any
|
||||
"""
|
||||
super().__setattr__(name, value)
|
||||
# def __getattribute__(self, item: str) -> Any: # noqa: CCR001
|
||||
# """
|
||||
# Because we need to overwrite getting the attribute by ormar instead of pydantic
|
||||
# as well as returning related models and not the value stored on the model the
|
||||
# __getattribute__ needs to be used not __getattr__.
|
||||
#
|
||||
# It's used to access all attributes so it can be a big overhead that's why a
|
||||
# number of short circuits is used.
|
||||
#
|
||||
# To short circuit all checks and expansions the set of attribute names present
|
||||
# on each model is gathered into _quick_access_fields that is looked first and
|
||||
# if field is in this set the object setattr is called directly.
|
||||
#
|
||||
# To avoid recursion object's getattribute is used to actually get the attribute
|
||||
# value from the model after the checks.
|
||||
#
|
||||
# Even the function calls are constructed with objects functions.
|
||||
#
|
||||
# Parameter "pk" is translated into actual primary key field name.
|
||||
#
|
||||
# Relations are returned so the actual related model is returned and not current
|
||||
# model's field. The related models are handled by RelationshipManager exposed
|
||||
# at _orm param.
|
||||
#
|
||||
# Json fields are converted if needed.
|
||||
#
|
||||
# :param item: name of the attribute to retrieve
|
||||
# :type item: str
|
||||
# :return: value of the attribute
|
||||
# :rtype: Any
|
||||
# """
|
||||
# if item in object.__getattribute__(self, "_quick_access_fields"):
|
||||
# return object.__getattribute__(self, item)
|
||||
# # if item == "pk":
|
||||
# # return object.__getattribute__(self, "__dict__").get(self.Meta.pkname, None)
|
||||
# # if item in object.__getattribute__(self, "extract_related_names")():
|
||||
# # return object.__getattribute__(
|
||||
# # self, "_extract_related_model_instead_of_field"
|
||||
# # )(item)
|
||||
# # if item in object.__getattribute__(self, "extract_through_names")():
|
||||
# # return object.__getattribute__(
|
||||
# # self, "_extract_related_model_instead_of_field"
|
||||
# # )(item)
|
||||
# # if item in object.__getattribute__(self, "Meta").property_fields:
|
||||
# # value = object.__getattribute__(self, item)
|
||||
# # return value() if callable(value) else value
|
||||
# # if item in object.__getattribute__(self, "_pydantic_fields"):
|
||||
# # value = object.__getattribute__(self, "__dict__").get(item, None)
|
||||
# # value = object.__getattribute__(self, "_convert_json")(item, value, "loads")
|
||||
# # value = object.__getattribute__(self, "_convert_bytes")(item, value, "read")
|
||||
# # return value
|
||||
#
|
||||
# return object.__getattribute__(self, item) # pragma: no cover
|
||||
|
||||
def _verify_model_can_be_initialized(self) -> None:
|
||||
"""
|
||||
@ -278,9 +202,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
if object.__getattribute__(self, "Meta").abstract:
|
||||
if self.Meta.abstract:
|
||||
raise ModelError(f"You cannot initialize abstract model {self.get_name()}")
|
||||
if object.__getattribute__(self, "Meta").requires_ref_update:
|
||||
if self.Meta.requires_ref_update:
|
||||
raise ModelError(
|
||||
f"Model {self.get_name()} has not updated "
|
||||
f"ForwardRefs. \nBefore using the model you "
|
||||
@ -304,11 +228,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:return: modified kwargs
|
||||
:rtype: Tuple[Dict, Dict]
|
||||
"""
|
||||
meta = object.__getattribute__(self, "Meta")
|
||||
property_fields = meta.property_fields
|
||||
model_fields = meta.model_fields
|
||||
pydantic_fields = object.__getattribute__(self, "__fields__")
|
||||
bytes_fields = object.__getattribute__(self, '_bytes_fields')
|
||||
property_fields = self.Meta.property_fields
|
||||
model_fields = self.Meta.model_fields
|
||||
pydantic_fields = set(self.__fields__.keys())
|
||||
|
||||
# remove property fields
|
||||
for prop_filed in property_fields:
|
||||
@ -316,7 +238,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
excluded: Set[str] = kwargs.pop("__excluded__", set())
|
||||
if "pk" in kwargs:
|
||||
kwargs[meta.pkname] = kwargs.pop("pk")
|
||||
kwargs[self.Meta.pkname] = kwargs.pop("pk")
|
||||
|
||||
# extract through fields
|
||||
through_tmp_dict = dict()
|
||||
@ -325,12 +247,14 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
try:
|
||||
new_kwargs: Dict[str, Any] = {
|
||||
k: self._convert_json(
|
||||
k: self._convert_to_bytes(
|
||||
k,
|
||||
model_fields[k].expand_relationship(v, self, to_register=False,)
|
||||
if k in model_fields
|
||||
else (v if k in pydantic_fields else model_fields[k]),
|
||||
"dumps",
|
||||
self._convert_json(
|
||||
k,
|
||||
model_fields[k].expand_relationship(v, self, to_register=False,)
|
||||
if k in model_fields
|
||||
else (v if k in pydantic_fields else model_fields[k]),
|
||||
),
|
||||
)
|
||||
for k, v in kwargs.items()
|
||||
}
|
||||
@ -362,21 +286,6 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
),
|
||||
)
|
||||
|
||||
def _extract_related_model_instead_of_field(
|
||||
self, item: str
|
||||
) -> Optional[Union["Model", Sequence["Model"]]]:
|
||||
"""
|
||||
Retrieves the related model/models from RelationshipManager.
|
||||
|
||||
:param item: name of the relation
|
||||
:type item: str
|
||||
:return: related model, list of related models or None
|
||||
:rtype: Optional[Union[Model, List[Model]]]
|
||||
"""
|
||||
if item in self._orm:
|
||||
return self._orm.get(item) # type: ignore
|
||||
return None # pragma no cover
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
"""
|
||||
Compares other model to this model. when == is called.
|
||||
@ -758,6 +667,11 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
exclude_none=exclude_none,
|
||||
)
|
||||
|
||||
dict_instance = {
|
||||
k: self._convert_bytes_to_str(column_name=k, value=v)
|
||||
for k, v in dict_instance.items()
|
||||
}
|
||||
|
||||
if include and isinstance(include, Set):
|
||||
include = translate_list_to_dict(include)
|
||||
if exclude and isinstance(exclude, Set):
|
||||
@ -844,40 +758,46 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
setattr(self, key, value)
|
||||
return self
|
||||
|
||||
def _convert_bytes(self, column_name: str, value: Any, op: str) -> Union[str, Dict]:
|
||||
def _convert_to_bytes(self, column_name: str, value: Any) -> Union[str, Dict]:
|
||||
"""
|
||||
Converts value to/from json if needed (for Json columns).
|
||||
Converts value to bytes from string
|
||||
|
||||
:param column_name: name of the field
|
||||
:type column_name: str
|
||||
:param value: value fo the field
|
||||
:type value: Any
|
||||
:param op: operator on json
|
||||
:type op: str
|
||||
:return: converted value if needed, else original value
|
||||
:rtype: Any
|
||||
"""
|
||||
if column_name not in object.__getattribute__(self, "_bytes_fields"):
|
||||
if column_name not in self._bytes_fields:
|
||||
return value
|
||||
field = self.Meta.model_fields[column_name]
|
||||
condition = (
|
||||
isinstance(value, bytes) if op == "read" else not isinstance(value, bytes)
|
||||
)
|
||||
if op == "read" and condition:
|
||||
if field.use_base64:
|
||||
value = base64.b64encode(value)
|
||||
elif field.represent_as_base64_str:
|
||||
value = base64.b64encode(value).decode()
|
||||
else:
|
||||
value = value.decode("utf-8")
|
||||
elif condition:
|
||||
if field.use_base64 or field.represent_as_base64_str:
|
||||
if not isinstance(value, bytes):
|
||||
if field.represent_as_base64_str:
|
||||
value = base64.b64decode(value)
|
||||
else:
|
||||
value = value.encode("utf-8")
|
||||
return value
|
||||
|
||||
def _convert_json(self, column_name: str, value: Any, op: str) -> Union[str, Dict]:
|
||||
def _convert_bytes_to_str(self, column_name: str, value: Any) -> Union[str, Dict]:
|
||||
"""
|
||||
Converts value to str from bytes for represent_as_base64_str columns.
|
||||
|
||||
:param column_name: name of the field
|
||||
:type column_name: str
|
||||
:param value: value fo the field
|
||||
:type value: Any
|
||||
:return: converted value if needed, else original value
|
||||
:rtype: Any
|
||||
"""
|
||||
if column_name not in self._bytes_fields:
|
||||
return value
|
||||
field = self.Meta.model_fields[column_name]
|
||||
if not isinstance(value, str) and field.represent_as_base64_str:
|
||||
return base64.b64encode(value).decode()
|
||||
return value
|
||||
|
||||
def _convert_json(self, column_name: str, value: Any) -> Union[str, Dict]:
|
||||
"""
|
||||
Converts value to/from json if needed (for Json columns).
|
||||
|
||||
@ -885,24 +805,14 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:type column_name: str
|
||||
:param value: value fo the field
|
||||
:type value: Any
|
||||
:param op: operator on json
|
||||
:type op: str
|
||||
:return: converted value if needed, else original value
|
||||
:rtype: Any
|
||||
"""
|
||||
if column_name not in object.__getattribute__(self, "_json_fields"):
|
||||
if column_name not in self._json_fields:
|
||||
return value
|
||||
|
||||
condition = (
|
||||
isinstance(value, str) if op == "loads" else not isinstance(value, str)
|
||||
)
|
||||
operand: Callable[[Any], Any] = (
|
||||
json.loads if op == "loads" else json.dumps # type: ignore
|
||||
)
|
||||
|
||||
if condition:
|
||||
if not isinstance(value, str):
|
||||
try:
|
||||
value = operand(value)
|
||||
value = json.dumps(value)
|
||||
except TypeError: # pragma no cover
|
||||
pass
|
||||
return value.decode("utf-8") if isinstance(value, bytes) else value
|
||||
|
||||
Reference in New Issue
Block a user