Merge pull request #96 from collerek/m2m_fields

M2m fields
This commit is contained in:
collerek
2021-02-11 17:32:55 +07:00
committed by GitHub
21 changed files with 725 additions and 331 deletions

View File

@ -1,3 +1,13 @@
# 0.9.3
## Fixes
* Fix `JSON` field being double escaped when setting value after initialization
* Fix `JSON` field not respecting `nullable` field setting due to `pydantic` internals
* Fix `choices` verification for `JSON` field
* Fix `choices` not being verified when setting the attribute after initialization
* Fix `choices` not being verified during `update` call from `QuerySet`
# 0.9.2 # 0.9.2
## Other ## Other

View File

@ -43,6 +43,8 @@ nav:
- api/models/helpers/pydantic.md - api/models/helpers/pydantic.md
- api/models/helpers/relations.md - api/models/helpers/relations.md
- api/models/helpers/sqlalchemy.md - api/models/helpers/sqlalchemy.md
- api/models/helpers/validation.md
- api/models/helpers/related-names-validation.md
- Mixins: - Mixins:
- Alias Mixin: api/models/mixins/alias-mixin.md - Alias Mixin: api/models/mixins/alias-mixin.md
- Excludable Mixin: api/models/mixins/excludable-mixin.md - Excludable Mixin: api/models/mixins/excludable-mixin.md

View File

@ -19,7 +19,8 @@ snakes, and ormar(e) in italian which means cabinet.
And what's a better name for python ORM than snakes cabinet :) And what's a better name for python ORM than snakes cabinet :)
""" """
from ormar.decorators import ( from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100
from ormar.decorators import ( # noqa: I100
post_delete, post_delete,
post_save, post_save,
post_update, post_update,
@ -28,13 +29,13 @@ from ormar.decorators import (
pre_update, pre_update,
property_field, property_field,
) )
from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100
from ormar.exceptions import ( # noqa: I100 from ormar.exceptions import ( # noqa: I100
ModelDefinitionError, ModelDefinitionError,
MultipleMatches, MultipleMatches,
NoMatch, NoMatch,
) )
from ormar.fields import ( # noqa: I100 from ormar.fields import (
BaseField,
BigInteger, BigInteger,
Boolean, Boolean,
Date, Date,
@ -42,15 +43,17 @@ from ormar.fields import ( # noqa: I100
Decimal, Decimal,
Float, Float,
ForeignKey, ForeignKey,
ForeignKeyField,
Integer, Integer,
JSON, JSON,
ManyToMany, ManyToMany,
ManyToManyField,
String, String,
Text, Text,
Time, Time,
UUID, UUID,
UniqueColumns, UniqueColumns,
) ) # noqa: I100
from ormar.models import Model from ormar.models import Model
from ormar.models.metaclass import ModelMeta from ormar.models.metaclass import ModelMeta
from ormar.queryset import QuerySet from ormar.queryset import QuerySet
@ -65,7 +68,7 @@ class UndefinedType: # pragma no cover
Undefined = UndefinedType() Undefined = UndefinedType()
__version__ = "0.9.2" __version__ = "0.9.3"
__all__ = [ __all__ = [
"Integer", "Integer",
"BigInteger", "BigInteger",
@ -100,4 +103,7 @@ __all__ = [
"pre_save", "pre_save",
"pre_update", "pre_update",
"Signal", "Signal",
"BaseField",
"ManyToManyField",
"ForeignKeyField",
] ]

View File

@ -5,7 +5,7 @@ as well as relation Fields (ForeignKey, ManyToMany).
Also a definition for custom CHAR based sqlalchemy UUID field Also a definition for custom CHAR based sqlalchemy UUID field
""" """
from ormar.fields.base import BaseField from ormar.fields.base import BaseField
from ormar.fields.foreign_key import ForeignKey, UniqueColumns from ormar.fields.foreign_key import ForeignKey, ForeignKeyField, UniqueColumns
from ormar.fields.many_to_many import ManyToMany, ManyToManyField from ormar.fields.many_to_many import ManyToMany, ManyToManyField
from ormar.fields.model_fields import ( from ormar.fields.model_fields import (
BigInteger, BigInteger,
@ -40,4 +40,5 @@ __all__ = [
"ManyToManyField", "ManyToManyField",
"BaseField", "BaseField",
"UniqueColumns", "UniqueColumns",
"ForeignKeyField",
] ]

View File

@ -1,12 +1,10 @@
from typing import Any, List, Optional, TYPE_CHECKING, Type, Union from typing import Any, List, Optional, TYPE_CHECKING, Type, Union
import pydantic
import sqlalchemy import sqlalchemy
from pydantic import Field, typing from pydantic import Field, Json, typing
from pydantic.fields import FieldInfo from pydantic.fields import FieldInfo, Required, Undefined
import ormar # noqa I101 import ormar # noqa I101
from ormar import ModelDefinitionError # noqa I101
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
from ormar.models import Model from ormar.models import Model
@ -96,6 +94,30 @@ class BaseField(FieldInfo):
and hasattr(cls, field_name) and hasattr(cls, field_name)
) )
@classmethod
def get_base_pydantic_field_info(cls, allow_null: bool) -> FieldInfo:
"""
Generates base pydantic.FieldInfo with only default and optionally
required to fix pydantic Json field being set to required=False.
Used in an ormar Model Metaclass.
:param allow_null: flag if the default value can be None
or if it should be populated by pydantic Undefined
:type allow_null: bool
:return: instance of base pydantic.FieldInfo
:rtype: pydantic.FieldInfo
"""
base = cls.default_value()
if base is None:
base = (
FieldInfo(default=None)
if (cls.nullable or allow_null)
else FieldInfo(default=Undefined)
)
if cls.__type__ == Json and base.default is Undefined:
base.default = Required
return base
@classmethod @classmethod
def convert_to_pydantic_field_info(cls, allow_null: bool = False) -> FieldInfo: def convert_to_pydantic_field_info(cls, allow_null: bool = False) -> FieldInfo:
""" """
@ -109,13 +131,7 @@ class BaseField(FieldInfo):
:return: actual instance of pydantic.FieldInfo with all needed fields populated :return: actual instance of pydantic.FieldInfo with all needed fields populated
:rtype: pydantic.FieldInfo :rtype: pydantic.FieldInfo
""" """
base = cls.default_value() base = cls.get_base_pydantic_field_info(allow_null=allow_null)
if base is None:
base = (
FieldInfo(default=None)
if (cls.nullable or allow_null)
else FieldInfo(default=pydantic.fields.Undefined)
)
for attr_name in FieldInfo.__dict__.keys(): for attr_name in FieldInfo.__dict__.keys():
if cls.is_valid_field_info_field(attr_name): if cls.is_valid_field_info_field(attr_name):
setattr(base, attr_name, cls.__dict__.get(attr_name)) setattr(base, attr_name, cls.__dict__.get(attr_name))

