work in progres pydantic_only and properties
This commit is contained in:
@ -12,13 +12,20 @@ from ormar.fields.base import BaseField # noqa I101
|
||||
|
||||
|
||||
def is_field_nullable(
|
||||
nullable: Optional[bool], default: Any, server_default: Any
|
||||
nullable: Optional[bool],
|
||||
default: Any,
|
||||
server_default: Any,
|
||||
pydantic_only: Optional[bool],
|
||||
) -> bool:
|
||||
if nullable is None:
|
||||
return default is not None or server_default is not None
|
||||
return default is not None or server_default is not None or pydantic_only
|
||||
return nullable
|
||||
|
||||
|
||||
def is_auto_primary_key(primary_key: bool, autoincrement: bool):
|
||||
return primary_key and autoincrement
|
||||
|
||||
|
||||
class ModelFieldFactory:
|
||||
_bases: Any = (BaseField,)
|
||||
_type: Any = None
|
||||
@ -29,19 +36,24 @@ class ModelFieldFactory:
|
||||
default = kwargs.pop("default", None)
|
||||
server_default = kwargs.pop("server_default", None)
|
||||
nullable = kwargs.pop("nullable", None)
|
||||
pydantic_only = kwargs.pop("pydantic_only", False)
|
||||
|
||||
primary_key = kwargs.pop("primary_key", False)
|
||||
autoincrement = kwargs.pop("autoincrement", False)
|
||||
|
||||
namespace = dict(
|
||||
__type__=cls._type,
|
||||
alias=kwargs.pop("name", None),
|
||||
name=None,
|
||||
primary_key=kwargs.pop("primary_key", False),
|
||||
primary_key=primary_key,
|
||||
default=default,
|
||||
server_default=server_default,
|
||||
nullable=is_field_nullable(nullable, default, server_default),
|
||||
nullable=is_field_nullable(nullable, default, server_default, pydantic_only)
|
||||
or is_auto_primary_key(primary_key, autoincrement),
|
||||
index=kwargs.pop("index", False),
|
||||
unique=kwargs.pop("unique", False),
|
||||
pydantic_only=kwargs.pop("pydantic_only", False),
|
||||
autoincrement=kwargs.pop("autoincrement", False),
|
||||
pydantic_only=pydantic_only,
|
||||
autoincrement=autoincrement,
|
||||
column_type=cls.get_column_type(**kwargs),
|
||||
choices=set(kwargs.pop("choices", [])),
|
||||
**kwargs
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import inspect
|
||||
import logging
|
||||
import warnings
|
||||
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
||||
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Tuple, Type, Union
|
||||
|
||||
import databases
|
||||
import pydantic
|
||||
@ -15,6 +16,7 @@ from ormar import ForeignKey, ModelDefinitionError, Integer # noqa I100
|
||||
from ormar.fields import BaseField
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.fields.many_to_many import ManyToMany, ManyToManyField
|
||||
from ormar.fields.model_fields import ModelFieldFactory
|
||||
from ormar.queryset import QuerySet
|
||||
from ormar.relations.alias_manager import AliasManager
|
||||
|
||||
@ -36,6 +38,8 @@ class ModelMeta:
|
||||
str, Union[Type[BaseField], Type[ForeignKeyField], Type[ManyToManyField]]
|
||||
]
|
||||
alias_manager: AliasManager
|
||||
include_props_in_dict: bool
|
||||
include_props_in_fields: bool
|
||||
|
||||
|
||||
def register_relation_on_build(table_name: str, field: Type[ForeignKeyField]) -> None:
|
||||
@ -121,6 +125,16 @@ def create_pydantic_field(
|
||||
)
|
||||
|
||||
|
||||
def get_pydantic_field(field_name: str, model: Type["Model"]) -> "ModelField":
|
||||
return ModelField(
|
||||
name=field_name,
|
||||
type_=model.Meta.model_fields[field_name].__type__,
|
||||
model_config=model.__config__,
|
||||
required=not model.Meta.model_fields[field_name].nullable,
|
||||
class_validators={},
|
||||
)
|
||||
|
||||
|
||||
def create_and_append_m2m_fk(
|
||||
model: Type["Model"], model_field: Type[ManyToManyField]
|
||||
) -> None:
|
||||
@ -295,16 +309,61 @@ def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, A
|
||||
return values
|
||||
|
||||
|
||||
def populate_choices_validators( # noqa CCR001
|
||||
model: Type["Model"], attrs: Dict
|
||||
) -> None:
|
||||
def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001
|
||||
if model_initialized_and_has_model_fields(model):
|
||||
for _, field in model.Meta.model_fields.items():
|
||||
if check_if_field_has_choices(field):
|
||||
validators = attrs.get("__pre_root_validators__", [])
|
||||
validators = getattr(model, "__pre_root_validators__", [])
|
||||
if choices_validator not in validators:
|
||||
validators.append(choices_validator)
|
||||
attrs["__pre_root_validators__"] = validators
|
||||
setattr(model, "__pre_root_validators__", validators)
|
||||
|
||||
|
||||
def populate_default_options_values(new_model: Type["Model"], model_fields: Dict):
|
||||
if not hasattr(new_model.Meta, "constraints"):
|
||||
new_model.Meta.constraints = []
|
||||
if not hasattr(new_model.Meta, "model_fields"):
|
||||
new_model.Meta.model_fields = model_fields
|
||||
|
||||
if not hasattr(new_model.Meta, "include_props_in_dict"):
|
||||
new_model.Meta.include_props_in_dict = True
|
||||
if not hasattr(new_model.Meta, "include_props_in_fields"):
|
||||
new_model.Meta.include_props_in_fields = False
|
||||
|
||||
|
||||
def add_cached_properties(new_model):
|
||||
new_model._props = {
|
||||
prop
|
||||
for prop in vars(new_model)
|
||||
if isinstance(getattr(new_model, prop), property)
|
||||
and prop
|
||||
not in ("__values__", "__fields__", "fields", "pk_column", "saved")
|
||||
}
|
||||
new_model._quick_access_fields = {
|
||||
"_orm_id",
|
||||
"_orm_saved",
|
||||
"_orm",
|
||||
"__fields__",
|
||||
"_related_names",
|
||||
"_props",
|
||||
"__class__",
|
||||
"extract_related_names",
|
||||
}
|
||||
new_model._related_names = None
|
||||
|
||||
|
||||
def add_property_fields(new_model):
|
||||
if new_model.Meta.include_props_in_fields:
|
||||
for prop in new_model._props:
|
||||
field_type = getattr(new_model, prop).fget.__annotations__.get('return')
|
||||
new_model.Meta.model_fields[prop] = ModelFieldFactory(nullable=True, pydantic_only=True)
|
||||
new_model.__fields__[prop] = ModelField(
|
||||
name=prop,
|
||||
type_=Optional[field_type] if field_type else Any,
|
||||
model_config=new_model.__config__,
|
||||
required=False,
|
||||
class_validators={},
|
||||
)
|
||||
|
||||
|
||||
class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
@ -319,28 +378,21 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
)
|
||||
|
||||
if hasattr(new_model, "Meta"):
|
||||
if not hasattr(new_model.Meta, "constraints"):
|
||||
new_model.Meta.constraints = []
|
||||
if not hasattr(new_model.Meta, "model_fields"):
|
||||
new_model.Meta.model_fields = model_fields
|
||||
populate_default_options_values(new_model, model_fields)
|
||||
new_model = populate_meta_tablename_columns_and_pk(name, new_model)
|
||||
new_model = populate_meta_sqlalchemy_table_if_required(new_model)
|
||||
expand_reverse_relationships(new_model)
|
||||
populate_choices_validators(new_model, attrs)
|
||||
|
||||
populate_choices_validators(new_model)
|
||||
if new_model.Meta.pkname not in attrs["__annotations__"]:
|
||||
field_name = new_model.Meta.pkname
|
||||
field = Integer(name=field_name, primary_key=True)
|
||||
attrs["__annotations__"][field_name] = Optional[int] # type: ignore
|
||||
populate_default_pydantic_field_value(
|
||||
field, field_name, attrs # type: ignore
|
||||
attrs[field_name] = None
|
||||
new_model.__fields__[field_name] = get_pydantic_field(
|
||||
field_name=field_name, model=new_model
|
||||
)
|
||||
|
||||
new_model = super().__new__( # type: ignore
|
||||
mcs, name, bases, attrs
|
||||
)
|
||||
|
||||
new_model.Meta.alias_manager = alias_manager
|
||||
new_model.objects = QuerySet(new_model)
|
||||
add_cached_properties(new_model)
|
||||
add_property_fields(new_model)
|
||||
|
||||
return new_model
|
||||
|
||||
@ -49,6 +49,7 @@ class ModelTableProxy:
|
||||
_related_names_hash: Union[str, bytes]
|
||||
pk: Any
|
||||
get_name: Callable
|
||||
_props: Set
|
||||
|
||||
def dict(self): # noqa A003
|
||||
raise NotImplementedError # pragma no cover
|
||||
@ -207,14 +208,13 @@ class ModelTableProxy:
|
||||
@classmethod
|
||||
def extract_related_names(cls) -> Set:
|
||||
|
||||
if isinstance(cls._related_names_hash, (str, bytes)):
|
||||
if isinstance(cls._related_names, Set):
|
||||
return cls._related_names
|
||||
|
||||
related_names = set()
|
||||
for name, field in cls.Meta.model_fields.items():
|
||||
if inspect.isclass(field) and issubclass(field, ForeignKeyField):
|
||||
related_names.add(name)
|
||||
cls._related_names_hash = json.dumps(list(cls.Meta.model_fields.keys()))
|
||||
cls._related_names = related_names
|
||||
|
||||
return related_names
|
||||
|
||||
@ -51,9 +51,6 @@ class NewBaseModel(
|
||||
"_orm_id",
|
||||
"_orm_saved",
|
||||
"_orm",
|
||||
"_related_names",
|
||||
"_related_names_hash",
|
||||
"_props",
|
||||
)
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
@ -70,7 +67,7 @@ class NewBaseModel(
|
||||
_orm_saved: bool
|
||||
_related_names: Set
|
||||
_related_names_hash: str
|
||||
_props: List[str]
|
||||
_props: Set
|
||||
Meta: ModelMeta
|
||||
|
||||
# noinspection PyMissingConstructor
|
||||
@ -158,20 +155,15 @@ class NewBaseModel(
|
||||
self.set_save_status(False)
|
||||
|
||||
def __getattribute__(self, item: str) -> Any:
|
||||
if item in (
|
||||
"_orm_id",
|
||||
"_orm_saved",
|
||||
"_orm",
|
||||
"__fields__",
|
||||
"_related_names",
|
||||
"_props",
|
||||
):
|
||||
if item in object.__getattribute__(self, '_quick_access_fields'):
|
||||
return object.__getattribute__(self, item)
|
||||
if item == "pk":
|
||||
return self.__dict__.get(self.Meta.pkname, None)
|
||||
if item != "extract_related_names" and item in self.extract_related_names():
|
||||
if item in self.extract_related_names():
|
||||
return self._extract_related_model_instead_of_field(item)
|
||||
if item != "__fields__" and item in self.__fields__:
|
||||
if item in self._props:
|
||||
return object.__getattribute__(self, item)
|
||||
if item in self.__fields__:
|
||||
value = self.__dict__.get(item, None)
|
||||
value = self._convert_json(item, value, "loads")
|
||||
return value
|
||||
@ -230,17 +222,8 @@ class NewBaseModel(
|
||||
def get_properties(
|
||||
cls, include: Union[Set, Dict, None], exclude: Union[Set, Dict, None]
|
||||
) -> List[str]:
|
||||
if isinstance(cls._props, list):
|
||||
|
||||
props = cls._props
|
||||
else:
|
||||
props = [
|
||||
prop
|
||||
for prop in dir(cls)
|
||||
if isinstance(getattr(cls, prop), property)
|
||||
and prop
|
||||
not in ("__values__", "__fields__", "fields", "pk_column", "saved")
|
||||
]
|
||||
cls._props = props
|
||||
if include:
|
||||
props = [prop for prop in props if prop in include]
|
||||
if exclude:
|
||||
@ -271,7 +254,7 @@ class NewBaseModel(
|
||||
for model in models:
|
||||
try:
|
||||
result.append(
|
||||
model.dict(nested=True, include=include, exclude=exclude,)
|
||||
model.dict(nested=True, include=include, exclude=exclude, )
|
||||
)
|
||||
except ReferenceError: # pragma no cover
|
||||
continue
|
||||
@ -349,6 +332,7 @@ class NewBaseModel(
|
||||
)
|
||||
|
||||
# include model properties as fields
|
||||
if self.Meta.include_props_in_dict:
|
||||
props = self.get_properties(include=include, exclude=exclude)
|
||||
if props:
|
||||
dict_instance.update({prop: getattr(self, prop) for prop in props})
|
||||
@ -379,4 +363,4 @@ class NewBaseModel(
|
||||
return value
|
||||
|
||||
def _is_conversion_to_json_needed(self, column_name: str) -> bool:
|
||||
return self.Meta.model_fields[column_name].__type__ == pydantic.Json
|
||||
return column_name in self.Meta.model_fields and self.Meta.model_fields[column_name].__type__ == pydantic.Json
|
||||
|
||||
@ -64,6 +64,8 @@ class RandomModel(ormar.Model):
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
include_props_in_fields = True
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
password: str = ormar.String(max_length=255, default=gen_pass)
|
||||
first_name: str = ormar.String(max_length=255, default="John")
|
||||
@ -72,6 +74,10 @@ class RandomModel(ormar.Model):
|
||||
server_default=sqlalchemy.func.now()
|
||||
)
|
||||
|
||||
@property
|
||||
def full_name(self):
|
||||
return ' '.join([self.first_name, self.last_name])
|
||||
|
||||
|
||||
class User(ormar.Model):
|
||||
class Meta:
|
||||
@ -136,6 +142,12 @@ async def create_user5(user: RandomModel):
|
||||
return await user.save()
|
||||
|
||||
|
||||
@app.post("/random2/", response_model=RandomModel)
|
||||
async def create_user6(user: RandomModel):
|
||||
user = await user.save()
|
||||
return user.dict()
|
||||
|
||||
|
||||
def test_all_endpoints():
|
||||
client = TestClient(app)
|
||||
with client as client:
|
||||
@ -187,4 +199,30 @@ def test_all_endpoints():
|
||||
"first_name",
|
||||
"last_name",
|
||||
"created_date",
|
||||
"full_name"
|
||||
]
|
||||
assert response.json().get("full_name") == "John Test"
|
||||
|
||||
RandomModel.Meta.include_props_in_fields = False
|
||||
user3 = {"last_name": "Test"}
|
||||
response = client.post("/random/", json=user3)
|
||||
assert list(response.json().keys()) == [
|
||||
"id",
|
||||
"password",
|
||||
"first_name",
|
||||
"last_name",
|
||||
"created_date",
|
||||
"full_name"
|
||||
]
|
||||
|
||||
RandomModel.Meta.include_props_in_dict = True
|
||||
user3 = {"last_name": "Test"}
|
||||
response = client.post("/random2/", json=user3)
|
||||
assert list(response.json().keys()) == [
|
||||
"id",
|
||||
"password",
|
||||
"first_name",
|
||||
"last_name",
|
||||
"created_date",
|
||||
"full_name"
|
||||
]
|
||||
|
||||
75
tests/test_pydantic_only_fields.py
Normal file
75
tests/test_pydantic_only_fields.py
Normal file
@ -0,0 +1,75 @@
|
||||
import datetime
|
||||
|
||||
import databases
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
from pydantic import validator
|
||||
|
||||
import ormar
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
|
||||
|
||||
class Album(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "albums"
|
||||
metadata = metadata
|
||||
database = database
|
||||
include_props_in_dict = True
|
||||
include_props_in_fields = True
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
timestamp: datetime.datetime = ormar.DateTime(pydantic_only=True)
|
||||
|
||||
@property
|
||||
def name10(self) -> str:
|
||||
return self.name + '_10'
|
||||
|
||||
@validator('name')
|
||||
def test(cls, v):
|
||||
return v
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pydantic_only_fields():
|
||||
async with database:
|
||||
async with database.transaction(force_rollback=True):
|
||||
album = await Album.objects.create(name='Hitchcock')
|
||||
assert album.pk is not None
|
||||
assert album.saved
|
||||
assert album.timestamp is None
|
||||
|
||||
album = await Album.objects.exclude_fields('timestamp').get()
|
||||
assert album.timestamp is None
|
||||
|
||||
album = await Album.objects.fields({'name', 'timestamp'}).get()
|
||||
assert album.timestamp is None
|
||||
|
||||
test_dict = album.dict()
|
||||
assert 'timestamp' in test_dict
|
||||
assert test_dict['timestamp'] is None
|
||||
|
||||
album.timestamp = datetime.datetime.now()
|
||||
test_dict = album.dict()
|
||||
assert 'timestamp' in test_dict
|
||||
assert test_dict['timestamp'] is not None
|
||||
assert test_dict.get('name10') == 'Hitchcock_10'
|
||||
|
||||
Album.Meta.include_props_in_dict = False
|
||||
test_dict = album.dict()
|
||||
assert 'timestamp' in test_dict
|
||||
assert test_dict['timestamp'] is not None
|
||||
# key is still there as now it's a field
|
||||
assert test_dict['name10'] is None
|
||||
Reference in New Issue
Block a user