check complex prefixes in groups, refactor limit queries, finish docstrings, refactors and cleanup in long methods
This commit is contained in:
@ -150,19 +150,68 @@ def sqlalchemy_columns_from_model_fields(
|
||||
"Integer primary key named `id` created."
|
||||
)
|
||||
validate_related_names_in_relations(model_fields, new_model)
|
||||
return _process_fields(model_fields=model_fields, new_model=new_model)
|
||||
|
||||
|
||||
def _process_fields(
|
||||
model_fields: Dict, new_model: Type["Model"]
|
||||
) -> Tuple[Optional[str], List[sqlalchemy.Column]]:
|
||||
"""
|
||||
Helper method.
|
||||
|
||||
Populates pkname and columns.
|
||||
Trigger validation of primary_key - only one and required pk can be set,
|
||||
cannot be pydantic_only.
|
||||
|
||||
Append fields to columns if it's not pydantic_only,
|
||||
virtual ForeignKey or ManyToMany field.
|
||||
|
||||
Sets `owner` on each model_field as reference to newly created Model.
|
||||
|
||||
:raises ModelDefinitionError: if validation of related_names fail,
|
||||
or pkname validation fails.
|
||||
:param model_fields: dictionary of declared ormar model fields
|
||||
:type model_fields: Dict[str, ormar.Field]
|
||||
:param new_model:
|
||||
:type new_model: Model class
|
||||
:return: pkname, list of sqlalchemy columns
|
||||
:rtype: Tuple[Optional[str], List[sqlalchemy.Column]]
|
||||
"""
|
||||
columns = []
|
||||
pkname = None
|
||||
for field_name, field in model_fields.items():
|
||||
field.owner = new_model
|
||||
if field.is_multi and not field.through:
|
||||
if _is_through_model_not_set(field):
|
||||
field.create_default_through_model()
|
||||
if field.primary_key:
|
||||
pkname = check_pk_column_validity(field_name, field, pkname)
|
||||
if not field.pydantic_only and not field.virtual and not field.is_multi:
|
||||
if _is_db_field(field):
|
||||
columns.append(field.get_column(field.get_alias()))
|
||||
return pkname, columns
|
||||
|
||||
|
||||
def _is_through_model_not_set(field: Type["BaseField"]) -> bool:
|
||||
"""
|
||||
Alias to if check that verifies if through model was created.
|
||||
:param field: field to check
|
||||
:type field: Type["BaseField"]
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
return field.is_multi and not field.through
|
||||
|
||||
|
||||
def _is_db_field(field: Type["BaseField"]) -> bool:
|
||||
"""
|
||||
Alias to if check that verifies if field should be included in database.
|
||||
:param field: field to check
|
||||
:type field: Type["BaseField"]
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
return not field.pydantic_only and not field.virtual and not field.is_multi
|
||||
|
||||
|
||||
def populate_meta_tablename_columns_and_pk(
|
||||
name: str, new_model: Type["Model"]
|
||||
) -> Type["Model"]:
|
||||
|
||||
@ -129,7 +129,7 @@ class RelationMixin:
|
||||
return related_names
|
||||
|
||||
@classmethod
|
||||
def _iterate_related_models(
|
||||
def _iterate_related_models( # noqa: CCR001
|
||||
cls,
|
||||
visited: Set[str] = None,
|
||||
source_visited: Set[str] = None,
|
||||
@ -149,14 +149,12 @@ class RelationMixin:
|
||||
:return: list of relation strings to be passed to select_related
|
||||
:rtype: List[str]
|
||||
"""
|
||||
source_visited = source_visited or set()
|
||||
if not source_model:
|
||||
source_visited = cls._populate_source_model_prefixes()
|
||||
source_visited = source_visited or cls._populate_source_model_prefixes()
|
||||
relations = cls.extract_related_names()
|
||||
processed_relations = []
|
||||
for relation in relations:
|
||||
target_model = cls.Meta.model_fields[relation].to
|
||||
if source_model and target_model == source_model:
|
||||
if cls._is_reverse_side_of_same_relation(source_model, target_model):
|
||||
continue
|
||||
if target_model not in source_visited or not source_model:
|
||||
deep_relations = target_model._iterate_related_models(
|
||||
@ -168,6 +166,39 @@ class RelationMixin:
|
||||
processed_relations.extend(deep_relations)
|
||||
else:
|
||||
processed_relations.append(relation)
|
||||
|
||||
return cls._get_final_relations(processed_relations, source_relation)
|
||||
|
||||
@staticmethod
|
||||
def _is_reverse_side_of_same_relation(
|
||||
source_model: Optional[Union[Type["Model"], Type["RelationMixin"]]],
|
||||
target_model: Type["Model"],
|
||||
) -> bool:
|
||||
"""
|
||||
Alias to check if source model is the same as target
|
||||
:param source_model: source model - relation comes from it
|
||||
:type source_model: Type["Model"]
|
||||
:param target_model: target model - relation leads to it
|
||||
:type target_model: Type["Model"]
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
return bool(source_model and target_model == source_model)
|
||||
|
||||
@staticmethod
|
||||
def _get_final_relations(
|
||||
processed_relations: List, source_relation: Optional[str]
|
||||
) -> List[str]:
|
||||
"""
|
||||
Helper method to prefix nested relation strings with current source relation
|
||||
|
||||
:param processed_relations: list of already processed relation str
|
||||
:type processed_relations: List[str]
|
||||
:param source_relation: name of the current relation
|
||||
:type source_relation: str
|
||||
:return: list of relation strings to be passed to select_related
|
||||
:rtype: List[str]
|
||||
"""
|
||||
if processed_relations:
|
||||
final_relations = [
|
||||
f"{source_relation + '__' if source_relation else ''}{relation}"
|
||||
|
||||
@ -4,7 +4,9 @@ from typing import (
|
||||
List,
|
||||
Optional,
|
||||
TYPE_CHECKING,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
@ -78,21 +80,12 @@ class ModelRow(NewBaseModel):
|
||||
related_models = group_related_list(select_related)
|
||||
|
||||
if related_field:
|
||||
if related_field.is_multi:
|
||||
previous_model = related_field.through
|
||||
else:
|
||||
previous_model = related_field.owner
|
||||
table_prefix = cls.Meta.alias_manager.resolve_relation_alias(
|
||||
from_model=previous_model, relation_name=related_field.name
|
||||
table_prefix = cls._process_table_prefix(
|
||||
source_model=source_model,
|
||||
current_relation_str=current_relation_str,
|
||||
related_field=related_field,
|
||||
used_prefixes=used_prefixes,
|
||||
)
|
||||
if not table_prefix or table_prefix in used_prefixes:
|
||||
manager = cls.Meta.alias_manager
|
||||
table_prefix = manager.resolve_relation_alias_after_complex(
|
||||
source_model=source_model,
|
||||
relation_str=current_relation_str,
|
||||
relation_field=related_field,
|
||||
)
|
||||
used_prefixes.append(table_prefix)
|
||||
|
||||
item = cls._populate_nested_models_from_row(
|
||||
item=item,
|
||||
@ -118,6 +111,44 @@ class ModelRow(NewBaseModel):
|
||||
instance.set_save_status(True)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def _process_table_prefix(
|
||||
cls,
|
||||
source_model: Type["Model"],
|
||||
current_relation_str: str,
|
||||
related_field: Type["ForeignKeyField"],
|
||||
used_prefixes: List[str],
|
||||
) -> str:
|
||||
"""
|
||||
|
||||
:param source_model: model on which relation was defined
|
||||
:type source_model: Type[Model]
|
||||
:param current_relation_str: current relation string
|
||||
:type current_relation_str: str
|
||||
:param related_field: field with relation declaration
|
||||
:type related_field: Type["ForeignKeyField"]
|
||||
:param used_prefixes: list of already extracted prefixes
|
||||
:type used_prefixes: List[str]
|
||||
:return: table_prefix to use
|
||||
:rtype: str
|
||||
"""
|
||||
if related_field.is_multi:
|
||||
previous_model = related_field.through
|
||||
else:
|
||||
previous_model = related_field.owner
|
||||
table_prefix = cls.Meta.alias_manager.resolve_relation_alias(
|
||||
from_model=previous_model, relation_name=related_field.name
|
||||
)
|
||||
if not table_prefix or table_prefix in used_prefixes:
|
||||
manager = cls.Meta.alias_manager
|
||||
table_prefix = manager.resolve_relation_alias_after_complex(
|
||||
source_model=source_model,
|
||||
relation_str=current_relation_str,
|
||||
relation_field=related_field,
|
||||
)
|
||||
used_prefixes.append(table_prefix)
|
||||
return table_prefix
|
||||
|
||||
@classmethod
|
||||
def _populate_nested_models_from_row( # noqa: CFQ002
|
||||
cls,
|
||||
@ -170,14 +201,11 @@ class ModelRow(NewBaseModel):
|
||||
if model_excludable.is_excluded(related):
|
||||
return item
|
||||
|
||||
relation_str = (
|
||||
"__".join([current_relation_str, related])
|
||||
if current_relation_str
|
||||
else related
|
||||
relation_str, remainder = cls._process_remainder_and_relation_string(
|
||||
related_models=related_models,
|
||||
current_relation_str=current_relation_str,
|
||||
related=related,
|
||||
)
|
||||
remainder = None
|
||||
if isinstance(related_models, dict) and related_models[related]:
|
||||
remainder = related_models[related]
|
||||
child = model_cls.from_row(
|
||||
row,
|
||||
related_models=remainder,
|
||||
@ -190,24 +218,84 @@ class ModelRow(NewBaseModel):
|
||||
)
|
||||
item[model_cls.get_column_name_from_alias(related)] = child
|
||||
if field.is_multi and child:
|
||||
through_name = cls.Meta.model_fields[related].through.get_name()
|
||||
through_child = cls.populate_through_instance(
|
||||
cls._populate_through_instance(
|
||||
row=row,
|
||||
item=item,
|
||||
related=related,
|
||||
through_name=through_name,
|
||||
excludable=excludable,
|
||||
child=child,
|
||||
proxy_source_model=proxy_source_model,
|
||||
)
|
||||
|
||||
if child.__class__ != proxy_source_model:
|
||||
setattr(child, through_name, through_child)
|
||||
else:
|
||||
item[through_name] = through_child
|
||||
child.set_save_status(True)
|
||||
|
||||
return item
|
||||
|
||||
@staticmethod
|
||||
def _process_remainder_and_relation_string(
|
||||
related_models: Union[Dict, List],
|
||||
current_relation_str: Optional[str],
|
||||
related: str,
|
||||
) -> Tuple[str, Optional[Union[Dict, List]]]:
|
||||
"""
|
||||
Process remainder models and relation string
|
||||
|
||||
:param related_models: list or dict of related models
|
||||
:type related_models: Union[Dict, List]
|
||||
:param current_relation_str: current relation string
|
||||
:type current_relation_str: Optional[str]
|
||||
:param related: name of the relation
|
||||
:type related: str
|
||||
"""
|
||||
relation_str = (
|
||||
"__".join([current_relation_str, related])
|
||||
if current_relation_str
|
||||
else related
|
||||
)
|
||||
|
||||
remainder = None
|
||||
if isinstance(related_models, dict) and related_models[related]:
|
||||
remainder = related_models[related]
|
||||
return relation_str, remainder
|
||||
|
||||
@classmethod
|
||||
def populate_through_instance(
|
||||
def _populate_through_instance( # noqa: CFQ002
|
||||
cls,
|
||||
row: sqlalchemy.engine.ResultProxy,
|
||||
item: Dict,
|
||||
related: str,
|
||||
excludable: ExcludableItems,
|
||||
child: "Model",
|
||||
proxy_source_model: Optional[Type["Model"]],
|
||||
) -> None:
|
||||
"""
|
||||
Populates the through model on reverse side of current query.
|
||||
Normally it's child class, unless the query is from queryset.
|
||||
|
||||
:param row: row from db result
|
||||
:type row: sqlalchemy.engine.ResultProxy
|
||||
:param item: parent item dict
|
||||
:type item: Dict
|
||||
:param related: current relation name
|
||||
:type related: str
|
||||
:param excludable: structure of fields to include and exclude
|
||||
:type excludable: ExcludableItems
|
||||
:param child: child item of parent
|
||||
:type child: "Model"
|
||||
:param proxy_source_model: source model from which querysetproxy is constructed
|
||||
:type proxy_source_model: Type["Model"]
|
||||
"""
|
||||
through_name = cls.Meta.model_fields[related].through.get_name()
|
||||
through_child = cls._create_through_instance(
|
||||
row=row, related=related, through_name=through_name, excludable=excludable,
|
||||
)
|
||||
|
||||
if child.__class__ != proxy_source_model:
|
||||
setattr(child, through_name, through_child)
|
||||
else:
|
||||
item[through_name] = through_child
|
||||
child.set_save_status(True)
|
||||
|
||||
@classmethod
|
||||
def _create_through_instance(
|
||||
cls,
|
||||
row: sqlalchemy.engine.ResultProxy,
|
||||
through_name: str,
|
||||
@ -288,12 +376,11 @@ class ModelRow(NewBaseModel):
|
||||
model=cls, excludable=excludable, alias=table_prefix, use_alias=False,
|
||||
)
|
||||
|
||||
column_prefix = table_prefix + "_" if table_prefix else ""
|
||||
for column in cls.Meta.table.columns:
|
||||
alias = cls.get_column_name_from_alias(column.name)
|
||||
if alias not in item and alias in selected_columns:
|
||||
prefixed_name = (
|
||||
f'{table_prefix + "_" if table_prefix else ""}{column.name}'
|
||||
)
|
||||
prefixed_name = f"{column_prefix}{column.name}"
|
||||
item[alias] = source[prefixed_name]
|
||||
|
||||
return item
|
||||
|
||||
@ -227,7 +227,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
super().__setattr__(name, value)
|
||||
self.set_save_status(False)
|
||||
|
||||
def __getattribute__(self, item: str) -> Any:
|
||||
def __getattribute__(self, item: str) -> Any: # noqa: CCR001
|
||||
"""
|
||||
Because we need to overwrite getting the attribute by ormar instead of pydantic
|
||||
as well as returning related models and not the value stored on the model the
|
||||
|
||||
Reference in New Issue
Block a user