View File

@ -222,7 +222,7 @@ class ForeignKeyField(BaseField):
to: Type["Model"] to: Type["Model"]
name: str name: str
related_name: str related_name: str # type: ignore
virtual: bool virtual: bool
ondelete: str ondelete: str
onupdate: str onupdate: str

View File

@ -1,5 +1,7 @@
from ormar.models.helpers.models import ( from ormar.models.helpers.models import (
check_required_meta_parameters,
extract_annotations_and_default_vals, extract_annotations_and_default_vals,
meta_field_not_set,
populate_default_options_values, populate_default_options_values,
) )
from ormar.models.helpers.pydantic import ( from ormar.models.helpers.pydantic import (
@ -15,7 +17,9 @@ from ormar.models.helpers.relations import expand_reverse_relationships
from ormar.models.helpers.sqlalchemy import ( from ormar.models.helpers.sqlalchemy import (
populate_meta_sqlalchemy_table_if_required, populate_meta_sqlalchemy_table_if_required,
populate_meta_tablename_columns_and_pk, populate_meta_tablename_columns_and_pk,
sqlalchemy_columns_from_model_fields,
) )
from ormar.models.helpers.validation import populate_choices_validators
__all__ = [ __all__ = [
"expand_reverse_relationships", "expand_reverse_relationships",
@ -28,4 +32,8 @@ __all__ = [
"get_pydantic_field", "get_pydantic_field",
"get_potential_fields", "get_potential_fields",
"get_pydantic_base_orm_config", "get_pydantic_base_orm_config",
"check_required_meta_parameters",
"sqlalchemy_columns_from_model_fields",
"populate_choices_validators",
"meta_field_not_set",
] ]

View File

@ -1,10 +1,9 @@
import itertools import itertools
import sqlite3 import sqlite3
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type from typing import Any, Dict, List, TYPE_CHECKING, Tuple, Type
from pydantic.typing import ForwardRef from pydantic.typing import ForwardRef
import ormar # noqa: I100 import ormar # noqa: I100
from ormar.fields.foreign_key import ForeignKeyField
from ormar.models.helpers.pydantic import populate_pydantic_default_values from ormar.models.helpers.pydantic import populate_pydantic_default_values
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
@ -22,7 +21,7 @@ def is_field_an_forward_ref(field: Type["BaseField"]) -> bool:
:return: result of the check :return: result of the check
:rtype: bool :rtype: bool
""" """
return issubclass(field, ForeignKeyField) and ( return issubclass(field, ormar.ForeignKeyField) and (
field.to.__class__ == ForwardRef or field.through.__class__ == ForwardRef field.to.__class__ == ForwardRef or field.through.__class__ == ForwardRef
) )
@ -124,43 +123,6 @@ def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]:
return attrs, model_fields return attrs, model_fields
# cannot be in relations helpers due to cyclical import
def validate_related_names_in_relations( # noqa CCR001
model_fields: Dict, new_model: Type["Model"]
) -> None:
"""
Performs a validation of relation_names in relation fields.
If multiple fields are leading to the same related model
only one can have empty related_name param
(populated by default as model.name.lower()+'s').
Also related_names have to be unique for given related model.
:raises ModelDefinitionError: if validation of related_names fail
:param model_fields: dictionary of declared ormar model fields
:type model_fields: Dict[str, ormar.Field]
:param new_model:
:type new_model: Model class
"""
already_registered: Dict[str, List[Optional[str]]] = dict()
for field in model_fields.values():
if issubclass(field, ForeignKeyField):
to_name = (
field.to.get_name()
if not field.to.__class__ == ForwardRef
else str(field.to)
)
previous_related_names = already_registered.setdefault(to_name, [])
if field.related_name in previous_related_names:
raise ormar.ModelDefinitionError(
f"Multiple fields declared on {new_model.get_name(lower=False)} "
f"model leading to {field.to.get_name(lower=False)} model without "
f"related_name property set. \nThere can be only one relation with "
f"default/empty name: '{new_model.get_name() + 's'}'"
f"\nTip: provide different related_name for FK and/or M2M fields"
)
previous_related_names.append(field.related_name)
def group_related_list(list_: List) -> Dict: def group_related_list(list_: List) -> Dict:
""" """
Translates the list of related strings into a dictionary. Translates the list of related strings into a dictionary.
@ -191,3 +153,19 @@ def group_related_list(list_: List) -> Dict:
else: else:
result_dict.setdefault(key, []).extend(new) result_dict.setdefault(key, []).extend(new)
return {k: v for k, v in sorted(result_dict.items(), key=lambda item: len(item[1]))} return {k: v for k, v in sorted(result_dict.items(), key=lambda item: len(item[1]))}
def meta_field_not_set(model: Type["Model"], field_name: str) -> bool:
"""
Checks if field with given name is already present in model.Meta.
Then check if it's set to something truthful
(in practice meaning not None, as it's non or ormar Field only).
:param model: newly constructed model
:type model: Model class
:param field_name: name of the ormar field
:type field_name: str
:return: result of the check
:rtype: bool
"""
return not hasattr(model.Meta, field_name) or not getattr(model.Meta, field_name)

View File

@ -1,7 +1,7 @@
import warnings import warnings
from typing import Dict, Optional, TYPE_CHECKING, Tuple, Type from typing import Dict, Optional, TYPE_CHECKING, Tuple, Type
from pydantic import BaseConfig import pydantic
from pydantic.fields import ModelField from pydantic.fields import ModelField
from pydantic.utils import lenient_issubclass from pydantic.utils import lenient_issubclass
@ -132,7 +132,7 @@ def populate_pydantic_default_values(attrs: Dict) -> Tuple[Dict, Dict]:
return attrs, model_fields return attrs, model_fields
def get_pydantic_base_orm_config() -> Type[BaseConfig]: def get_pydantic_base_orm_config() -> Type[pydantic.BaseConfig]:
""" """
Returns empty pydantic Config with orm_mode set to True. Returns empty pydantic Config with orm_mode set to True.
@ -140,7 +140,7 @@ def get_pydantic_base_orm_config() -> Type[BaseConfig]:
:rtype: pydantic Config :rtype: pydantic Config
""" """
class Config(BaseConfig): class Config(pydantic.BaseConfig):
orm_mode = True orm_mode = True
return Config return Config

View File

@ -0,0 +1,43 @@
from typing import Dict, List, Optional, TYPE_CHECKING, Type
from pydantic.typing import ForwardRef
import ormar # noqa: I100
if TYPE_CHECKING: # pragma no cover
from ormar import Model
def validate_related_names_in_relations( # noqa CCR001
model_fields: Dict, new_model: Type["Model"]
) -> None:
"""
Performs a validation of relation_names in relation fields.
If multiple fields are leading to the same related model
only one can have empty related_name param
(populated by default as model.name.lower()+'s').
Also related_names have to be unique for given related model.
:raises ModelDefinitionError: if validation of related_names fail
:param model_fields: dictionary of declared ormar model fields
:type model_fields: Dict[str, ormar.Field]
:param new_model:
:type new_model: Model class
"""
already_registered: Dict[str, List[Optional[str]]] = dict()
for field in model_fields.values():
if issubclass(field, ormar.ForeignKeyField):
to_name = (
field.to.get_name()
if not field.to.__class__ == ForwardRef
else str(field.to)
)
previous_related_names = already_registered.setdefault(to_name, [])
if field.related_name in previous_related_names:
raise ormar.ModelDefinitionError(
f"Multiple fields declared on {new_model.get_name(lower=False)} "
f"model leading to {field.to.get_name(lower=False)} model without "
f"related_name property set. \nThere can be only one relation with "
f"default/empty name: '{new_model.get_name() + 's'}'"
f"\nTip: provide different related_name for FK and/or M2M fields"
)
previous_related_names.append(field.related_name)

View File

@ -3,18 +3,18 @@ from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
import sqlalchemy import sqlalchemy
from ormar import ForeignKey, Integer, ModelDefinitionError # noqa: I202 import ormar # noqa: I100, I202
from ormar.fields import BaseField, ManyToManyField
from ormar.fields.foreign_key import ForeignKeyField
from ormar.models.helpers.models import validate_related_names_in_relations
from ormar.models.helpers.pydantic import create_pydantic_field from ormar.models.helpers.pydantic import create_pydantic_field
from ormar.models.helpers.related_names_validation import (
validate_related_names_in_relations,
)
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
from ormar import Model, ModelMeta from ormar import Model, ModelMeta, ManyToManyField, BaseField, ForeignKeyField
from ormar.models import NewBaseModel from ormar.models import NewBaseModel
def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> None: def adjust_through_many_to_many_model(model_field: Type["ManyToManyField"]) -> None:
""" """
Registers m2m relation on through model. Registers m2m relation on through model.
Sets ormar.ForeignKey from through model to both child and parent models. Sets ormar.ForeignKey from through model to both child and parent models.
@ -27,13 +27,13 @@ def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> Non
parent_name = model_field.default_target_field_name() parent_name = model_field.default_target_field_name()
child_name = model_field.default_source_field_name() child_name = model_field.default_source_field_name()
model_field.through.Meta.model_fields[parent_name] = ForeignKey( model_field.through.Meta.model_fields[parent_name] = ormar.ForeignKey(
model_field.to, model_field.to,
real_name=parent_name, real_name=parent_name,
ondelete="CASCADE", ondelete="CASCADE",
owner=model_field.through, owner=model_field.through,
) )
model_field.through.Meta.model_fields[child_name] = ForeignKey( model_field.through.Meta.model_fields[child_name] = ormar.ForeignKey(
model_field.owner, model_field.owner,
real_name=child_name, real_name=child_name,
ondelete="CASCADE", ondelete="CASCADE",
@ -52,7 +52,7 @@ def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> Non
def create_and_append_m2m_fk( def create_and_append_m2m_fk(
model: Type["Model"], model_field: Type[ManyToManyField], field_name: str model: Type["Model"], model_field: Type["ManyToManyField"], field_name: str
) -> None: ) -> None:
""" """
Registers sqlalchemy Column with sqlalchemy.ForeignKey leading to the model. Registers sqlalchemy Column with sqlalchemy.ForeignKey leading to the model.
@ -69,7 +69,7 @@ def create_and_append_m2m_fk(
pk_alias = model.get_column_alias(model.Meta.pkname) pk_alias = model.get_column_alias(model.Meta.pkname)
pk_column = next((col for col in model.Meta.columns if col.name == pk_alias), None) pk_column = next((col for col in model.Meta.columns if col.name == pk_alias), None)
if pk_column is None: # pragma: no cover if pk_column is None: # pragma: no cover
raise ModelDefinitionError( raise ormar.ModelDefinitionError(
"ManyToMany relation cannot lead to field without pk" "ManyToMany relation cannot lead to field without pk"
) )
column = sqlalchemy.Column( column = sqlalchemy.Column(
@ -88,7 +88,7 @@ def create_and_append_m2m_fk(
def check_pk_column_validity( def check_pk_column_validity(
field_name: str, field: BaseField, pkname: Optional[str] field_name: str, field: "BaseField", pkname: Optional[str]
) -> Optional[str]: ) -> Optional[str]:
""" """
Receives the field marked as primary key and verifies if the pkname Receives the field marked as primary key and verifies if the pkname
@ -106,9 +106,9 @@ def check_pk_column_validity(
:rtype: str :rtype: str
""" """
if pkname is not None: if pkname is not None:
raise ModelDefinitionError("Only one primary key column is allowed.") raise ormar.ModelDefinitionError("Only one primary key column is allowed.")
if field.pydantic_only: if field.pydantic_only:
raise ModelDefinitionError("Primary key column cannot be pydantic only") raise ormar.ModelDefinitionError("Primary key column cannot be pydantic only")
return field_name return field_name
@ -144,7 +144,7 @@ def sqlalchemy_columns_from_model_fields(
:rtype: Tuple[Optional[str], List[sqlalchemy.Column]] :rtype: Tuple[Optional[str], List[sqlalchemy.Column]]
""" """
if len(model_fields.keys()) == 0: if len(model_fields.keys()) == 0:
model_fields["id"] = Integer(name="id", primary_key=True) model_fields["id"] = ormar.Integer(name="id", primary_key=True)
logging.warning( logging.warning(
"Table {table_name} had no fields so auto " "Table {table_name} had no fields so auto "
"Integer primary key named `id` created." "Integer primary key named `id` created."
@ -159,7 +159,7 @@ def sqlalchemy_columns_from_model_fields(
if ( if (
not field.pydantic_only not field.pydantic_only
and not field.virtual and not field.virtual
and not issubclass(field, ManyToManyField) and not issubclass(field, ormar.ManyToManyField)
): ):
columns.append(field.get_column(field.get_alias())) columns.append(field.get_column(field.get_alias()))
return pkname, columns return pkname, columns
@ -201,7 +201,7 @@ def populate_meta_tablename_columns_and_pk(
) )
if pkname is None: if pkname is None:
raise ModelDefinitionError("Table has to have a primary key.") raise ormar.ModelDefinitionError("Table has to have a primary key.")
new_model.Meta.columns = columns new_model.Meta.columns = columns
new_model.Meta.pkname = pkname new_model.Meta.pkname = pkname
@ -248,7 +248,7 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
def update_column_definition( def update_column_definition(
model: Union[Type["Model"], Type["NewBaseModel"]], field: Type[ForeignKeyField] model: Union[Type["Model"], Type["NewBaseModel"]], field: Type["ForeignKeyField"]
) -> None: ) -> None:
""" """
Updates a column with a new type column based on updated parameters in FK fields. Updates a column with a new type column based on updated parameters in FK fields.

View File

@ -0,0 +1,161 @@
import datetime
import decimal
import uuid
from enum import Enum
from typing import Any, Dict, List, TYPE_CHECKING, Tuple, Type
try:
import orjson as json
except ImportError: # pragma: no cover
import json # type: ignore
import pydantic
from pydantic.main import SchemaExtraCallable
import ormar # noqa: I100, I202
from ormar.fields import BaseField
from ormar.models.helpers.models import meta_field_not_set
if TYPE_CHECKING: # pragma no cover
from ormar import Model
def check_if_field_has_choices(field: Type[BaseField]) -> bool:
"""
Checks if given field has choices populated.
A if it has one, a validator for this field needs to be attached.
:param field: ormar field to check
:type field: BaseField
:return: result of the check
:rtype: bool
"""
return hasattr(field, "choices") and bool(field.choices)
def convert_choices_if_needed( # noqa: CCR001
field: Type["BaseField"], value: Any
) -> Tuple[Any, List]:
"""
Converts dates to isoformat as fastapi can check this condition in routes
and the fields are not yet parsed.
Converts enums to list of it's values.
Converts uuids to strings.
Converts decimal to float with given scale.
:param field: ormar field to check with choices
:type field: Type[BaseField]
:param values: current values of the model to verify
:type values: Dict
:return: value, choices list
:rtype: Tuple[Any, List]
"""
choices = [o.value if isinstance(o, Enum) else o for o in field.choices]
if field.__type__ in [datetime.datetime, datetime.date, datetime.time]:
value = value.isoformat() if not isinstance(value, str) else value
choices = [o.isoformat() for o in field.choices]
elif field.__type__ == pydantic.Json:
value = json.dumps(value) if not isinstance(value, str) else value
value = value.decode("utf-8") if isinstance(value, bytes) else value
elif field.__type__ == uuid.UUID:
value = str(value) if not isinstance(value, str) else value
choices = [str(o) for o in field.choices]
elif field.__type__ == decimal.Decimal:
precision = field.scale # type: ignore
value = (
round(float(value), precision)
if isinstance(value, decimal.Decimal)
else value
)
choices = [round(float(o), precision) for o in choices]
return value, choices
def validate_choices(field: Type["BaseField"], value: Any) -> None:
"""
Validates if given value is in provided choices.
:raises ValueError: If value is not in choices.
:param field:field to validate
:type field: Type[BaseField]
:param value: value of the field
:type value: Any
"""
value, choices = convert_choices_if_needed(field=field, value=value)
if value is not ormar.Undefined and value not in choices:
raise ValueError(
f"{field.name}: '{value}' " f"not in allowed choices set:" f" {choices}"
)
def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, Any]:
"""
Validator that is attached to pydantic model pre root validators.
Validator checks if field value is in field.choices list.
:raises ValueError: if field value is outside of allowed choices.
:param cls: constructed class
:type cls: Model class
:param values: dictionary of field values (pydantic side)
:type values: Dict[str, Any]
:return: values if pass validation, otherwise exception is raised
:rtype: Dict[str, Any]
"""
for field_name, field in cls.Meta.model_fields.items():
if check_if_field_has_choices(field):
value = values.get(field_name, ormar.Undefined)
validate_choices(field=field, value=value)
return values
def construct_modify_schema_function(fields_with_choices: List) -> SchemaExtraCallable:
"""
Modifies the schema to include fields with choices validator.
Those fields will be displayed in schema as Enum types with available choices
values listed next to them.
:param fields_with_choices: list of fields with choices validation
:type fields_with_choices: List
:return: callable that will be run by pydantic to modify the schema
:rtype: Callable
"""
def schema_extra(schema: Dict[str, Any], model: Type["Model"]) -> None:
for field_id, prop in schema.get("properties", {}).items():
if field_id in fields_with_choices:
prop["enum"] = list(model.Meta.model_fields[field_id].choices)
prop["description"] = prop.get("description", "") + "An enumeration."
return staticmethod(schema_extra) # type: ignore
def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001
"""
Checks if Model has any fields with choices set.
If yes it adds choices validation into pre root validators.
:param model: newly constructed Model
:type model: Model class
"""
fields_with_choices = []
if not meta_field_not_set(model=model, field_name="model_fields"):
for name, field in model.Meta.model_fields.items():
if check_if_field_has_choices(field):
fields_with_choices.append(name)
validators = getattr(model, "__pre_root_validators__", [])
if choices_validator not in validators:
validators.append(choices_validator)
model.__pre_root_validators__ = validators
if not model._choices_fields:
model._choices_fields = set()
model._choices_fields.add(name)
if fields_with_choices:
model.Config.schema_extra = construct_modify_schema_function(
fields_with_choices=fields_with_choices
)

View File

@ -1,7 +1,3 @@
import datetime
import decimal
import uuid
from enum import Enum
from typing import ( from typing import (
Any, Any,
Dict, Dict,
@ -18,28 +14,29 @@ from typing import (
import databases import databases
import pydantic import pydantic
import sqlalchemy import sqlalchemy
from pydantic.main import SchemaExtraCallable
from sqlalchemy.sql.schema import ColumnCollectionConstraint from sqlalchemy.sql.schema import ColumnCollectionConstraint
import ormar # noqa I100 import ormar # noqa I100
from ormar import ForeignKey, Integer, ModelDefinitionError # noqa I100 from ormar import ModelDefinitionError # noqa I100
from ormar.fields import BaseField from ormar.fields import BaseField
from ormar.fields.foreign_key import ForeignKeyField from ormar.fields.foreign_key import ForeignKeyField
from ormar.fields.many_to_many import ManyToManyField from ormar.fields.many_to_many import ManyToManyField
from ormar.models.helpers import ( from ormar.models.helpers import (
alias_manager, alias_manager,
check_required_meta_parameters,
expand_reverse_relationships, expand_reverse_relationships,
extract_annotations_and_default_vals, extract_annotations_and_default_vals,
get_potential_fields, get_potential_fields,
get_pydantic_base_orm_config, get_pydantic_base_orm_config,
get_pydantic_field, get_pydantic_field,
meta_field_not_set,
populate_choices_validators,
populate_default_options_values, populate_default_options_values,
populate_meta_sqlalchemy_table_if_required, populate_meta_sqlalchemy_table_if_required,
populate_meta_tablename_columns_and_pk, populate_meta_tablename_columns_and_pk,
register_relation_in_alias_manager, register_relation_in_alias_manager,
sqlalchemy_columns_from_model_fields,
) )
from ormar.models.helpers.models import check_required_meta_parameters
from ormar.models.helpers.sqlalchemy import sqlalchemy_columns_from_model_fields
from ormar.models.quick_access_views import quick_access_set from ormar.models.quick_access_views import quick_access_set
from ormar.queryset import QuerySet from ormar.queryset import QuerySet
from ormar.relations.alias_manager import AliasManager from ormar.relations.alias_manager import AliasManager
@ -48,7 +45,6 @@ from ormar.signals import Signal, SignalEmitter
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
from ormar import Model from ormar import Model
PARSED_FIELDS_KEY = "__parsed_fields__"
CONFIG_KEY = "Config" CONFIG_KEY = "Config"
@ -76,119 +72,6 @@ class ModelMeta:
requires_ref_update: bool requires_ref_update: bool
def check_if_field_has_choices(field: Type[BaseField]) -> bool:
"""
Checks if given field has choices populated.
A if it has one, a validator for this field needs to be attached.
:param field: ormar field to check
:type field: BaseField
:return: result of the check
:rtype: bool
"""
return hasattr(field, "choices") and bool(field.choices)
def convert_choices_if_needed( # noqa: CCR001
field: Type["BaseField"], values: Dict
) -> Tuple[Any, List]:
"""
Converts dates to isoformat as fastapi can check this condition in routes
and the fields are not yet parsed.
Converts enums to list of it's values.
Converts uuids to strings.
Converts decimal to float with given scale.
:param field: ormar field to check with choices
:type field: Type[BaseField]
:param values: current values of the model to verify
:type values: Dict
:return: value, choices list
:rtype: Tuple[Any, List]
"""
value = values.get(field.name, ormar.Undefined)
choices = [o.value if isinstance(o, Enum) else o for o in field.choices]
if field.__type__ in [datetime.datetime, datetime.date, datetime.time]:
value = value.isoformat() if not isinstance(value, str) else value
choices = [o.isoformat() for o in field.choices]
elif field.__type__ == uuid.UUID:
value = str(value) if not isinstance(value, str) else value
choices = [str(o) for o in field.choices]
elif field.__type__ == decimal.Decimal:
precision = field.scale # type: ignore
value = (
round(float(value), precision)
if isinstance(value, decimal.Decimal)
else value
)
choices = [round(float(o), precision) for o in choices]
return value, choices
def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, Any]:
"""
Validator that is attached to pydantic model pre root validators.
Validator checks if field value is in field.choices list.
:raises ValueError: if field value is outside of allowed choices.
:param cls: constructed class
:type cls: Model class
:param values: dictionary of field values (pydantic side)
:type values: Dict[str, Any]
:return: values if pass validation, otherwise exception is raised
:rtype: Dict[str, Any]
"""
for field_name, field in cls.Meta.model_fields.items():
if check_if_field_has_choices(field):
value, choices = convert_choices_if_needed(field=field, values=values)
if value is not ormar.Undefined and value not in choices:
raise ValueError(
f"{field_name}: '{values.get(field_name)}' "
f"not in allowed choices set:"
f" {choices}"
)
return values
def construct_modify_schema_function(fields_with_choices: List) -> SchemaExtraCallable:
def schema_extra(schema: Dict[str, Any], model: Type["Model"]) -> None:
for field_id, prop in schema.get("properties", {}).items():
if field_id in fields_with_choices:
prop["enum"] = list(model.Meta.model_fields[field_id].choices)
prop["description"] = prop.get("description", "") + "An enumeration."
return staticmethod(schema_extra) # type: ignore
def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001
"""
Checks if Model has any fields with choices set.
If yes it adds choices validation into pre root validators.
:param model: newly constructed Model
:type model: Model class
"""
fields_with_choices = []
if not meta_field_not_set(model=model, field_name="model_fields"):
for name, field in model.Meta.model_fields.items():
if check_if_field_has_choices(field):
fields_with_choices.append(name)
validators = getattr(model, "__pre_root_validators__", [])
if choices_validator not in validators:
validators.append(choices_validator)
model.__pre_root_validators__ = validators
if fields_with_choices:
model.Config.schema_extra = construct_modify_schema_function(
fields_with_choices=fields_with_choices
)
def add_cached_properties(new_model: Type["Model"]) -> None: def add_cached_properties(new_model: Type["Model"]) -> None:
""" """
Sets cached properties for both pydantic and ormar models. Sets cached properties for both pydantic and ormar models.
@ -207,22 +90,7 @@ def add_cached_properties(new_model: Type["Model"]) -> None:
new_model._related_names = None new_model._related_names = None
new_model._related_fields = None new_model._related_fields = None
new_model._pydantic_fields = {name for name in new_model.__fields__} new_model._pydantic_fields = {name for name in new_model.__fields__}
new_model._choices_fields = set()
def meta_field_not_set(model: Type["Model"], field_name: str) -> bool:
"""
Checks if field with given name is already present in model.Meta.
Then check if it's set to something truthful
(in practice meaning not None, as it's non or ormar Field only).
:param model: newly constructed model
:type model: Model class
:param field_name: name of the ormar field
:type field_name: str
:return: result of the check
:rtype: bool
"""
return not hasattr(model.Meta, field_name) or not getattr(model.Meta, field_name)
def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001 def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001
@ -273,34 +141,81 @@ def register_signals(new_model: Type["Model"]) -> None: # noqa: CCR001
new_model.Meta.signals = signals new_model.Meta.signals = signals
def update_attrs_and_fields( class ModelMetaclass(pydantic.main.ModelMetaclass):
attrs: Dict, def __new__( # type: ignore # noqa: CCR001
new_attrs: Dict, mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict
model_fields: Dict, ) -> "ModelMetaclass":
new_model_fields: Dict,
new_fields: Set,
) -> Dict:
""" """
Updates __annotations__, values of model fields (so pydantic FieldInfos) Metaclass used by ormar Models that performs configuration
as well as model.Meta.model_fields definitions from parents. and build of ormar Models.
:param attrs: new namespace for class being constructed
Sets pydantic configuration.
Extract model_fields and convert them to pydantic FieldInfo,
updates class namespace.
Extracts settings and fields from parent classes.
Fetches methods decorated with @property_field decorator
to expose them later in dict().
Construct parent pydantic Metaclass/ Model.
If class has Meta class declared (so actual ormar Models) it also:
* populate sqlalchemy columns, pkname and tables from model_fields
* register reverse relationships on related models
* registers all relations in alias manager that populates table_prefixes
* exposes alias manager on each Model
* creates QuerySet for each model and exposes it on a class
:param name: name of current class
:type name: str
:param bases: base classes
:type bases: Tuple
:param attrs: class namespace
:type attrs: Dict :type attrs: Dict
:param new_attrs: related of the namespace extracted from parent class
:type new_attrs: Dict
:param model_fields: ormar fields in defined in current class
:type model_fields: Dict[str, BaseField]
:param new_model_fields: ormar fields defined in parent classes
:type new_model_fields: Dict[str, BaseField]
:param new_fields: set of new fields names
:type new_fields: Set[str]
""" """
key = "__annotations__" attrs["Config"] = get_pydantic_base_orm_config()
attrs[key].update(new_attrs[key]) attrs["__name__"] = name
attrs.update({name: new_attrs[name] for name in new_fields}) attrs, model_fields = extract_annotations_and_default_vals(attrs)
updated_model_fields = {k: v for k, v in new_model_fields.items()} for base in reversed(bases):
updated_model_fields.update(model_fields) mod = base.__module__
return updated_model_fields if mod.startswith("ormar.models.") or mod.startswith("pydantic."):
continue
attrs, model_fields = extract_from_parents_definition(
base_class=base, curr_class=mcs, attrs=attrs, model_fields=model_fields
)
new_model = super().__new__( # type: ignore
mcs, name, bases, attrs
)
add_cached_properties(new_model)
if hasattr(new_model, "Meta"):
populate_default_options_values(new_model, model_fields)
check_required_meta_parameters(new_model)
add_property_fields(new_model, attrs)
register_signals(new_model=new_model)
populate_choices_validators(new_model)
if not new_model.Meta.abstract:
new_model = populate_meta_tablename_columns_and_pk(name, new_model)
populate_meta_sqlalchemy_table_if_required(new_model.Meta)
expand_reverse_relationships(new_model)
for field in new_model.Meta.model_fields.values():
register_relation_in_alias_manager(field=field)
if new_model.Meta.pkname not in attrs["__annotations__"]:
field_name = new_model.Meta.pkname
attrs["__annotations__"][field_name] = Optional[int] # type: ignore
attrs[field_name] = None
new_model.__fields__[field_name] = get_pydantic_field(
field_name=field_name, model=new_model
)
new_model.Meta.alias_manager = alias_manager
new_model.objects = QuerySet(new_model)
return new_model
def verify_constraint_names( def verify_constraint_names(
@ -594,78 +509,34 @@ def extract_from_parents_definition( # noqa: CCR001
return attrs, model_fields return attrs, model_fields
class ModelMetaclass(pydantic.main.ModelMetaclass): def update_attrs_and_fields(
def __new__( # type: ignore # noqa: CCR001 attrs: Dict,
mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict new_attrs: Dict,
) -> "ModelMetaclass": model_fields: Dict,
new_model_fields: Dict,
new_fields: Set,
) -> Dict:
""" """
Metaclass used by ormar Models that performs configuration Updates __annotations__, values of model fields (so pydantic FieldInfos)
and build of ormar Models. as well as model.Meta.model_fields definitions from parents.
:param attrs: new namespace for class being constructed
Sets pydantic configuration.
Extract model_fields and convert them to pydantic FieldInfo,
updates class namespace.
Extracts settings and fields from parent classes.
Fetches methods decorated with @property_field decorator
to expose them later in dict().
Construct parent pydantic Metaclass/ Model.
If class has Meta class declared (so actual ormar Models) it also:
* populate sqlalchemy columns, pkname and tables from model_fields
* register reverse relationships on related models
* registers all relations in alias manager that populates table_prefixes
* exposes alias manager on each Model
* creates QuerySet for each model and exposes it on a class
:param name: name of current class
:type name: str
:param bases: base classes
:type bases: Tuple
:param attrs: class namespace
:type attrs: Dict :type attrs: Dict
:param new_attrs: related of the namespace extracted from parent class
:type new_attrs: Dict
:param model_fields: ormar fields in defined in current class
:type model_fields: Dict[str, BaseField]
:param new_model_fields: ormar fields defined in parent classes
:type new_model_fields: Dict[str, BaseField]
:param new_fields: set of new fields names
:type new_fields: Set[str]
""" """
attrs["Config"] = get_pydantic_base_orm_config() key = "__annotations__"
attrs["__name__"] = name attrs[key].update(new_attrs[key])
attrs, model_fields = extract_annotations_and_default_vals(attrs) attrs.update({name: new_attrs[name] for name in new_fields})
for base in reversed(bases): updated_model_fields = {k: v for k, v in new_model_fields.items()}
mod = base.__module__ updated_model_fields.update(model_fields)
if mod.startswith("ormar.models.") or mod.startswith("pydantic."): return updated_model_fields
continue
attrs, model_fields = extract_from_parents_definition(
base_class=base, curr_class=mcs, attrs=attrs, model_fields=model_fields
)
new_model = super().__new__( # type: ignore
mcs, name, bases, attrs
)
add_cached_properties(new_model)
if hasattr(new_model, "Meta"): PARSED_FIELDS_KEY = "__parsed_fields__"
populate_default_options_values(new_model, model_fields)
check_required_meta_parameters(new_model)
add_property_fields(new_model, attrs)
register_signals(new_model=new_model)
populate_choices_validators(new_model)
if not new_model.Meta.abstract:
new_model = populate_meta_tablename_columns_and_pk(name, new_model)
populate_meta_sqlalchemy_table_if_required(new_model.Meta)
expand_reverse_relationships(new_model)
for field in new_model.Meta.model_fields.values():
register_relation_in_alias_manager(field=field)
if new_model.Meta.pkname not in attrs["__annotations__"]:
field_name = new_model.Meta.pkname
attrs["__annotations__"][field_name] = Optional[int] # type: ignore
attrs[field_name] = None
new_model.__fields__[field_name] = get_pydantic_field(
field_name=field_name, model=new_model
)
new_model.Meta.alias_manager = alias_manager
new_model.objects = QuerySet(new_model)
return new_model

View File

@ -1,7 +1,8 @@
from typing import Dict from typing import Dict, Optional, Set, TYPE_CHECKING
import ormar import ormar
from ormar.exceptions import ModelPersistenceError from ormar.exceptions import ModelPersistenceError
from ormar.models.helpers.validation import validate_choices
from ormar.models.mixins import AliasMixin from ormar.models.mixins import AliasMixin
from ormar.models.mixins.relation_mixin import RelationMixin from ormar.models.mixins.relation_mixin import RelationMixin
@ -11,6 +12,9 @@ class SavePrepareMixin(RelationMixin, AliasMixin):
Used to prepare models to be saved in database Used to prepare models to be saved in database
""" """
if TYPE_CHECKING: # pragma: nocover
_choices_fields: Optional[Set]
@classmethod @classmethod
def prepare_model_to_save(cls, new_kwargs: dict) -> dict: def prepare_model_to_save(cls, new_kwargs: dict) -> dict:
""" """
@ -109,3 +113,22 @@ class SavePrepareMixin(RelationMixin, AliasMixin):
if field.server_default is not None and not new_kwargs.get(field_name): if field.server_default is not None and not new_kwargs.get(field_name):
new_kwargs.pop(field_name, None) new_kwargs.pop(field_name, None)
return new_kwargs return new_kwargs
@classmethod
def validate_choices(cls, new_kwargs: Dict) -> Dict:
"""
Receives dictionary of model that is about to be saved and validates the
fields with choices set to see if the value is allowed.
:param new_kwargs: dictionary of model that is about to be saved
:type new_kwargs: Dict
:return: dictionary of model that is about to be saved
:rtype: Dict
"""
if not cls._choices_fields:
return new_kwargs
for field_name, field in cls.Meta.model_fields.items():
if field_name in new_kwargs and field_name in cls._choices_fields:
validate_choices(field=field, value=new_kwargs.get(field_name))
return new_kwargs

View File

@ -23,7 +23,6 @@ try:
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
import json # type: ignore import json # type: ignore
import databases import databases
import pydantic import pydantic
import sqlalchemy import sqlalchemy
@ -39,6 +38,7 @@ from ormar.models.helpers.sqlalchemy import (
populate_meta_sqlalchemy_table_if_required, populate_meta_sqlalchemy_table_if_required,
update_column_definition, update_column_definition,
) )
from ormar.models.helpers.validation import validate_choices
from ormar.models.metaclass import ModelMeta, ModelMetaclass from ormar.models.metaclass import ModelMeta, ModelMetaclass
from ormar.models.modelproxy import ModelTableProxy from ormar.models.modelproxy import ModelTableProxy
from ormar.queryset.utils import translate_list_to_dict from ormar.queryset.utils import translate_list_to_dict
@ -83,6 +83,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
_orm_saved: bool _orm_saved: bool
_related_names: Optional[Set] _related_names: Optional[Set]
_related_names_hash: str _related_names_hash: str
_choices_fields: Optional[Set]
_pydantic_fields: Set _pydantic_fields: Set
_quick_access_fields: Set _quick_access_fields: Set
Meta: ModelMeta Meta: ModelMeta
@ -208,23 +209,22 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
elif name == "pk": elif name == "pk":
object.__setattr__(self, self.Meta.pkname, value) object.__setattr__(self, self.Meta.pkname, value)
self.set_save_status(False) self.set_save_status(False)
elif name in self._orm: elif name in object.__getattribute__(self, "_orm"):
model = self.Meta.model_fields[name].expand_relationship( model = (
value=value, child=self object.__getattribute__(self, "Meta")
.model_fields[name]
.expand_relationship(value=value, child=self)
) )
if isinstance(self.__dict__.get(name), list): if isinstance(object.__getattribute__(self, "__dict__").get(name), list):
# virtual foreign key or many to many # virtual foreign key or many to many
self.__dict__[name].append(model) object.__getattribute__(self, "__dict__")[name].append(model)
else: else:
# foreign key relation # foreign key relation
self.__dict__[name] = model object.__getattribute__(self, "__dict__")[name] = model
self.set_save_status(False) self.set_save_status(False)
else: else:
value = ( if name in object.__getattribute__(self, "_choices_fields"):
self._convert_json(name, value, "dumps") validate_choices(field=self.Meta.model_fields[name], value=value)
if name in self.__fields__
else value
)
super().__setattr__(name, value) super().__setattr__(name, value)
self.set_save_status(False) self.set_save_status(False)

View File

@ -16,6 +16,7 @@ quick_access_set = {
"__pre_root_validators__", "__pre_root_validators__",
"__same__", "__same__",
"_calculate_keys", "_calculate_keys",
"_choices_fields",
"_convert_json", "_convert_json",
"_extract_db_related_names", "_extract_db_related_names",
"_extract_model_db_fields", "_extract_model_db_fields",

View File

@ -555,6 +555,7 @@ class QuerySet:
self.model.extract_related_names() self.model.extract_related_names()
) )
updates = {k: v for k, v in kwargs.items() if k in self_fields} updates = {k: v for k, v in kwargs.items() if k in self_fields}
updates = self.model.validate_choices(updates)
updates = self.model.translate_columns_to_aliases(updates) updates = self.model.translate_columns_to_aliases(updates)
if not each and not self.filter_clauses: if not each and not self.filter_clauses:
raise QueryDefinitionError( raise QueryDefinitionError(

View File

@ -64,7 +64,7 @@ class Relation:
self._to_remove: Set = set() self._to_remove: Set = set()
self.to: Type["T"] = to self.to: Type["T"] = to
self._through: Optional[Type["T"]] = through self._through: Optional[Type["T"]] = through
self.field_name = field_name self.field_name: str = field_name
self.related_models: Optional[Union[RelationProxy, "T"]] = ( self.related_models: Optional[Union[RelationProxy, "T"]] = (
RelationProxy(relation=self, type_=type_, field_name=field_name) RelationProxy(relation=self, type_=type_, field_name=field_name)
if type_ in (RelationType.REVERSE, RelationType.MULTIPLE) if type_ in (RelationType.REVERSE, RelationType.MULTIPLE)

View File

@ -0,0 +1,181 @@
# type: ignore
import uuid
from typing import List
import databases
import pydantic
import pytest
import sqlalchemy
from fastapi import FastAPI
from starlette.testclient import TestClient
import ormar
from tests.settings import DATABASE_URL
app = FastAPI()
database = databases.Database(DATABASE_URL, force_rollback=True)
metadata = sqlalchemy.MetaData()
app.state.database = database
@app.on_event("startup")
async def startup() -> None:
database_ = app.state.database
if not database_.is_connected:
await database_.connect()
@app.on_event("shutdown")
async def shutdown() -> None:
database_ = app.state.database
if database_.is_connected:
await database_.disconnect()
class BaseMeta(ormar.ModelMeta):
metadata = metadata
database = database
class Thing(ormar.Model):
class Meta(BaseMeta):
tablename = "things"
id: uuid.UUID = ormar.UUID(primary_key=True, default=uuid.uuid4)
name: str = ormar.Text(default="")
js: pydantic.Json = ormar.JSON()
@app.get("/things", response_model=List[Thing])
async def read_things():
return await Thing.objects.order_by("name").all()
@app.get("/things_with_sample", response_model=List[Thing])
async def read_things_sample():
await Thing(name="b", js=["asdf", "asdf", "bobby", "nigel"]).save()
await Thing(name="a", js='["lemon", "raspberry", "lime", "pumice"]').save()
return await Thing.objects.order_by("name").all()
@app.get("/things_with_sample_after_init", response_model=Thing)
async def read_things_init():
thing1 = Thing(js="{}")
thing1.name = "d"
thing1.js = ["js", "set", "after", "constructor"]
await thing1.save()
return thing1
@app.put("/update_thing", response_model=Thing)
async def update_things(thing: Thing):
thing.js = ["js", "set", "after", "update"] # type: ignore
await thing.update()
return thing
@app.post("/things", response_model=Thing)
async def create_things(thing: Thing):
thing = await thing.save()
return thing
@app.get("/things_untyped")
async def read_things_untyped():
return await Thing.objects.order_by("name").all()
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.mark.asyncio
async def test_json_is_required_if_not_nullable():
with pytest.raises(pydantic.ValidationError):
Thing()
@pytest.mark.asyncio
async def test_json_is_not_required_if_nullable():
class Thing2(ormar.Model):
class Meta(BaseMeta):
tablename = "things2"
id: uuid.UUID = ormar.UUID(primary_key=True, default=uuid.uuid4)
name: str = ormar.Text(default="")
js: pydantic.Json = ormar.JSON(nullable=True)
Thing2()
@pytest.mark.asyncio
async def test_setting_values_after_init():
async with database:
t1 = Thing(id="67a82813-d90c-45ff-b546-b4e38d7030d7", name="t1", js=["thing1"])
assert '["thing1"]' in t1.json()
await t1.save()
t1.json()
assert '["thing1"]' in t1.json()
assert '["thing1"]' in (await Thing.objects.get(id=t1.id)).json()
await t1.update()
assert '["thing1"]' in (await Thing.objects.get(id=t1.id)).json()
def test_read_main():
client = TestClient(app)
with client as client:
response = client.get("/things_with_sample")
assert response.status_code == 200
# check if raw response not double encoded
assert '["lemon","raspberry","lime","pumice"]' in response.text
# parse json and check that we get lists not strings
resp = response.json()
assert resp[0].get("js") == ["lemon", "raspberry", "lime", "pumice"]
assert resp[1].get("js") == ["asdf", "asdf", "bobby", "nigel"]
# create a new one
response = client.post("/things", json={"js": ["test", "test2"], "name": "c"})
assert response.json().get("js") == ["test", "test2"]
# get all with new one
response = client.get("/things")
assert response.status_code == 200
assert '["test","test2"]' in response.text
resp = response.json()
assert resp[0].get("js") == ["lemon", "raspberry", "lime", "pumice"]
assert resp[1].get("js") == ["asdf", "asdf", "bobby", "nigel"]
assert resp[2].get("js") == ["test", "test2"]
response = client.get("/things_with_sample_after_init")
assert response.status_code == 200
resp = response.json()
assert resp.get("js") == ["js", "set", "after", "constructor"]
# test new with after constructor
response = client.get("/things")
resp = response.json()
assert resp[0].get("js") == ["lemon", "raspberry", "lime", "pumice"]
assert resp[1].get("js") == ["asdf", "asdf", "bobby", "nigel"]
assert resp[2].get("js") == ["test", "test2"]
assert resp[3].get("js") == ["js", "set", "after", "constructor"]
response = client.put("/update_thing", json=resp[3])
assert response.status_code == 200
resp = response.json()
assert resp.get("js") == ["js", "set", "after", "update"]
# test new with after constructor
response = client.get("/things_untyped")
resp = response.json()
assert resp[0].get("js") == ["lemon", "raspberry", "lime", "pumice"]
assert resp[1].get("js") == ["asdf", "asdf", "bobby", "nigel"]
assert resp[2].get("js") == ["test", "test2"]
assert resp[3].get("js") == ["js", "set", "after", "update"]

View File

@ -0,0 +1,66 @@
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL, force_rollback=True)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
database = database
metadata = metadata
class Category(ormar.Model):
class Meta(BaseMeta):
tablename = "categories"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=40)
class PostCategory(ormar.Model):
class Meta(BaseMeta):
tablename = "posts_x_categories"
id: int = ormar.Integer(primary_key=True)
sort_order: int = ormar.Integer(nullable=True)
class Post(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
title: str = ormar.String(max_length=200)
categories = ormar.ManyToMany(Category, through=PostCategory)
#
# @pytest.fixture(autouse=True, scope="module")
# async def create_test_database():
# engine = sqlalchemy.create_engine(DATABASE_URL)
# metadata.create_all(engine)
# yield
# metadata.drop_all(engine)
#
#
# @pytest.mark.asyncio
# async def test_setting_fields_on_through_model():
# async with database:
# # TODO: check/ modify following
# # loading the data into model instance of though model?
# # <- attach to other side? both sides? access by through, or add to fields?
# # creating while adding to relation (kwargs in add?)
# # creating in query (dividing kwargs between final and through)
# # updating in query
# # sorting in filter (special __through__<field_name> notation?)
# # ordering by in order_by
# # accessing from instance (both sides?)
# # modifying from instance (both sides?)
# # including/excluding in fields?
# # allowing to change fk fields names in through model?
# pass

View File

@ -81,7 +81,7 @@ class Product(ormar.Model):
last_delivery: datetime.date = ormar.Date(default=datetime.datetime.now) last_delivery: datetime.date = ormar.Date(default=datetime.datetime.now)
country_name_choices = ("Canada", "Algeria", "United States") country_name_choices = ("Canada", "Algeria", "United States", "Belize")
country_taxed_choices = (True,) country_taxed_choices = (True,)
country_country_code_choices = (-10, 1, 213, 1200) country_country_code_choices = (-10, 1, 213, 1200)
@ -449,6 +449,32 @@ async def test_model_choices():
name=name, taxed=taxed, country_code=country_code name=name, taxed=taxed, country_code=country_code
) )
# test setting after init also triggers validation
with pytest.raises(ValueError):
name, taxed, country_code = "Algeria", True, 967
check_choices((name, taxed, country_code), ["in", "in", "out"])
country = Country()
country.country_code = country_code
with pytest.raises(ValueError):
name, taxed, country_code = "Saudi Arabia", True, 1
check_choices((name, taxed, country_code), ["out", "in", "in"])
country = Country()
country.name = name
with pytest.raises(ValueError):
name, taxed, country_code = "Algeria", False, 1
check_choices((name, taxed, country_code), ["in", "out", "in"])
country = Country()
country.taxed = taxed
# check also update from queryset
with pytest.raises(ValueError):
name, taxed, country_code = "Algeria", False, 1
check_choices((name, taxed, country_code), ["in", "out", "in"])
await Country(name="Belize").save()
await Country.objects.filter(name="Belize").update(name="Vietnam")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_and_end_filters(): async def test_start_and_end_filters():