add some more docstrings

This commit is contained in:
collerek
2020-12-09 17:44:31 +01:00
parent 8c917ddc41
commit fc710687e6
2 changed files with 147 additions and 26 deletions

View File

@ -41,6 +41,7 @@ class ModelMeta:
alias_manager: AliasManager alias_manager: AliasManager
property_fields: Set property_fields: Set
signals: SignalEmitter signals: SignalEmitter
abstract: bool
def register_relation_on_build(table_name: str, field: Type[ForeignKeyField]) -> None: def register_relation_on_build(table_name: str, field: Type[ForeignKeyField]) -> None:
@ -269,6 +270,15 @@ def populate_meta_tablename_columns_and_pk(
def populate_meta_sqlalchemy_table_if_required( def populate_meta_sqlalchemy_table_if_required(
new_model: Type["Model"], new_model: Type["Model"],
) -> Type["Model"]: ) -> Type["Model"]:
"""
Constructs sqlalchemy table out of columns and parameters set on Meta class.
It populates name, metadata, columns and constraints.
:param new_model: class without sqlalchemy table constructed
:type new_model: Model class
:return: class with populated Meta.table
:rtype: Model class
"""
if not hasattr(new_model.Meta, "table"): if not hasattr(new_model.Meta, "table"):
new_model.Meta.table = sqlalchemy.Table( new_model.Meta.table = sqlalchemy.Table(
new_model.Meta.tablename, new_model.Meta.tablename,
@ -280,22 +290,45 @@ def populate_meta_sqlalchemy_table_if_required(
def get_pydantic_base_orm_config() -> Type[BaseConfig]: def get_pydantic_base_orm_config() -> Type[BaseConfig]:
"""
Returns empty pydantic Config with orm_mode set to True.
:return: empty default config with orm_mode set.
:rtype: pydantic Config
"""
class Config(BaseConfig): class Config(BaseConfig):
orm_mode = True orm_mode = True
# arbitrary_types_allowed = True
return Config return Config
def check_if_field_has_choices(field: Type[BaseField]) -> bool: def check_if_field_has_choices(field: Type[BaseField]) -> bool:
"""
Checks if given field has choices populated.
A if it has one, a validator for this field needs to be attached.
:param field: ormar field to check
:type field: BaseField
:return: result of the check
:rtype: bool
"""
return hasattr(field, "choices") and bool(field.choices) return hasattr(field, "choices") and bool(field.choices)
def model_initialized_and_has_model_fields(model: Type["Model"]) -> bool:
return hasattr(model, "Meta") and hasattr(model.Meta, "model_fields")
def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, Any]: def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, Any]:
"""
Validator that is attached to pydantic model pre root validators.
Validator checks if field value is in field.choices list.
:raises: ValueError if field value is outside of allowed choices.
:param cls: constructed class
:type cls: Model class
:param values: dictionary of field values (pydantic side)
:type values: Dict[str, Any]
:return: values if pass validation, otherwise exception is raised
:rtype: Dict[str, Any]
"""
for field_name, field in cls.Meta.model_fields.items(): for field_name, field in cls.Meta.model_fields.items():
if check_if_field_has_choices(field): if check_if_field_has_choices(field):
value = values.get(field_name, ormar.Undefined) value = values.get(field_name, ormar.Undefined)
@ -309,7 +342,14 @@ def choices_validator(cls: Type["Model"], values: Dict[str, Any]) -> Dict[str, A
def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001 def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001
if model_initialized_and_has_model_fields(model): """
Checks if Model has any fields with choices set.
If yes it adds choices validation into pre root validators.
:param model: newly constructed Model
:type model: Model class
"""
if not meta_field_not_set(model=model, field_name="model_fields"):
for _, field in model.Meta.model_fields.items(): for _, field in model.Meta.model_fields.items():
if check_if_field_has_choices(field): if check_if_field_has_choices(field):
validators = getattr(model, "__pre_root_validators__", []) validators = getattr(model, "__pre_root_validators__", [])
@ -321,13 +361,43 @@ def populate_choices_validators(model: Type["Model"]) -> None: # noqa CCR001
def populate_default_options_values( def populate_default_options_values(
new_model: Type["Model"], model_fields: Dict new_model: Type["Model"], model_fields: Dict
) -> None: ) -> None:
"""
Sets all optional Meta values to it's defaults
and set model_fields that were already previously extracted.
Here should live all options that are not overwritten/set for all models.
Current options are:
* constraints = []
* abstract = False
:param new_model: newly constructed Model
:type new_model: Model class
:param model_fields:
:type model_fields: Union[Dict[str, type], Dict]
"""
if not hasattr(new_model.Meta, "constraints"): if not hasattr(new_model.Meta, "constraints"):
new_model.Meta.constraints = [] new_model.Meta.constraints = []
if not hasattr(new_model.Meta, "model_fields"): if not hasattr(new_model.Meta, "model_fields"):
new_model.Meta.model_fields = model_fields new_model.Meta.model_fields = model_fields
if not hasattr(new_model.Meta, "abstract"):
new_model.Meta.abstract = False
def add_cached_properties(new_model: Type["Model"]) -> None: def add_cached_properties(new_model: Type["Model"]) -> None:
"""
Sets cached properties for both pydantic and ormar models.
Quick access fields are fields grabbed in getattribute to skip all checks.
Related fields and names are populated to None as they can change later.
When children models are constructed they can modify parent to register itself.
All properties here are used as "cache" to not recalculate them constantly.
:param new_model: newly constructed Model
:type new_model: Model class
"""
new_model._quick_access_fields = quick_access_set new_model._quick_access_fields = quick_access_set
new_model._related_names = None new_model._related_names = None
new_model._related_fields = None new_model._related_fields = None
@ -335,10 +405,34 @@ def add_cached_properties(new_model: Type["Model"]) -> None:
def meta_field_not_set(model: Type["Model"], field_name: str) -> bool: def meta_field_not_set(model: Type["Model"], field_name: str) -> bool:
"""
Checks if field with given name is already present in model.Meta.
Then check if it's set to something truthful
(in practice meaning not None, as it's non or ormar Field only).
:param model: newly constructed model
:type model: Model class
:param field_name: name of the ormar field
:type field_name: str
:return: result of the check
:rtype: bool
"""
return not hasattr(model.Meta, field_name) or not getattr(model.Meta, field_name) return not hasattr(model.Meta, field_name) or not getattr(model.Meta, field_name)
def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001 def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa: CCR001
"""
Checks class namespace for properties or functions with __property_field__.
If attribute have __property_field__ it was decorated with @property_field.
Functions like this are exposed in dict() (therefore also fastapi result).
Names of property fields are cached for quicker access / extraction.
:param new_model: newly constructed model
:type new_model: Model class
:param attrs:
:type attrs: Dict[str, str]
"""
if meta_field_not_set(model=new_model, field_name="property_fields"): if meta_field_not_set(model=new_model, field_name="property_fields"):
props = set() props = set()
for var_name, value in attrs.items(): for var_name, value in attrs.items():
@ -351,6 +445,15 @@ def add_property_fields(new_model: Type["Model"], attrs: Dict) -> None: # noqa:
def register_signals(new_model: Type["Model"]) -> None: # noqa: CCR001 def register_signals(new_model: Type["Model"]) -> None: # noqa: CCR001
"""
Registers on model's SignalEmmiter and sets pre defined signals.
Predefined signals are (pre/post) + (save/update/delete).
Signals are emitted in both model own methods and in selected queryset ones.
:param new_model: newly constructed model
:type new_model: Model class
"""
if meta_field_not_set(model=new_model, field_name="signals"): if meta_field_not_set(model=new_model, field_name="signals"):
signals = SignalEmitter() signals = SignalEmitter()
signals.pre_save = Signal() signals.pre_save = Signal()
@ -461,7 +564,14 @@ def extract_mixin_fields_from_dict(
:rtype: Tuple[Dict, Dict] :rtype: Tuple[Dict, Dict]
""" """
if hasattr(base_class, "Meta"): if hasattr(base_class, "Meta"):
# not a mixin base parent Model new_fields = set(base_class.Meta.model_fields.keys()) # type: ignore
check_conflicting_fields(
new_fields=new_fields,
attrs=attrs,
base_class=base_class,
curr_class=curr_class,
)
model_fields.update(base_class.Meta.model_fields) # type: ignore
return attrs, model_fields return attrs, model_fields
key = "__annotations__" key = "__annotations__"
@ -537,20 +647,23 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
if hasattr(new_model, "Meta"): if hasattr(new_model, "Meta"):
populate_default_options_values(new_model, model_fields) populate_default_options_values(new_model, model_fields)
new_model = populate_meta_tablename_columns_and_pk(name, new_model)
new_model = populate_meta_sqlalchemy_table_if_required(new_model)
expand_reverse_relationships(new_model)
populate_choices_validators(new_model)
if new_model.Meta.pkname not in attrs["__annotations__"]:
field_name = new_model.Meta.pkname
attrs["__annotations__"][field_name] = Optional[int] # type: ignore
attrs[field_name] = None
new_model.__fields__[field_name] = get_pydantic_field(
field_name=field_name, model=new_model
)
new_model.Meta.alias_manager = alias_manager
new_model.objects = QuerySet(new_model)
add_property_fields(new_model, attrs) add_property_fields(new_model, attrs)
register_signals(new_model=new_model) register_signals(new_model=new_model)
populate_choices_validators(new_model)
if not new_model.Meta.abstract:
new_model = populate_meta_tablename_columns_and_pk(name, new_model)
new_model = populate_meta_sqlalchemy_table_if_required(new_model)
expand_reverse_relationships(new_model)
if new_model.Meta.pkname not in attrs["__annotations__"]:
field_name = new_model.Meta.pkname
attrs["__annotations__"][field_name] = Optional[int] # type: ignore
attrs[field_name] = None
new_model.__fields__[field_name] = get_pydantic_field(
field_name=field_name, model=new_model
)
new_model.Meta.alias_manager = alias_manager
new_model.objects = QuerySet(new_model)
return new_model return new_model

View File

@ -57,6 +57,7 @@ def create_test_database():
def test_field_redefining_raises_error(): def test_field_redefining_raises_error():
with pytest.raises(ModelDefinitionError): with pytest.raises(ModelDefinitionError):
class WrongField(ormar.Model, DateFieldsMixins): # pragma: no cover class WrongField(ormar.Model, DateFieldsMixins): # pragma: no cover
class Meta(ormar.ModelMeta): class Meta(ormar.ModelMeta):
tablename = "wrongs" tablename = "wrongs"
@ -77,6 +78,7 @@ def test_field_redefining_in_second_raises_error():
id: int = ormar.Integer(primary_key=True) id: int = ormar.Integer(primary_key=True)
with pytest.raises(ModelDefinitionError): with pytest.raises(ModelDefinitionError):
class WrongField(ormar.Model, DateFieldsMixins): # pragma: no cover class WrongField(ormar.Model, DateFieldsMixins): # pragma: no cover
class Meta(ormar.ModelMeta): class Meta(ormar.ModelMeta):
tablename = "wrongs" tablename = "wrongs"
@ -87,7 +89,9 @@ def test_field_redefining_in_second_raises_error():
created_date: datetime.datetime = ormar.DateTime() created_date: datetime.datetime = ormar.DateTime()
def round_date_to_seconds(date: datetime.datetime) -> datetime.datetime: # pragma: no cover def round_date_to_seconds(
date: datetime.datetime,
) -> datetime.datetime: # pragma: no cover
if date.microsecond >= 500000: if date.microsecond >= 500000:
date = date + datetime.timedelta(seconds=1) date = date + datetime.timedelta(seconds=1)
return date.replace(microsecond=0) return date.replace(microsecond=0)
@ -126,13 +130,17 @@ async def test_fields_inherited_from_mixin():
sub2 = ( sub2 = (
await Subject.objects.select_related("category") await Subject.objects.select_related("category")
.order_by("-created_date") .order_by("-created_date")
.exclude_fields("updated_date") .exclude_fields("updated_date")
.get() .get()
)
assert round_date_to_seconds(sub2.created_date) == round_date_to_seconds(
sub.created_date
) )
assert round_date_to_seconds(sub2.created_date) == round_date_to_seconds(sub.created_date)
assert sub2.category.updated_date is not None assert sub2.category.updated_date is not None
assert round_date_to_seconds(sub2.category.created_date) == round_date_to_seconds(cat.created_date) assert round_date_to_seconds(
sub2.category.created_date
) == round_date_to_seconds(cat.created_date)
assert sub2.updated_date is None assert sub2.updated_date is None
assert sub2.category.created_by == "Sam" assert sub2.category.created_by == "Sam"
assert sub2.category.updated_by == cat.updated_by assert sub2.category.updated_by == cat.updated_by