add get_pydantic and basic tests

This commit is contained in:
collerek
2021-05-23 16:44:24 +02:00
parent 40f1076443
commit f93ab413de
13 changed files with 266 additions and 22 deletions

View File

@ -341,11 +341,11 @@ Calls the pydantic method to evaluate pydantic fields.
`(None)`: None `(None)`: None
<a name="models.newbasemodel.NewBaseModel._get_related_not_excluded_fields"></a> <a name="models.newbasemodel.NewBaseModel._get_not_excluded_fields"></a>
#### \_get\_related\_not\_excluded\_fields #### \_get\_related\_not\_excluded\_fields
```python ```python
| _get_related_not_excluded_fields(include: Optional[Dict], exclude: Optional[Dict]) -> List | _get_not_excluded_fields(include: Optional[Dict], exclude: Optional[Dict]) -> List
``` ```
Returns related field names applying on them include and exclude set. Returns related field names applying on them include and exclude set.

View File

@ -131,7 +131,7 @@ def generate_model_example(model: Type["Model"], relation_map: Dict = None) -> D
:type model: Type["Model"] :type model: Type["Model"]
:param relation_map: dict with relations to follow :param relation_map: dict with relations to follow
:type relation_map: Optional[Dict] :type relation_map: Optional[Dict]
:return: :return: dict with example values
:rtype: Dict[str, int] :rtype: Dict[str, int]
""" """
example: Dict[str, Any] = dict() example: Dict[str, Any] = dict()
@ -141,6 +141,31 @@ def generate_model_example(model: Type["Model"], relation_map: Dict = None) -> D
else translate_list_to_dict(model._iterate_related_models()) else translate_list_to_dict(model._iterate_related_models())
) )
for name, field in model.Meta.model_fields.items(): for name, field in model.Meta.model_fields.items():
populates_sample_fields_values(
example=example, name=name, field=field, relation_map=relation_map
)
to_exclude = {name for name in model.Meta.model_fields}
pydantic_repr = generate_pydantic_example(pydantic_model=model, exclude=to_exclude)
example.update(pydantic_repr)
return example
def populates_sample_fields_values(
example: Dict[str, Any], name: str, field: BaseField, relation_map: Dict = None
) -> None:
"""
Iterates the field and sets fields to sample values
:param field: ormar field
:type field: BaseField
:param name: name of the field
:type name: str
:param example: example dict
:type example: Dict[str, Any]
:param relation_map: dict with relations to follow
:type relation_map: Optional[Dict]
"""
if not field.is_relation: if not field.is_relation:
is_bytes_str = field.__type__ == bytes and field.represent_as_base64_str is_bytes_str = field.__type__ == bytes and field.represent_as_base64_str
example[name] = field.__sample__ if not is_bytes_str else "string" example[name] = field.__sample__ if not is_bytes_str else "string"
@ -148,11 +173,6 @@ def generate_model_example(model: Type["Model"], relation_map: Dict = None) -> D
example[name] = get_nested_model_example( example[name] = get_nested_model_example(
name=name, field=field, relation_map=relation_map name=name, field=field, relation_map=relation_map
) )
to_exclude = {name for name in model.Meta.model_fields}
pydantic_repr = generate_pydantic_example(pydantic_model=model, exclude=to_exclude)
example.update(pydantic_repr)
return example
def get_nested_model_example( def get_nested_model_example(

View File

@ -8,6 +8,7 @@ from ormar.models.mixins.alias_mixin import AliasMixin
from ormar.models.mixins.excludable_mixin import ExcludableMixin from ormar.models.mixins.excludable_mixin import ExcludableMixin
from ormar.models.mixins.merge_mixin import MergeModelMixin from ormar.models.mixins.merge_mixin import MergeModelMixin
from ormar.models.mixins.prefetch_mixin import PrefetchQueryMixin from ormar.models.mixins.prefetch_mixin import PrefetchQueryMixin
from ormar.models.mixins.pydantic_mixin import PydanticMixin
from ormar.models.mixins.save_mixin import SavePrepareMixin from ormar.models.mixins.save_mixin import SavePrepareMixin
__all__ = [ __all__ = [
@ -16,4 +17,5 @@ __all__ = [
"PrefetchQueryMixin", "PrefetchQueryMixin",
"SavePrepareMixin", "SavePrepareMixin",
"ExcludableMixin", "ExcludableMixin",
"PydanticMixin",
] ]

View File

@ -0,0 +1,95 @@
from typing import Any, Callable, Dict, List, Set, TYPE_CHECKING, Type, Union, cast
import pydantic
from pydantic.fields import ModelField
from ormar.models.mixins.relation_mixin import RelationMixin # noqa: I100, I202
from ormar.queryset.utils import translate_list_to_dict
class PydanticMixin(RelationMixin):
if TYPE_CHECKING: # pragma: no cover
__fields__: Dict[str, ModelField]
_skip_ellipsis: Callable
_get_not_excluded_fields: Callable
@classmethod
def get_pydantic(
cls, *, include: Union[Set, Dict] = None, exclude: Union[Set, Dict] = None,
) -> Type[pydantic.BaseModel]:
"""
Returns a pydantic model out of ormar model.
Converts also nested ormar models into pydantic models.
Can be used to fully exclude certain fields in fastapi response and requests.
:param include: fields of own and nested models to include
:type include: Union[Set, Dict, None]
:param exclude: fields of own and nested models to exclude
:type exclude: Union[Set, Dict, None]
"""
relation_map = translate_list_to_dict(cls._iterate_related_models())
return cls._convert_ormar_to_pydantic(
include=include, exclude=exclude, relation_map=relation_map
)
@classmethod
def _convert_ormar_to_pydantic(
cls,
relation_map: Dict[str, Any],
include: Union[Set, Dict] = None,
exclude: Union[Set, Dict] = None,
) -> Type[pydantic.BaseModel]:
if include and isinstance(include, Set):
include = translate_list_to_dict(include)
if exclude and isinstance(exclude, Set):
exclude = translate_list_to_dict(exclude)
fields_dict: Dict[str, Any] = dict()
defaults: Dict[str, Any] = dict()
fields_to_process = cls._get_not_excluded_fields(
fields={*cls.Meta.model_fields.keys()}, include=include, exclude=exclude
)
for name in fields_to_process:
field = cls._determine_pydantic_field_type(
name=name,
defaults=defaults,
include=include,
exclude=exclude,
relation_map=relation_map,
)
if field is not None:
fields_dict[name] = field
model = type(
cls.__name__,
(pydantic.BaseModel,),
{"__annotations__": fields_dict, **defaults},
)
return cast(Type[pydantic.BaseModel], model)
@classmethod
def _determine_pydantic_field_type(
cls,
name: str,
defaults: Dict,
include: Union[Set, Dict, None],
exclude: Union[Set, Dict, None],
relation_map: Dict[str, Any],
) -> Any:
field = cls.Meta.model_fields[name]
if field.is_relation and name in relation_map: # type: ignore
target = field.to._convert_ormar_to_pydantic(
include=cls._skip_ellipsis(include, name),
exclude=cls._skip_ellipsis(exclude, name),
relation_map=cls._skip_ellipsis(
relation_map, field, default_return=dict()
),
)
if field.is_multi or field.virtual:
return List[target] # type: ignore
return target
elif not field.is_relation:
defaults[name] = cls.__fields__[name].field_info
return field.__type__
return None

View File

@ -2,12 +2,17 @@ from ormar.models.mixins import (
ExcludableMixin, ExcludableMixin,
MergeModelMixin, MergeModelMixin,
PrefetchQueryMixin, PrefetchQueryMixin,
PydanticMixin,
SavePrepareMixin, SavePrepareMixin,
) )
class ModelTableProxy( class ModelTableProxy(
PrefetchQueryMixin, MergeModelMixin, SavePrepareMixin, ExcludableMixin PrefetchQueryMixin,
MergeModelMixin,
SavePrepareMixin,
ExcludableMixin,
PydanticMixin,
): ):
""" """
Used to combine all mixins with different set of functionalities. Used to combine all mixins with different set of functionalities.

View File

@ -454,8 +454,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
super().update_forward_refs(**localns) super().update_forward_refs(**localns)
cls.Meta.requires_ref_update = False cls.Meta.requires_ref_update = False
def _get_related_not_excluded_fields( @staticmethod
self, include: Optional[Dict], exclude: Optional[Dict], def _get_not_excluded_fields(
fields: Union[List, Set], include: Optional[Dict], exclude: Optional[Dict],
) -> List: ) -> List:
""" """
Returns related field names applying on them include and exclude set. Returns related field names applying on them include and exclude set.
@ -467,7 +468,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: :return:
:rtype: List of fields with relations that is not excluded :rtype: List of fields with relations that is not excluded
""" """
fields = [field for field in self.extract_related_names()] fields = [*fields] if not isinstance(fields, list) else fields
if include: if include:
fields = [field for field in fields if field in include] fields = [field for field in fields if field in include]
if exclude: if exclude:
@ -519,8 +520,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
continue continue
return result return result
@classmethod
def _skip_ellipsis( def _skip_ellipsis(
self, items: Union[Set, Dict, None], key: str, default_return: Any = None cls, items: Union[Set, Dict, None], key: str, default_return: Any = None
) -> Union[Set, Dict, None]: ) -> Union[Set, Dict, None]:
""" """
Helper to traverse the include/exclude dictionaries. Helper to traverse the include/exclude dictionaries.
@ -534,10 +536,11 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: nested value of the items :return: nested value of the items
:rtype: Union[Set, Dict, None] :rtype: Union[Set, Dict, None]
""" """
result = self.get_child(items, key) result = cls.get_child(items, key)
return result if result is not Ellipsis else default_return return result if result is not Ellipsis else default_return
def _convert_all(self, items: Union[Set, Dict, None]) -> Union[Set, Dict, None]: @staticmethod
def _convert_all(items: Union[Set, Dict, None]) -> Union[Set, Dict, None]:
""" """
Helper to convert __all__ pydantic special index to ormar which does not Helper to convert __all__ pydantic special index to ormar which does not
support index based exclusions. support index based exclusions.
@ -573,8 +576,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: current model dict with child models converted to dictionaries :return: current model dict with child models converted to dictionaries
:rtype: Dict :rtype: Dict
""" """
fields = self._get_not_excluded_fields(
fields = self._get_related_not_excluded_fields(include=include, exclude=exclude) fields=self.extract_related_names(), include=include, exclude=exclude
)
for field in fields: for field in fields:
if not relation_map or field not in relation_map: if not relation_map or field not in relation_map:

View File

@ -26,7 +26,7 @@ quick_access_set = {
"_extract_nested_models_from_list", "_extract_nested_models_from_list",
"_extract_own_model_fields", "_extract_own_model_fields",
"_extract_related_model_instead_of_field", "_extract_related_model_instead_of_field",
"_get_related_not_excluded_fields", "_get_not_excluded_fields",
"_get_value", "_get_value",
"_init_private_attributes", "_init_private_attributes",
"_is_conversion_to_json_needed", "_is_conversion_to_json_needed",

View File

@ -7,7 +7,7 @@ from fastapi import FastAPI
from starlette.testclient import TestClient from starlette.testclient import TestClient
from tests.settings import DATABASE_URL from tests.settings import DATABASE_URL
from tests.test_inheritance.test_inheritance_concrete import ( # type: ignore from tests.test_inheritance_and_pydantic_generation.test_inheritance_concrete import ( # type: ignore
Category, Category,
Subject, Subject,
Person, Person,

View File

@ -6,7 +6,7 @@ from fastapi import FastAPI
from starlette.testclient import TestClient from starlette.testclient import TestClient
from tests.settings import DATABASE_URL from tests.settings import DATABASE_URL
from tests.test_inheritance.test_inheritance_mixins import Category, Subject, metadata, db as database # type: ignore from tests.test_inheritance_and_pydantic_generation.test_inheritance_mixins import Category, Subject, metadata, db as database # type: ignore
app = FastAPI() app = FastAPI()
app.state.database = database app.state.database = database

View File

@ -0,0 +1,118 @@
from typing import List, Optional
import databases
import pydantic
import sqlalchemy
from pydantic import ConstrainedStr
import ormar
from tests.settings import DATABASE_URL
metadata = sqlalchemy.MetaData()
database = databases.Database(DATABASE_URL, force_rollback=True)
class Category(ormar.Model):
class Meta:
tablename = "categories"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class Item(ormar.Model):
class Meta:
tablename = "items"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100, default="test")
category: Optional[Category] = ormar.ForeignKey(Category, nullable=True)
def test_getting_pydantic_model():
PydanticCategory = Category.get_pydantic()
assert issubclass(PydanticCategory, pydantic.BaseModel)
assert {*PydanticCategory.__fields__.keys()} == {"items", "id", "name"}
assert not PydanticCategory.__fields__["id"].required
assert PydanticCategory.__fields__["id"].outer_type_ == int
assert PydanticCategory.__fields__["id"].default is None
assert PydanticCategory.__fields__["name"].required
assert issubclass(PydanticCategory.__fields__["name"].outer_type_, ConstrainedStr)
assert PydanticCategory.__fields__["name"].default is None
PydanticItem = PydanticCategory.__fields__["items"].type_
assert PydanticCategory.__fields__["items"].outer_type_ == List[PydanticItem]
assert issubclass(PydanticItem, pydantic.BaseModel)
assert not PydanticItem.__fields__["name"].required
assert PydanticItem.__fields__["name"].default == "test"
assert issubclass(PydanticItem.__fields__["name"].outer_type_, ConstrainedStr)
assert "category" not in PydanticItem.__fields__
def test_getting_pydantic_model_include():
PydanticCategory = Category.get_pydantic(include={"id", "name"})
assert len(PydanticCategory.__fields__) == 2
assert "items" not in PydanticCategory.__fields__
def test_getting_pydantic_model_nested_include_set():
PydanticCategory = Category.get_pydantic(include={"id", "items__id"})
assert len(PydanticCategory.__fields__) == 2
assert "name" not in PydanticCategory.__fields__
PydanticItem = PydanticCategory.__fields__["items"].type_
assert len(PydanticItem.__fields__) == 1
assert "id" in PydanticItem.__fields__
def test_getting_pydantic_model_nested_include_dict():
PydanticCategory = Category.get_pydantic(include={"id": ..., "items": {"id"}})
assert len(PydanticCategory.__fields__) == 2
assert "name" not in PydanticCategory.__fields__
PydanticItem = PydanticCategory.__fields__["items"].type_
assert len(PydanticItem.__fields__) == 1
assert "id" in PydanticItem.__fields__
def test_getting_pydantic_model_nested_include_nested_dict():
PydanticCategory = Category.get_pydantic(include={"id": ..., "items": {"id": ...}})
assert len(PydanticCategory.__fields__) == 2
assert "name" not in PydanticCategory.__fields__
PydanticItem = PydanticCategory.__fields__["items"].type_
assert len(PydanticItem.__fields__) == 1
assert "id" in PydanticItem.__fields__
def test_getting_pydantic_model_include_exclude():
PydanticCategory = Category.get_pydantic(
include={"id": ..., "items": {"id", "name"}}, exclude={"items__name"}
)
assert len(PydanticCategory.__fields__) == 2
assert "name" not in PydanticCategory.__fields__
PydanticItem = PydanticCategory.__fields__["items"].type_
assert len(PydanticItem.__fields__) == 1
assert "id" in PydanticItem.__fields__
def test_getting_pydantic_model_exclude():
PydanticItem = Item.get_pydantic(exclude={"category__name"})
assert len(PydanticItem.__fields__) == 3
assert "category" in PydanticItem.__fields__
PydanticCategory = PydanticItem.__fields__["category"].type_
assert len(PydanticCategory.__fields__) == 1
assert "name" not in PydanticCategory.__fields__
def test_getting_pydantic_model_exclude_dict():
PydanticItem = Item.get_pydantic(exclude={"id": ..., "category": {"name"}})
assert len(PydanticItem.__fields__) == 2
assert "category" in PydanticItem.__fields__
assert "id" not in PydanticItem.__fields__
PydanticCategory = PydanticItem.__fields__["category"].type_
assert len(PydanticCategory.__fields__) == 1
assert "name" not in PydanticCategory.__fields__