use existing encode_json to avoid code duplication, rename queryset customization param and move it to Meta, move docs to models from inheritance

This commit is contained in:
collerek
2022-01-26 17:59:00 +01:00
parent 33b492216d
commit 0e167dc89f
14 changed files with 74 additions and 112 deletions

View File

@ -1,5 +1,5 @@
[flake8] [flake8]
ignore = ANN101, ANN102, W503, S101, CFQ004 ignore = ANN101, ANN102, W503, S101, CFQ004, S311
max-complexity = 8 max-complexity = 8
max-line-length = 88 max-line-length = 88
import-order-style = pycharm import-order-style = pycharm

View File

@ -266,6 +266,44 @@ But for now you cannot change the ManyToMany column names as they go through oth
--8<-- "../docs_src/models/docs010.py" --8<-- "../docs_src/models/docs010.py"
``` ```
## Overwriting the default QuerySet
If you want to customize the queries run by ormar you can define your own queryset class (that extends the ormar `QuerySet`) in your model class, default one is simply the `QuerySet`
You can provide a new class in `Meta` configuration of your class as `queryset_class` parameter.
```python
import ormar
from ormar.queryset.queryset import QuerySet
from fastapi import HTTPException
class MyQuerySetClass(QuerySet):
async def first_or_404(self, *args, **kwargs):
entity = await self.get_or_none(*args, **kwargs)
if entity is None:
# in fastapi or starlette
raise HTTPException(404)
class Book(ormar.Model):
class Meta(ormar.ModelMeta):
metadata = metadata
database = database
tablename = "book"
queryset_class = MyQuerySetClass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=32)
# when book not found, raise `404` in your view.
book = await Book.objects.first_or_404(name="123")
```
### Type Hints & Legacy ### Type Hints & Legacy
Before version 0.4.0 `ormar` supported only one way of defining `Fields` on a `Model` using python type hints as pydantic. Before version 0.4.0 `ormar` supported only one way of defining `Fields` on a `Model` using python type hints as pydantic.

View File

@ -571,37 +571,3 @@ class Category(CreateDateFieldsModel, AuditCreateModel):
``` ```
That way you can inherit from both create and update classes if needed, and only one of them otherwise. That way you can inherit from both create and update classes if needed, and only one of them otherwise.
## __queryset_cls__
You can define your own queryset_class(extends the `Queryset`) in your model class, default is `QuerySet`
```python
import ormar
from ormar.queryset.queryset import QuerySet
from fastapi import HTTPException
class MyQuerySetClass(QuerySet):
async def first_or_404(self, *args, **kwargs):
entity = await self.get_or_none(*args, **kwargs)
if entity is None:
# in fastapi or starlette
raise HTTPException(404)
class Book(ormar.Model):
class Meta:
metadata = metadata
database = database
tablename = "book"
__queryset_cls__ = MyQuerySetClass
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=32)
# when book not found, raise `404` in your view.
book = await Book.objects.first_or_404(name="123")
```

View File

