Merge pull request #64 from collerek/pydantic_only

Intorduce property_field decorator and fix for pydantic_only plus optimization
This commit is contained in:
collerek
2020-12-04 21:23:50 +07:00
committed by GitHub
15 changed files with 680 additions and 97 deletions

View File

@ -17,6 +17,8 @@ To build an ormar model you simply need to inherit a `ormar.Model` class.
Next assign one or more of the [Fields][fields] as a class level variables. Next assign one or more of the [Fields][fields] as a class level variables.
#### Basic Field Types
Each table **has to** have a primary key column, which you specify by setting `primary_key=True` on selected field. Each table **has to** have a primary key column, which you specify by setting `primary_key=True` on selected field.
Only one primary key column is allowed. Only one primary key column is allowed.
@ -37,7 +39,212 @@ You can disable by passing `autoincremant=False`.
id: int = ormar.Integer(primary_key=True, autoincrement=False) id: int = ormar.Integer(primary_key=True, autoincrement=False)
``` ```
### Fields names vs Column names #### Non Database Fields
Note that if you need a normal pydantic field in your model (used to store value on model or pass around some value) you can define a
field with parameter `pydantic_only=True`.
Fields created like this are added to the `pydantic` model fields -> so are subject to validation according to `Field` type,
also appear in `dict()` and `json()` result.
The difference is that **those fields are not saved in the database**. So they won't be included in underlying sqlalchemy `columns`,
or `table` variables (check [Internals][Internals] section below to see how you can access those if you need).
Subsequently `pydantic_only` fields won't be included in migrations or any database operation (like `save`, `update` etc.)
Fields like those can be passed around into payload in `fastapi` request and will be returned in `fastapi` response
(of course only if you set their value somewhere in your code as the value is **not** fetched from the db.
If you pass a value in `fastapi` `request` and return the same instance that `fastapi` constructs for you in `request_model`
you should get back exactly same value in `response`.).
!!!warning
`pydantic_only=True` fields are always **Optional** and it cannot be changed (otherwise db load validation would fail)
!!!tip
`pydantic_only=True` fields are a good solution if you need to pass additional information from outside of your API
(i.e. frontend). They are not stored in db but you can access them in your `APIRoute` code and they also have `pydantic` validation.
```Python hl_lines="18"
--8<-- "../docs_src/models/docs014.py"
```
If you combine `pydantic_only=True` field with `default` parameter and do not pass actual value in request you will always get default value.
Since it can be a function you can set `default=datetime.datetime.now` and get current timestamp each time you call an endpoint etc.
!!!note
Note that both `pydantic_only` and `property_field` decorated field can be included/excluded in both `dict()` and `fastapi`
response with `include`/`exclude` and `response_model_include`/`response_model_exclude` accordingly.
```python
# <==part of code removed for clarity==>
class User(ormar.Model):
class Meta:
tablename: str = "users2"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
email: str = ormar.String(max_length=255, nullable=False)
password: str = ormar.String(max_length=255)
first_name: str = ormar.String(max_length=255)
last_name: str = ormar.String(max_length=255)
category: str = ormar.String(max_length=255, nullable=True)
timestamp: datetime.datetime = ormar.DateTime(
pydantic_only=True, default=datetime.datetime.now
)
# <==part of code removed for clarity==>
app =FastAPI()
@app.post("/users/")
async def create_user(user: User):
return await user.save()
# <==part of code removed for clarity==>
def test_excluding_fields_in_endpoints():
client = TestClient(app)
with client as client:
timestamp = datetime.datetime.now()
user = {
"email": "test@domain.com",
"password": "^*^%A*DA*IAAA",
"first_name": "John",
"last_name": "Doe",
"timestamp": str(timestamp),
}
response = client.post("/users/", json=user)
assert list(response.json().keys()) == [
"id",
"email",
"first_name",
"last_name",
"category",
"timestamp",
]
# returned is the same timestamp
assert response.json().get("timestamp") == str(timestamp).replace(" ", "T")
# <==part of code removed for clarity==>
```
#### Property fields
Sometimes it's desirable to do some kind of calculation on the model instance. One of the most common examples can be concatenating
two or more fields. Imagine you have `first_name` and `last_name` fields on your model, but would like to have `full_name` in the result
of the `fastapi` query.
You can create a new `pydantic` model with a `method` that accepts only `self` (so like default python `@property`)
and populate it in your code.
But it's so common that `ormar` has you covered. You can "materialize" a `property_field` on you `Model`.
!!!warning
`property_field` fields are always **Optional** and it cannot be changed (otherwise db load validation would fail)
```Python hl_lines="20-22"
--8<-- "../docs_src/models/docs015.py"
```
!!!warning
The decorated function has to accept only one parameter, and that parameter have to be `self`.
If you try to decorate a function with more parameters `ormar` will raise `ModelDefinitionError`.
Sample:
```python
# will raise ModelDefinitionError
@property_field
def prefixed_name(self, prefix="prefix_"):
return 'custom_prefix__' + self.name
# will raise ModelDefinitionError
# (calling first param something else than 'self' is a bad practice anyway)
@property_field
def prefixed_name(instance):
return 'custom_prefix__' + self.name
```
Note that `property_field` decorated methods do not go through verification (but that might change in future) and are only available
in the response from `fastapi` and `dict()` and `json()` methods. You cannot pass a value for this field in the request
(or rather you can but it will be discarded by ormar so really no point but no Exception will be raised).
!!!note
Note that both `pydantic_only` and `property_field` decorated field can be included/excluded in both `dict()` and `fastapi`
response with `include`/`exclude` and `response_model_include`/`response_model_exclude` accordingly.
!!!tip
Note that `@property_field` decorator is designed to replace the python `@property` decorator, you do not have to combine them.
In theory you can cause `ormar` have a failsafe mechanism, but note that i.e. `mypy` will complain about re-decorating a property.
```python
# valid and working but unnecessary and mypy will complain
@property_field
@property
def prefixed_name(self):
return 'custom_prefix__' + self.name
```
```python
# <==part of code removed for clarity==>
def gen_pass(): # note: NOT production ready
choices = string.ascii_letters + string.digits + "!@#$%^&*()"
return "".join(random.choice(choices) for _ in range(20))
class RandomModel(ormar.Model):
class Meta:
tablename: str = "random_users"
metadata = metadata
database = database
include_props_in_dict = True
id: int = ormar.Integer(primary_key=True)
password: str = ormar.String(max_length=255, default=gen_pass)
first_name: str = ormar.String(max_length=255, default="John")
last_name: str = ormar.String(max_length=255)
created_date: datetime.datetime = ormar.DateTime(
server_default=sqlalchemy.func.now()
)
@property_field
def full_name(self) -> str:
return " ".join([self.first_name, self.last_name])
# <==part of code removed for clarity==>
app =FastAPI()
# explicitly exclude property_field in this endpoint
@app.post("/random/", response_model=RandomModel, response_model_exclude={"full_name"})
async def create_user(user: RandomModel):
return await user.save()
# <==part of code removed for clarity==>
def test_excluding_property_field_in_endpoints2():
client = TestClient(app)
with client as client:
RandomModel.Meta.include_props_in_dict = True
user3 = {"last_name": "Test"}
response = client.post("/random3/", json=user3)
assert list(response.json().keys()) == [
"id",
"password",
"first_name",
"last_name",
"created_date",
]
# despite being decorated with property_field if you explictly exclude it it will be gone
assert response.json().get("full_name") is None
# <==part of code removed for clarity==>
```
#### Fields names vs Column names
By default names of the fields will be used for both the underlying `pydantic` model and `sqlalchemy` table. By default names of the fields will be used for both the underlying `pydantic` model and `sqlalchemy` table.
@ -330,7 +537,7 @@ You can set this parameter by providing `Meta` class `constraints` argument.
--8<-- "../docs_src/models/docs006.py" --8<-- "../docs_src/models/docs006.py"
``` ```
## Initialization ## Model Initialization
There are two ways to create and persist the `Model` instance in the database. There are two ways to create and persist the `Model` instance in the database.
@ -560,3 +767,4 @@ For example to list table model fields you can:
[sqlalchemy table creation]: https://docs.sqlalchemy.org/en/13/core/metadata.html#creating-and-dropping-database-tables [sqlalchemy table creation]: https://docs.sqlalchemy.org/en/13/core/metadata.html#creating-and-dropping-database-tables
[alembic]: https://alembic.sqlalchemy.org/en/latest/tutorial.html [alembic]: https://alembic.sqlalchemy.org/en/latest/tutorial.html
[save status]: ../models/#model-save-status [save status]: ../models/#model-save-status
[Internals]: #internals

