Merge pull request #195 from collerek/excludes_in_dict
Excludes in dict
This commit is contained in:
17
README.md
17
README.md
@ -35,6 +35,10 @@ Ormar - apart form obvious ORM in name - get it's name from ormar in swedish whi
|
||||
|
||||
And what's a better name for python ORM than snakes cabinet :)
|
||||
|
||||
**If you like ormar remember to star the repository in [github](https://github.com/collerek/ormar)!**
|
||||
|
||||
The bigger community we build, the easier it will be to catch bugs and attract contributors ;)
|
||||
|
||||
### Documentation
|
||||
|
||||
Check out the [documentation][documentation] for details.
|
||||
@ -68,7 +72,16 @@ Ormar is built with:
|
||||
* [`pydantic`][pydantic] for data validation.
|
||||
* `typing_extensions` for python 3.6 - 3.7
|
||||
|
||||
### Migrating from `sqlalchemy`
|
||||
### License
|
||||
|
||||
`ormar` is built as an open-sorce software and remain completely free (MIT license).
|
||||
|
||||
As I write open-source code to solve everyday problems in my work or to promote and build strong python
|
||||
community you can say thank you and buy me a coffee or sponsor me with a monthly amount to help ensure my work remains free and maintained.
|
||||
|
||||
<iframe src="https://github.com/sponsors/collerek/button" title="Sponsor collerek" height="35" width="116" style="border: 0;"></iframe>
|
||||
|
||||
### Migrating from `sqlalchemy` and existing databases
|
||||
|
||||
If you currently use `sqlalchemy` and would like to switch to `ormar` check out the auto-translation
|
||||
tool that can help you with translating existing sqlalchemy orm models so you do not have to do it manually.
|
||||
@ -76,6 +89,8 @@ tool that can help you with translating existing sqlalchemy orm models so you do
|
||||
**Beta** versions available at github: [`sqlalchemy-to-ormar`](https://github.com/collerek/sqlalchemy-to-ormar)
|
||||
or simply `pip install sqlalchemy-to-ormar`
|
||||
|
||||
`sqlalchemy-to-ormar` can be used in pair with `sqlacodegen` to auto-map/ generate `ormar` models from existing database, even if you don't use the `sqlalchemy` for your project.
|
||||
|
||||
### Migrations & Database creation
|
||||
|
||||
Because ormar is built on SQLAlchemy core, you can use [`alembic`][alembic] to provide
|
||||
|
||||
@ -138,6 +138,35 @@ LargeBinary length is used in some backend (i.e. mysql) to determine the size of
|
||||
in other backends it's simply ignored yet in ormar it's always required. It should be max
|
||||
size of the file/bytes in bytes.
|
||||
|
||||
`LargeBinary` has also optional `represent_as_base64_str: bool = False` flag.
|
||||
When set to `True` `ormar` will auto-convert bytes value to base64 decoded string,
|
||||
you can also set value by passing a base64 encoded string.
|
||||
|
||||
That way you can i.e. set the value by API, even if value is not `utf-8` compatible and would otherwise fail during json conversion.
|
||||
|
||||
```python
|
||||
import base64
|
||||
... # other imports skipped for brevity
|
||||
class LargeBinaryStr(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "my_str_blobs"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
test_binary: str = ormar.LargeBinary(
|
||||
max_length=100000, represent_as_base64_str=True
|
||||
)
|
||||
|
||||
# set non utf-8 compliant value - note this can be passed by api (i.e. fastapi) in json
|
||||
item = LargeBinaryStr(test_binary=base64.b64encode(b"\xc3\x28").decode())
|
||||
|
||||
assert item.test_binary == base64.b64encode(b"\xc3\x28").decode()
|
||||
|
||||
# technical note that underlying value is still bytes and will be saved as so
|
||||
assert item.__dict__["test_binary"] == b"\xc3\x28"
|
||||
```
|
||||
|
||||
### UUID
|
||||
|
||||
`UUID(uuid_format: str = 'hex')` has no required parameters.
|
||||
|
||||
@ -35,6 +35,10 @@ Ormar - apart form obvious ORM in name - get it's name from ormar in swedish whi
|
||||
|
||||
And what's a better name for python ORM than snakes cabinet :)
|
||||
|
||||
**If you like ormar remember to star the repository in [github](https://github.com/collerek/ormar)!**
|
||||
|
||||
The bigger community we build, the easier it will be to catch bugs and attract contributors ;)
|
||||
|
||||
### Documentation
|
||||
|
||||
Check out the [documentation][documentation] for details.
|
||||
@ -68,7 +72,16 @@ Ormar is built with:
|
||||
* [`pydantic`][pydantic] for data validation.
|
||||
* `typing_extensions` for python 3.6 - 3.7
|
||||
|
||||
### Migrating from `sqlalchemy`
|
||||
### License
|
||||
|
||||
`ormar` is built as an open-sorce software and remain completely free (MIT license).
|
||||
|
||||
As I write open-source code to solve everyday problems in my work or to promote and build strong python
|
||||
community you can say thank you and buy me a coffee or sponsor me with a monthly amount to help ensure my work remains free and maintained.
|
||||
|
||||
<iframe src="https://github.com/sponsors/collerek/button" title="Sponsor collerek" height="35" width="116" style="border: 0;"></iframe>
|
||||
|
||||
### Migrating from `sqlalchemy` and existing databases
|
||||
|
||||
If you currently use `sqlalchemy` and would like to switch to `ormar` check out the auto-translation
|
||||
tool that can help you with translating existing sqlalchemy orm models so you do not have to do it manually.
|
||||
@ -76,6 +89,8 @@ tool that can help you with translating existing sqlalchemy orm models so you do
|
||||
**Beta** versions available at github: [`sqlalchemy-to-ormar`](https://github.com/collerek/sqlalchemy-to-ormar)
|
||||
or simply `pip install sqlalchemy-to-ormar`
|
||||
|
||||
`sqlalchemy-to-ormar` can be used in pair with `sqlacodegen` to auto-map/ generate `ormar` models from existing database, even if you don't use the `sqlalchemy` for your project.
|
||||
|
||||
### Migrations & Database creation
|
||||
|
||||
Because ormar is built on SQLAlchemy core, you can use [`alembic`][alembic] to provide
|
||||
|
||||
@ -17,6 +17,273 @@ especially `dict()` and `json()` methods that can also accept `exclude`, `includ
|
||||
|
||||
To read more check [pydantic][pydantic] documentation
|
||||
|
||||
## dict
|
||||
|
||||
`dict` is a method inherited from `pydantic`, yet `ormar` adds its own parameters and has some nuances when working with default values,
|
||||
therefore it's listed here for clarity.
|
||||
|
||||
`dict` as the name suggests export data from model tree to dictionary.
|
||||
|
||||
Explanation of dict parameters:
|
||||
|
||||
### include (`ormar` modifed)
|
||||
|
||||
`include: Union[Set, Dict] = None`
|
||||
|
||||
Set or dictionary of field names to include in returned dictionary.
|
||||
|
||||
Note that `pydantic` has an uncommon pattern of including/ excluding fields in lists (so also nested models) by an index.
|
||||
And if you want to exclude the field in all children you need to pass a `__all__` key to dictionary.
|
||||
|
||||
You cannot exclude nested models in `Set`s in `pydantic` but you can in `ormar`
|
||||
(by adding double underscore on relation name i.e. to exclude name of category for a book you cen use `exclude={"book__category__name"}`)
|
||||
|
||||
`ormar` does not support by index exclusion/ inclusions and accepts a simplified and more user-friendly notation.
|
||||
|
||||
To check how you can include/exclude fields, including nested fields check out [fields](../queries/select-columns.md#fields) section that has an explanation and a lot of samples.
|
||||
|
||||
!!!note
|
||||
The fact that in `ormar` you can exclude nested models in sets, you can exclude from a whole model tree in `response_model_exclude` and `response_model_include` in fastapi!
|
||||
|
||||
### exclude (`ormar` modified)
|
||||
|
||||
`exclude: Union[Set, Dict] = None`
|
||||
|
||||
Set or dictionary of field names to exclude in returned dictionary.
|
||||
|
||||
Note that `pydantic` has an uncommon pattern of including/ excluding fields in lists (so also nested models) by an index.
|
||||
And if you want to exclude the field in all children you need to pass a `__all__` key to dictionary.
|
||||
|
||||
You cannot exclude nested models in `Set`s in `pydantic` but you can in `ormar`
|
||||
(by adding double underscore on relation name i.e. to exclude name of category for a book you cen use `exclude={"book__category__name"}`)
|
||||
|
||||
`ormar` does not support by index exclusion/ inclusions and accepts a simplified and more user-friendly notation.
|
||||
|
||||
To check how you can include/exclude fields, including nested fields check out [fields](../queries/select-columns.md#fields) section that has an explanation and a lot of samples.
|
||||
|
||||
!!!note
|
||||
The fact that in `ormar` you can exclude nested models in sets, you can exclude from a whole model tree in `response_model_exclude` and `response_model_include` in fastapi!
|
||||
|
||||
### exclude_unset
|
||||
|
||||
`exclude_unset: bool = False`
|
||||
|
||||
Flag indicates whether fields which were not explicitly set when creating the model should be excluded from the returned dictionary.
|
||||
|
||||
!!!warning
|
||||
Note that after you save data into database each field has its own value -> either provided by you, default, or `None`.
|
||||
|
||||
That means that when you load the data from database, **all** fields are set, and this flag basically stop working!
|
||||
|
||||
```python
|
||||
class Category(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "categories"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100, default="Test")
|
||||
visibility: bool = ormar.Boolean(default=True)
|
||||
|
||||
|
||||
class Item(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "items"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
price: float = ormar.Float(default=9.99)
|
||||
categories: List[Category] = ormar.ManyToMany(Category)
|
||||
|
||||
category = Category(name="Test 2")
|
||||
assert category.dict() == {'id': None, 'items': [], 'name': 'Test 2',
|
||||
'visibility': True}
|
||||
assert category.dict(exclude_unset=True) == {'items': [], 'name': 'Test 2'}
|
||||
|
||||
await category.save()
|
||||
category2 = await Category.objects.get()
|
||||
assert category2.dict() == {'id': 1, 'items': [], 'name': 'Test 2',
|
||||
'visibility': True}
|
||||
# NOTE how after loading from db all fields are set explicitly
|
||||
# as this is what happens when you populate a model from db
|
||||
assert category2.dict(exclude_unset=True) == {'id': 1, 'items': [],
|
||||
'name': 'Test 2', 'visibility': True}
|
||||
```
|
||||
|
||||
### exclude_defaults
|
||||
|
||||
`exclude_defaults: bool = False`
|
||||
|
||||
Flag indicates are equal to their default values (whether set or otherwise) should be excluded from the returned dictionary
|
||||
|
||||
```python
|
||||
class Category(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "categories"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100, default="Test")
|
||||
visibility: bool = ormar.Boolean(default=True)
|
||||
|
||||
class Item(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "items"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
price: float = ormar.Float(default=9.99)
|
||||
categories: List[Category] = ormar.ManyToMany(Category)
|
||||
|
||||
category = Category()
|
||||
# note that Integer pk is by default autoincrement so optional
|
||||
assert category.dict() == {'id': None, 'items': [], 'name': 'Test', 'visibility': True}
|
||||
assert category.dict(exclude_defaults=True) == {'items': []}
|
||||
|
||||
# save and reload the data
|
||||
await category.save()
|
||||
category2 = await Category.objects.get()
|
||||
|
||||
assert category2.dict() == {'id': 1, 'items': [], 'name': 'Test', 'visibility': True}
|
||||
assert category2.dict(exclude_defaults=True) == {'id': 1, 'items': []}
|
||||
```
|
||||
|
||||
### exclude_none
|
||||
|
||||
`exclude_none: bool = False`
|
||||
|
||||
Flag indicates whether fields which are equal to `None` should be excluded from the returned dictionary.
|
||||
|
||||
```python
|
||||
class Category(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "categories"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100, default="Test", nullable=True)
|
||||
visibility: bool = ormar.Boolean(default=True)
|
||||
|
||||
|
||||
class Item(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "items"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
price: float = ormar.Float(default=9.99)
|
||||
categories: List[Category] = ormar.ManyToMany(Category)
|
||||
|
||||
|
||||
category = Category(name=None)
|
||||
assert category.dict() == {'id': None, 'items': [], 'name': None,
|
||||
'visibility': True}
|
||||
# note the id is not set yet so None and excluded
|
||||
assert category.dict(exclude_none=True) == {'items': [], 'visibility': True}
|
||||
|
||||
await category.save()
|
||||
category2 = await Category.objects.get()
|
||||
assert category2.dict() == {'id': 1, 'items': [], 'name': None,
|
||||
'visibility': True}
|
||||
assert category2.dict(exclude_none=True) == {'id': 1, 'items': [],
|
||||
'visibility': True}
|
||||
|
||||
```
|
||||
|
||||
### exclude_primary_keys (`ormar` only)
|
||||
|
||||
`exclude_primary_keys: bool = False`
|
||||
|
||||
Setting flag to `True` will exclude all primary key columns in a tree, including nested models.
|
||||
|
||||
```python
|
||||
class Item(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "items"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
|
||||
item1 = Item(id=1, name="Test Item")
|
||||
assert item1.dict() == {"id": 1, "name": "Test Item"}
|
||||
assert item1.dict(exclude_primary_keys=True) == {"name": "Test Item"}
|
||||
```
|
||||
|
||||
### exclude_through_models (`ormar` only)
|
||||
|
||||
`exclude_through_models: bool = False`
|
||||
|
||||
`Through` models are auto added for every `ManyToMany` relation, and they hold additional parameters on linking model/table.
|
||||
|
||||
Setting the `exclude_through_models=True` will exclude all through models, including Through models of submodels.
|
||||
|
||||
```python
|
||||
class Category(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "categories"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
|
||||
|
||||
class Item(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "items"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
categories: List[Category] = ormar.ManyToMany(Category)
|
||||
|
||||
# tree defining the models
|
||||
item_dict = {
|
||||
"name": "test",
|
||||
"categories": [{"name": "test cat"}, {"name": "test cat2"}],
|
||||
}
|
||||
# save whole tree
|
||||
await Item(**item_dict).save_related(follow=True, save_all=True)
|
||||
|
||||
# get the saved values
|
||||
item = await Item.objects.select_related("categories").get()
|
||||
|
||||
# by default you can see the through models (itemcategory)
|
||||
assert item.dict() == {'id': 1, 'name': 'test',
|
||||
'categories': [
|
||||
{'id': 1, 'name': 'test cat',
|
||||
'itemcategory': {'id': 1, 'category': None, 'item': None}},
|
||||
{'id': 2, 'name': 'test cat2',
|
||||
'itemcategory': {'id': 2, 'category': None, 'item': None}}
|
||||
]}
|
||||
|
||||
# you can exclude those fields/ models
|
||||
assert item.dict(exclude_through_models=True) == {
|
||||
'id': 1, 'name': 'test',
|
||||
'categories': [
|
||||
{'id': 1, 'name': 'test cat'},
|
||||
{'id': 2, 'name': 'test cat2'}
|
||||
]}
|
||||
```
|
||||
|
||||
## json
|
||||
|
||||
`json()` has exactly the same parameters as `dict()` so check above.
|
||||
|
||||
Of course the end result is a string with json representation and not a dictionary.
|
||||
|
||||
## load
|
||||
|
||||
By default when you query a table without prefetching related models, the ormar will still construct
|
||||
|
||||
@ -1,3 +1,23 @@
|
||||
# 0.10.7
|
||||
|
||||
## ✨ Features
|
||||
|
||||
* Add `exclude_primary_keys: bool = False` flag to `dict()` method that allows to exclude all primary key columns in the resulting dictionaru. [#164](https://github.com/collerek/ormar/issues/164)
|
||||
* Add `exclude_through_models: bool = False` flag to `dict()` that allows excluding all through models from `ManyToMany` relations [#164](https://github.com/collerek/ormar/issues/164)
|
||||
* Add `represent_as_base64_str: bool = False` parameter that allows conversion of bytes `LargeBinary` field to base64 encoded string. String is returned in `dict()`,
|
||||
on access to attribute and string is converted to bytes on setting. Data in database is stored as bytes. [#187](https://github.com/collerek/ormar/issues/187)
|
||||
* Add `pk` alias to allow field access by `Model.pk` in filters and order by clauses (python style)
|
||||
|
||||
## 🐛 Fixes
|
||||
|
||||
* Remove default `None` option for `max_length` for `LargeBinary` field [#186](https://github.com/collerek/ormar/issues/186)
|
||||
* Remove default `None` option for `max_length` for `String` field
|
||||
|
||||
## 💬 Other
|
||||
|
||||
* Provide a guide and samples of `dict()` parameters in the [docs](https://collerek.github.io/ormar/models/methods/)
|
||||
* Major refactor of getting/setting attributes from magic methods into descriptors -> noticeable performance improvement
|
||||
|
||||
# 0.10.6
|
||||
|
||||
## ✨ Features
|
||||
|
||||
@ -76,7 +76,7 @@ class UndefinedType: # pragma no cover
|
||||
|
||||
Undefined = UndefinedType()
|
||||
|
||||
__version__ = "0.10.6"
|
||||
__version__ = "0.10.7"
|
||||
__all__ = [
|
||||
"Integer",
|
||||
"BigInteger",
|
||||
|
||||
@ -95,6 +95,10 @@ class BaseField(FieldInfo):
|
||||
self.ormar_default: Any = kwargs.pop("default", None)
|
||||
self.server_default: Any = kwargs.pop("server_default", None)
|
||||
|
||||
self.represent_as_base64_str: bool = kwargs.pop(
|
||||
"represent_as_base64_str", False
|
||||
)
|
||||
|
||||
for name, value in kwargs.items():
|
||||
setattr(self, name, value)
|
||||
|
||||
|
||||
@ -136,10 +136,10 @@ class String(ModelFieldFactory, str):
|
||||
def __new__( # type: ignore # noqa CFQ002
|
||||
cls,
|
||||
*,
|
||||
max_length: int,
|
||||
allow_blank: bool = True,
|
||||
strip_whitespace: bool = False,
|
||||
min_length: int = None,
|
||||
max_length: int = None,
|
||||
curtail_length: int = None,
|
||||
regex: str = None,
|
||||
**kwargs: Any
|
||||
@ -176,7 +176,7 @@ class String(ModelFieldFactory, str):
|
||||
:type kwargs: Any
|
||||
"""
|
||||
max_length = kwargs.get("max_length", None)
|
||||
if max_length is None or max_length <= 0:
|
||||
if max_length <= 0:
|
||||
raise ModelDefinitionError(
|
||||
"Parameter max_length is required for field String"
|
||||
)
|
||||
@ -435,7 +435,7 @@ class LargeBinary(ModelFieldFactory, bytes):
|
||||
_sample = "bytes"
|
||||
|
||||
def __new__( # type: ignore # noqa CFQ002
|
||||
cls, *, max_length: int = None, **kwargs: Any
|
||||
cls, *, max_length: int, represent_as_base64_str: bool = False, **kwargs: Any
|
||||
) -> BaseField: # type: ignore
|
||||
kwargs = {
|
||||
**kwargs,
|
||||
@ -468,7 +468,7 @@ class LargeBinary(ModelFieldFactory, bytes):
|
||||
:type kwargs: Any
|
||||
"""
|
||||
max_length = kwargs.get("max_length", None)
|
||||
if max_length is None or max_length <= 0:
|
||||
if max_length <= 0:
|
||||
raise ModelDefinitionError(
|
||||
"Parameter max_length is required for field LargeBinary"
|
||||
)
|
||||
|
||||
17
ormar/models/descriptors/__init__.py
Normal file
17
ormar/models/descriptors/__init__.py
Normal file
@ -0,0 +1,17 @@
|
||||
from ormar.models.descriptors.descriptors import (
|
||||
BytesDescriptor,
|
||||
JsonDescriptor,
|
||||
PkDescriptor,
|
||||
PropertyDescriptor,
|
||||
PydanticDescriptor,
|
||||
RelationDescriptor,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"PydanticDescriptor",
|
||||
"RelationDescriptor",
|
||||
"PropertyDescriptor",
|
||||
"PkDescriptor",
|
||||
"JsonDescriptor",
|
||||
"BytesDescriptor",
|
||||
]
|
||||
143
ormar/models/descriptors/descriptors.py
Normal file
143
ormar/models/descriptors/descriptors.py
Normal file
@ -0,0 +1,143 @@
|
||||
import base64
|
||||
from typing import Any, TYPE_CHECKING, Type
|
||||
|
||||
try:
|
||||
import orjson as json
|
||||
except ImportError: # pragma: no cover
|
||||
import json # type: ignore
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from ormar import Model
|
||||
|
||||
|
||||
class PydanticDescriptor:
|
||||
"""
|
||||
Pydantic descriptor simply delegates everything to pydantic model
|
||||
"""
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
|
||||
def __get__(self, instance: "Model", owner: Type["Model"]) -> Any:
|
||||
value = instance.__dict__.get(self.name, None)
|
||||
return value
|
||||
|
||||
def __set__(self, instance: "Model", value: Any) -> None:
|
||||
instance._internal_set(self.name, value)
|
||||
instance.set_save_status(False)
|
||||
|
||||
|
||||
class JsonDescriptor:
|
||||
"""
|
||||
Json descriptor dumps/loads strings to actual data on write/read
|
||||
"""
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
|
||||
def __get__(self, instance: "Model", owner: Type["Model"]) -> Any:
|
||||
value = instance.__dict__.get(self.name, None)
|
||||
return value
|
||||
|
||||
def __set__(self, instance: "Model", value: Any) -> None:
|
||||
if not isinstance(value, str):
|
||||
value = json.dumps(value)
|
||||
value = value.decode("utf-8") if isinstance(value, bytes) else value
|
||||
instance._internal_set(self.name, value)
|
||||
instance.set_save_status(False)
|
||||
|
||||
|
||||
class BytesDescriptor:
|
||||
"""
|
||||
Bytes descriptor converts strings to bytes on write and converts bytes to str
|
||||
if represent_as_base64_str flag is set, so the value can be dumped to json
|
||||
"""
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
|
||||
def __get__(self, instance: "Model", owner: Type["Model"]) -> Any:
|
||||
value = instance.__dict__.get(self.name, None)
|
||||
field = instance.Meta.model_fields[self.name]
|
||||
if field.represent_as_base64_str and not isinstance(value, str):
|
||||
value = base64.b64encode(value).decode()
|
||||
return value
|
||||
|
||||
def __set__(self, instance: "Model", value: Any) -> None:
|
||||
field = instance.Meta.model_fields[self.name]
|
||||
if isinstance(value, str):
|
||||
if field.represent_as_base64_str:
|
||||
value = base64.b64decode(value)
|
||||
else:
|
||||
value = value.encode("utf-8")
|
||||
instance._internal_set(self.name, value)
|
||||
instance.set_save_status(False)
|
||||
|
||||
|
||||
class PkDescriptor:
|
||||
"""
|
||||
As of now it's basically a copy of PydanticDescriptor but that will
|
||||
change in the future with multi column primary keys
|
||||
"""
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
|
||||
def __get__(self, instance: "Model", owner: Type["Model"]) -> Any:
|
||||
value = instance.__dict__.get(self.name, None)
|
||||
return value
|
||||
|
||||
def __set__(self, instance: "Model", value: Any) -> None:
|
||||
instance._internal_set(self.name, value)
|
||||
instance.set_save_status(False)
|
||||
|
||||
|
||||
class RelationDescriptor:
|
||||
"""
|
||||
Relation descriptor expands the relation to initialize the related model
|
||||
before setting it to __dict__. Note that expanding also registers the
|
||||
related model in RelationManager.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
|
||||
def __get__(self, instance: "Model", owner: Type["Model"]) -> Any:
|
||||
if self.name in instance._orm:
|
||||
return instance._orm.get(self.name) # type: ignore
|
||||
return None # pragma no cover
|
||||
|
||||
def __set__(self, instance: "Model", value: Any) -> None:
|
||||
model = instance.Meta.model_fields[self.name].expand_relationship(
|
||||
value=value, child=instance
|
||||
)
|
||||
if isinstance(instance.__dict__.get(self.name), list):
|
||||
# virtual foreign key or many to many
|
||||
# TODO: Fix double items in dict, no effect on real action just ugly repr
|
||||
instance.__dict__[self.name].append(model)
|
||||
else:
|
||||
# foreign key relation
|
||||
instance.__dict__[self.name] = model
|
||||
instance.set_save_status(False)
|
||||
|
||||
|
||||
class PropertyDescriptor:
|
||||
"""
|
||||
Property descriptor handles methods decorated with @property_field decorator.
|
||||
They are read only.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, function: Any) -> None:
|
||||
self.name = name
|
||||
self.function = function
|
||||
|
||||
def __get__(self, instance: "Model", owner: Type["Model"]) -> Any:
|
||||
if instance is None:
|
||||
return self
|
||||
if instance is not None and self.function is not None:
|
||||
bound = self.function.__get__(instance, instance.__class__)
|
||||
return bound() if callable(bound) else bound
|
||||
|
||||
def __set__(self, instance: "Model", value: Any) -> None: # pragma: no cover
|
||||
# kept here so it's a data-descriptor and precedes __dict__ lookup
|
||||
pass
|
||||
@ -67,6 +67,11 @@ def populate_default_options_values(
|
||||
for name, field in new_model.Meta.model_fields.items()
|
||||
if field.__type__ == pydantic.Json
|
||||
}
|
||||
new_model._bytes_fields = {
|
||||
name
|
||||
for name, field in new_model.Meta.model_fields.items()
|
||||
if field.__type__ == bytes
|
||||
}
|
||||
|
||||
|
||||
class Connection(sqlite3.Connection):
|
||||
|
||||
@ -3,6 +3,7 @@ from typing import TYPE_CHECKING, Type, cast
|
||||
import ormar
|
||||
from ormar import ForeignKey, ManyToMany
|
||||
from ormar.fields import Through
|
||||
from ormar.models.descriptors import RelationDescriptor
|
||||
from ormar.models.helpers.sqlalchemy import adjust_through_many_to_many_model
|
||||
from ormar.relations import AliasManager
|
||||
|
||||
@ -130,6 +131,8 @@ def register_reverse_model_fields(model_field: "ForeignKeyField") -> None:
|
||||
orders_by=model_field.related_orders_by,
|
||||
skip_field=model_field.skip_reverse,
|
||||
)
|
||||
if not model_field.skip_reverse:
|
||||
setattr(model_field.to, related_name, RelationDescriptor(name=related_name))
|
||||
|
||||
|
||||
def register_through_shortcut_fields(model_field: "ManyToManyField") -> None:
|
||||
@ -160,6 +163,8 @@ def register_through_shortcut_fields(model_field: "ManyToManyField") -> None:
|
||||
owner=model_field.to,
|
||||
nullable=True,
|
||||
)
|
||||
setattr(model_field.owner, through_name, RelationDescriptor(name=through_name))
|
||||
setattr(model_field.to, through_name, RelationDescriptor(name=through_name))
|
||||
|
||||
|
||||
def register_relation_in_alias_manager(field: "ForeignKeyField") -> None:
|
||||
|
||||
@ -4,6 +4,7 @@ from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
||||
import sqlalchemy
|
||||
|
||||
import ormar # noqa: I100, I202
|
||||
from ormar.models.descriptors import RelationDescriptor
|
||||
from ormar.models.helpers.pydantic import create_pydantic_field
|
||||
from ormar.models.helpers.related_names_validation import (
|
||||
validate_related_names_in_relations,
|
||||
@ -33,6 +34,7 @@ def adjust_through_many_to_many_model(model_field: "ManyToManyField") -> None:
|
||||
ondelete="CASCADE",
|
||||
owner=model_field.through,
|
||||
)
|
||||
|
||||
model_fields[child_name] = ormar.ForeignKey( # type: ignore
|
||||
model_field.owner,
|
||||
real_name=child_name,
|
||||
@ -50,6 +52,9 @@ def adjust_through_many_to_many_model(model_field: "ManyToManyField") -> None:
|
||||
create_pydantic_field(parent_name, model_field.to, model_field)
|
||||
create_pydantic_field(child_name, model_field.owner, model_field)
|
||||
|
||||
setattr(model_field.through, parent_name, RelationDescriptor(name=parent_name))
|
||||
setattr(model_field.through, child_name, RelationDescriptor(name=child_name))
|
||||
|
||||
|
||||
def create_and_append_m2m_fk(
|
||||
model: Type["Model"], model_field: "ManyToManyField", field_name: str
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import base64
|
||||
import datetime
|
||||
import decimal
|
||||
import numbers
|
||||
@ -77,6 +78,9 @@ def convert_choices_if_needed( # noqa: CCR001
|
||||
)
|
||||
choices = [round(float(o), precision) for o in choices]
|
||||
elif field.__type__ == bytes:
|
||||
if field.represent_as_base64_str:
|
||||
value = value if isinstance(value, bytes) else base64.b64decode(value)
|
||||
else:
|
||||
value = value if isinstance(value, bytes) else value.encode("utf-8")
|
||||
|
||||
return value, choices
|
||||
|
||||
@ -22,6 +22,14 @@ from ormar.exceptions import ModelError
|
||||
from ormar.fields import BaseField
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.fields.many_to_many import ManyToManyField
|
||||
from ormar.models.descriptors import (
|
||||
JsonDescriptor,
|
||||
PkDescriptor,
|
||||
PropertyDescriptor,
|
||||
PydanticDescriptor,
|
||||
RelationDescriptor,
|
||||
)
|
||||
from ormar.models.descriptors.descriptors import BytesDescriptor
|
||||
from ormar.models.helpers import (
|
||||
alias_manager,
|
||||
check_required_meta_parameters,
|
||||
@ -95,6 +103,7 @@ def add_cached_properties(new_model: Type["Model"]) -> None:
|
||||
new_model._pydantic_fields = {name for name in new_model.__fields__}
|
||||
new_model._choices_fields = set()
|
||||
new_model._json_fields = set()
|
||||
new_model._bytes_fields = set()
|
||||
|
||||
|
||||
def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001
|
||||
@ -477,6 +486,31 @@ def update_attrs_and_fields(
|
||||
return updated_model_fields
|
||||
|
||||
|
||||
def add_field_descriptor(
|
||||
name: str, field: "BaseField", new_model: Type["Model"]
|
||||
) -> None:
|
||||
"""
|
||||
Sets appropriate descriptor for each model field.
|
||||
There are 5 main types of descriptors, for bytes, json, pure pydantic fields,
|
||||
and 2 ormar ones - one for relation and one for pk shortcut
|
||||
|
||||
:param name: name of the field
|
||||
:type name: str
|
||||
:param field: model field to add descriptor for
|
||||
:type field: BaseField
|
||||
:param new_model: model with fields
|
||||
:type new_model: Type["Model]
|
||||
"""
|
||||
if field.is_relation:
|
||||
setattr(new_model, name, RelationDescriptor(name=name))
|
||||
elif field.__type__ == pydantic.Json:
|
||||
setattr(new_model, name, JsonDescriptor(name=name))
|
||||
elif field.__type__ == bytes:
|
||||
setattr(new_model, name, BytesDescriptor(name=name))
|
||||
else:
|
||||
setattr(new_model, name, PydanticDescriptor(name=name))
|
||||
|
||||
|
||||
class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
def __new__( # type: ignore # noqa: CCR001
|
||||
mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict
|
||||
@ -539,8 +573,9 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
populate_meta_sqlalchemy_table_if_required(new_model.Meta)
|
||||
expand_reverse_relationships(new_model)
|
||||
# TODO: iterate only related fields
|
||||
for field in new_model.Meta.model_fields.values():
|
||||
for name, field in new_model.Meta.model_fields.items():
|
||||
register_relation_in_alias_manager(field=field)
|
||||
add_field_descriptor(name=name, field=field, new_model=new_model)
|
||||
|
||||
if new_model.Meta.pkname not in attrs["__annotations__"]:
|
||||
field_name = new_model.Meta.pkname
|
||||
@ -551,6 +586,16 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
)
|
||||
new_model.Meta.alias_manager = alias_manager
|
||||
|
||||
for item in new_model.Meta.property_fields:
|
||||
function = getattr(new_model, item)
|
||||
setattr(
|
||||
new_model,
|
||||
item,
|
||||
PropertyDescriptor(name=item, function=function),
|
||||
)
|
||||
|
||||
new_model.pk = PkDescriptor(name=new_model.Meta.pkname)
|
||||
|
||||
return new_model
|
||||
|
||||
@property
|
||||
@ -564,6 +609,17 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
|
||||
return QuerySet(model_cls=cls)
|
||||
|
||||
def __getattr__(self, item: str) -> Any:
|
||||
"""
|
||||
Returns FieldAccessors on access to model fields from a class,
|
||||
that way it can be used in python style filters and order_by.
|
||||
|
||||
:param item: name of the field
|
||||
:type item: str
|
||||
:return: FieldAccessor for given field
|
||||
:rtype: FieldAccessor
|
||||
"""
|
||||
if item == "pk":
|
||||
item = self.Meta.pkname
|
||||
if item in object.__getattribute__(self, "Meta").model_fields:
|
||||
field = self.Meta.model_fields.get(item)
|
||||
if field.is_relation:
|
||||
|
||||
@ -151,7 +151,7 @@ class ExcludableMixin(RelationMixin):
|
||||
:return: set or dict with excluded fields added.
|
||||
:rtype: Union[Set, Dict]
|
||||
"""
|
||||
exclude = exclude or {}
|
||||
exclude = exclude or set()
|
||||
related_set = cls.extract_related_names()
|
||||
if isinstance(exclude, set):
|
||||
exclude = {s for s in exclude}
|
||||
@ -162,6 +162,26 @@ class ExcludableMixin(RelationMixin):
|
||||
exclude = exclude.union(related_set)
|
||||
return exclude
|
||||
|
||||
@classmethod
|
||||
def _update_excluded_with_pks_and_through(
|
||||
cls, exclude: Set, exclude_primary_keys: bool, exclude_through_models: bool
|
||||
) -> Set:
|
||||
"""
|
||||
Updates excluded names with name of pk column if exclude flag is set.
|
||||
|
||||
:param exclude: set of names to exclude
|
||||
:type exclude: Set
|
||||
:param exclude_primary_keys: flag if the primary keys should be excluded
|
||||
:type exclude_primary_keys: bool
|
||||
:return: set updated with pk if flag is set
|
||||
:rtype: Set
|
||||
"""
|
||||
if exclude_primary_keys:
|
||||
exclude.add(cls.Meta.pkname)
|
||||
if exclude_through_models:
|
||||
exclude = exclude.union(cls.extract_through_names())
|
||||
return exclude
|
||||
|
||||
@classmethod
|
||||
def get_names_to_exclude(cls, excludable: ExcludableItems, alias: str) -> Set:
|
||||
"""
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
import base64
|
||||
import sys
|
||||
import warnings
|
||||
from typing import (
|
||||
AbstractSet,
|
||||
Any,
|
||||
@ -8,7 +10,6 @@ from typing import (
|
||||
Mapping,
|
||||
MutableSequence,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Tuple,
|
||||
@ -37,7 +38,6 @@ from ormar.models.helpers.sqlalchemy import (
|
||||
populate_meta_sqlalchemy_table_if_required,
|
||||
update_column_definition,
|
||||
)
|
||||
from ormar.models.helpers.validation import validate_choices
|
||||
from ormar.models.metaclass import ModelMeta, ModelMetaclass
|
||||
from ormar.models.modelproxy import ModelTableProxy
|
||||
from ormar.queryset.utils import translate_list_to_dict
|
||||
@ -87,6 +87,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
_pydantic_fields: Set
|
||||
_quick_access_fields: Set
|
||||
_json_fields: Set
|
||||
_bytes_fields: Set
|
||||
Meta: ModelMeta
|
||||
|
||||
# noinspection PyMissingConstructor
|
||||
@ -155,23 +156,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001
|
||||
"""
|
||||
Overwrites setattr in object to allow for special behaviour of certain params.
|
||||
|
||||
Parameter "pk" is translated into actual primary key field name.
|
||||
|
||||
Relations are expanded (child model constructed if needed) and registered on
|
||||
both ends of the relation. The related models are handled by RelationshipManager
|
||||
exposed at _orm param.
|
||||
|
||||
Json fields converted if needed.
|
||||
|
||||
Setting pk, foreign key value or any other field value sets Model save status
|
||||
to False. Setting a reverse relation or many to many relation does not as it
|
||||
does not modify the state of the model (but related model or through model).
|
||||
|
||||
To short circuit all checks and expansions the set of attribute names present
|
||||
on each model is gathered into _quick_access_fields that is looked first and
|
||||
if field is in this set the object setattr is called directly.
|
||||
Overwrites setattr in pydantic parent as otherwise descriptors are not called.
|
||||
|
||||
:param name: name of the attribute to set
|
||||
:type name: str
|
||||
@ -180,84 +165,35 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
if name in object.__getattribute__(self, "_quick_access_fields"):
|
||||
if hasattr(self, name):
|
||||
object.__setattr__(self, name, value)
|
||||
elif name == "pk":
|
||||
object.__setattr__(self, self.Meta.pkname, value)
|
||||
self.set_save_status(False)
|
||||
elif name in object.__getattribute__(self, "_orm"):
|
||||
model = (
|
||||
object.__getattribute__(self, "Meta")
|
||||
.model_fields[name]
|
||||
.expand_relationship(value=value, child=self)
|
||||
)
|
||||
if isinstance(object.__getattribute__(self, "__dict__").get(name), list):
|
||||
# virtual foreign key or many to many
|
||||
# TODO: Fix double items in dict, no effect on real action ugly repr
|
||||
# if model.pk not in [x.pk for x in related_list]:
|
||||
object.__getattribute__(self, "__dict__")[name].append(model)
|
||||
else:
|
||||
# foreign key relation
|
||||
object.__getattribute__(self, "__dict__")[name] = model
|
||||
self.set_save_status(False)
|
||||
else:
|
||||
if name in object.__getattribute__(self, "_choices_fields"):
|
||||
validate_choices(field=self.Meta.model_fields[name], value=value)
|
||||
super().__setattr__(name, self._convert_json(name, value, op="dumps"))
|
||||
self.set_save_status(False)
|
||||
# let pydantic handle errors for unknown fields
|
||||
super().__setattr__(name, value)
|
||||
|
||||
def __getattribute__(self, item: str) -> Any: # noqa: CCR001
|
||||
def __getattr__(self, item: str) -> Any:
|
||||
"""
|
||||
Because we need to overwrite getting the attribute by ormar instead of pydantic
|
||||
as well as returning related models and not the value stored on the model the
|
||||
__getattribute__ needs to be used not __getattr__.
|
||||
Used only to silence mypy errors for Through models and reverse relations.
|
||||
Not used in real life as in practice calls are intercepted
|
||||
by RelationDescriptors
|
||||
|
||||
It's used to access all attributes so it can be a big overhead that's why a
|
||||
number of short circuits is used.
|
||||
|
||||
To short circuit all checks and expansions the set of attribute names present
|
||||
on each model is gathered into _quick_access_fields that is looked first and
|
||||
if field is in this set the object setattr is called directly.
|
||||
|
||||
To avoid recursion object's getattribute is used to actually get the attribute
|
||||
value from the model after the checks.
|
||||
|
||||
Even the function calls are constructed with objects functions.
|
||||
|
||||
Parameter "pk" is translated into actual primary key field name.
|
||||
|
||||
Relations are returned so the actual related model is returned and not current
|
||||
model's field. The related models are handled by RelationshipManager exposed
|
||||
at _orm param.
|
||||
|
||||
Json fields are converted if needed.
|
||||
|
||||
:param item: name of the attribute to retrieve
|
||||
:param item: name of attribute
|
||||
:type item: str
|
||||
:return: value of the attribute
|
||||
:return: Any
|
||||
:rtype: Any
|
||||
"""
|
||||
if item in object.__getattribute__(self, "_quick_access_fields"):
|
||||
return object.__getattribute__(self, item)
|
||||
if item == "pk":
|
||||
return object.__getattribute__(self, "__dict__").get(self.Meta.pkname, None)
|
||||
if item in object.__getattribute__(self, "extract_related_names")():
|
||||
return object.__getattribute__(
|
||||
self, "_extract_related_model_instead_of_field"
|
||||
)(item)
|
||||
if item in object.__getattribute__(self, "extract_through_names")():
|
||||
return object.__getattribute__(
|
||||
self, "_extract_related_model_instead_of_field"
|
||||
)(item)
|
||||
if item in object.__getattribute__(self, "Meta").property_fields:
|
||||
value = object.__getattribute__(self, item)
|
||||
return value() if callable(value) else value
|
||||
if item in object.__getattribute__(self, "_pydantic_fields"):
|
||||
value = object.__getattribute__(self, "__dict__").get(item, None)
|
||||
value = object.__getattribute__(self, "_convert_json")(item, value, "loads")
|
||||
return value
|
||||
return super().__getattribute__(item)
|
||||
|
||||
return object.__getattribute__(self, item) # pragma: no cover
|
||||
def _internal_set(self, name: str, value: Any) -> None:
|
||||
"""
|
||||
Delegates call to pydantic.
|
||||
|
||||
:param name: name of param
|
||||
:type name: str
|
||||
:param value: value to set
|
||||
:type value: Any
|
||||
"""
|
||||
super().__setattr__(name, value)
|
||||
|
||||
def _verify_model_can_be_initialized(self) -> None:
|
||||
"""
|
||||
@ -266,9 +202,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
if object.__getattribute__(self, "Meta").abstract:
|
||||
if self.Meta.abstract:
|
||||
raise ModelError(f"You cannot initialize abstract model {self.get_name()}")
|
||||
if object.__getattribute__(self, "Meta").requires_ref_update:
|
||||
if self.Meta.requires_ref_update:
|
||||
raise ModelError(
|
||||
f"Model {self.get_name()} has not updated "
|
||||
f"ForwardRefs. \nBefore using the model you "
|
||||
@ -292,10 +228,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:return: modified kwargs
|
||||
:rtype: Tuple[Dict, Dict]
|
||||
"""
|
||||
meta = object.__getattribute__(self, "Meta")
|
||||
property_fields = meta.property_fields
|
||||
model_fields = meta.model_fields
|
||||
pydantic_fields = object.__getattribute__(self, "__fields__")
|
||||
property_fields = self.Meta.property_fields
|
||||
model_fields = self.Meta.model_fields
|
||||
pydantic_fields = set(self.__fields__.keys())
|
||||
|
||||
# remove property fields
|
||||
for prop_filed in property_fields:
|
||||
@ -303,7 +238,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
excluded: Set[str] = kwargs.pop("__excluded__", set())
|
||||
if "pk" in kwargs:
|
||||
kwargs[meta.pkname] = kwargs.pop("pk")
|
||||
kwargs[self.Meta.pkname] = kwargs.pop("pk")
|
||||
|
||||
# extract through fields
|
||||
through_tmp_dict = dict()
|
||||
@ -312,12 +247,14 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
try:
|
||||
new_kwargs: Dict[str, Any] = {
|
||||
k: self._convert_json(
|
||||
k: self._convert_to_bytes(
|
||||
k,
|
||||
self._convert_json(
|
||||
k,
|
||||
model_fields[k].expand_relationship(v, self, to_register=False,)
|
||||
if k in model_fields
|
||||
else (v if k in pydantic_fields else model_fields[k]),
|
||||
"dumps",
|
||||
),
|
||||
)
|
||||
for k, v in kwargs.items()
|
||||
}
|
||||
@ -349,21 +286,6 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
),
|
||||
)
|
||||
|
||||
def _extract_related_model_instead_of_field(
|
||||
self, item: str
|
||||
) -> Optional[Union["Model", Sequence["Model"]]]:
|
||||
"""
|
||||
Retrieves the related model/models from RelationshipManager.
|
||||
|
||||
:param item: name of the relation
|
||||
:type item: str
|
||||
:return: related model, list of related models or None
|
||||
:rtype: Optional[Union[Model, List[Model]]]
|
||||
"""
|
||||
if item in self._orm:
|
||||
return self._orm.get(item) # type: ignore
|
||||
return None # pragma no cover
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
"""
|
||||
Compares other model to this model. when == is called.
|
||||
@ -562,6 +484,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
models: MutableSequence,
|
||||
include: Union[Set, Dict, None],
|
||||
exclude: Union[Set, Dict, None],
|
||||
exclude_primary_keys: bool,
|
||||
exclude_through_models: bool,
|
||||
) -> List:
|
||||
"""
|
||||
Converts list of models into list of dictionaries.
|
||||
@ -580,7 +504,11 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
try:
|
||||
result.append(
|
||||
model.dict(
|
||||
relation_map=relation_map, include=include, exclude=exclude,
|
||||
relation_map=relation_map,
|
||||
include=include,
|
||||
exclude=exclude,
|
||||
exclude_primary_keys=exclude_primary_keys,
|
||||
exclude_through_models=exclude_through_models,
|
||||
)
|
||||
)
|
||||
except ReferenceError: # pragma no cover
|
||||
@ -623,6 +551,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
dict_instance: Dict,
|
||||
include: Optional[Dict],
|
||||
exclude: Optional[Dict],
|
||||
exclude_primary_keys: bool,
|
||||
exclude_through_models: bool,
|
||||
) -> Dict:
|
||||
"""
|
||||
Traverse nested models and converts them into dictionaries.
|
||||
@ -655,6 +585,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
models=nested_model,
|
||||
include=self._convert_all(self._skip_ellipsis(include, field)),
|
||||
exclude=self._convert_all(self._skip_ellipsis(exclude, field)),
|
||||
exclude_primary_keys=exclude_primary_keys,
|
||||
exclude_through_models=exclude_through_models,
|
||||
)
|
||||
elif nested_model is not None:
|
||||
|
||||
@ -664,6 +596,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
),
|
||||
include=self._convert_all(self._skip_ellipsis(include, field)),
|
||||
exclude=self._convert_all(self._skip_ellipsis(exclude, field)),
|
||||
exclude_primary_keys=exclude_primary_keys,
|
||||
exclude_through_models=exclude_through_models,
|
||||
)
|
||||
else:
|
||||
dict_instance[field] = None
|
||||
@ -681,6 +615,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
exclude_unset: bool = False,
|
||||
exclude_defaults: bool = False,
|
||||
exclude_none: bool = False,
|
||||
exclude_primary_keys: bool = False,
|
||||
exclude_through_models: bool = False,
|
||||
relation_map: Dict = None,
|
||||
) -> "DictStrAny": # noqa: A003'
|
||||
"""
|
||||
@ -692,6 +628,10 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
Additionally fields decorated with @property_field are also added.
|
||||
|
||||
:param exclude_through_models: flag to exclude through models from dict
|
||||
:type exclude_through_models: bool
|
||||
:param exclude_primary_keys: flag to exclude primary keys from dict
|
||||
:type exclude_primary_keys: bool
|
||||
:param include: fields to include
|
||||
:type include: Union[Set, Dict, None]
|
||||
:param exclude: fields to exclude
|
||||
@ -711,9 +651,15 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
pydantic_exclude = self._update_excluded_with_related(exclude)
|
||||
pydantic_exclude = self._update_excluded_with_pks_and_through(
|
||||
exclude=pydantic_exclude,
|
||||
exclude_primary_keys=exclude_primary_keys,
|
||||
exclude_through_models=exclude_through_models,
|
||||
)
|
||||
dict_instance = super().dict(
|
||||
include=include,
|
||||
exclude=self._update_excluded_with_related(exclude),
|
||||
exclude=pydantic_exclude,
|
||||
by_alias=by_alias,
|
||||
skip_defaults=skip_defaults,
|
||||
exclude_unset=exclude_unset,
|
||||
@ -721,6 +667,11 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
exclude_none=exclude_none,
|
||||
)
|
||||
|
||||
dict_instance = {
|
||||
k: self._convert_bytes_to_str(column_name=k, value=v)
|
||||
for k, v in dict_instance.items()
|
||||
}
|
||||
|
||||
if include and isinstance(include, Set):
|
||||
include = translate_list_to_dict(include)
|
||||
if exclude and isinstance(exclude, Set):
|
||||
@ -738,6 +689,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
dict_instance=dict_instance,
|
||||
include=include, # type: ignore
|
||||
exclude=exclude, # type: ignore
|
||||
exclude_primary_keys=exclude_primary_keys,
|
||||
exclude_through_models=exclude_through_models,
|
||||
)
|
||||
|
||||
# include model properties as fields in dict
|
||||
@ -748,6 +701,50 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
|
||||
return dict_instance
|
||||
|
||||
def json( # type: ignore # noqa A003
|
||||
self,
|
||||
*,
|
||||
include: Union[Set, Dict] = None,
|
||||
exclude: Union[Set, Dict] = None,
|
||||
by_alias: bool = False,
|
||||
skip_defaults: bool = None,
|
||||
exclude_unset: bool = False,
|
||||
exclude_defaults: bool = False,
|
||||
exclude_none: bool = False,
|
||||
encoder: Optional[Callable[[Any], Any]] = None,
|
||||
exclude_primary_keys: bool = False,
|
||||
exclude_through_models: bool = False,
|
||||
**dumps_kwargs: Any,
|
||||
) -> str:
|
||||
"""
|
||||
Generate a JSON representation of the model, `include` and `exclude`
|
||||
arguments as per `dict()`.
|
||||
|
||||
`encoder` is an optional function to supply as `default` to json.dumps(),
|
||||
other arguments as per `json.dumps()`.
|
||||
"""
|
||||
if skip_defaults is not None: # pragma: no cover
|
||||
warnings.warn(
|
||||
f'{self.__class__.__name__}.json(): "skip_defaults" is deprecated '
|
||||
f'and replaced by "exclude_unset"',
|
||||
DeprecationWarning,
|
||||
)
|
||||
exclude_unset = skip_defaults
|
||||
encoder = cast(Callable[[Any], Any], encoder or self.__json_encoder__)
|
||||
data = self.dict(
|
||||
include=include,
|
||||
exclude=exclude,
|
||||
by_alias=by_alias,
|
||||
exclude_unset=exclude_unset,
|
||||
exclude_defaults=exclude_defaults,
|
||||
exclude_none=exclude_none,
|
||||
exclude_primary_keys=exclude_primary_keys,
|
||||
exclude_through_models=exclude_through_models,
|
||||
)
|
||||
if self.__custom_root_type__: # pragma: no cover
|
||||
data = data["__root__"]
|
||||
return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)
|
||||
|
||||
def update_from_dict(self, value_dict: Dict) -> "NewBaseModel":
|
||||
"""
|
||||
Updates self with values of fields passed in the dictionary.
|
||||
@ -761,7 +758,46 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
setattr(self, key, value)
|
||||
return self
|
||||
|
||||
def _convert_json(self, column_name: str, value: Any, op: str) -> Union[str, Dict]:
|
||||
def _convert_to_bytes(self, column_name: str, value: Any) -> Union[str, Dict]:
|
||||
"""
|
||||
Converts value to bytes from string
|
||||
|
||||
:param column_name: name of the field
|
||||
:type column_name: str
|
||||
:param value: value fo the field
|
||||
:type value: Any
|
||||
:return: converted value if needed, else original value
|
||||
:rtype: Any
|
||||
"""
|
||||
if column_name not in self._bytes_fields:
|
||||
return value
|
||||
field = self.Meta.model_fields[column_name]
|
||||
if not isinstance(value, bytes):
|
||||
if field.represent_as_base64_str:
|
||||
value = base64.b64decode(value)
|
||||
else:
|
||||
value = value.encode("utf-8")
|
||||
return value
|
||||
|
||||
def _convert_bytes_to_str(self, column_name: str, value: Any) -> Union[str, Dict]:
|
||||
"""
|
||||
Converts value to str from bytes for represent_as_base64_str columns.
|
||||
|
||||
:param column_name: name of the field
|
||||
:type column_name: str
|
||||
:param value: value fo the field
|
||||
:type value: Any
|
||||
:return: converted value if needed, else original value
|
||||
:rtype: Any
|
||||
"""
|
||||
if column_name not in self._bytes_fields:
|
||||
return value
|
||||
field = self.Meta.model_fields[column_name]
|
||||
if not isinstance(value, str) and field.represent_as_base64_str:
|
||||
return base64.b64encode(value).decode()
|
||||
return value
|
||||
|
||||
def _convert_json(self, column_name: str, value: Any) -> Union[str, Dict]:
|
||||
"""
|
||||
Converts value to/from json if needed (for Json columns).
|
||||
|
||||
@ -769,24 +805,14 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
:type column_name: str
|
||||
:param value: value fo the field
|
||||
:type value: Any
|
||||
:param op: operator on json
|
||||
:type op: str
|
||||
:return: converted value if needed, else original value
|
||||
:rtype: Any
|
||||
"""
|
||||
if column_name not in object.__getattribute__(self, "_json_fields"):
|
||||
if column_name not in self._json_fields:
|
||||
return value
|
||||
|
||||
condition = (
|
||||
isinstance(value, str) if op == "loads" else not isinstance(value, str)
|
||||
)
|
||||
operand: Callable[[Any], Any] = (
|
||||
json.loads if op == "loads" else json.dumps # type: ignore
|
||||
)
|
||||
|
||||
if condition:
|
||||
if not isinstance(value, str):
|
||||
try:
|
||||
value = operand(value)
|
||||
value = json.dumps(value)
|
||||
except TypeError: # pragma no cover
|
||||
pass
|
||||
return value.decode("utf-8") if isinstance(value, bytes) else value
|
||||
|
||||
@ -282,6 +282,9 @@ class QuerysetProxy(Generic[T]):
|
||||
|
||||
Actual call delegated to QuerySet.
|
||||
|
||||
Passing args and/or kwargs is a shortcut and equals to calling
|
||||
`filter(*args, **kwargs).first()`.
|
||||
|
||||
List of related models is cleared before the call.
|
||||
|
||||
:param kwargs:
|
||||
@ -300,7 +303,8 @@ class QuerysetProxy(Generic[T]):
|
||||
|
||||
If no criteria set it will return the last row in db sorted by pk.
|
||||
|
||||
Passing a criteria is actually calling filter(**kwargs) method described below.
|
||||
Passing args and/or kwargs is a shortcut and equals to calling
|
||||
`filter(*args, **kwargs).get_or_none()`.
|
||||
|
||||
If not match is found None will be returned.
|
||||
|
||||
@ -324,7 +328,8 @@ class QuerysetProxy(Generic[T]):
|
||||
|
||||
If no criteria set it will return the last row in db sorted by pk.
|
||||
|
||||
Passing a criteria is actually calling filter(**kwargs) method described below.
|
||||
Passing args and/or kwargs is a shortcut and equals to calling
|
||||
`filter(*args, **kwargs).get()`.
|
||||
|
||||
Actual call delegated to QuerySet.
|
||||
|
||||
@ -346,7 +351,8 @@ class QuerysetProxy(Generic[T]):
|
||||
"""
|
||||
Returns all rows from a database for given model for set filter options.
|
||||
|
||||
Passing kwargs is a shortcut and equals to calling `filter(**kwrags).all()`.
|
||||
Passing args and/or kwargs is a shortcut and equals to calling
|
||||
`filter(*args, **kwargs).all()`.
|
||||
|
||||
If there are no rows meeting the criteria an empty list is returned.
|
||||
|
||||
|
||||
@ -76,6 +76,11 @@ renderer:
|
||||
- title: Save Prepare Mixin
|
||||
contents:
|
||||
- models.mixins.save_mixin.*
|
||||
- title: Descriptors
|
||||
children:
|
||||
- title: descriptors
|
||||
contents:
|
||||
- models.descriptors.descriptors.*
|
||||
- title: Fields
|
||||
children:
|
||||
- title: Base Field
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
|
||||
import databases
|
||||
import pytest
|
||||
@ -11,16 +11,28 @@ metadata = sqlalchemy.MetaData()
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
|
||||
|
||||
class User(ormar.Model):
|
||||
class Meta:
|
||||
tablename: str = "users"
|
||||
class MainMeta(ormar.ModelMeta):
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
|
||||
class Role(ormar.Model):
|
||||
class Meta(MainMeta):
|
||||
pass
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=255, nullable=False)
|
||||
|
||||
|
||||
class User(ormar.Model):
|
||||
class Meta(MainMeta):
|
||||
tablename: str = "users"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
email: str = ormar.String(max_length=255, nullable=False)
|
||||
password: str = ormar.String(max_length=255, nullable=True)
|
||||
first_name: str = ormar.String(max_length=255, nullable=False)
|
||||
roles: List[Role] = ormar.ManyToMany(Role)
|
||||
|
||||
|
||||
class Tier(ormar.Model):
|
||||
@ -58,12 +70,20 @@ class Item(ormar.Model):
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def sample_data():
|
||||
user = User(email="test@test.com", password="ijacids7^*&", first_name="Anna")
|
||||
tier = Tier(name="Tier I")
|
||||
category1 = Category(name="Toys", tier=tier)
|
||||
category2 = Category(name="Weapons", tier=tier)
|
||||
item1 = Item(name="Teddy Bear", category=category1, created_by=user)
|
||||
item2 = Item(name="M16", category=category2, created_by=user)
|
||||
role = Role(name="User", id=1)
|
||||
role2 = Role(name="Admin", id=2)
|
||||
user = User(
|
||||
id=1,
|
||||
email="test@test.com",
|
||||
password="ijacids7^*&",
|
||||
first_name="Anna",
|
||||
roles=[role, role2],
|
||||
)
|
||||
tier = Tier(id=1, name="Tier I")
|
||||
category1 = Category(id=1, name="Toys", tier=tier)
|
||||
category2 = Category(id=2, name="Weapons", tier=tier)
|
||||
item1 = Item(id=1, name="Teddy Bear", category=category1, created_by=user)
|
||||
item2 = Item(id=2, name="M16", category=category2, created_by=user)
|
||||
return item1, item2
|
||||
|
||||
|
||||
@ -139,4 +159,30 @@ def test_dumping_to_dict_exclude_and_include_nested_dict(sample_data):
|
||||
assert dict2["category"]["name"] == "Toys"
|
||||
assert "created_by" not in dict1
|
||||
assert dict1["category"]["tier"].get("name") is None
|
||||
assert dict1["category"]["tier"]["id"] is None
|
||||
assert dict1["category"]["tier"]["id"] == 1
|
||||
|
||||
|
||||
def test_dumping_dict_without_primary_keys(sample_data):
|
||||
item1, item2 = sample_data
|
||||
dict1 = item2.dict(exclude_primary_keys=True)
|
||||
assert dict1 == {
|
||||
"category": {"name": "Weapons", "tier": {"name": "Tier I"}},
|
||||
"created_by": {
|
||||
"email": "test@test.com",
|
||||
"first_name": "Anna",
|
||||
"password": "ijacids7^*&",
|
||||
"roles": [{"name": "User"}, {"name": "Admin"}],
|
||||
},
|
||||
"name": "M16",
|
||||
}
|
||||
dict2 = item1.dict(exclude_primary_keys=True)
|
||||
assert dict2 == {
|
||||
"category": {"name": "Toys", "tier": {"name": "Tier I"}},
|
||||
"created_by": {
|
||||
"email": "test@test.com",
|
||||
"first_name": "Anna",
|
||||
"password": "ijacids7^*&",
|
||||
"roles": [{"name": "User"}, {"name": "Admin"}],
|
||||
},
|
||||
"name": "Teddy Bear",
|
||||
}
|
||||
|
||||
135
tests/test_exclude_include_dict/test_pydantic_dict_params.py
Normal file
135
tests/test_exclude_include_dict/test_pydantic_dict_params.py
Normal file
@ -0,0 +1,135 @@
|
||||
from typing import List
|
||||
|
||||
import databases
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
|
||||
import ormar
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
metadata = sqlalchemy.MetaData()
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
|
||||
|
||||
class Category(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "categories"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100, default="Test", nullable=True)
|
||||
visibility: bool = ormar.Boolean(default=True)
|
||||
|
||||
|
||||
class Item(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "items"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
price: float = ormar.Float(default=9.99)
|
||||
categories: List[Category] = ormar.ManyToMany(Category)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exclude_default():
|
||||
async with database:
|
||||
category = Category()
|
||||
assert category.dict() == {
|
||||
"id": None,
|
||||
"items": [],
|
||||
"name": "Test",
|
||||
"visibility": True,
|
||||
}
|
||||
assert category.dict(exclude_defaults=True) == {"items": []}
|
||||
|
||||
await category.save()
|
||||
category2 = await Category.objects.get()
|
||||
assert category2.dict() == {
|
||||
"id": 1,
|
||||
"items": [],
|
||||
"name": "Test",
|
||||
"visibility": True,
|
||||
}
|
||||
assert category2.dict(exclude_defaults=True) == {"id": 1, "items": []}
|
||||
assert category2.json(exclude_defaults=True) == '{"id": 1, "items": []}'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exclude_none():
|
||||
async with database:
|
||||
category = Category(id=2, name=None)
|
||||
assert category.dict() == {
|
||||
"id": 2,
|
||||
"items": [],
|
||||
"name": None,
|
||||
"visibility": True,
|
||||
}
|
||||
assert category.dict(exclude_none=True) == {
|
||||
"id": 2,
|
||||
"items": [],
|
||||
"visibility": True,
|
||||
}
|
||||
|
||||
await category.save()
|
||||
category2 = await Category.objects.get()
|
||||
assert category2.dict() == {
|
||||
"id": 2,
|
||||
"items": [],
|
||||
"name": None,
|
||||
"visibility": True,
|
||||
}
|
||||
assert category2.dict(exclude_none=True) == {
|
||||
"id": 2,
|
||||
"items": [],
|
||||
"visibility": True,
|
||||
}
|
||||
assert (
|
||||
category2.json(exclude_none=True)
|
||||
== '{"id": 2, "visibility": true, "items": []}'
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exclude_unset():
|
||||
async with database:
|
||||
category = Category(id=3, name="Test 2")
|
||||
assert category.dict() == {
|
||||
"id": 3,
|
||||
"items": [],
|
||||
"name": "Test 2",
|
||||
"visibility": True,
|
||||
}
|
||||
assert category.dict(exclude_unset=True) == {
|
||||
"id": 3,
|
||||
"items": [],
|
||||
"name": "Test 2",
|
||||
}
|
||||
|
||||
await category.save()
|
||||
category2 = await Category.objects.get()
|
||||
assert category2.dict() == {
|
||||
"id": 3,
|
||||
"items": [],
|
||||
"name": "Test 2",
|
||||
"visibility": True,
|
||||
}
|
||||
# NOTE how after loading from db all fields are set explicitly
|
||||
# as this is what happens when you populate a model from db
|
||||
assert category2.dict(exclude_unset=True) == {
|
||||
"id": 3,
|
||||
"items": [],
|
||||
"name": "Test 2",
|
||||
"visibility": True,
|
||||
}
|
||||
91
tests/test_fastapi/test_binary_fields.py
Normal file
91
tests/test_fastapi/test_binary_fields.py
Normal file
@ -0,0 +1,91 @@
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from typing import List
|
||||
|
||||
import databases
|
||||
import pydantic
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
from fastapi import FastAPI
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
import ormar
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
app.state.database = database
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup() -> None:
|
||||
database_ = app.state.database
|
||||
if not database_.is_connected:
|
||||
await database_.connect()
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown() -> None:
|
||||
database_ = app.state.database
|
||||
if database_.is_connected:
|
||||
await database_.disconnect()
|
||||
|
||||
|
||||
blob3 = b"\xc3\x28"
|
||||
blob4 = b"\xf0\x28\x8c\x28"
|
||||
blob5 = b"\xee"
|
||||
blob6 = b"\xff"
|
||||
|
||||
|
||||
class BaseMeta(ormar.ModelMeta):
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
|
||||
class BinaryThing(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "things"
|
||||
|
||||
id: uuid.UUID = ormar.UUID(primary_key=True, default=uuid.uuid4)
|
||||
name: str = ormar.Text(default="")
|
||||
bt: bytes = ormar.LargeBinary(
|
||||
max_length=1000,
|
||||
choices=[blob3, blob4, blob5, blob6],
|
||||
represent_as_base64_str=True,
|
||||
)
|
||||
|
||||
|
||||
@app.get("/things", response_model=List[BinaryThing])
|
||||
async def read_things():
|
||||
return await BinaryThing.objects.order_by("name").all()
|
||||
|
||||
|
||||
@app.post("/things", response_model=BinaryThing)
|
||||
async def create_things(thing: BinaryThing):
|
||||
thing = await thing.save()
|
||||
return thing
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
def test_read_main():
|
||||
client = TestClient(app)
|
||||
with client as client:
|
||||
response = client.post(
|
||||
"/things", data=json.dumps({"bt": base64.b64encode(blob3).decode()})
|
||||
)
|
||||
assert response.status_code == 200
|
||||
response = client.get("/things")
|
||||
assert response.json()[0]["bt"] == base64.b64encode(blob3).decode()
|
||||
thing = BinaryThing(**response.json()[0])
|
||||
assert thing.__dict__["bt"] == blob3
|
||||
149
tests/test_fastapi/test_excluding_fields.py
Normal file
149
tests/test_fastapi/test_excluding_fields.py
Normal file
@ -0,0 +1,149 @@
|
||||
from typing import List
|
||||
|
||||
import databases
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
from fastapi import FastAPI
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
import ormar
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
app = FastAPI()
|
||||
metadata = sqlalchemy.MetaData()
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
app.state.database = database
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup() -> None:
|
||||
database_ = app.state.database
|
||||
if not database_.is_connected:
|
||||
await database_.connect()
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown() -> None:
|
||||
database_ = app.state.database
|
||||
if database_.is_connected:
|
||||
await database_.disconnect()
|
||||
|
||||
|
||||
class Category(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "categories"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
|
||||
|
||||
class Item(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "items"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
categories: List[Category] = ormar.ManyToMany(Category)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
@app.post("/items/", response_model=Item)
|
||||
async def create_item(item: Item):
|
||||
await item.save_related(follow=True, save_all=True)
|
||||
return item
|
||||
|
||||
|
||||
@app.get("/items/{item_id}")
|
||||
async def get_item(item_id: int):
|
||||
item = await Item.objects.select_related("categories").get(pk=item_id)
|
||||
return item.dict(exclude_primary_keys=True, exclude_through_models=True)
|
||||
|
||||
|
||||
@app.get("/categories/{category_id}")
|
||||
async def get_category(category_id: int):
|
||||
category = await Category.objects.select_related("items").get(pk=category_id)
|
||||
return category.dict(exclude_primary_keys=True)
|
||||
|
||||
|
||||
@app.get("/categories/nt/{category_id}")
|
||||
async def get_category_no_through(category_id: int):
|
||||
category = await Category.objects.select_related("items").get(pk=category_id)
|
||||
return category.dict(exclude_through_models=True)
|
||||
|
||||
|
||||
@app.get("/categories/ntp/{category_id}")
|
||||
async def get_category_no_pk_through(category_id: int):
|
||||
category = await Category.objects.select_related("items").get(pk=category_id)
|
||||
return category.dict(exclude_through_models=True, exclude_primary_keys=True)
|
||||
|
||||
|
||||
@app.get(
|
||||
"/items/fex/{item_id}",
|
||||
response_model=Item,
|
||||
response_model_exclude={
|
||||
"id",
|
||||
"categories__id",
|
||||
"categories__itemcategory",
|
||||
"categories__items",
|
||||
},
|
||||
)
|
||||
async def get_item_excl(item_id: int):
|
||||
item = await Item.objects.select_all().get(pk=item_id)
|
||||
return item
|
||||
|
||||
|
||||
def test_all_endpoints():
|
||||
client = TestClient(app)
|
||||
with client as client:
|
||||
item = {
|
||||
"name": "test",
|
||||
"categories": [{"name": "test cat"}, {"name": "test cat2"}],
|
||||
}
|
||||
response = client.post("/items/", json=item)
|
||||
item_check = Item(**response.json())
|
||||
assert item_check.id is not None
|
||||
assert item_check.categories[0].id is not None
|
||||
|
||||
no_pk_item = client.get(f"/items/{item_check.id}", json=item).json()
|
||||
assert no_pk_item == item
|
||||
|
||||
no_pk_item2 = client.get(f"/items/fex/{item_check.id}", json=item).json()
|
||||
assert no_pk_item2 == item
|
||||
|
||||
no_pk_category = client.get(
|
||||
f"/categories/{item_check.categories[0].id}", json=item
|
||||
).json()
|
||||
assert no_pk_category == {
|
||||
"items": [
|
||||
{
|
||||
"itemcategory": {"category": None, "id": 1, "item": None},
|
||||
"name": "test",
|
||||
}
|
||||
],
|
||||
"name": "test cat",
|
||||
}
|
||||
|
||||
no_through_category = client.get(
|
||||
f"/categories/nt/{item_check.categories[0].id}", json=item
|
||||
).json()
|
||||
assert no_through_category == {
|
||||
"id": 1,
|
||||
"items": [{"id": 1, "name": "test"}],
|
||||
"name": "test cat",
|
||||
}
|
||||
|
||||
no_through_category = client.get(
|
||||
f"/categories/ntp/{item_check.categories[0].id}", json=item
|
||||
).json()
|
||||
assert no_through_category == {"items": [{"name": "test"}], "name": "test cat"}
|
||||
@ -55,6 +55,7 @@ def test_fields_access():
|
||||
# basic access
|
||||
assert Product.id._field == Product.Meta.model_fields["id"]
|
||||
assert Product.id.id == Product.Meta.model_fields["id"]
|
||||
assert Product.pk.id == Product.id.id
|
||||
assert isinstance(Product.id._field, BaseField)
|
||||
assert Product.id._access_chain == "id"
|
||||
assert Product.id._source_model == Product
|
||||
|
||||
@ -228,7 +228,7 @@ def test_binary_error_without_length_model_definition():
|
||||
database = database
|
||||
metadata = metadata
|
||||
|
||||
test: bytes = ormar.LargeBinary(primary_key=True)
|
||||
test: bytes = ormar.LargeBinary(primary_key=True, max_length=-1)
|
||||
|
||||
|
||||
@typing.no_type_check
|
||||
@ -241,7 +241,7 @@ def test_string_error_in_model_definition():
|
||||
database = database
|
||||
metadata = metadata
|
||||
|
||||
test: str = ormar.String(primary_key=True)
|
||||
test: str = ormar.String(primary_key=True, max_length=0)
|
||||
|
||||
|
||||
@typing.no_type_check
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import datetime
|
||||
import os
|
||||
import uuid
|
||||
from typing import List
|
||||
|
||||
@ -37,7 +39,23 @@ class LargeBinarySample(ormar.Model):
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
test_binary = ormar.LargeBinary(max_length=100000, choices=[blob, blob2])
|
||||
test_binary: bytes = ormar.LargeBinary(max_length=100000, choices=[blob, blob2])
|
||||
|
||||
|
||||
blob3 = os.urandom(64)
|
||||
blob4 = os.urandom(100)
|
||||
|
||||
|
||||
class LargeBinaryStr(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "my_str_blobs"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
test_binary: str = ormar.LargeBinary(
|
||||
max_length=100000, choices=[blob3, blob4], represent_as_base64_str=True
|
||||
)
|
||||
|
||||
|
||||
class UUIDSample(ormar.Model):
|
||||
@ -157,6 +175,9 @@ async def test_json_column():
|
||||
assert items[0].test_json == dict(aa=12)
|
||||
assert items[1].test_json == dict(aa=12)
|
||||
|
||||
items[0].test_json = "[1, 2, 3]"
|
||||
assert items[0].test_json == [1, 2, 3]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_binary_column():
|
||||
@ -170,6 +191,25 @@ async def test_binary_column():
|
||||
assert items[0].test_binary == blob
|
||||
assert items[1].test_binary == blob2
|
||||
|
||||
items[0].test_binary = "test2icac89uc98"
|
||||
assert items[0].test_binary == b"test2icac89uc98"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_binary_str_column():
|
||||
async with database:
|
||||
async with database.transaction(force_rollback=True):
|
||||
await LargeBinaryStr.objects.create(test_binary=blob3)
|
||||
await LargeBinaryStr.objects.create(test_binary=blob4)
|
||||
|
||||
items = await LargeBinaryStr.objects.all()
|
||||
assert len(items) == 2
|
||||
assert items[0].test_binary == base64.b64encode(blob3).decode()
|
||||
items[0].test_binary = base64.b64encode(blob4).decode()
|
||||
assert items[0].test_binary == base64.b64encode(blob4).decode()
|
||||
assert items[1].test_binary == base64.b64encode(blob4).decode()
|
||||
assert items[1].__dict__["test_binary"] == blob4
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uuid_column():
|
||||
|
||||
Reference in New Issue
Block a user