@ -25,12 +25,12 @@ except ImportError: # pragma: no cover
from importlib_metadata import version # type: ignore from importlib_metadata import version # type: ignore
from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100 from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100
from ormar.decorators import ( # noqa: I100 from ormar.decorators import ( # noqa: I100
post_bulk_update,
post_delete, post_delete,
post_relation_add, post_relation_add,
post_relation_remove, post_relation_remove,
post_save, post_save,
post_update, post_update,
post_bulk_update,
pre_delete, pre_delete,
pre_relation_add, pre_relation_add,
pre_relation_remove, pre_relation_remove,

View File

@ -1,4 +1,4 @@
from typing import Callable, List, Type, TYPE_CHECKING, Union from typing import Callable, List, TYPE_CHECKING, Type, Union
if TYPE_CHECKING: # pragma: no cover if TYPE_CHECKING: # pragma: no cover
from ormar import Model from ormar import Model

View File

@ -1,12 +1,7 @@
import base64 import base64
from typing import Any, TYPE_CHECKING, Type from typing import Any, TYPE_CHECKING, Type
from ormar.queryset.utils import to_str from ormar.fields.parsers import encode_json
try:
import orjson as json
except ImportError: # pragma: no cover
import json # type: ignore
if TYPE_CHECKING: # pragma: no cover if TYPE_CHECKING: # pragma: no cover
from ormar import Model from ormar import Model
@ -42,9 +37,7 @@ class JsonDescriptor:
return value return value
def __set__(self, instance: "Model", value: Any) -> None: def __set__(self, instance: "Model", value: Any) -> None:
if not isinstance(value, str): value = encode_json(value)
value = json.dumps(value)
value = to_str(value)
instance._internal_set(self.name, value) instance._internal_set(self.name, value)
instance.set_save_status(False) instance.set_save_status(False)

View File

@ -47,18 +47,18 @@ def populate_default_options_values( # noqa: CCR001
:param model_fields: dict of model fields :param model_fields: dict of model fields
:type model_fields: Union[Dict[str, type], Dict] :type model_fields: Union[Dict[str, type], Dict]
""" """
if not hasattr(new_model.Meta, "constraints"): defaults = {
new_model.Meta.constraints = [] "queryset_class": ormar.QuerySet,
if not hasattr(new_model.Meta, "model_fields"): "constraints": [],
new_model.Meta.model_fields = model_fields "model_fields": model_fields,
if not hasattr(new_model.Meta, "abstract"): "abstract": False,
new_model.Meta.abstract = False "extra": Extra.forbid,
if not hasattr(new_model.Meta, "extra"): "orders_by": [],
new_model.Meta.extra = Extra.forbid "exclude_parent_fields": [],
if not hasattr(new_model.Meta, "orders_by"): }
new_model.Meta.orders_by = [] for key, value in defaults.items():
if not hasattr(new_model.Meta, "exclude_parent_fields"): if not hasattr(new_model.Meta, key):
new_model.Meta.exclude_parent_fields = [] setattr(new_model.Meta, key, value)
if any( if any(
is_field_an_forward_ref(field) for field in new_model.Meta.model_fields.values() is_field_an_forward_ref(field) for field in new_model.Meta.model_fields.values()

View File

@ -85,6 +85,7 @@ class ModelMeta:
orders_by: List[str] orders_by: List[str]
exclude_parent_fields: List[str] exclude_parent_fields: List[str]
extra: Extra extra: Extra
queryset_class: Type[QuerySet]
def add_cached_properties(new_model: Type["Model"]) -> None: def add_cached_properties(new_model: Type["Model"]) -> None:
@ -614,8 +615,6 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
return new_model return new_model
__queryset_cls__ = QuerySet
@property @property
def objects(cls: Type["T"]) -> "QuerySet[T]": # type: ignore def objects(cls: Type["T"]) -> "QuerySet[T]": # type: ignore
if cls.Meta.requires_ref_update: if cls.Meta.requires_ref_update:
@ -624,7 +623,7 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
f"ForwardRefs. \nBefore using the model you " f"ForwardRefs. \nBefore using the model you "
f"need to call update_forward_refs()." f"need to call update_forward_refs()."
) )
return cls.__queryset_cls__(model_cls=cls) return cls.Meta.queryset_class(model_cls=cls)
def __getattr__(self, item: str) -> Any: def __getattr__(self, item: str) -> Any:
""" """

View File

@ -12,18 +12,13 @@ from typing import (
cast, cast,
) )
try:
import orjson as json
except ImportError: # pragma: no cover
import json # type: ignore
import pydantic import pydantic
import ormar # noqa: I100, I202 import ormar # noqa: I100, I202
from ormar.exceptions import ModelPersistenceError from ormar.exceptions import ModelPersistenceError
from ormar.fields.parsers import encode_json
from ormar.models.mixins import AliasMixin from ormar.models.mixins import AliasMixin
from ormar.models.mixins.relation_mixin import RelationMixin from ormar.models.mixins.relation_mixin import RelationMixin
from ormar.queryset.utils import to_str
if TYPE_CHECKING: # pragma: no cover if TYPE_CHECKING: # pragma: no cover
from ormar import ForeignKeyField, Model from ormar import ForeignKeyField, Model
@ -208,8 +203,8 @@ class SavePrepareMixin(RelationMixin, AliasMixin):
:rtype: Dict :rtype: Dict
""" """
for key, value in model_dict.items(): for key, value in model_dict.items():
if key in cls._json_fields and not isinstance(value, str): if key in cls._json_fields:
model_dict[key] = to_str(json.dumps(value)) model_dict[key] = encode_json(value)
return model_dict return model_dict
@classmethod @classmethod

View File

@ -30,9 +30,9 @@ except ImportError: # pragma: no cover
import ormar # noqa I100 import ormar # noqa I100
from ormar import MultipleMatches, NoMatch from ormar import MultipleMatches, NoMatch
from ormar.exceptions import ( from ormar.exceptions import (
ModelListEmptyError,
ModelPersistenceError, ModelPersistenceError,
QueryDefinitionError, QueryDefinitionError,
ModelListEmptyError,
) )
from ormar.queryset import FieldAccessor, FilterQuery, SelectAction from ormar.queryset import FieldAccessor, FilterQuery, SelectAction
from ormar.queryset.actions.order_action import OrderAction from ormar.queryset.actions.order_action import OrderAction
@ -1065,8 +1065,8 @@ class QuerySet(Generic[T]):
:type objects: List[Model] :type objects: List[Model]
""" """
ready_objects = [obj.prepare_model_to_save(obj.dict()) for obj in objects] ready_objects = [obj.prepare_model_to_save(obj.dict()) for obj in objects]
# don't use execute_many, as in databases it's executed in a loop # don't use execute_many, as in databases it's executed in a loop
# instead of using execute_many from drivers # instead of using execute_many from drivers
expr = self.table.insert().values(ready_objects) expr = self.table.insert().values(ready_objects)
await self.database.execute(expr) await self.database.execute(expr)

View File

@ -17,15 +17,6 @@ if TYPE_CHECKING: # pragma no cover
from ormar import Model, BaseField from ormar import Model, BaseField
def to_str(val: Union[bytes, str, int]) -> str:
""" convert bytes to str simply """
if isinstance(val, bytes):
return val.decode("utf-8")
elif isinstance(val, str):
return val
return str(val)
def check_node_not_dict_or_not_last_node( def check_node_not_dict_or_not_last_node(
part: str, is_last: bool, current_level: Any part: str, is_last: bool, current_level: Any
) -> bool: ) -> bool:

View File

@ -1,6 +1,6 @@
import asyncio import asyncio
import inspect import inspect
from typing import Any, Callable, Dict, Tuple, Type, TYPE_CHECKING, Union from typing import Any, Callable, Dict, TYPE_CHECKING, Tuple, Type, Union
from ormar.exceptions import SignalDefinitionError from ormar.exceptions import SignalDefinitionError

View File

@ -78,22 +78,21 @@ class ItemConfig(ormar.Model):
pairs: pydantic.Json = ormar.JSON(default=["2", "3"]) pairs: pydantic.Json = ormar.JSON(default=["2", "3"])
class QuerySetCls(QuerySet):
async def first_or_404(self, *args, **kwargs):
entity = await self.get_or_none(*args, **kwargs)
if not entity:
# maybe HTTPException in fastapi
raise ValueError("customer not found")
return entity
class Customer(ormar.Model): class Customer(ormar.Model):
class Meta: class Meta:
metadata = metadata metadata = metadata
database = database database = database
tablename = "customer" tablename = "customer"
queryset_class = QuerySetCls
class QuerySetCls(QuerySet):
async def first_or_404(self, *args, **kwargs):
entity = await self.get_or_none(*args, **kwargs)
if not entity:
# maybe HTTPException in fastapi
raise ValueError("customer not found")
return entity
__queryset_cls__ = QuerySetCls
id: Optional[int] = ormar.Integer(primary_key=True) id: Optional[int] = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=32) name: str = ormar.String(max_length=32)

View File

@ -1,19 +0,0 @@
import orjson
import json
from ormar.queryset.utils import to_str
def test_to_str():
expected_str = "[]"
val = orjson.dumps([])
assert expected_str == to_str(val)
val = json.dumps([])
assert expected_str == to_str(val)
expected_bytes = expected_str.encode()
assert isinstance(expected_bytes, bytes)
assert isinstance(to_str(expected_bytes), str)
assert "1" == to_str(1)