View File

@ -1,3 +1,11 @@
# 0.6.2
* Performance optimization
* Fix for bug with `pydantic_only` fields being required
* Add `property_field` decorator that registers a function as a property that will
be included in `Model.dict()` and in `fastapi` response
* Update docs
# 0.6.1 # 0.6.1
* Explicitly set None to excluded nullable fields to avoid pydantic setting a default value (fix [#60][#60]). * Explicitly set None to excluded nullable fields to avoid pydantic setting a default value (fix [#60][#60]).

View File

@ -0,0 +1,18 @@
import databases
import sqlalchemy
import ormar
database = databases.Database("sqlite:///db.sqlite")
metadata = sqlalchemy.MetaData()
class Course(ormar.Model):
class Meta:
database = database
metadata = metadata
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
completed: bool = ormar.Boolean(default=False)
non_db_field: str = ormar.String(max_length=100, pydantic_only=True)

View File

@ -0,0 +1,22 @@
import databases
import sqlalchemy
import ormar
from ormar import property_field
database = databases.Database("sqlite:///db.sqlite")
metadata = sqlalchemy.MetaData()
class Course(ormar.Model):
class Meta:
database = database
metadata = metadata
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
completed: bool = ormar.Boolean(default=False)
@property_field
def prefixed_name(self):
return 'custom_prefix__' + self.name

View File

@ -1,3 +1,4 @@
from ormar.decorators import property_field
from ormar.exceptions import ModelDefinitionError, ModelNotSet, MultipleMatches, NoMatch from ormar.exceptions import ModelDefinitionError, ModelNotSet, MultipleMatches, NoMatch
from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100 from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100
from ormar.fields import ( # noqa: I100 from ormar.fields import ( # noqa: I100
@ -30,7 +31,7 @@ class UndefinedType: # pragma no cover
Undefined = UndefinedType() Undefined = UndefinedType()
__version__ = "0.6.1" __version__ = "0.6.2"
__all__ = [ __all__ = [
"Integer", "Integer",
"BigInteger", "BigInteger",
@ -58,4 +59,5 @@ __all__ = [
"QuerySetProtocol", "QuerySetProtocol",
"RelationProtocol", "RelationProtocol",
"ModelMeta", "ModelMeta",
"property_field",
] ]

View File

@ -0,0 +1,5 @@
from ormar.decorators.property_field import property_field
__all__ = [
"property_field",
]

View File

@ -0,0 +1,19 @@
import inspect
from collections.abc import Callable
from typing import Union
from ormar.exceptions import ModelDefinitionError
def property_field(func: Callable) -> Union[property, Callable]:
if isinstance(func, property): # pragma: no cover
func.fget.__property_field__ = True
else:
arguments = list(inspect.signature(func).parameters.keys())
if len(arguments) > 1 or arguments[0] != "self":
raise ModelDefinitionError(
"property_field decorator can be used "
"only on class methods with no arguments"
)
func.__dict__["__property_field__"] = True
return func

View File

@ -12,13 +12,24 @@ from ormar.fields.base import BaseField # noqa I101
def is_field_nullable( def is_field_nullable(
nullable: Optional[bool], default: Any, server_default: Any nullable: Optional[bool],
default: Any,
server_default: Any,
pydantic_only: Optional[bool],
) -> bool: ) -> bool:
if nullable is None: if nullable is None:
return default is not None or server_default is not None return (
default is not None
or server_default is not None
or (pydantic_only is not None and pydantic_only)
)
return nullable return nullable
def is_auto_primary_key(primary_key: bool, autoincrement: bool) -> bool:
return primary_key and autoincrement
class ModelFieldFactory: class ModelFieldFactory:
_bases: Any = (BaseField,) _bases: Any = (BaseField,)
_type: Any = None _type: Any = None
@ -29,19 +40,24 @@ class ModelFieldFactory:
default = kwargs.pop("default", None) default = kwargs.pop("default", None)
server_default = kwargs.pop("server_default", None) server_default = kwargs.pop("server_default", None)
nullable = kwargs.pop("nullable", None) nullable = kwargs.pop("nullable", None)
pydantic_only = kwargs.pop("pydantic_only", False)
primary_key = kwargs.pop("primary_key", False)
autoincrement = kwargs.pop("autoincrement", False)
namespace = dict( namespace = dict(
__type__=cls._type, __type__=cls._type,
alias=kwargs.pop("name", None), alias=kwargs.pop("name", None),
name=None, name=None,
primary_key=kwargs.pop("primary_key", False), primary_key=primary_key,
default=default, default=default,
server_default=server_default, server_default=server_default,
nullable=is_field_nullable(nullable, default, server_default), nullable=is_field_nullable(nullable, default, server_default, pydantic_only)
or is_auto_primary_key(primary_key, autoincrement),
index=kwargs.pop("index", False), index=kwargs.pop("index", False),
unique=kwargs.pop("unique", False), unique=kwargs.pop("unique", False),
pydantic_only=kwargs.pop("pydantic_only", False), pydantic_only=pydantic_only,
autoincrement=kwargs.pop("autoincrement", False), autoincrement=autoincrement,
column_type=cls.get_column_type(**kwargs), column_type=cls.get_column_type(**kwargs),
choices=set(kwargs.pop("choices", [])), choices=set(kwargs.pop("choices", [])),
**kwargs **kwargs

View File

@ -1,6 +1,6 @@
import logging import logging
import warnings import warnings
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Tuple, Type, Union
import databases import databases
import pydantic import pydantic
@ -11,10 +11,11 @@ from pydantic.utils import lenient_issubclass
from sqlalchemy.sql.schema import ColumnCollectionConstraint from sqlalchemy.sql.schema import ColumnCollectionConstraint
import ormar # noqa I100 import ormar # noqa I100
from ormar import ForeignKey, ModelDefinitionError, Integer # noqa I100 from ormar import ForeignKey, Integer, ModelDefinitionError # noqa I100
from ormar.fields import BaseField from ormar.fields import BaseField
from ormar.fields.foreign_key import ForeignKeyField from ormar.fields.foreign_key import ForeignKeyField
from ormar.fields.many_to_many import ManyToMany, ManyToManyField from ormar.fields.many_to_many import ManyToMany, ManyToManyField
from ormar.models.quick_access_views import quick_access_set
from ormar.queryset import QuerySet from ormar.queryset import QuerySet
from ormar.relations.alias_manager import AliasManager from ormar.relations.alias_manager import AliasManager
@ -36,6 +37,7 @@ class ModelMeta:
str, Union[Type[BaseField], Type[ForeignKeyField], Type[ManyToManyField]] str, Union[Type[BaseField], Type[ForeignKeyField], Type[ManyToManyField]]
] ]
alias_manager: AliasManager alias_manager: AliasManager
property_fields: Set
def register_relation_on_build(table_name: str, field: Type[ForeignKeyField]) -> None: def register_relation_on_build(table_name: str, field: Type[ForeignKeyField]) -> None:
@ -121,6 +123,16 @@ def create_pydantic_field(
) )
def get_pydantic_field(field_name: str, model: Type["Model"]) -> "ModelField":
return ModelField(
name=field_name,
type_=model.Meta.model_fields[field_name].__type__, # type: ignore
model_config=model.__config__,
required=not model.Meta.model_fields[field_name].nullable,
class_validators={},
)
def create_and_append_m2m_fk( def create_and_append_m2m_fk(
model: Type["Model"], model_field: Type[ManyToManyField] model: Type["Model"], model_field: Type[ManyToManyField]
) -> None: ) -> None:
@ -295,16 +307,48 @@ def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, A
return values return values
def populate_choices_validators( # noqa CCR001 def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001
model: Type["Model"], attrs: Dict
) -> None:
if model_initialized_and_has_model_fields(model): if model_initialized_and_has_model_fields(model):
for _, field in model.Meta.model_fields.items(): for _, field in model.Meta.model_fields.items():
if check_if_field_has_choices(field): if check_if_field_has_choices(field):
validators = attrs.get("__pre_root_validators__", []) validators = getattr(model, "__pre_root_validators__", [])
if choices_validator not in validators: if choices_validator not in validators:
validators.append(choices_validator) validators.append(choices_validator)
attrs["__pre_root_validators__"] = validators model.__pre_root_validators__ = validators
def populate_default_options_values(
new_model: Type["Model"], model_fields: Dict
) -> None:
if not hasattr(new_model.Meta, "constraints"):
new_model.Meta.constraints = []
if not hasattr(new_model.Meta, "model_fields"):
new_model.Meta.model_fields = model_fields
def add_cached_properties(new_model: Type["Model"]) -> None:
new_model._quick_access_fields = quick_access_set
new_model._related_names = None
new_model._pydantic_fields = {name for name in new_model.__fields__}
def property_fields_not_set(new_model: Type["Model"]) -> bool:
return (
not hasattr(new_model.Meta, "property_fields")
or not new_model.Meta.property_fields
)
def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001
if property_fields_not_set(new_model):
props = set()
for var_name, value in attrs.items():
if isinstance(value, property):
value = value.fget
field_config = getattr(value, "__property_field__", None)
if field_config:
props.add(var_name)
new_model.Meta.property_fields = props
class ModelMetaclass(pydantic.main.ModelMetaclass): class ModelMetaclass(pydantic.main.ModelMetaclass):
@ -317,30 +361,23 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
new_model = super().__new__( # type: ignore new_model = super().__new__( # type: ignore
mcs, name, bases, attrs mcs, name, bases, attrs
) )
add_cached_properties(new_model)
if hasattr(new_model, "Meta"): if hasattr(new_model, "Meta"):
if not hasattr(new_model.Meta, "constraints"): populate_default_options_values(new_model, model_fields)
new_model.Meta.constraints = []
if not hasattr(new_model.Meta, "model_fields"):
new_model.Meta.model_fields = model_fields
new_model = populate_meta_tablename_columns_and_pk(name, new_model) new_model = populate_meta_tablename_columns_and_pk(name, new_model)
new_model = populate_meta_sqlalchemy_table_if_required(new_model) new_model = populate_meta_sqlalchemy_table_if_required(new_model)
expand_reverse_relationships(new_model) expand_reverse_relationships(new_model)
populate_choices_validators(new_model, attrs) populate_choices_validators(new_model)
if new_model.Meta.pkname not in attrs["__annotations__"]: if new_model.Meta.pkname not in attrs["__annotations__"]:
field_name = new_model.Meta.pkname field_name = new_model.Meta.pkname
field = Integer(name=field_name, primary_key=True)
attrs["__annotations__"][field_name] = Optional[int] # type: ignore attrs["__annotations__"][field_name] = Optional[int] # type: ignore
populate_default_pydantic_field_value( attrs[field_name] = None
field, field_name, attrs # type: ignore new_model.__fields__[field_name] = get_pydantic_field(
field_name=field_name, model=new_model
) )
new_model = super().__new__( # type: ignore
mcs, name, bases, attrs
)
new_model.Meta.alias_manager = alias_manager new_model.Meta.alias_manager = alias_manager
new_model.objects = QuerySet(new_model) new_model.objects = QuerySet(new_model)
add_property_fields(new_model, attrs)
return new_model return new_model

View File

@ -20,11 +20,6 @@ from typing import (
from ormar.exceptions import ModelPersistenceError, RelationshipInstanceError from ormar.exceptions import ModelPersistenceError, RelationshipInstanceError
from ormar.queryset.utils import translate_list_to_dict, update from ormar.queryset.utils import translate_list_to_dict, update
try:
import orjson as json
except ImportError: # pragma: nocover
import json # type: ignore
import ormar # noqa: I100 import ormar # noqa: I100
from ormar.fields import BaseField from ormar.fields import BaseField
from ormar.fields.foreign_key import ForeignKeyField from ormar.fields.foreign_key import ForeignKeyField
@ -45,17 +40,16 @@ Field = TypeVar("Field", bound=BaseField)
class ModelTableProxy: class ModelTableProxy:
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
Meta: ModelMeta Meta: ModelMeta
_related_names: Set _related_names: Optional[Set]
_related_names_hash: Union[str, bytes] _related_names_hash: Union[str, bytes]
pk: Any pk: Any
get_name: Callable get_name: Callable
_props: Set
def dict(self): # noqa A003 dict: Callable # noqa: A001, VNE003
raise NotImplementedError # pragma no cover
def _extract_own_model_fields(self) -> Dict: def _extract_own_model_fields(self) -> Dict:
related_names = self.extract_related_names() related_names = self.extract_related_names()
self_fields = {k: v for k, v in self.dict().items() if k not in related_names} self_fields = self.dict(exclude=related_names)
return self_fields return self_fields
@classmethod @classmethod
@ -183,7 +177,11 @@ class ModelTableProxy:
@classmethod @classmethod
def populate_default_values(cls, new_kwargs: Dict) -> Dict: def populate_default_values(cls, new_kwargs: Dict) -> Dict:
for field_name, field in cls.Meta.model_fields.items(): for field_name, field in cls.Meta.model_fields.items():
if field_name not in new_kwargs and field.has_default(use_server=False): if (
field_name not in new_kwargs
and field.has_default(use_server=False)
and not field.pydantic_only
):
new_kwargs[field_name] = field.get_default() new_kwargs[field_name] = field.get_default()
# clear fields with server_default set as None # clear fields with server_default set as None
if field.server_default is not None and not new_kwargs.get(field_name): if field.server_default is not None and not new_kwargs.get(field_name):
@ -207,14 +205,13 @@ class ModelTableProxy:
@classmethod @classmethod
def extract_related_names(cls) -> Set: def extract_related_names(cls) -> Set:
if isinstance(cls._related_names_hash, (str, bytes)): if isinstance(cls._related_names, Set):
return cls._related_names return cls._related_names
related_names = set() related_names = set()
for name, field in cls.Meta.model_fields.items(): for name, field in cls.Meta.model_fields.items():
if inspect.isclass(field) and issubclass(field, ForeignKeyField): if inspect.isclass(field) and issubclass(field, ForeignKeyField):
related_names.add(name) related_names.add(name)
cls._related_names_hash = json.dumps(list(cls.Meta.model_fields.keys()))
cls._related_names = related_names cls._related_names = related_names
return related_names return related_names

View File

@ -51,9 +51,6 @@ class NewBaseModel(
"_orm_id", "_orm_id",
"_orm_saved", "_orm_saved",
"_orm", "_orm",
"_related_names",
"_related_names_hash",
"_props",
) )
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
@ -68,14 +65,14 @@ class NewBaseModel(
_orm_relationship_manager: AliasManager _orm_relationship_manager: AliasManager
_orm: RelationsManager _orm: RelationsManager
_orm_saved: bool _orm_saved: bool
_related_names: Set _related_names: Optional[Set]
_related_names_hash: str _related_names_hash: str
_props: List[str] _pydantic_fields: Set
_quick_access_fields: Set
Meta: ModelMeta Meta: ModelMeta
# noinspection PyMissingConstructor # noinspection PyMissingConstructor
def __init__(self, *args: Any, **kwargs: Any) -> None: # type: ignore def __init__(self, *args: Any, **kwargs: Any) -> None: # type: ignore
object.__setattr__(self, "_orm_id", uuid.uuid4().hex) object.__setattr__(self, "_orm_id", uuid.uuid4().hex)
object.__setattr__(self, "_orm_saved", False) object.__setattr__(self, "_orm_saved", False)
object.__setattr__( object.__setattr__(
@ -96,6 +93,13 @@ class NewBaseModel(
if "pk" in kwargs: if "pk" in kwargs:
kwargs[self.Meta.pkname] = kwargs.pop("pk") kwargs[self.Meta.pkname] = kwargs.pop("pk")
# remove property fields values from validation
kwargs = {
k: v
for k, v in kwargs.items()
if k not in object.__getattribute__(self, "Meta").property_fields
}
# build the models to set them and validate but don't register # build the models to set them and validate but don't register
try: try:
new_kwargs: Dict[str, Any] = { new_kwargs: Dict[str, Any] = {
@ -134,7 +138,7 @@ class NewBaseModel(
) )
def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001 def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001
if name in ("_orm_id", "_orm_saved", "_orm", "_related_names", "_props"): if name in object.__getattribute__(self, "_quick_access_fields"):
object.__setattr__(self, name, value) object.__setattr__(self, name, value)
elif name == "pk": elif name == "pk":
object.__setattr__(self, self.Meta.pkname, value) object.__setattr__(self, self.Meta.pkname, value)
@ -158,29 +162,26 @@ class NewBaseModel(
self.set_save_status(False) self.set_save_status(False)
def __getattribute__(self, item: str) -> Any: def __getattribute__(self, item: str) -> Any:
if item in ( if item in object.__getattribute__(self, "_quick_access_fields"):
"_orm_id",
"_orm_saved",
"_orm",
"__fields__",
"_related_names",
"_props",
):
return object.__getattribute__(self, item) return object.__getattribute__(self, item)
if item == "pk": if item == "pk":
return self.__dict__.get(self.Meta.pkname, None) return object.__getattribute__(self, "__dict__").get(self.Meta.pkname, None)
if item != "extract_related_names" and item in self.extract_related_names(): if item in object.__getattribute__(self, "extract_related_names")():
return self._extract_related_model_instead_of_field(item) return object.__getattribute__(
if item != "__fields__" and item in self.__fields__: self, "_extract_related_model_instead_of_field"
value = self.__dict__.get(item, None) )(item)
value = self._convert_json(item, value, "loads") if item in object.__getattribute__(self, "Meta").property_fields:
value = object.__getattribute__(self, item)
return value() if callable(value) else value
if item in object.__getattribute__(self, "_pydantic_fields"):
value = object.__getattribute__(self, "__dict__").get(item, None)
value = object.__getattribute__(self, "_convert_json")(item, value, "loads")
return value return value
return super().__getattribute__(item) return object.__getattribute__(self, item) # pragma: no cover
def _extract_related_model_instead_of_field( def _extract_related_model_instead_of_field(
self, item: str self, item: str
) -> Optional[Union["T", Sequence["T"]]]: ) -> Optional[Union["T", Sequence["T"]]]:
# alias = self.get_column_alias(item)
if item in self._orm: if item in self._orm:
return self._orm.get(item) return self._orm.get(item)
return None # pragma no cover return None # pragma no cover
@ -193,8 +194,8 @@ class NewBaseModel(
def __same__(self, other: "NewBaseModel") -> bool: def __same__(self, other: "NewBaseModel") -> bool:
return ( return (
self._orm_id == other._orm_id self._orm_id == other._orm_id
or self.dict() == other.dict()
or (self.pk == other.pk and self.pk is not None) or (self.pk == other.pk and self.pk is not None)
or self.dict() == other.dict()
) )
@classmethod @classmethod
@ -229,22 +230,13 @@ class NewBaseModel(
@classmethod @classmethod
def get_properties( def get_properties(
cls, include: Union[Set, Dict, None], exclude: Union[Set, Dict, None] cls, include: Union[Set, Dict, None], exclude: Union[Set, Dict, None]
) -> List[str]: ) -> Set[str]:
if isinstance(cls._props, list):
props = cls._props props = cls.Meta.property_fields
else:
props = [
prop
for prop in dir(cls)
if isinstance(getattr(cls, prop), property)
and prop
not in ("__values__", "__fields__", "fields", "pk_column", "saved")
]
cls._props = props
if include: if include:
props = [prop for prop in props if prop in include] props = {prop for prop in props if prop in include}
if exclude: if exclude:
props = [prop for prop in props if prop not in exclude] props = {prop for prop in props if prop not in exclude}
return props return props
def _get_related_not_excluded_fields( def _get_related_not_excluded_fields(
@ -348,10 +340,11 @@ class NewBaseModel(
exclude=exclude, # type: ignore exclude=exclude, # type: ignore
) )
# include model properties as fields # include model properties as fields in dict
props = self.get_properties(include=include, exclude=exclude) if object.__getattribute__(self, "Meta").property_fields:
if props: props = self.get_properties(include=include, exclude=exclude)
dict_instance.update({prop: getattr(self, prop) for prop in props}) if props:
dict_instance.update({prop: getattr(self, prop) for prop in props})
return dict_instance return dict_instance
@ -379,4 +372,7 @@ class NewBaseModel(
return value return value
def _is_conversion_to_json_needed(self, column_name: str) -> bool: def _is_conversion_to_json_needed(self, column_name: str) -> bool:
return self.Meta.model_fields[column_name].__type__ == pydantic.Json return (
column_name in self.Meta.model_fields
and self.Meta.model_fields[column_name].__type__ == pydantic.Json
)

View File

@ -0,0 +1,61 @@
quick_access_set = {
"Config",
"Meta",
"__class__",
"__config__",
"__custom_root_type__",
"__dict__",
"__fields__",
"__fields_set__",
"__json_encoder__",
"__post_root_validators__",
"__pre_root_validators__",
"__same__",
"_calculate_keys",
"_convert_json",
"_extract_db_related_names",
"_extract_model_db_fields",
"_extract_nested_models",
"_extract_nested_models_from_list",
"_extract_own_model_fields",
"_get_related_not_excluded_fields",
"_get_value",
"_is_conversion_to_json_needed",
"_iter",
"_orm",
"_orm_id",
"_orm_saved",
"_related_names",
"_skip_ellipsis",
"_update_and_follow",
"_update_excluded_with_related_not_required",
"copy",
"delete",
"dict",
"extract_related_names",
"from_dict",
"get_column_alias",
"get_column_name_from_alias",
"get_filtered_names_to_extract",
"get_name",
"get_properties",
"get_related_field_name",
"get_relation_model_id",
"json",
"keys",
"load",
"pk_column",
"pk_type",
"populate_default_values",
"remove",
"resolve_relation_field",
"resolve_relation_name",
"save",
"save_related",
"saved",
"set_save_status",
"translate_aliases_to_columns",
"translate_columns_to_aliases",
"update",
"upsert",
}

View File

@ -10,6 +10,7 @@ from fastapi import FastAPI
from starlette.testclient import TestClient from starlette.testclient import TestClient
import ormar import ormar
from ormar import property_field
from tests.settings import DATABASE_URL from tests.settings import DATABASE_URL
app = FastAPI() app = FastAPI()
@ -64,6 +65,8 @@ class RandomModel(ormar.Model):
metadata = metadata metadata = metadata
database = database database = database
include_props_in_dict = True
id: int = ormar.Integer(primary_key=True) id: int = ormar.Integer(primary_key=True)
password: str = ormar.String(max_length=255, default=gen_pass) password: str = ormar.String(max_length=255, default=gen_pass)
first_name: str = ormar.String(max_length=255, default="John") first_name: str = ormar.String(max_length=255, default="John")
@ -72,6 +75,10 @@ class RandomModel(ormar.Model):
server_default=sqlalchemy.func.now() server_default=sqlalchemy.func.now()
) )
@property_field
def full_name(self) -> str:
return " ".join([self.first_name, self.last_name])
class User(ormar.Model): class User(ormar.Model):
class Meta: class Meta:
@ -80,10 +87,10 @@ class User(ormar.Model):
database = database database = database
id: int = ormar.Integer(primary_key=True) id: int = ormar.Integer(primary_key=True)
email: str = ormar.String(max_length=255, nullable=False) email: str = ormar.String(max_length=255)
password: str = ormar.String(max_length=255, nullable=True) password: str = ormar.String(max_length=255, nullable=True)
first_name: str = ormar.String(max_length=255, nullable=False) first_name: str = ormar.String(max_length=255)
last_name: str = ormar.String(max_length=255, nullable=False) last_name: str = ormar.String(max_length=255)
category: str = ormar.String(max_length=255, nullable=True) category: str = ormar.String(max_length=255, nullable=True)
@ -95,10 +102,13 @@ class User2(ormar.Model):
id: int = ormar.Integer(primary_key=True) id: int = ormar.Integer(primary_key=True)
email: str = ormar.String(max_length=255, nullable=False) email: str = ormar.String(max_length=255, nullable=False)
password: str = ormar.String(max_length=255, nullable=False) password: str = ormar.String(max_length=255)
first_name: str = ormar.String(max_length=255, nullable=False) first_name: str = ormar.String(max_length=255)
last_name: str = ormar.String(max_length=255, nullable=False) last_name: str = ormar.String(max_length=255)
category: str = ormar.String(max_length=255, nullable=True) category: str = ormar.String(max_length=255, nullable=True)
timestamp: datetime.datetime = ormar.DateTime(
pydantic_only=True, default=datetime.datetime.now
)
@pytest.fixture(autouse=True, scope="module") @pytest.fixture(autouse=True, scope="module")
@ -133,10 +143,23 @@ async def create_user4(user: User2):
@app.post("/random/", response_model=RandomModel) @app.post("/random/", response_model=RandomModel)
async def create_user5(user: RandomModel): async def create_user5(user: RandomModel):
return await user.save() user = await user.save()
return user
def test_all_endpoints(): @app.post("/random2/", response_model=RandomModel)
async def create_user6(user: RandomModel):
user = await user.save()
return user.dict()
@app.post("/random3/", response_model=RandomModel, response_model_exclude={"full_name"})
async def create_user7(user: RandomModel):
user = await user.save()
return user.dict()
def test_excluding_fields_in_endpoints():
client = TestClient(app) client = TestClient(app)
with client as client: with client as client:
user = { user = {
@ -170,6 +193,32 @@ def test_all_endpoints():
response = client.post("/users3/", json=user) response = client.post("/users3/", json=user)
assert list(response.json().keys()) == ["email", "first_name", "last_name"] assert list(response.json().keys()) == ["email", "first_name", "last_name"]
timestamp = datetime.datetime.now()
user3 = {
"email": "test@domain.com",
"password": "^*^%A*DA*IAAA",
"first_name": "John",
"last_name": "Doe",
"timestamp": str(timestamp),
}
response = client.post("/users4/", json=user3)
assert list(response.json().keys()) == [
"id",
"email",
"first_name",
"last_name",
"category",
"timestamp",
]
assert response.json().get("timestamp") == str(timestamp).replace(" ", "T")
resp_dict = response.json()
resp_dict.update({"password": "random"})
user_instance = User2(**resp_dict)
assert user_instance.timestamp is not None
assert isinstance(user_instance.timestamp, datetime.datetime)
assert user_instance.timestamp == timestamp
response = client.post("/users4/", json=user) response = client.post("/users4/", json=user)
assert list(response.json().keys()) == [ assert list(response.json().keys()) == [
"id", "id",
@ -177,8 +226,28 @@ def test_all_endpoints():
"first_name", "first_name",
"last_name", "last_name",
"category", "category",
"timestamp",
] ]
assert response.json().get("timestamp") != str(timestamp).replace(" ", "T")
assert response.json().get("timestamp") is not None
def test_adding_fields_in_endpoints():
client = TestClient(app)
with client as client:
user3 = {"last_name": "Test", "full_name": "deleted"}
response = client.post("/random/", json=user3)
assert list(response.json().keys()) == [
"id",
"password",
"first_name",
"last_name",
"created_date",
"full_name",
]
assert response.json().get("full_name") == "John Test"
RandomModel.Meta.include_props_in_fields = False
user3 = {"last_name": "Test"} user3 = {"last_name": "Test"}
response = client.post("/random/", json=user3) response = client.post("/random/", json=user3)
assert list(response.json().keys()) == [ assert list(response.json().keys()) == [
@ -187,4 +256,39 @@ def test_all_endpoints():
"first_name", "first_name",
"last_name", "last_name",
"created_date", "created_date",
"full_name",
] ]
assert response.json().get("full_name") == "John Test"
def test_adding_fields_in_endpoints2():
client = TestClient(app)
with client as client:
RandomModel.Meta.include_props_in_dict = True
user3 = {"last_name": "Test"}
response = client.post("/random2/", json=user3)
assert list(response.json().keys()) == [
"id",
"password",
"first_name",
"last_name",
"created_date",
"full_name",
]
assert response.json().get("full_name") == "John Test"
def test_excluding_property_field_in_endpoints2():
client = TestClient(app)
with client as client:
RandomModel.Meta.include_props_in_dict = True
user3 = {"last_name": "Test"}
response = client.post("/random3/", json=user3)
assert list(response.json().keys()) == [
"id",
"password",
"first_name",
"last_name",
"created_date",
]
assert response.json().get("full_name") is None

View File

@ -1,8 +1,10 @@
# type: ignore
import databases import databases
import pytest import pytest
import sqlalchemy import sqlalchemy
import ormar import ormar
from ormar import ModelDefinitionError, property_field
from tests.settings import DATABASE_URL from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL, force_rollback=True) database = databases.Database(DATABASE_URL, force_rollback=True)
@ -19,15 +21,15 @@ class Song(ormar.Model):
name: str = ormar.String(max_length=100) name: str = ormar.String(max_length=100)
sort_order: int = ormar.Integer() sort_order: int = ormar.Integer()
@property @property_field
def sorted_name(self): def sorted_name(self):
return f"{self.sort_order}: {self.name}" return f"{self.sort_order}: {self.name}"
@property @property_field
def sample(self): def sample(self):
return "sample" return "sample"
@property @property_field
def sample2(self): def sample2(self):
return "sample2" return "sample2"
@ -66,3 +68,12 @@ async def test_sort_order_on_main_model():
assert "sample" not in check_include assert "sample" not in check_include
assert "sample2" in check_include assert "sample2" in check_include
assert "sorted_name" in check_include assert "sorted_name" in check_include
def test_wrong_definition():
with pytest.raises(ModelDefinitionError):
class WrongModel(ormar.Model): # pragma: no cover
@property_field
def test(self, aa=10, bb=30):
pass

View File

@ -0,0 +1,79 @@
import datetime
import databases
import pytest
import sqlalchemy
import ormar
from ormar import property_field
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL, force_rollback=True)
metadata = sqlalchemy.MetaData()
class Album(ormar.Model):
class Meta:
tablename = "albums"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
timestamp: datetime.datetime = ormar.DateTime(pydantic_only=True)
@property_field
def name10(self) -> str:
return self.name + "_10"
@property_field
def name20(self) -> str:
return self.name + "_20"
@property
def name30(self) -> str:
return self.name + "_30"
@property_field
def name40(self) -> str:
return self.name + "_40"
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.drop_all(engine)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.mark.asyncio
async def test_pydantic_only_fields():
async with database:
async with database.transaction(force_rollback=True):
album = await Album.objects.create(name="Hitchcock")
assert album.pk is not None
assert album.saved
assert album.timestamp is None
album = await Album.objects.exclude_fields("timestamp").get()
assert album.timestamp is None
album = await Album.objects.fields({"name", "timestamp"}).get()
assert album.timestamp is None
test_dict = album.dict()
assert "timestamp" in test_dict
assert test_dict["timestamp"] is None
assert album.name30 == "Hitchcock_30"
album.timestamp = datetime.datetime.now()
test_dict = album.dict()
assert "timestamp" in test_dict
assert test_dict["timestamp"] is not None
assert test_dict.get("name10") == "Hitchcock_10"
assert test_dict.get("name20") == "Hitchcock_20"
assert test_dict.get("name40") == "Hitchcock_40"
assert "name30" not in test_dict