Merge pull request #183 from collerek/large_binary

Add large binary, support for native pydantic fields, examples in openapi
This commit is contained in:
collerek
2021-05-02 15:11:46 +02:00
committed by GitHub
29 changed files with 790 additions and 110 deletions

1
.github/FUNDING.yml vendored Normal file
View File

@ -0,0 +1 @@
github: collerek

View File

@ -17,3 +17,15 @@ test_mysql:
test_sqlite: test_sqlite:
bash scripts/test.sh -svv bash scripts/test.sh -svv
test:
pytest
coverage:
pytest --cov=ormar --cov=tests --cov-fail-under=100 --cov-report=term-missing
black:
black ormar tests
mypy:
mypy ormar tests

View File

@ -452,6 +452,16 @@ async def aggregations():
# visit: https://collerek.github.io/ormar/queries/aggregations/ # visit: https://collerek.github.io/ormar/queries/aggregations/
async def with_connect(function):
# note that for any other backend than sqlite you actually need to
# connect to the database to perform db operations
async with database:
await function()
# note that if you use framework like `fastapi` you shouldn't connect
# in your endpoints but have a global connection pool
# check https://collerek.github.io/ormar/fastapi/ and section with db connection
# gather and execute all functions # gather and execute all functions
# note - normally import should be at the beginning of the file # note - normally import should be at the beginning of the file
import asyncio import asyncio
@ -462,7 +472,7 @@ for func in [create, read, update, delete, joins,
filter_and_sort, subset_of_columns, filter_and_sort, subset_of_columns,
pagination, aggregations]: pagination, aggregations]:
print(f"Executing: {func.__name__}") print(f"Executing: {func.__name__}")
asyncio.run(func()) asyncio.run(with_connect(func))
# drop the database tables # drop the database tables
metadata.drop_all(engine) metadata.drop_all(engine)
@ -521,6 +531,7 @@ Available Model Fields (with required args - optional ones in docs):
* `BigInteger()` * `BigInteger()`
* `Decimal(scale, precision)` * `Decimal(scale, precision)`
* `UUID()` * `UUID()`
* `LargeBinary(max_length)`
* `EnumField` - by passing `choices` to any other Field type * `EnumField` - by passing `choices` to any other Field type
* `EncryptedString` - by passing `encrypt_secret` and `encrypt_backend` * `EncryptedString` - by passing `encrypt_secret` and `encrypt_backend`
* `ForeignKey(to)` * `ForeignKey(to)`

View File

@ -98,7 +98,11 @@ Sets the unique constraint on a table's column.
Used in sql only. Used in sql only.
## pydantic_only ## pydantic_only (**DEPRECATED**)
**This parameter is deprecated and will be removed in one of next releases!**
**To check how to declare pydantic only fields that are not saved into database see [pydantic fields section](pydantic-fields.md)**
`pydantic_only`: `bool` = `False` `pydantic_only`: `bool` = `False`

View File

