diff --git a/docs/releases.md b/docs/releases.md index 1cd5407..4b5a94e 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -1,3 +1,13 @@ +# 0.9.3 + +## Fixes +* Fix `JSON` field being double escaped when setting value after initialization +* Fix `JSON` field not respecting `nullable` field setting due to `pydantic` internals +* Fix `choices` verification for `JSON` field +* Fix `choices` not being verified when setting the attribute after initialization +* Fix `choices` not being verified during `update` call from `QuerySet` + + # 0.9.2 ## Other diff --git a/mkdocs.yml b/mkdocs.yml index c6379f1..2bdafce 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -43,6 +43,8 @@ nav: - api/models/helpers/pydantic.md - api/models/helpers/relations.md - api/models/helpers/sqlalchemy.md + - api/models/helpers/validation.md + - api/models/helpers/related-names-validation.md - Mixins: - Alias Mixin: api/models/mixins/alias-mixin.md - Excludable Mixin: api/models/mixins/excludable-mixin.md diff --git a/ormar/__init__.py b/ormar/__init__.py index 147b40b..4ef50dd 100644 --- a/ormar/__init__.py +++ b/ormar/__init__.py @@ -19,7 +19,8 @@ snakes, and ormar(e) in italian which means cabinet. And what's a better name for python ORM than snakes cabinet :) """ -from ormar.decorators import ( +from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100 +from ormar.decorators import ( # noqa: I100 post_delete, post_save, post_update, @@ -28,13 +29,13 @@ from ormar.decorators import ( pre_update, property_field, ) -from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100 from ormar.exceptions import ( # noqa: I100 ModelDefinitionError, MultipleMatches, NoMatch, ) -from ormar.fields import ( # noqa: I100 +from ormar.fields import ( + BaseField, BigInteger, Boolean, Date, @@ -42,15 +43,17 @@ from ormar.fields import ( # noqa: I100 Decimal, Float, ForeignKey, + ForeignKeyField, Integer, JSON, ManyToMany, + ManyToManyField, String, Text, Time, UUID, UniqueColumns, -) +) # noqa: I100 from ormar.models import Model from ormar.models.metaclass import ModelMeta from ormar.queryset import QuerySet @@ -65,7 +68,7 @@ class UndefinedType: # pragma no cover Undefined = UndefinedType() -__version__ = "0.9.2" +__version__ = "0.9.3" __all__ = [ "Integer", "BigInteger", @@ -100,4 +103,7 @@ __all__ = [ "pre_save", "pre_update", "Signal", + "BaseField", + "ManyToManyField", + "ForeignKeyField", ] diff --git a/ormar/fields/__init__.py b/ormar/fields/__init__.py index 681cb40..2e4a013 100644 --- a/ormar/fields/__init__.py +++ b/ormar/fields/__init__.py @@ -5,7 +5,7 @@ as well as relation Fields (ForeignKey, ManyToMany). Also a definition for custom CHAR based sqlalchemy UUID field """ from ormar.fields.base import BaseField -from ormar.fields.foreign_key import ForeignKey, UniqueColumns +from ormar.fields.foreign_key import ForeignKey, ForeignKeyField, UniqueColumns from ormar.fields.many_to_many import ManyToMany, ManyToManyField from ormar.fields.model_fields import ( BigInteger, @@ -40,4 +40,5 @@ __all__ = [ "ManyToManyField", "BaseField", "UniqueColumns", + "ForeignKeyField", ] diff --git a/ormar/fields/base.py b/ormar/fields/base.py index d6c115d..08308d8 100644 --- a/ormar/fields/base.py +++ b/ormar/fields/base.py @@ -1,12 +1,10 @@ from typing import Any, List, Optional, TYPE_CHECKING, Type, Union -import pydantic import sqlalchemy -from pydantic import Field, typing -from pydantic.fields import FieldInfo +from pydantic import Field, Json, typing +from pydantic.fields import FieldInfo, Required, Undefined import ormar # noqa I101 -from ormar import ModelDefinitionError # noqa I101 if TYPE_CHECKING: # pragma no cover from ormar.models import Model @@ -96,6 +94,30 @@ class BaseField(FieldInfo): and hasattr(cls, field_name) ) + @classmethod + def get_base_pydantic_field_info(cls, allow_null: bool) -> FieldInfo: + """ + Generates base pydantic.FieldInfo with only default and optionally + required to fix pydantic Json field being set to required=False. + Used in an ormar Model Metaclass. + + :param allow_null: flag if the default value can be None + or if it should be populated by pydantic Undefined + :type allow_null: bool + :return: instance of base pydantic.FieldInfo + :rtype: pydantic.FieldInfo + """ + base = cls.default_value() + if base is None: + base = ( + FieldInfo(default=None) + if (cls.nullable or allow_null) + else FieldInfo(default=Undefined) + ) + if cls.__type__ == Json and base.default is Undefined: + base.default = Required + return base + @classmethod def convert_to_pydantic_field_info(cls, allow_null: bool = False) -> FieldInfo: """ @@ -109,13 +131,7 @@ class BaseField(FieldInfo): :return: actual instance of pydantic.FieldInfo with all needed fields populated :rtype: pydantic.FieldInfo """ - base = cls.default_value() - if base is None: - base = ( - FieldInfo(default=None) - if (cls.nullable or allow_null) - else FieldInfo(default=pydantic.fields.Undefined) - ) + base = cls.get_base_pydantic_field_info(allow_null=allow_null) for attr_name in FieldInfo.__dict__.keys(): if cls.is_valid_field_info_field(attr_name): setattr(base, attr_name, cls.__dict__.get(attr_name)) diff --git a/ormar/fields/foreign_key.py b/ormar/fields/foreign_key.py index 3ed222b..30602ae 100644 --- a/ormar/fields/foreign_key.py +++ b/ormar/fields/foreign_key.py @@ -222,7 +222,7 @@ class ForeignKeyField(BaseField): to: Type["Model"] name: str - related_name: str + related_name: str # type: ignore virtual: bool ondelete: str onupdate: str diff --git a/ormar/models/helpers/__init__.py b/ormar/models/helpers/__init__.py index ddc3987..af84d9e 100644 --- a/ormar/models/helpers/__init__.py +++ b/ormar/models/helpers/__init__.py @@ -1,5 +1,7 @@ from ormar.models.helpers.models import ( + check_required_meta_parameters, extract_annotations_and_default_vals, + meta_field_not_set, populate_default_options_values, ) from ormar.models.helpers.pydantic import ( @@ -15,7 +17,9 @@ from ormar.models.helpers.relations import expand_reverse_relationships from ormar.models.helpers.sqlalchemy import ( populate_meta_sqlalchemy_table_if_required, populate_meta_tablename_columns_and_pk, + sqlalchemy_columns_from_model_fields, ) +from ormar.models.helpers.validation import populate_choices_validators __all__ = [ "expand_reverse_relationships", @@ -28,4 +32,8 @@ __all__ = [ "get_pydantic_field", "get_potential_fields", "get_pydantic_base_orm_config", + "check_required_meta_parameters", + "sqlalchemy_columns_from_model_fields", + "populate_choices_validators", + "meta_field_not_set", ] diff --git a/ormar/models/helpers/models.py b/ormar/models/helpers/models.py index 8843bcc..449a920 100644 --- a/ormar/models/helpers/models.py +++ b/ormar/models/helpers/models.py @@ -1,10 +1,9 @@ import itertools import sqlite3 -from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type +from typing import Any, Dict, List, TYPE_CHECKING, Tuple, Type from pydantic.typing import ForwardRef import ormar # noqa: I100 -from ormar.fields.foreign_key import ForeignKeyField from ormar.models.helpers.pydantic import populate_pydantic_default_values if TYPE_CHECKING: # pragma no cover @@ -22,7 +21,7 @@ def is_field_an_forward_ref(field: Type["BaseField"]) -> bool: :return: result of the check :rtype: bool """ - return issubclass(field, ForeignKeyField) and ( + return issubclass(field, ormar.ForeignKeyField) and ( field.to.__class__ == ForwardRef or field.through.__class__ == ForwardRef ) @@ -124,43 +123,6 @@ def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]: return attrs, model_fields -# cannot be in relations helpers due to cyclical import -def validate_related_names_in_relations( # noqa CCR001 - model_fields: Dict, new_model: Type["Model"] -) -> None: - """ - Performs a validation of relation_names in relation fields. - If multiple fields are leading to the same related model - only one can have empty related_name param - (populated by default as model.name.lower()+'s'). - Also related_names have to be unique for given related model. - - :raises ModelDefinitionError: if validation of related_names fail - :param model_fields: dictionary of declared ormar model fields - :type model_fields: Dict[str, ormar.Field] - :param new_model: - :type new_model: Model class - """ - already_registered: Dict[str, List[Optional[str]]] = dict() - for field in model_fields.values(): - if issubclass(field, ForeignKeyField): - to_name = ( - field.to.get_name() - if not field.to.__class__ == ForwardRef - else str(field.to) - ) - previous_related_names = already_registered.setdefault(to_name, []) - if field.related_name in previous_related_names: - raise ormar.ModelDefinitionError( - f"Multiple fields declared on {new_model.get_name(lower=False)} " - f"model leading to {field.to.get_name(lower=False)} model without " - f"related_name property set. \nThere can be only one relation with " - f"default/empty name: '{new_model.get_name() + 's'}'" - f"\nTip: provide different related_name for FK and/or M2M fields" - ) - previous_related_names.append(field.related_name) - - def group_related_list(list_: List) -> Dict: """ Translates the list of related strings into a dictionary. @@ -191,3 +153,19 @@ def group_related_list(list_: List) -> Dict: else: result_dict.setdefault(key, []).extend(new) return {k: v for k, v in sorted(result_dict.items(), key=lambda item: len(item[1]))} + + +def meta_field_not_set(model: Type["Model"], field_name: str) -> bool: + """ + Checks if field with given name is already present in model.Meta. + Then check if it's set to something truthful + (in practice meaning not None, as it's non or ormar Field only). + + :param model: newly constructed model + :type model: Model class + :param field_name: name of the ormar field + :type field_name: str + :return: result of the check + :rtype: bool + """ + return not hasattr(model.Meta, field_name) or not getattr(model.Meta, field_name) diff --git a/ormar/models/helpers/pydantic.py b/ormar/models/helpers/pydantic.py index 69c0496..6a8f8a5 100644 --- a/ormar/models/helpers/pydantic.py +++ b/ormar/models/helpers/pydantic.py @@ -1,7 +1,7 @@ import warnings from typing import Dict, Optional, TYPE_CHECKING, Tuple, Type -from pydantic import BaseConfig +import pydantic from pydantic.fields import ModelField from pydantic.utils import lenient_issubclass @@ -132,7 +132,7 @@ def populate_pydantic_default_values(attrs: Dict) -> Tuple[Dict, Dict]: return attrs, model_fields -def get_pydantic_base_orm_config() -> Type[BaseConfig]: +def get_pydantic_base_orm_config() -> Type[pydantic.BaseConfig]: """ Returns empty pydantic Config with orm_mode set to True. @@ -140,7 +140,7 @@ def get_pydantic_base_orm_config() -> Type[BaseConfig]: :rtype: pydantic Config """ - class Config(BaseConfig): + class Config(pydantic.BaseConfig): orm_mode = True return Config diff --git a/ormar/models/helpers/related_names_validation.py b/ormar/models/helpers/related_names_validation.py new file mode 100644 index 0000000..8bc32c1 --- /dev/null +++ b/ormar/models/helpers/related_names_validation.py @@ -0,0 +1,43 @@ +from typing import Dict, List, Optional, TYPE_CHECKING, Type + +from pydantic.typing import ForwardRef +import ormar # noqa: I100 + +if TYPE_CHECKING: # pragma no cover + from ormar import Model + + +def validate_related_names_in_relations( # noqa CCR001 + model_fields: Dict, new_model: Type["Model"] +) -> None: + """ + Performs a validation of relation_names in relation fields. + If multiple fields are leading to the same related model + only one can have empty related_name param + (populated by default as model.name.lower()+'s'). + Also related_names have to be unique for given related model. + + :raises ModelDefinitionError: if validation of related_names fail + :param model_fields: dictionary of declared ormar model fields + :type model_fields: Dict[str, ormar.Field] + :param new_model: + :type new_model: Model class + """ + already_registered: Dict[str, List[Optional[str]]] = dict() + for field in model_fields.values(): + if issubclass(field, ormar.ForeignKeyField): + to_name = ( + field.to.get_name() + if not field.to.__class__ == ForwardRef + else str(field.to) + ) + previous_related_names = already_registered.setdefault(to_name, []) + if field.related_name in previous_related_names: + raise ormar.ModelDefinitionError( + f"Multiple fields declared on {new_model.get_name(lower=False)} " + f"model leading to {field.to.get_name(lower=False)} model without " + f"related_name property set. \nThere can be only one relation with " + f"default/empty name: '{new_model.get_name() + 's'}'" + f"\nTip: provide different related_name for FK and/or M2M fields" + ) + previous_related_names.append(field.related_name) diff --git a/ormar/models/helpers/sqlalchemy.py b/ormar/models/helpers/sqlalchemy.py index ed4a605..a17e786 100644 --- a/ormar/models/helpers/sqlalchemy.py +++ b/ormar/models/helpers/sqlalchemy.py @@ -3,18 +3,18 @@ from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union import sqlalchemy -from ormar import ForeignKey, Integer, ModelDefinitionError # noqa: I202 -from ormar.fields import BaseField, ManyToManyField -from ormar.fields.foreign_key import ForeignKeyField -from ormar.models.helpers.models import validate_related_names_in_relations +import ormar # noqa: I100, I202 from ormar.models.helpers.pydantic import create_pydantic_field +from ormar.models.helpers.related_names_validation import ( + validate_related_names_in_relations, +) if TYPE_CHECKING: # pragma no cover - from ormar import Model, ModelMeta + from ormar import Model, ModelMeta, ManyToManyField, BaseField, ForeignKeyField from ormar.models import NewBaseModel -def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> None: +def adjust_through_many_to_many_model(model_field: Type["ManyToManyField"]) -> None: """ Registers m2m relation on through model. Sets ormar.ForeignKey from through model to both child and parent models. @@ -27,13 +27,13 @@ def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> Non parent_name = model_field.default_target_field_name() child_name = model_field.default_source_field_name() - model_field.through.Meta.model_fields[parent_name] = ForeignKey( + model_field.through.Meta.model_fields[parent_name] = ormar.ForeignKey( model_field.to, real_name=parent_name, ondelete="CASCADE", owner=model_field.through, ) - model_field.through.Meta.model_fields[child_name] = ForeignKey( + model_field.through.Meta.model_fields[child_name] = ormar.ForeignKey( model_field.owner, real_name=child_name, ondelete="CASCADE", @@ -52,7 +52,7 @@ def adjust_through_many_to_many_model(model_field: Type[ManyToManyField]) -> Non def create_and_append_m2m_fk( - model: Type["Model"], model_field: Type[ManyToManyField], field_name: str + model: Type["Model"], model_field: Type["ManyToManyField"], field_name: str ) -> None: """ Registers sqlalchemy Column with sqlalchemy.ForeignKey leading to the model. @@ -69,7 +69,7 @@ def create_and_append_m2m_fk( pk_alias = model.get_column_alias(model.Meta.pkname) pk_column = next((col for col in model.Meta.columns if col.name == pk_alias), None) if pk_column is None: # pragma: no cover - raise ModelDefinitionError( + raise ormar.ModelDefinitionError( "ManyToMany relation cannot lead to field without pk" ) column = sqlalchemy.Column( @@ -88,7 +88,7 @@ def create_and_append_m2m_fk( def check_pk_column_validity( - field_name: str, field: BaseField, pkname: Optional[str] + field_name: str, field: "BaseField", pkname: Optional[str] ) -> Optional[str]: """ Receives the field marked as primary key and verifies if the pkname @@ -106,9 +106,9 @@ def check_pk_column_validity( :rtype: str """ if pkname is not None: - raise ModelDefinitionError("Only one primary key column is allowed.") + raise ormar.ModelDefinitionError("Only one primary key column is allowed.") if field.pydantic_only: - raise ModelDefinitionError("Primary key column cannot be pydantic only") + raise ormar.ModelDefinitionError("Primary key column cannot be pydantic only") return field_name @@ -144,7 +144,7 @@ def sqlalchemy_columns_from_model_fields( :rtype: Tuple[Optional[str], List[sqlalchemy.Column]] """ if len(model_fields.keys()) == 0: - model_fields["id"] = Integer(name="id", primary_key=True) + model_fields["id"] = ormar.Integer(name="id", primary_key=True) logging.warning( "Table {table_name} had no fields so auto " "Integer primary key named `id` created." @@ -159,7 +159,7 @@ def sqlalchemy_columns_from_model_fields( if ( not field.pydantic_only and not field.virtual - and not issubclass(field, ManyToManyField) + and not issubclass(field, ormar.ManyToManyField) ): columns.append(field.get_column(field.get_alias())) return pkname, columns @@ -201,7 +201,7 @@ def populate_meta_tablename_columns_and_pk( ) if pkname is None: - raise ModelDefinitionError("Table has to have a primary key.") + raise ormar.ModelDefinitionError("Table has to have a primary key.") new_model.Meta.columns = columns new_model.Meta.pkname = pkname @@ -248,7 +248,7 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None: def update_column_definition( - model: Union[Type["Model"], Type["NewBaseModel"]], field: Type[ForeignKeyField] + model: Union[Type["Model"], Type["NewBaseModel"]], field: Type["ForeignKeyField"] ) -> None: """ Updates a column with a new type column based on updated parameters in FK fields. diff --git a/ormar/models/helpers/validation.py b/ormar/models/helpers/validation.py new file mode 100644 index 0000000..582c3fa --- /dev/null +++ b/ormar/models/helpers/validation.py @@ -0,0 +1,161 @@ +import datetime +import decimal +import uuid +from enum import Enum +from typing import Any, Dict, List, TYPE_CHECKING, Tuple, Type + +try: + import orjson as json +except ImportError: # pragma: no cover + import json # type: ignore + +import pydantic +from pydantic.main import SchemaExtraCallable + +import ormar # noqa: I100, I202 +from ormar.fields import BaseField +from ormar.models.helpers.models import meta_field_not_set + +if TYPE_CHECKING: # pragma no cover + from ormar import Model + + +def check_if_field_has_choices(field: Type[BaseField]) -> bool: + """ + Checks if given field has choices populated. + A if it has one, a validator for this field needs to be attached. + + :param field: ormar field to check + :type field: BaseField + :return: result of the check + :rtype: bool + """ + return hasattr(field, "choices") and bool(field.choices) + + +def convert_choices_if_needed( # noqa: CCR001 + field: Type["BaseField"], value: Any +) -> Tuple[Any, List]: + """ + Converts dates to isoformat as fastapi can check this condition in routes + and the fields are not yet parsed. + + Converts enums to list of it's values. + + Converts uuids to strings. + + Converts decimal to float with given scale. + + :param field: ormar field to check with choices + :type field: Type[BaseField] + :param values: current values of the model to verify + :type values: Dict + :return: value, choices list + :rtype: Tuple[Any, List] + """ + choices = [o.value if isinstance(o, Enum) else o for o in field.choices] + + if field.__type__ in [datetime.datetime, datetime.date, datetime.time]: + value = value.isoformat() if not isinstance(value, str) else value + choices = [o.isoformat() for o in field.choices] + elif field.__type__ == pydantic.Json: + value = json.dumps(value) if not isinstance(value, str) else value + value = value.decode("utf-8") if isinstance(value, bytes) else value + elif field.__type__ == uuid.UUID: + value = str(value) if not isinstance(value, str) else value + choices = [str(o) for o in field.choices] + elif field.__type__ == decimal.Decimal: + precision = field.scale # type: ignore + value = ( + round(float(value), precision) + if isinstance(value, decimal.Decimal) + else value + ) + choices = [round(float(o), precision) for o in choices] + + return value, choices + + +def validate_choices(field: Type["BaseField"], value: Any) -> None: + """ + Validates if given value is in provided choices. + + :raises ValueError: If value is not in choices. + :param field:field to validate + :type field: Type[BaseField] + :param value: value of the field + :type value: Any + """ + value, choices = convert_choices_if_needed(field=field, value=value) + if value is not ormar.Undefined and value not in choices: + raise ValueError( + f"{field.name}: '{value}' " f"not in allowed choices set:" f" {choices}" + ) + + +def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, Any]: + """ + Validator that is attached to pydantic model pre root validators. + Validator checks if field value is in field.choices list. + + :raises ValueError: if field value is outside of allowed choices. + :param cls: constructed class + :type cls: Model class + :param values: dictionary of field values (pydantic side) + :type values: Dict[str, Any] + :return: values if pass validation, otherwise exception is raised + :rtype: Dict[str, Any] + """ + for field_name, field in cls.Meta.model_fields.items(): + if check_if_field_has_choices(field): + value = values.get(field_name, ormar.Undefined) + validate_choices(field=field, value=value) + return values + + +def construct_modify_schema_function(fields_with_choices: List) -> SchemaExtraCallable: + """ + Modifies the schema to include fields with choices validator. + Those fields will be displayed in schema as Enum types with available choices + values listed next to them. + + :param fields_with_choices: list of fields with choices validation + :type fields_with_choices: List + :return: callable that will be run by pydantic to modify the schema + :rtype: Callable + """ + + def schema_extra(schema: Dict[str, Any], model: Type["Model"]) -> None: + for field_id, prop in schema.get("properties", {}).items(): + if field_id in fields_with_choices: + prop["enum"] = list(model.Meta.model_fields[field_id].choices) + prop["description"] = prop.get("description", "") + "An enumeration." + + return staticmethod(schema_extra) # type: ignore + + +def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001 + """ + Checks if Model has any fields with choices set. + If yes it adds choices validation into pre root validators. + + :param model: newly constructed Model + :type model: Model class + """ + fields_with_choices = [] + if not meta_field_not_set(model=model, field_name="model_fields"): + for name, field in model.Meta.model_fields.items(): + if check_if_field_has_choices(field): + fields_with_choices.append(name) + validators = getattr(model, "__pre_root_validators__", []) + if choices_validator not in validators: + validators.append(choices_validator) + model.__pre_root_validators__ = validators + if not model._choices_fields: + model._choices_fields = set() + model._choices_fields.add(name) + + if fields_with_choices: + model.Config.schema_extra = construct_modify_schema_function( + fields_with_choices=fields_with_choices + ) diff --git a/ormar/models/metaclass.py b/ormar/models/metaclass.py index 3ce2699..0fc6f5d 100644 --- a/ormar/models/metaclass.py +++ b/ormar/models/metaclass.py @@ -1,7 +1,3 @@ -import datetime -import decimal -import uuid -from enum import Enum from typing import ( Any, Dict, @@ -18,28 +14,29 @@ from typing import ( import databases import pydantic import sqlalchemy -from pydantic.main import SchemaExtraCallable from sqlalchemy.sql.schema import ColumnCollectionConstraint import ormar # noqa I100 -from ormar import ForeignKey, Integer, ModelDefinitionError # noqa I100 +from ormar import ModelDefinitionError # noqa I100 from ormar.fields import BaseField from ormar.fields.foreign_key import ForeignKeyField from ormar.fields.many_to_many import ManyToManyField from ormar.models.helpers import ( alias_manager, + check_required_meta_parameters, expand_reverse_relationships, extract_annotations_and_default_vals, get_potential_fields, get_pydantic_base_orm_config, get_pydantic_field, + meta_field_not_set, + populate_choices_validators, populate_default_options_values, populate_meta_sqlalchemy_table_if_required, populate_meta_tablename_columns_and_pk, register_relation_in_alias_manager, + sqlalchemy_columns_from_model_fields, ) -from ormar.models.helpers.models import check_required_meta_parameters -from ormar.models.helpers.sqlalchemy import sqlalchemy_columns_from_model_fields from ormar.models.quick_access_views import quick_access_set from ormar.queryset import QuerySet from ormar.relations.alias_manager import AliasManager @@ -48,7 +45,6 @@ from ormar.signals import Signal, SignalEmitter if TYPE_CHECKING: # pragma no cover from ormar import Model -PARSED_FIELDS_KEY = "__parsed_fields__" CONFIG_KEY = "Config" @@ -76,119 +72,6 @@ class ModelMeta: requires_ref_update: bool -def check_if_field_has_choices(field: Type[BaseField]) -> bool: - """ - Checks if given field has choices populated. - A if it has one, a validator for this field needs to be attached. - - :param field: ormar field to check - :type field: BaseField - :return: result of the check - :rtype: bool - """ - return hasattr(field, "choices") and bool(field.choices) - - -def convert_choices_if_needed( # noqa: CCR001 - field: Type["BaseField"], values: Dict -) -> Tuple[Any, List]: - """ - Converts dates to isoformat as fastapi can check this condition in routes - and the fields are not yet parsed. - - Converts enums to list of it's values. - - Converts uuids to strings. - - Converts decimal to float with given scale. - - :param field: ormar field to check with choices - :type field: Type[BaseField] - :param values: current values of the model to verify - :type values: Dict - :return: value, choices list - :rtype: Tuple[Any, List] - """ - value = values.get(field.name, ormar.Undefined) - choices = [o.value if isinstance(o, Enum) else o for o in field.choices] - - if field.__type__ in [datetime.datetime, datetime.date, datetime.time]: - value = value.isoformat() if not isinstance(value, str) else value - choices = [o.isoformat() for o in field.choices] - elif field.__type__ == uuid.UUID: - value = str(value) if not isinstance(value, str) else value - choices = [str(o) for o in field.choices] - elif field.__type__ == decimal.Decimal: - precision = field.scale # type: ignore - value = ( - round(float(value), precision) - if isinstance(value, decimal.Decimal) - else value - ) - choices = [round(float(o), precision) for o in choices] - - return value, choices - - -def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, Any]: - """ - Validator that is attached to pydantic model pre root validators. - Validator checks if field value is in field.choices list. - - :raises ValueError: if field value is outside of allowed choices. - :param cls: constructed class - :type cls: Model class - :param values: dictionary of field values (pydantic side) - :type values: Dict[str, Any] - :return: values if pass validation, otherwise exception is raised - :rtype: Dict[str, Any] - """ - for field_name, field in cls.Meta.model_fields.items(): - if check_if_field_has_choices(field): - value, choices = convert_choices_if_needed(field=field, values=values) - if value is not ormar.Undefined and value not in choices: - raise ValueError( - f"{field_name}: '{values.get(field_name)}' " - f"not in allowed choices set:" - f" {choices}" - ) - return values - - -def construct_modify_schema_function(fields_with_choices: List) -> SchemaExtraCallable: - def schema_extra(schema: Dict[str, Any], model: Type["Model"]) -> None: - for field_id, prop in schema.get("properties", {}).items(): - if field_id in fields_with_choices: - prop["enum"] = list(model.Meta.model_fields[field_id].choices) - prop["description"] = prop.get("description", "") + "An enumeration." - - return staticmethod(schema_extra) # type: ignore - - -def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001 - """ - Checks if Model has any fields with choices set. - If yes it adds choices validation into pre root validators. - - :param model: newly constructed Model - :type model: Model class - """ - fields_with_choices = [] - if not meta_field_not_set(model=model, field_name="model_fields"): - for name, field in model.Meta.model_fields.items(): - if check_if_field_has_choices(field): - fields_with_choices.append(name) - validators = getattr(model, "__pre_root_validators__", []) - if choices_validator not in validators: - validators.append(choices_validator) - model.__pre_root_validators__ = validators - - if fields_with_choices: - model.Config.schema_extra = construct_modify_schema_function( - fields_with_choices=fields_with_choices - ) - - def add_cached_properties(new_model: Type["Model"]) -> None: """ Sets cached properties for both pydantic and ormar models. @@ -207,22 +90,7 @@ def add_cached_properties(new_model: Type["Model"]) -> None: new_model._related_names = None new_model._related_fields = None new_model._pydantic_fields = {name for name in new_model.__fields__} - - -def meta_field_not_set(model: Type["Model"], field_name: str) -> bool: - """ - Checks if field with given name is already present in model.Meta. - Then check if it's set to something truthful - (in practice meaning not None, as it's non or ormar Field only). - - :param model: newly constructed model - :type model: Model class - :param field_name: name of the ormar field - :type field_name: str - :return: result of the check - :rtype: bool - """ - return not hasattr(model.Meta, field_name) or not getattr(model.Meta, field_name) + new_model._choices_fields = set() def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001 @@ -273,34 +141,81 @@ def register_signals(new_model: Type["Model"]) -> None: # noqa: CCR001 new_model.Meta.signals = signals -def update_attrs_and_fields( - attrs: Dict, - new_attrs: Dict, - model_fields: Dict, - new_model_fields: Dict, - new_fields: Set, -) -> Dict: - """ - Updates __annotations__, values of model fields (so pydantic FieldInfos) - as well as model.Meta.model_fields definitions from parents. +class ModelMetaclass(pydantic.main.ModelMetaclass): + def __new__( # type: ignore # noqa: CCR001 + mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict + ) -> "ModelMetaclass": + """ + Metaclass used by ormar Models that performs configuration + and build of ormar Models. - :param attrs: new namespace for class being constructed - :type attrs: Dict - :param new_attrs: related of the namespace extracted from parent class - :type new_attrs: Dict - :param model_fields: ormar fields in defined in current class - :type model_fields: Dict[str, BaseField] - :param new_model_fields: ormar fields defined in parent classes - :type new_model_fields: Dict[str, BaseField] - :param new_fields: set of new fields names - :type new_fields: Set[str] - """ - key = "__annotations__" - attrs[key].update(new_attrs[key]) - attrs.update({name: new_attrs[name] for name in new_fields}) - updated_model_fields = {k: v for k, v in new_model_fields.items()} - updated_model_fields.update(model_fields) - return updated_model_fields + + Sets pydantic configuration. + Extract model_fields and convert them to pydantic FieldInfo, + updates class namespace. + + Extracts settings and fields from parent classes. + Fetches methods decorated with @property_field decorator + to expose them later in dict(). + + Construct parent pydantic Metaclass/ Model. + + If class has Meta class declared (so actual ormar Models) it also: + + * populate sqlalchemy columns, pkname and tables from model_fields + * register reverse relationships on related models + * registers all relations in alias manager that populates table_prefixes + * exposes alias manager on each Model + * creates QuerySet for each model and exposes it on a class + + :param name: name of current class + :type name: str + :param bases: base classes + :type bases: Tuple + :param attrs: class namespace + :type attrs: Dict + """ + attrs["Config"] = get_pydantic_base_orm_config() + attrs["__name__"] = name + attrs, model_fields = extract_annotations_and_default_vals(attrs) + for base in reversed(bases): + mod = base.__module__ + if mod.startswith("ormar.models.") or mod.startswith("pydantic."): + continue + attrs, model_fields = extract_from_parents_definition( + base_class=base, curr_class=mcs, attrs=attrs, model_fields=model_fields + ) + new_model = super().__new__( # type: ignore + mcs, name, bases, attrs + ) + + add_cached_properties(new_model) + + if hasattr(new_model, "Meta"): + populate_default_options_values(new_model, model_fields) + check_required_meta_parameters(new_model) + add_property_fields(new_model, attrs) + register_signals(new_model=new_model) + populate_choices_validators(new_model) + + if not new_model.Meta.abstract: + new_model = populate_meta_tablename_columns_and_pk(name, new_model) + populate_meta_sqlalchemy_table_if_required(new_model.Meta) + expand_reverse_relationships(new_model) + for field in new_model.Meta.model_fields.values(): + register_relation_in_alias_manager(field=field) + + if new_model.Meta.pkname not in attrs["__annotations__"]: + field_name = new_model.Meta.pkname + attrs["__annotations__"][field_name] = Optional[int] # type: ignore + attrs[field_name] = None + new_model.__fields__[field_name] = get_pydantic_field( + field_name=field_name, model=new_model + ) + new_model.Meta.alias_manager = alias_manager + new_model.objects = QuerySet(new_model) + + return new_model def verify_constraint_names( @@ -594,78 +509,34 @@ def extract_from_parents_definition( # noqa: CCR001 return attrs, model_fields -class ModelMetaclass(pydantic.main.ModelMetaclass): - def __new__( # type: ignore # noqa: CCR001 - mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict - ) -> "ModelMetaclass": - """ - Metaclass used by ormar Models that performs configuration - and build of ormar Models. +def update_attrs_and_fields( + attrs: Dict, + new_attrs: Dict, + model_fields: Dict, + new_model_fields: Dict, + new_fields: Set, +) -> Dict: + """ + Updates __annotations__, values of model fields (so pydantic FieldInfos) + as well as model.Meta.model_fields definitions from parents. + + :param attrs: new namespace for class being constructed + :type attrs: Dict + :param new_attrs: related of the namespace extracted from parent class + :type new_attrs: Dict + :param model_fields: ormar fields in defined in current class + :type model_fields: Dict[str, BaseField] + :param new_model_fields: ormar fields defined in parent classes + :type new_model_fields: Dict[str, BaseField] + :param new_fields: set of new fields names + :type new_fields: Set[str] + """ + key = "__annotations__" + attrs[key].update(new_attrs[key]) + attrs.update({name: new_attrs[name] for name in new_fields}) + updated_model_fields = {k: v for k, v in new_model_fields.items()} + updated_model_fields.update(model_fields) + return updated_model_fields - Sets pydantic configuration. - Extract model_fields and convert them to pydantic FieldInfo, - updates class namespace. - - Extracts settings and fields from parent classes. - Fetches methods decorated with @property_field decorator - to expose them later in dict(). - - Construct parent pydantic Metaclass/ Model. - - If class has Meta class declared (so actual ormar Models) it also: - - * populate sqlalchemy columns, pkname and tables from model_fields - * register reverse relationships on related models - * registers all relations in alias manager that populates table_prefixes - * exposes alias manager on each Model - * creates QuerySet for each model and exposes it on a class - - :param name: name of current class - :type name: str - :param bases: base classes - :type bases: Tuple - :param attrs: class namespace - :type attrs: Dict - """ - attrs["Config"] = get_pydantic_base_orm_config() - attrs["__name__"] = name - attrs, model_fields = extract_annotations_and_default_vals(attrs) - for base in reversed(bases): - mod = base.__module__ - if mod.startswith("ormar.models.") or mod.startswith("pydantic."): - continue - attrs, model_fields = extract_from_parents_definition( - base_class=base, curr_class=mcs, attrs=attrs, model_fields=model_fields - ) - new_model = super().__new__( # type: ignore - mcs, name, bases, attrs - ) - - add_cached_properties(new_model) - - if hasattr(new_model, "Meta"): - populate_default_options_values(new_model, model_fields) - check_required_meta_parameters(new_model) - add_property_fields(new_model, attrs) - register_signals(new_model=new_model) - populate_choices_validators(new_model) - - if not new_model.Meta.abstract: - new_model = populate_meta_tablename_columns_and_pk(name, new_model) - populate_meta_sqlalchemy_table_if_required(new_model.Meta) - expand_reverse_relationships(new_model) - for field in new_model.Meta.model_fields.values(): - register_relation_in_alias_manager(field=field) - - if new_model.Meta.pkname not in attrs["__annotations__"]: - field_name = new_model.Meta.pkname - attrs["__annotations__"][field_name] = Optional[int] # type: ignore - attrs[field_name] = None - new_model.__fields__[field_name] = get_pydantic_field( - field_name=field_name, model=new_model - ) - new_model.Meta.alias_manager = alias_manager - new_model.objects = QuerySet(new_model) - - return new_model +PARSED_FIELDS_KEY = "__parsed_fields__" diff --git a/ormar/models/mixins/save_mixin.py b/ormar/models/mixins/save_mixin.py index 0e0884f..abdda94 100644 --- a/ormar/models/mixins/save_mixin.py +++ b/ormar/models/mixins/save_mixin.py @@ -1,7 +1,8 @@ -from typing import Dict +from typing import Dict, Optional, Set, TYPE_CHECKING import ormar from ormar.exceptions import ModelPersistenceError +from ormar.models.helpers.validation import validate_choices from ormar.models.mixins import AliasMixin from ormar.models.mixins.relation_mixin import RelationMixin @@ -11,6 +12,9 @@ class SavePrepareMixin(RelationMixin, AliasMixin): Used to prepare models to be saved in database """ + if TYPE_CHECKING: # pragma: nocover + _choices_fields: Optional[Set] + @classmethod def prepare_model_to_save(cls, new_kwargs: dict) -> dict: """ @@ -109,3 +113,22 @@ class SavePrepareMixin(RelationMixin, AliasMixin): if field.server_default is not None and not new_kwargs.get(field_name): new_kwargs.pop(field_name, None) return new_kwargs + + @classmethod + def validate_choices(cls, new_kwargs: Dict) -> Dict: + """ + Receives dictionary of model that is about to be saved and validates the + fields with choices set to see if the value is allowed. + + :param new_kwargs: dictionary of model that is about to be saved + :type new_kwargs: Dict + :return: dictionary of model that is about to be saved + :rtype: Dict + """ + if not cls._choices_fields: + return new_kwargs + + for field_name, field in cls.Meta.model_fields.items(): + if field_name in new_kwargs and field_name in cls._choices_fields: + validate_choices(field=field, value=new_kwargs.get(field_name)) + return new_kwargs diff --git a/ormar/models/newbasemodel.py b/ormar/models/newbasemodel.py index ed69e46..3aa359e 100644 --- a/ormar/models/newbasemodel.py +++ b/ormar/models/newbasemodel.py @@ -23,7 +23,6 @@ try: except ImportError: # pragma: no cover import json # type: ignore - import databases import pydantic import sqlalchemy @@ -39,6 +38,7 @@ from ormar.models.helpers.sqlalchemy import ( populate_meta_sqlalchemy_table_if_required, update_column_definition, ) +from ormar.models.helpers.validation import validate_choices from ormar.models.metaclass import ModelMeta, ModelMetaclass from ormar.models.modelproxy import ModelTableProxy from ormar.queryset.utils import translate_list_to_dict @@ -83,6 +83,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass _orm_saved: bool _related_names: Optional[Set] _related_names_hash: str + _choices_fields: Optional[Set] _pydantic_fields: Set _quick_access_fields: Set Meta: ModelMeta @@ -208,23 +209,22 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass elif name == "pk": object.__setattr__(self, self.Meta.pkname, value) self.set_save_status(False) - elif name in self._orm: - model = self.Meta.model_fields[name].expand_relationship( - value=value, child=self + elif name in object.__getattribute__(self, "_orm"): + model = ( + object.__getattribute__(self, "Meta") + .model_fields[name] + .expand_relationship(value=value, child=self) ) - if isinstance(self.__dict__.get(name), list): + if isinstance(object.__getattribute__(self, "__dict__").get(name), list): # virtual foreign key or many to many - self.__dict__[name].append(model) + object.__getattribute__(self, "__dict__")[name].append(model) else: # foreign key relation - self.__dict__[name] = model + object.__getattribute__(self, "__dict__")[name] = model self.set_save_status(False) else: - value = ( - self._convert_json(name, value, "dumps") - if name in self.__fields__ - else value - ) + if name in object.__getattribute__(self, "_choices_fields"): + validate_choices(field=self.Meta.model_fields[name], value=value) super().__setattr__(name, value) self.set_save_status(False) diff --git a/ormar/models/quick_access_views.py b/ormar/models/quick_access_views.py index 398c4b8..c09b672 100644 --- a/ormar/models/quick_access_views.py +++ b/ormar/models/quick_access_views.py @@ -16,6 +16,7 @@ quick_access_set = { "__pre_root_validators__", "__same__", "_calculate_keys", + "_choices_fields", "_convert_json", "_extract_db_related_names", "_extract_model_db_fields", diff --git a/ormar/queryset/queryset.py b/ormar/queryset/queryset.py index 8a03866..6adec49 100644 --- a/ormar/queryset/queryset.py +++ b/ormar/queryset/queryset.py @@ -555,6 +555,7 @@ class QuerySet: self.model.extract_related_names() ) updates = {k: v for k, v in kwargs.items() if k in self_fields} + updates = self.model.validate_choices(updates) updates = self.model.translate_columns_to_aliases(updates) if not each and not self.filter_clauses: raise QueryDefinitionError( diff --git a/ormar/relations/relation.py b/ormar/relations/relation.py index 9eb8e60..cf191e0 100644 --- a/ormar/relations/relation.py +++ b/ormar/relations/relation.py @@ -64,7 +64,7 @@ class Relation: self._to_remove: Set = set() self.to: Type["T"] = to self._through: Optional[Type["T"]] = through - self.field_name = field_name + self.field_name: str = field_name self.related_models: Optional[Union[RelationProxy, "T"]] = ( RelationProxy(relation=self, type_=type_, field_name=field_name) if type_ in (RelationType.REVERSE, RelationType.MULTIPLE) diff --git a/tests/test_json_field_fastapi.py b/tests/test_json_field_fastapi.py index ab7f22d..0569c0c 100644 --- a/tests/test_json_field_fastapi.py +++ b/tests/test_json_field_fastapi.py @@ -1,3 +1,4 @@ +# type: ignore import uuid from typing import List @@ -54,13 +55,13 @@ async def read_things(): @app.get("/things_with_sample", response_model=List[Thing]) async def read_things_sample(): await Thing(name="b", js=["asdf", "asdf", "bobby", "nigel"]).save() - await Thing(name="a", js="[\"lemon\", \"raspberry\", \"lime\", \"pumice\"]").save() + await Thing(name="a", js='["lemon", "raspberry", "lime", "pumice"]').save() return await Thing.objects.order_by("name").all() @app.get("/things_with_sample_after_init", response_model=Thing) async def read_things_init(): - thing1 = Thing() + thing1 = Thing(js="{}") thing1.name = "d" thing1.js = ["js", "set", "after", "constructor"] await thing1.save() @@ -80,6 +81,11 @@ async def create_things(thing: Thing): return thing +@app.get("/things_untyped") +async def read_things_untyped(): + return await Thing.objects.order_by("name").all() + + @pytest.fixture(autouse=True, scope="module") def create_test_database(): engine = sqlalchemy.create_engine(DATABASE_URL) @@ -88,6 +94,38 @@ def create_test_database(): metadata.drop_all(engine) +@pytest.mark.asyncio +async def test_json_is_required_if_not_nullable(): + with pytest.raises(pydantic.ValidationError): + Thing() + + +@pytest.mark.asyncio +async def test_json_is_not_required_if_nullable(): + class Thing2(ormar.Model): + class Meta(BaseMeta): + tablename = "things2" + + id: uuid.UUID = ormar.UUID(primary_key=True, default=uuid.uuid4) + name: str = ormar.Text(default="") + js: pydantic.Json = ormar.JSON(nullable=True) + + Thing2() + + +@pytest.mark.asyncio +async def test_setting_values_after_init(): + t1 = Thing(id="67a82813-d90c-45ff-b546-b4e38d7030d7", name="t1", js=["thing1"]) + assert '["thing1"]' in t1.json() + await t1.save() + t1.json() + assert '["thing1"]' in t1.json() + + assert '["thing1"]' in (await Thing.objects.get(id=t1.id)).json() + await t1.update() + assert '["thing1"]' in (await Thing.objects.get(id=t1.id)).json() + + def test_read_main(): client = TestClient(app) with client as client: @@ -134,7 +172,7 @@ def test_read_main(): assert resp.get("js") == ["js", "set", "after", "update"] # test new with after constructor - response = client.get("/things") + response = client.get("/things_untyped") resp = response.json() assert resp[0].get("js") == ["lemon", "raspberry", "lime", "pumice"] assert resp[1].get("js") == ["asdf", "asdf", "bobby", "nigel"] diff --git a/tests/test_m2m_through_fields.py b/tests/test_m2m_through_fields.py index 2bfd464..96c7f89 100644 --- a/tests/test_m2m_through_fields.py +++ b/tests/test_m2m_through_fields.py @@ -38,6 +38,7 @@ class Post(ormar.Model): title: str = ormar.String(max_length=200) categories = ormar.ManyToMany(Category, through=PostCategory) + # # @pytest.fixture(autouse=True, scope="module") # async def create_test_database(): diff --git a/tests/test_models.py b/tests/test_models.py index 0601f4d..2724eec 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -81,7 +81,7 @@ class Product(ormar.Model): last_delivery: datetime.date = ormar.Date(default=datetime.datetime.now) -country_name_choices = ("Canada", "Algeria", "United States") +country_name_choices = ("Canada", "Algeria", "United States", "Belize") country_taxed_choices = (True,) country_country_code_choices = (-10, 1, 213, 1200) @@ -449,6 +449,32 @@ async def test_model_choices(): name=name, taxed=taxed, country_code=country_code ) + # test setting after init also triggers validation + with pytest.raises(ValueError): + name, taxed, country_code = "Algeria", True, 967 + check_choices((name, taxed, country_code), ["in", "in", "out"]) + country = Country() + country.country_code = country_code + + with pytest.raises(ValueError): + name, taxed, country_code = "Saudi Arabia", True, 1 + check_choices((name, taxed, country_code), ["out", "in", "in"]) + country = Country() + country.name = name + + with pytest.raises(ValueError): + name, taxed, country_code = "Algeria", False, 1 + check_choices((name, taxed, country_code), ["in", "out", "in"]) + country = Country() + country.taxed = taxed + + # check also update from queryset + with pytest.raises(ValueError): + name, taxed, country_code = "Algeria", False, 1 + check_choices((name, taxed, country_code), ["in", "out", "in"]) + await Country(name="Belize").save() + await Country.objects.filter(name="Belize").update(name="Vietnam") + @pytest.mark.asyncio async def test_start_and_end_filters():