@ -127,6 +127,17 @@ You can use either `length` and `precision` parameters or `max_digits` and `deci
* Sqlalchemy column: `sqlalchemy.JSON` * Sqlalchemy column: `sqlalchemy.JSON`
* Type (used for pydantic): `pydantic.Json` * Type (used for pydantic): `pydantic.Json`
### LargeBinary
`LargeBinary(max_length)` has a required `max_length` parameter.
* Sqlalchemy column: `sqlalchemy.LargeBinary`
* Type (used for pydantic): `bytes`
LargeBinary length is used in some backend (i.e. mysql) to determine the size of the field,
in other backends it's simply ignored yet in ormar it's always required. It should be max
size of the file/bytes in bytes.
### UUID ### UUID
`UUID(uuid_format: str = 'hex')` has no required parameters. `UUID(uuid_format: str = 'hex')` has no required parameters.

View File

@ -0,0 +1,195 @@
# Pydantic only fields
Ormar allows you to declare normal `pydantic` fields in its model, so you have access to
all basic and custom pydantic fields like `str`, `int`, `HttpUrl`, `PaymentCardNumber` etc.
You can even declare fields leading to nested pydantic only Models, not only single fields.
Since those fields are not stored in database (that's the whole point of those fields),
you have to provide a meaningful value for them, either by setting a default one or
providing one during model initialization.
If `ormar` cannot resolve the value for pydantic field it will fail during loading data from the database,
with missing required value for declared pydantic field.
Options to provide a value are described below.
Of course you can combine few or all of them in one model.
## Optional field
If you set a field as `Optional`, it defaults to `None` if not provided and that's
exactly what's going to happen during loading from database.
```python
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
class ModelTest(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200)
number: Optional[PaymentCardNumber]
test = ModelTest(name="Test")
assert test.name == "Test"
assert test.number is None
test.number = "123456789015"
await test.save()
test_check = await ModelTest.objects.get()
assert test_check.name == "Test"
# after load it's back to None
assert test_check.number is None
```
## Field with default value
By setting a default value, this value will be set on initialization and database load.
Note that setting a default to `None` is the same as setting the field to `Optional`.
```python
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
class ModelTest(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200)
url: HttpUrl = "https://www.example.com"
test = ModelTest(name="Test")
assert test.name == "Test"
assert test.url == "https://www.example.com"
test.url = "https://www.sdta.ada.pt"
assert test.url == "https://www.sdta.ada.pt"
await test.save()
test_check = await ModelTest.objects.get()
assert test_check.name == "Test"
# after load it's back to default
assert test_check.url == "https://www.example.com"
```
## Default factory function
By setting a `default_factory` function, this result of the function call will be set
on initialization and each database load.
```python
from pydantic import Field, PaymentCardNumber
# ...
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
CARD_NUMBERS = [
"123456789007",
"123456789015",
"123456789023",
"123456789031",
"123456789049",
]
def get_number():
return random.choice(CARD_NUMBERS)
class ModelTest2(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200)
# note that you do not call the function, just pass reference
number: PaymentCardNumber = Field(default_factory=get_number)
# note that you still CAN provide a value
test = ModelTest2(name="Test2", number="4000000000000002")
assert test.name == "Test2"
assert test.number == "4000000000000002"
await test.save()
test_check = await ModelTest2.objects.get()
assert test_check.name == "Test2"
# after load value is set to be one of the CARD_NUMBERS
assert test_check.number in CARD_NUMBERS
assert test_check.number != test.number
```
## Custom setup in `__init__`
You can provide a value for the field in your `__init__()` method before calling a `super()` init method.
```python
from pydantic import BaseModel
# ...
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
class PydanticTest(BaseModel):
aa: str
bb: int
class ModelTest3(ormar.Model):
class Meta(BaseMeta):
pass
# provide your custom init function
def __init__(self, **kwargs):
# add value for required field without default value
kwargs["pydantic_test"] = PydanticTest(aa="random", bb=42)
# remember to call ormar.Model init!
super().__init__(**kwargs)
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200)
pydantic_test: PydanticTest
test = ModelTest3(name="Test3")
assert test.name == "Test3"
assert test.pydantic_test.bb == 42
test.pydantic.aa = "new value"
assert test.pydantic.aa == "new value"
await test.save()
test_check = await ModelTest3.objects.get()
assert test_check.name == "Test3"
# after load it's back to value provided in init
assert test_check.pydantic_test.aa == "random"
```
!!!warning
If you do not provide a value in one of the above ways `ValidationError` will be raised on load from database.

View File

@ -452,6 +452,16 @@ async def aggregations():
# visit: https://collerek.github.io/ormar/queries/aggregations/ # visit: https://collerek.github.io/ormar/queries/aggregations/
async def with_connect(function):
# note that for any other backend than sqlite you actually need to
# connect to the database to perform db operations
async with database:
await function()
# note that if you use framework like `fastapi` you shouldn't connect
# in your endpoints but have a global connection pool
# check https://collerek.github.io/ormar/fastapi/ and section with db connection
# gather and execute all functions # gather and execute all functions
# note - normally import should be at the beginning of the file # note - normally import should be at the beginning of the file
import asyncio import asyncio
@ -462,7 +472,7 @@ for func in [create, read, update, delete, joins,
filter_and_sort, subset_of_columns, filter_and_sort, subset_of_columns,
pagination, aggregations]: pagination, aggregations]:
print(f"Executing: {func.__name__}") print(f"Executing: {func.__name__}")
asyncio.run(func()) asyncio.run(with_connect(func))
# drop the database tables # drop the database tables
metadata.drop_all(engine) metadata.drop_all(engine)
@ -521,6 +531,7 @@ Available Model Fields (with required args - optional ones in docs):
* `BigInteger()` * `BigInteger()`
* `Decimal(scale, precision)` * `Decimal(scale, precision)`
* `UUID()` * `UUID()`
* `LargeBinary(max_length)`
* `EnumField` - by passing `choices` to any other Field type * `EnumField` - by passing `choices` to any other Field type
* `EncryptedString` - by passing `encrypt_secret` and `encrypt_backend` * `EncryptedString` - by passing `encrypt_secret` and `encrypt_backend`
* `ForeignKey(to)` * `ForeignKey(to)`

View File

@ -1,3 +1,38 @@
# 0.10.6
## ✨ Features
* Add `LargeBinary(max_length)` field type [#166](https://github.com/collerek/ormar/issues/166)
* Add support for normal pydantic fields (including Models) instead of `pydantic_only`
attribute which is now deprecated [#160](https://github.com/collerek/ormar/issues/160).
Pydantic fields should be declared normally as in pydantic model next to ormar fields,
note that (obviously) `ormar` does not save and load the value for this field in
database that mean that **ONE** of the following has to be true:
* pydantic field declared on ormar model has to be `Optional` (defaults to None)
* pydantic field has to have a default value set
* pydantic field has `default_factory` function set
* ormar.Model with pydantic field has to overwrite `__init__()` and provide the value there
If none of the above `ormar` (or rather pydantic) will fail during loading data from the database,
with missing required value for declared pydantic field.
* Ormar provides now a meaningful examples in openapi schema, including nested models.
The same algorithm is used to iterate related models without looks
as with `dict()` and `select/load_all`. Examples appear also in `fastapi`. [#157](https://github.com/collerek/ormar/issues/157)
## 🐛 Fixes
* By default `pydantic` is not validating fields during assignment,
which is not a desirable setting for an ORM, now all `ormar.Models`
have validation turned-on during assignment (like `model.column = 'value'`)
## 💬 Other
* Add connecting to the database in QuickStart in readme [#180](https://github.com/collerek/ormar/issues/180)
* OpenAPI schema does no longer include `ormar.Model` docstring as description,
instead just model name is provided if you do not provide your own docstring.
* Some performance improvements.
# 0.10.5 # 0.10.5
## 🐛 Fixes ## 🐛 Fixes

Binary file not shown.

View File

@ -319,6 +319,16 @@ async def aggregations():
# visit: https://collerek.github.io/ormar/queries/aggregations/ # visit: https://collerek.github.io/ormar/queries/aggregations/
async def with_connect(function):
# note that for any other backend than sqlite you actually need to
# connect to the database to perform db operations
async with database:
await function()
# note that if you use framework like `fastapi` you shouldn't connect
# in your endpoints but have a global connection pool
# check https://collerek.github.io/ormar/fastapi/ and section with db connection
# gather and execute all functions # gather and execute all functions
# note - normally import should be at the beginning of the file # note - normally import should be at the beginning of the file
import asyncio import asyncio
@ -329,7 +339,7 @@ for func in [create, read, update, delete, joins,
filter_and_sort, subset_of_columns, filter_and_sort, subset_of_columns,
pagination, aggregations]: pagination, aggregations]:
print(f"Executing: {func.__name__}") print(f"Executing: {func.__name__}")
asyncio.run(func()) asyncio.run(with_connect(func))
# drop the database tables # drop the database tables
metadata.drop_all(engine) metadata.drop_all(engine)

View File

@ -12,6 +12,7 @@ nav:
- Fields: - Fields:
- Common parameters: fields/common-parameters.md - Common parameters: fields/common-parameters.md
- Fields types: fields/field-types.md - Fields types: fields/field-types.md
- Pydantic only fields: fields/pydantic-fields.md
- Fields encryption: fields/encryption.md - Fields encryption: fields/encryption.md
- Relations: - Relations:
- relations/index.md - relations/index.md

View File

@ -53,6 +53,7 @@ from ormar.fields import (
ForeignKeyField, ForeignKeyField,
Integer, Integer,
JSON, JSON,
LargeBinary,
ManyToMany, ManyToMany,
ManyToManyField, ManyToManyField,
String, String,
@ -75,7 +76,7 @@ class UndefinedType: # pragma no cover
Undefined = UndefinedType() Undefined = UndefinedType()
__version__ = "0.10.5" __version__ = "0.10.6"
__all__ = [ __all__ = [
"Integer", "Integer",
"BigInteger", "BigInteger",
@ -124,4 +125,5 @@ __all__ = [
"EncryptBackends", "EncryptBackends",
"ENCODERS_MAP", "ENCODERS_MAP",
"DECODERS_MAP", "DECODERS_MAP",
"LargeBinary",
] ]

View File

@ -16,6 +16,7 @@ from ormar.fields.model_fields import (
Float, Float,
Integer, Integer,
JSON, JSON,
LargeBinary,
String, String,
Text, Text,
Time, Time,
@ -50,4 +51,5 @@ __all__ = [
"EncryptBackend", "EncryptBackend",
"DECODERS_MAP", "DECODERS_MAP",
"ENCODERS_MAP", "ENCODERS_MAP",
"LargeBinary",
] ]

View File

@ -1,3 +1,4 @@
import warnings
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Type, Union from typing import Any, Dict, List, Optional, TYPE_CHECKING, Type, Union
import sqlalchemy import sqlalchemy
@ -30,6 +31,7 @@ class BaseField(FieldInfo):
def __init__(self, **kwargs: Any) -> None: def __init__(self, **kwargs: Any) -> None:
self.__type__: type = kwargs.pop("__type__", None) self.__type__: type = kwargs.pop("__type__", None)
self.__sample__: type = kwargs.pop("__sample__", None)
self.related_name = kwargs.pop("related_name", None) self.related_name = kwargs.pop("related_name", None)
self.column_type: sqlalchemy.Column = kwargs.pop("column_type", None) self.column_type: sqlalchemy.Column = kwargs.pop("column_type", None)
@ -43,6 +45,14 @@ class BaseField(FieldInfo):
self.index: bool = kwargs.pop("index", False) self.index: bool = kwargs.pop("index", False)
self.unique: bool = kwargs.pop("unique", False) self.unique: bool = kwargs.pop("unique", False)
self.pydantic_only: bool = kwargs.pop("pydantic_only", False) self.pydantic_only: bool = kwargs.pop("pydantic_only", False)
if self.pydantic_only:
warnings.warn(
"Parameter `pydantic_only` is deprecated and will "
"be removed in one of the next releases.\n You can declare "
"pydantic fields in a normal way. \n Check documentation: "
"https://collerek.github.io/ormar/fields/pydantic-fields",
DeprecationWarning,
)
self.choices: typing.Sequence = kwargs.pop("choices", False) self.choices: typing.Sequence = kwargs.pop("choices", False)
self.virtual: bool = kwargs.pop( self.virtual: bool = kwargs.pop(

View File

@ -80,7 +80,7 @@ def create_dummy_model(
:rtype: pydantic.BaseModel :rtype: pydantic.BaseModel
""" """
alias = ( alias = (
"".join(choices(string.ascii_uppercase, k=2)) + uuid.uuid4().hex[:4] "".join(choices(string.ascii_uppercase, k=6)) # + uuid.uuid4().hex[:4]
).lower() ).lower()
fields = {f"{pk_field.name}": (pk_field.__type__, None)} fields = {f"{pk_field.name}": (pk_field.__type__, None)}

View File

@ -62,6 +62,7 @@ class ModelFieldFactory:
_bases: Any = (BaseField,) _bases: Any = (BaseField,)
_type: Any = None _type: Any = None
_sample: Any = None
def __new__(cls, *args: Any, **kwargs: Any) -> BaseField: # type: ignore def __new__(cls, *args: Any, **kwargs: Any) -> BaseField: # type: ignore
cls.validate(**kwargs) cls.validate(**kwargs)
@ -80,6 +81,7 @@ class ModelFieldFactory:
namespace = dict( namespace = dict(
__type__=cls._type, __type__=cls._type,
__sample__=cls._sample,
alias=kwargs.pop("name", None), alias=kwargs.pop("name", None),
name=None, name=None,
primary_key=primary_key, primary_key=primary_key,
@ -129,6 +131,7 @@ class String(ModelFieldFactory, str):
""" """
_type = str _type = str
_sample = "string"
def __new__( # type: ignore # noqa CFQ002 def __new__( # type: ignore # noqa CFQ002
cls, cls,
@ -185,6 +188,7 @@ class Integer(ModelFieldFactory, int):
""" """
_type = int _type = int
_sample = 0
def __new__( # type: ignore def __new__( # type: ignore
cls, cls,
@ -232,6 +236,7 @@ class Text(ModelFieldFactory, str):
""" """
_type = str _type = str
_sample = "text"
def __new__( # type: ignore def __new__( # type: ignore
cls, *, allow_blank: bool = True, strip_whitespace: bool = False, **kwargs: Any cls, *, allow_blank: bool = True, strip_whitespace: bool = False, **kwargs: Any
@ -267,6 +272,7 @@ class Float(ModelFieldFactory, float):
""" """
_type = float _type = float
_sample = 0.0
def __new__( # type: ignore def __new__( # type: ignore
cls, cls,
@ -316,6 +322,7 @@ else:
""" """
_type = bool _type = bool
_sample = True
@classmethod @classmethod
def get_column_type(cls, **kwargs: Any) -> Any: def get_column_type(cls, **kwargs: Any) -> Any:
@ -337,6 +344,7 @@ class DateTime(ModelFieldFactory, datetime.datetime):
""" """
_type = datetime.datetime _type = datetime.datetime
_sample = "datetime"
@classmethod @classmethod
def get_column_type(cls, **kwargs: Any) -> Any: def get_column_type(cls, **kwargs: Any) -> Any:
@ -358,6 +366,7 @@ class Date(ModelFieldFactory, datetime.date):
""" """
_type = datetime.date _type = datetime.date
_sample = "date"
@classmethod @classmethod
def get_column_type(cls, **kwargs: Any) -> Any: def get_column_type(cls, **kwargs: Any) -> Any:
@ -379,6 +388,7 @@ class Time(ModelFieldFactory, datetime.time):
""" """
_type = datetime.time _type = datetime.time
_sample = "time"
@classmethod @classmethod
def get_column_type(cls, **kwargs: Any) -> Any: def get_column_type(cls, **kwargs: Any) -> Any:
@ -400,6 +410,7 @@ class JSON(ModelFieldFactory, pydantic.Json):
""" """
_type = pydantic.Json _type = pydantic.Json
_sample = '{"json": "json"}'
@classmethod @classmethod
def get_column_type(cls, **kwargs: Any) -> Any: def get_column_type(cls, **kwargs: Any) -> Any:
@ -415,12 +426,61 @@ class JSON(ModelFieldFactory, pydantic.Json):
return sqlalchemy.JSON() return sqlalchemy.JSON()
class LargeBinary(ModelFieldFactory, bytes):
"""
LargeBinary field factory that construct Field classes and populated their values.
"""
_type = bytes
_sample = "bytes"
def __new__( # type: ignore # noqa CFQ002
cls, *, max_length: int = None, **kwargs: Any
) -> BaseField: # type: ignore
kwargs = {
**kwargs,
**{
k: v
for k, v in locals().items()
if k not in ["cls", "__class__", "kwargs"]
},
}
return super().__new__(cls, **kwargs)
@classmethod
def get_column_type(cls, **kwargs: Any) -> Any:
"""
Return proper type of db column for given field type.
Accepts required and optional parameters that each column type accepts.
:param kwargs: key, value pairs of sqlalchemy options
:type kwargs: Any
:return: initialized column with proper options
:rtype: sqlalchemy Column
"""
return sqlalchemy.LargeBinary(length=kwargs.get("max_length"))
@classmethod
def validate(cls, **kwargs: Any) -> None:
"""
Used to validate if all required parameters on a given field type are set.
:param kwargs: all params passed during construction
:type kwargs: Any
"""
max_length = kwargs.get("max_length", None)
if max_length is None or max_length <= 0:
raise ModelDefinitionError(
"Parameter max_length is required for field LargeBinary"
)
class BigInteger(Integer, int): class BigInteger(Integer, int):
""" """
BigInteger field factory that construct Field classes and populated their values. BigInteger field factory that construct Field classes and populated their values.
""" """
_type = int _type = int
_sample = 0
def __new__( # type: ignore def __new__( # type: ignore
cls, cls,
@ -468,6 +528,7 @@ class Decimal(ModelFieldFactory, decimal.Decimal):
""" """
_type = decimal.Decimal _type = decimal.Decimal
_sample = 0.0
def __new__( # type: ignore # noqa CFQ002 def __new__( # type: ignore # noqa CFQ002
cls, cls,
@ -540,6 +601,7 @@ class UUID(ModelFieldFactory, uuid.UUID):
""" """
_type = uuid.UUID _type = uuid.UUID
_sample = "uuid"
def __new__( # type: ignore # noqa CFQ002 def __new__( # type: ignore # noqa CFQ002
cls, *, uuid_format: str = "hex", **kwargs: Any cls, *, uuid_format: str = "hex", **kwargs: Any

View File

@ -3,6 +3,7 @@ import itertools
import sqlite3 import sqlite3
from typing import Any, Dict, List, TYPE_CHECKING, Tuple, Type from typing import Any, Dict, List, TYPE_CHECKING, Tuple, Type
import pydantic
from pydantic.typing import ForwardRef from pydantic.typing import ForwardRef
import ormar # noqa: I100 import ormar # noqa: I100
from ormar.models.helpers.pydantic import populate_pydantic_default_values from ormar.models.helpers.pydantic import populate_pydantic_default_values
@ -61,6 +62,12 @@ def populate_default_options_values(
else: else:
new_model.Meta.requires_ref_update = False new_model.Meta.requires_ref_update = False
new_model._json_fields = {
name
for name, field in new_model.Meta.model_fields.items()
if field.__type__ == pydantic.Json
}
class Connection(sqlite3.Connection): class Connection(sqlite3.Connection):
def __init__(self, *args: Any, **kwargs: Any) -> None: # pragma: no cover def __init__(self, *args: Any, **kwargs: Any) -> None: # pragma: no cover

View File

@ -98,6 +98,7 @@ def get_pydantic_base_orm_config() -> Type[pydantic.BaseConfig]:
class Config(pydantic.BaseConfig): class Config(pydantic.BaseConfig):
orm_mode = True orm_mode = True
validate_assignment = True
return Config return Config

View File

@ -1,8 +1,9 @@
import datetime import datetime
import decimal import decimal
import numbers
import uuid import uuid
from enum import Enum from enum import Enum
from typing import Any, Dict, List, TYPE_CHECKING, Tuple, Type from typing import Any, Dict, List, Set, TYPE_CHECKING, Tuple, Type, Union
try: try:
import orjson as json import orjson as json
@ -10,11 +11,13 @@ except ImportError: # pragma: no cover
import json # type: ignore import json # type: ignore
import pydantic import pydantic
from pydantic.fields import SHAPE_LIST
from pydantic.main import SchemaExtraCallable from pydantic.main import SchemaExtraCallable
import ormar # noqa: I100, I202 import ormar # noqa: I100, I202
from ormar.fields import BaseField from ormar.fields import BaseField
from ormar.models.helpers.models import meta_field_not_set from ormar.models.helpers.models import meta_field_not_set
from ormar.queryset.utils import translate_list_to_dict
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
from ormar import Model from ormar import Model
@ -73,6 +76,8 @@ def convert_choices_if_needed( # noqa: CCR001
else value else value
) )
choices = [round(float(o), precision) for o in choices] choices = [round(float(o), precision) for o in choices]
elif field.__type__ == bytes:
value = value if isinstance(value, bytes) else value.encode("utf-8")
return value, choices return value, choices
@ -114,12 +119,109 @@ def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, A
return values return values
def generate_model_example(model: Type["Model"], relation_map: Dict = None) -> Dict:
"""
Generates example to be included in schema in fastapi.
:param model: ormar.Model
:type model: Type["Model"]
:param relation_map: dict with relations to follow
:type relation_map: Optional[Dict]
:return:
:rtype: Dict[str, int]
"""
example: Dict[str, Any] = dict()
relation_map = (
relation_map
if relation_map is not None
else translate_list_to_dict(model._iterate_related_models())
)
for name, field in model.Meta.model_fields.items():
if not field.is_relation:
example[name] = field.__sample__
elif isinstance(relation_map, dict) and name in relation_map:
example[name] = get_nested_model_example(
name=name, field=field, relation_map=relation_map
)
to_exclude = {name for name in model.Meta.model_fields}
pydantic_repr = generate_pydantic_example(pydantic_model=model, exclude=to_exclude)
example.update(pydantic_repr)
return example
def get_nested_model_example(
name: str, field: "BaseField", relation_map: Dict
) -> Union[List, Dict]:
"""
Gets representation of nested model.
:param name: name of the field to follow
:type name: str
:param field: ormar field
:type field: BaseField
:param relation_map: dict with relation map
:type relation_map: Dict
:return: nested model or list of nested model repr
:rtype: Union[List, Dict]
"""
value = generate_model_example(field.to, relation_map=relation_map.get(name, {}))
new_value: Union[List, Dict] = [value] if field.is_multi or field.virtual else value
return new_value
def generate_pydantic_example(
pydantic_model: Type[pydantic.BaseModel], exclude: Set = None
) -> Dict:
"""
Generates dict with example.
:param pydantic_model: model to parse
:type pydantic_model: Type[pydantic.BaseModel]
:param exclude: list of fields to exclude
:type exclude: Optional[Set]
:return: dict with fields and sample values
:rtype: Dict
"""
example: Dict[str, Any] = dict()
exclude = exclude or set()
name_to_check = [name for name in pydantic_model.__fields__ if name not in exclude]
for name in name_to_check:
field = pydantic_model.__fields__[name]
type_ = field.type_
if field.shape == SHAPE_LIST:
example[name] = [get_pydantic_example_repr(type_)]
else:
example[name] = get_pydantic_example_repr(type_)
return example
def get_pydantic_example_repr(type_: Any) -> Any:
"""
Gets sample representation of pydantic field for example dict.
:param type_: type of pydantic field
:type type_: Any
:return: representation to include in example
:rtype: Any
"""
if issubclass(type_, (numbers.Number, decimal.Decimal)):
return 0
elif issubclass(type_, pydantic.BaseModel):
return generate_pydantic_example(pydantic_model=type_)
else:
return "string"
def construct_modify_schema_function(fields_with_choices: List) -> SchemaExtraCallable: def construct_modify_schema_function(fields_with_choices: List) -> SchemaExtraCallable:
""" """
Modifies the schema to include fields with choices validator. Modifies the schema to include fields with choices validator.
Those fields will be displayed in schema as Enum types with available choices Those fields will be displayed in schema as Enum types with available choices
values listed next to them. values listed next to them.
Note that schema extra has to be a function, otherwise it's called to soon
before all the relations are expanded.
:param fields_with_choices: list of fields with choices validation :param fields_with_choices: list of fields with choices validation
:type fields_with_choices: List :type fields_with_choices: List
:return: callable that will be run by pydantic to modify the schema :return: callable that will be run by pydantic to modify the schema
@ -131,6 +233,28 @@ def construct_modify_schema_function(fields_with_choices: List) -> SchemaExtraCa
if field_id in fields_with_choices: if field_id in fields_with_choices:
prop["enum"] = list(model.Meta.model_fields[field_id].choices) prop["enum"] = list(model.Meta.model_fields[field_id].choices)
prop["description"] = prop.get("description", "") + "An enumeration." prop["description"] = prop.get("description", "") + "An enumeration."
schema["example"] = generate_model_example(model=model)
if "Main base class of ormar Model." in schema.get("description", ""):
schema["description"] = f"{model.__name__}"
return staticmethod(schema_extra) # type: ignore
def construct_schema_function_without_choices() -> SchemaExtraCallable:
"""
Modifies model example and description if needed.
Note that schema extra has to be a function, otherwise it's called to soon
before all the relations are expanded.
:return: callable that will be run by pydantic to modify the schema
:rtype: Callable
"""
def schema_extra(schema: Dict[str, Any], model: Type["Model"]) -> None:
schema["example"] = generate_model_example(model=model)
if "Main base class of ormar Model." in schema.get("description", ""):
schema["description"] = f"{model.__name__}"
return staticmethod(schema_extra) # type: ignore return staticmethod(schema_extra) # type: ignore
@ -160,3 +284,5 @@ def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001
model.Config.schema_extra = construct_modify_schema_function( model.Config.schema_extra = construct_modify_schema_function(
fields_with_choices=fields_with_choices fields_with_choices=fields_with_choices
) )
else:
model.Config.schema_extra = construct_schema_function_without_choices()

View File

@ -94,6 +94,7 @@ def add_cached_properties(new_model: Type["Model"]) -> None:
new_model._related_fields = None new_model._related_fields = None
new_model._pydantic_fields = {name for name in new_model.__fields__} new_model._pydantic_fields = {name for name in new_model.__fields__}
new_model._choices_fields = set() new_model._choices_fields = set()
new_model._json_fields = set()
def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001 def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001

View File

@ -48,7 +48,7 @@ class RelationMixin:
:return: list of related fields :return: list of related fields
:rtype: List :rtype: List
""" """
if isinstance(cls._related_fields, List): if cls._related_fields is not None:
return cls._related_fields return cls._related_fields
related_fields = [] related_fields = []
@ -66,7 +66,7 @@ class RelationMixin:
:return: set of related through fields names :return: set of related through fields names
:rtype: Set :rtype: Set
""" """
if isinstance(cls._through_names, Set): if cls._through_names is not None:
return cls._through_names return cls._through_names
related_names = set() related_names = set()
@ -86,7 +86,7 @@ class RelationMixin:
:return: set of related fields names :return: set of related fields names
:rtype: Set :rtype: Set
""" """
if isinstance(cls._related_names, Set): if cls._related_names is not None:
return cls._related_names return cls._related_names
related_names = set() related_names = set()

View File

@ -275,12 +275,12 @@ class SavePrepareMixin(RelationMixin, AliasMixin):
:rtype: int :rtype: int
""" """
for field in fields_list: for field in fields_list:
value = getattr(self, field.name) or [] values = getattr(self, field.name) or []
if not isinstance(value, list): if not isinstance(values, list):
value = [value] values = [values]
for val in value: for value in values:
if follow: if follow:
update_count = await val.save_related( update_count = await value.save_related(
follow=follow, follow=follow,
save_all=save_all, save_all=save_all,
relation_map=self._skip_ellipsis( # type: ignore relation_map=self._skip_ellipsis( # type: ignore
@ -291,8 +291,8 @@ class SavePrepareMixin(RelationMixin, AliasMixin):
relation_field=field, relation_field=field,
) )
else: else:
update_count = await val._upsert_model( update_count = await value._upsert_model(
instance=val, instance=value,
save_all=save_all, save_all=save_all,
previous_model=self, previous_model=self,
relation_field=field, relation_field=field,

View File

@ -1,5 +1,4 @@
import sys import sys
import uuid
from typing import ( from typing import (
AbstractSet, AbstractSet,
Any, Any,
@ -12,6 +11,7 @@ from typing import (
Sequence, Sequence,
Set, Set,
TYPE_CHECKING, TYPE_CHECKING,
Tuple,
Type, Type,
Union, Union,
cast, cast,
@ -86,6 +86,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
_choices_fields: Optional[Set] _choices_fields: Optional[Set]
_pydantic_fields: Set _pydantic_fields: Set
_quick_access_fields: Set _quick_access_fields: Set
_json_fields: Set
Meta: ModelMeta Meta: ModelMeta
# noinspection PyMissingConstructor # noinspection PyMissingConstructor
@ -123,53 +124,12 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:type kwargs: Any :type kwargs: Any
""" """
self._verify_model_can_be_initialized() self._verify_model_can_be_initialized()
object.__setattr__(self, "_orm_id", uuid.uuid4().hex) self._initialize_internal_attributes()
object.__setattr__(self, "_orm_saved", False)
object.__setattr__(self, "_pk_column", None)
object.__setattr__(
self,
"_orm",
RelationsManager(
related_fields=self.extract_related_fields(), owner=cast("Model", self),
),
)
pk_only = kwargs.pop("__pk_only__", False) pk_only = kwargs.pop("__pk_only__", False)
object.__setattr__(self, "__pk_only__", pk_only) object.__setattr__(self, "__pk_only__", pk_only)
excluded: Set[str] = kwargs.pop("__excluded__", set()) new_kwargs, through_tmp_dict = self._process_kwargs(kwargs)
if "pk" in kwargs:
kwargs[self.Meta.pkname] = kwargs.pop("pk")
# build the models to set them and validate but don't register
# also remove property fields values from validation
try:
new_kwargs: Dict[str, Any] = {
k: self._convert_json(
k,
self.Meta.model_fields[k].expand_relationship(
v, self, to_register=False,
),
"dumps",
)
for k, v in kwargs.items()
if k not in object.__getattribute__(self, "Meta").property_fields
}
except KeyError as e:
raise ModelError(
f"Unknown field '{e.args[0]}' for model {self.get_name(lower=False)}"
)
# explicitly set None to excluded fields
# as pydantic populates them with default if set
for field_to_nullify in excluded:
new_kwargs[field_to_nullify] = None
# extract through fields
through_tmp_dict = dict()
for field_name in self.extract_through_names():
through_tmp_dict[field_name] = new_kwargs.pop(field_name, None)
values, fields_set, validation_error = pydantic.validate_model( values, fields_set, validation_error = pydantic.validate_model(
self, new_kwargs # type: ignore self, new_kwargs # type: ignore
@ -182,10 +142,10 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
# add back through fields # add back through fields
new_kwargs.update(through_tmp_dict) new_kwargs.update(through_tmp_dict)
model_fields = object.__getattribute__(self, "Meta").model_fields
# register the columns models after initialization # register the columns models after initialization
for related in self.extract_related_names().union(self.extract_through_names()): for related in self.extract_related_names().union(self.extract_through_names()):
self.Meta.model_fields[related].expand_relationship( model_fields[related].expand_relationship(
new_kwargs.get(related), self, to_register=True, new_kwargs.get(related), self, to_register=True,
) )
@ -243,7 +203,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
else: else:
if name in object.__getattribute__(self, "_choices_fields"): if name in object.__getattribute__(self, "_choices_fields"):
validate_choices(field=self.Meta.model_fields[name], value=value) validate_choices(field=self.Meta.model_fields[name], value=value)
super().__setattr__(name, value) super().__setattr__(name, self._convert_json(name, value, op="dumps"))
self.set_save_status(False) self.set_save_status(False)
def __getattribute__(self, item: str) -> Any: # noqa: CCR001 def __getattribute__(self, item: str) -> Any: # noqa: CCR001
@ -306,15 +266,89 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: None :return: None
:rtype: None :rtype: None
""" """
if self.Meta.abstract: if object.__getattribute__(self, "Meta").abstract:
raise ModelError(f"You cannot initialize abstract model {self.get_name()}") raise ModelError(f"You cannot initialize abstract model {self.get_name()}")
if self.Meta.requires_ref_update: if object.__getattribute__(self, "Meta").requires_ref_update:
raise ModelError( raise ModelError(
f"Model {self.get_name()} has not updated " f"Model {self.get_name()} has not updated "
f"ForwardRefs. \nBefore using the model you " f"ForwardRefs. \nBefore using the model you "
f"need to call update_forward_refs()." f"need to call update_forward_refs()."
) )
def _process_kwargs(self, kwargs: Dict) -> Tuple[Dict, Dict]:
"""
Initializes nested models.
Removes property_fields
Checks if field is in the model fields or pydatnic fields.
Nullifies fields that should be excluded.
Extracts through models from kwargs into temporary dict.
:param kwargs: passed to init keyword arguments
:type kwargs: Dict
:return: modified kwargs
:rtype: Tuple[Dict, Dict]
"""
meta = object.__getattribute__(self, "Meta")
property_fields = meta.property_fields
model_fields = meta.model_fields
pydantic_fields = object.__getattribute__(self, "__fields__")
# remove property fields
for prop_filed in property_fields:
kwargs.pop(prop_filed, None)
excluded: Set[str] = kwargs.pop("__excluded__", set())
if "pk" in kwargs:
kwargs[meta.pkname] = kwargs.pop("pk")
# extract through fields
through_tmp_dict = dict()
for field_name in self.extract_through_names():
through_tmp_dict[field_name] = kwargs.pop(field_name, None)
try:
new_kwargs: Dict[str, Any] = {
k: self._convert_json(
k,
model_fields[k].expand_relationship(v, self, to_register=False,)
if k in model_fields
else (v if k in pydantic_fields else model_fields[k]),
"dumps",
)
for k, v in kwargs.items()
}
except KeyError as e:
raise ModelError(
f"Unknown field '{e.args[0]}' for model {self.get_name(lower=False)}"
)
# explicitly set None to excluded fields
# as pydantic populates them with default if set
for field_to_nullify in excluded:
new_kwargs[field_to_nullify] = None
return new_kwargs, through_tmp_dict
def _initialize_internal_attributes(self) -> None:
"""
Initializes internal attributes during __init__()
:rtype: None
"""
# object.__setattr__(self, "_orm_id", uuid.uuid4().hex)
object.__setattr__(self, "_orm_saved", False)
object.__setattr__(self, "_pk_column", None)
object.__setattr__(
self,
"_orm",
RelationsManager(
related_fields=self.extract_related_fields(), owner=cast("Model", self),
),
)
def _extract_related_model_instead_of_field( def _extract_related_model_instead_of_field(
self, item: str self, item: str
) -> Optional[Union["Model", Sequence["Model"]]]: ) -> Optional[Union["Model", Sequence["Model"]]]:
@ -355,8 +389,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:rtype: bool :rtype: bool
""" """
return ( return (
self._orm_id == other._orm_id # self._orm_id == other._orm_id
or (self.pk == other.pk and self.pk is not None) (self.pk == other.pk and self.pk is not None)
or ( or (
(self.pk is None and other.pk is None) (self.pk is None and other.pk is None)
and { and {
@ -740,7 +774,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: converted value if needed, else original value :return: converted value if needed, else original value
:rtype: Any :rtype: Any
""" """
if not self._is_conversion_to_json_needed(column_name): if column_name not in object.__getattribute__(self, "_json_fields"):
return value return value
condition = ( condition = (
@ -757,20 +791,6 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
pass pass
return value.decode("utf-8") if isinstance(value, bytes) else value return value.decode("utf-8") if isinstance(value, bytes) else value
def _is_conversion_to_json_needed(self, column_name: str) -> bool:
"""
Checks if given column name is related to JSON field.
:param column_name: name of the field
:type column_name: str
:return: result of the check
:rtype: bool
"""
return (
column_name in self.Meta.model_fields
and self.Meta.model_fields[column_name].__type__ == pydantic.Json
)
def _extract_own_model_fields(self) -> Dict: def _extract_own_model_fields(self) -> Dict:
""" """
Returns a dictionary with field names and values for fields that are not Returns a dictionary with field names and values for fields that are not

View File

@ -1,6 +1,7 @@
import datetime import datetime
import decimal import decimal
import uuid import uuid
from base64 import b64encode
from enum import Enum from enum import Enum
import databases import databases
@ -22,6 +23,10 @@ uuid1 = uuid.uuid4()
uuid2 = uuid.uuid4() uuid2 = uuid.uuid4()
blob = b"test"
blob2 = b"test2icac89uc98"
class EnumTest(Enum): class EnumTest(Enum):
val1 = "Val1" val1 = "Val1"
val2 = "Val2" val2 = "Val2"
@ -57,6 +62,7 @@ class Organisation(ormar.Model):
random_json: pydantic.Json = ormar.JSON(choices=["aa", '{"aa":"bb"}']) random_json: pydantic.Json = ormar.JSON(choices=["aa", '{"aa":"bb"}'])
random_uuid: uuid.UUID = ormar.UUID(choices=[uuid1, uuid2]) random_uuid: uuid.UUID = ormar.UUID(choices=[uuid1, uuid2])
enum_string: str = ormar.String(max_length=100, choices=list(EnumTest)) enum_string: str = ormar.String(max_length=100, choices=list(EnumTest))
blob_col: bytes = ormar.LargeBinary(max_length=100000, choices=[blob, blob2])
@app.on_event("startup") @app.on_event("startup")
@ -111,6 +117,7 @@ def test_all_endpoints():
"random_json": '{"aa":"bb"}', "random_json": '{"aa":"bb"}',
"random_uuid": str(uuid1), "random_uuid": str(uuid1),
"enum_string": EnumTest.val1.value, "enum_string": EnumTest.val1.value,
"blob_col": blob.decode("utf-8"),
}, },
) )

View File

@ -1,6 +1,8 @@
from typing import List import datetime
from typing import List, Optional
import databases import databases
import pydantic
import pytest import pytest
import sqlalchemy import sqlalchemy
from fastapi import FastAPI from fastapi import FastAPI
@ -34,6 +36,17 @@ class LocalMeta:
database = database database = database
class PTestA(pydantic.BaseModel):
c: str
d: bytes
e: datetime.datetime
class PTestP(pydantic.BaseModel):
a: int
b: Optional[PTestA]
class Category(ormar.Model): class Category(ormar.Model):
class Meta(LocalMeta): class Meta(LocalMeta):
tablename = "categories" tablename = "categories"
@ -48,6 +61,8 @@ class Item(ormar.Model):
id: int = ormar.Integer(primary_key=True) id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100) name: str = ormar.String(max_length=100)
pydantic_int: Optional[int]
test_P: Optional[List[PTestP]]
categories = ormar.ManyToMany(Category) categories = ormar.ManyToMany(Category)
@ -124,6 +139,29 @@ def test_schema_modification():
x.get("type") == "array" for x in schema["properties"]["categories"]["anyOf"] x.get("type") == "array" for x in schema["properties"]["categories"]["anyOf"]
) )
assert schema["properties"]["categories"]["title"] == "Categories" assert schema["properties"]["categories"]["title"] == "Categories"
assert schema["example"] == {
"categories": [{"id": 0, "name": "string"}],
"id": 0,
"name": "string",
"pydantic_int": 0,
"test_P": [{"a": 0, "b": {"c": "string", "d": "string", "e": "string"}}],
}
schema = Category.schema()
assert schema["example"] == {
"id": 0,
"name": "string",
"items": [
{
"id": 0,
"name": "string",
"pydantic_int": 0,
"test_P": [
{"a": 0, "b": {"c": "string", "d": "string", "e": "string"}}
],
}
],
}
def test_schema_gen(): def test_schema_gen():

View File

@ -218,6 +218,19 @@ def test_decimal_error_in_model_definition():
test: decimal.Decimal = ormar.Decimal(primary_key=True) test: decimal.Decimal = ormar.Decimal(primary_key=True)
@typing.no_type_check
def test_binary_error_without_length_model_definition():
with pytest.raises(ModelDefinitionError):
class ExampleModel2(Model):
class Meta:
tablename = "example6"
database = database
metadata = metadata
test: bytes = ormar.LargeBinary(primary_key=True)
@typing.no_type_check @typing.no_type_check
def test_string_error_in_model_definition(): def test_string_error_in_model_definition():
with pytest.raises(ModelDefinitionError): with pytest.raises(ModelDefinitionError):

View File

@ -1,6 +1,6 @@
import asyncio import asyncio
import uuid
import datetime import datetime
import uuid
from typing import List from typing import List
import databases import databases
@ -9,7 +9,7 @@ import pytest
import sqlalchemy import sqlalchemy
import ormar import ormar
from ormar.exceptions import QueryDefinitionError, NoMatch, ModelError from ormar.exceptions import ModelError, NoMatch, QueryDefinitionError
from tests.settings import DATABASE_URL from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL, force_rollback=True) database = databases.Database(DATABASE_URL, force_rollback=True)
@ -26,6 +26,20 @@ class JsonSample(ormar.Model):
test_json = ormar.JSON(nullable=True) test_json = ormar.JSON(nullable=True)
blob = b"test"
blob2 = b"test2icac89uc98"
class LargeBinarySample(ormar.Model):
class Meta:
tablename = "my_bolbs"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
test_binary = ormar.LargeBinary(max_length=100000, choices=[blob, blob2])
class UUIDSample(ormar.Model): class UUIDSample(ormar.Model):
class Meta: class Meta:
tablename = "uuids" tablename = "uuids"
@ -102,15 +116,8 @@ class Country(ormar.Model):
) )
@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.get_event_loop()
yield loop
loop.close()
@pytest.fixture(autouse=True, scope="module") @pytest.fixture(autouse=True, scope="module")
async def create_test_database(): def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL) engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.drop_all(engine) metadata.drop_all(engine)
metadata.create_all(engine) metadata.create_all(engine)
@ -151,6 +158,19 @@ async def test_json_column():
assert items[1].test_json == dict(aa=12) assert items[1].test_json == dict(aa=12)
@pytest.mark.asyncio
async def test_binary_column():
async with database:
async with database.transaction(force_rollback=True):
await LargeBinarySample.objects.create(test_binary=blob)
await LargeBinarySample.objects.create(test_binary=blob2)
items = await LargeBinarySample.objects.all()
assert len(items) == 2
assert items[0].test_binary == blob
assert items[1].test_binary == blob2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_uuid_column(): async def test_uuid_column():
async with database: async with database:

View File

@ -1,9 +1,10 @@
import random
from typing import Optional from typing import Optional
import databases import databases
import pytest import pytest
import sqlalchemy import sqlalchemy
from pydantic import HttpUrl from pydantic import BaseModel, Field, HttpUrl, PaymentCardNumber
import ormar import ormar
from tests.settings import DATABASE_URL from tests.settings import DATABASE_URL
@ -17,19 +18,58 @@ class BaseMeta(ormar.ModelMeta):
database = database database = database
class Test(ormar.Model): class ModelTest(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200)
url: HttpUrl = "https://www.example.com"
number: Optional[PaymentCardNumber]
CARD_NUMBERS = [
"123456789007",
"123456789015",
"123456789023",
"123456789031",
"123456789049",
]
def get_number():
return random.choice(CARD_NUMBERS)
class ModelTest2(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200)
url: HttpUrl = "https://www.example2.com"
number: PaymentCardNumber = Field(default_factory=get_number)
class PydanticTest(BaseModel):
aa: str
bb: int
class ModelTest3(ormar.Model):
class Meta(BaseMeta): class Meta(BaseMeta):
pass pass
def __init__(self, **kwargs): def __init__(self, **kwargs):
# you need to pop non - db fields as ormar will complain that it's unknown field kwargs["number"] = get_number()
url = kwargs.pop("url", self.__fields__["url"].get_default()) kwargs["pydantic_test"] = PydanticTest(aa="random", bb=42)
super().__init__(**kwargs) super().__init__(**kwargs)
self.url = url
id: int = ormar.Integer(primary_key=True) id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=200) name: str = ormar.String(max_length=200)
url: HttpUrl = "www.example.com" # field with default url: HttpUrl = "https://www.example3.com"
number: PaymentCardNumber
pydantic_test: PydanticTest
@pytest.fixture(autouse=True, scope="module") @pytest.fixture(autouse=True, scope="module")
@ -44,18 +84,58 @@ def create_test_database():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_working_with_pydantic_fields(): async def test_working_with_pydantic_fields():
async with database: async with database:
test = Test(name="Test") test = ModelTest(name="Test")
assert test.name == "Test" assert test.name == "Test"
assert test.url == "www.example.com" assert test.url == "https://www.example.com"
assert test.number is None
test.number = "123456789015"
test.url = "www.sdta.ada.pt" test.url = "https://www.sdta.ada.pt"
assert test.url == "www.sdta.ada.pt" assert test.url == "https://www.sdta.ada.pt"
await test.save() await test.save()
test_check = await Test.objects.get() test_check = await ModelTest.objects.get()
assert test_check.name == "Test" assert test_check.name == "Test"
assert test_check.url == "www.example.com" assert test_check.url == "https://www.example.com"
assert test_check.number is None
# TODO add validate assignment to pydantic config
# test_check.email = 1 @pytest.mark.asyncio
async def test_default_factory_for_pydantic_fields():
async with database:
test = ModelTest2(name="Test2", number="4000000000000002")
assert test.name == "Test2"
assert test.url == "https://www.example2.com"
assert test.number == "4000000000000002"
test.url = "http://www.sdta.ada.pt"
assert test.url == "http://www.sdta.ada.pt"
await test.save()
test_check = await ModelTest2.objects.get()
assert test_check.name == "Test2"
assert test_check.url == "https://www.example2.com"
assert test_check.number in CARD_NUMBERS
assert test_check.number != test.number
@pytest.mark.asyncio
async def test_init_setting_for_pydantic_fields():
async with database:
test = ModelTest3(name="Test3")
assert test.name == "Test3"
assert test.url == "https://www.example3.com"
assert test.pydantic_test.bb == 42
test.url = "http://www.sdta.ada.pt"
assert test.url == "http://www.sdta.ada.pt"
await test.save()
test_check = await ModelTest3.objects.get()
assert test_check.name == "Test3"
assert test_check.url == "https://www.example3.com"
assert test_check.number in CARD_NUMBERS
assert test_check.pydantic_test.aa == "random"

View File

@ -57,7 +57,7 @@ def create_test_database():
def assert_type(book: Book): def assert_type(book: Book):
print(book) _ = str(book)
@pytest.mark.asyncio @pytest.mark.asyncio