wip with m2m fields

This commit is contained in:
collerek
2021-02-28 08:19:02 +01:00
parent ad9d065c6d
commit fd38ae2a40
7 changed files with 231 additions and 304 deletions

View File

@ -12,6 +12,20 @@ class Excludable:
include: Set = field(default_factory=set)
exclude: Set = field(default_factory=set)
@property
def include_all(self):
return ... in self.include
@property
def exclude_all(self):
return ... in self.exclude
def get_copy(self) -> "Excludable":
_copy = self.__class__()
_copy.include = {x for x in self.include}
_copy.exclude = {x for x in self.exclude}
return _copy
def set_values(self, value: Set, is_exclude: bool) -> None:
prop = "exclude" if is_exclude else "include"
if ... in getattr(self, prop) or ... in value:
@ -38,15 +52,22 @@ class ExcludableItems:
def __init__(self) -> None:
self.items: Dict[str, Excludable] = dict()
@classmethod
def from_excludable(cls, other: "ExcludableItems") -> "ExcludableItems":
new_excludable = cls()
for key, value in other.items.items():
new_excludable.items[key] = value.get_copy()
return new_excludable
def get(self, model_cls: Type["Model"], alias: str = "") -> Excludable:
key = f"{alias + '_' if alias else ''}{model_cls.get_name(lower=True)}"
return self.items.get(key, Excludable())
def build(
self,
items: Union[List[str], str, Tuple[str], Set[str], Dict],
model_cls: Type["Model"],
is_exclude: bool = False,
self,
items: Union[List[str], str, Tuple[str], Set[str], Dict],
model_cls: Type["Model"],
is_exclude: bool = False,
) -> None:
if isinstance(items, str):
@ -75,7 +96,7 @@ class ExcludableItems:
)
def _set_excludes(
self, items: Set, model_name: str, is_exclude: bool, alias: str = ""
self, items: Set, model_name: str, is_exclude: bool, alias: str = ""
) -> None:
key = f"{alias + '_' if alias else ''}{model_name}"
@ -86,13 +107,13 @@ class ExcludableItems:
self.items[key] = excludable
def _traverse_dict( # noqa: CFQ002
self,
values: Dict,
source_model: Type["Model"],
model_cls: Type["Model"],
is_exclude: bool,
related_items: List = None,
alias: str = "",
self,
values: Dict,
source_model: Type["Model"],
model_cls: Type["Model"],
is_exclude: bool,
related_items: List = None,
alias: str = "",
) -> None:
self_fields = set()
@ -144,7 +165,7 @@ class ExcludableItems:
)
def _traverse_list(
self, values: Set[str], model_cls: Type["Model"], is_exclude: bool
self, values: Set[str], model_cls: Type["Model"], is_exclude: bool
) -> None:
# here we have only nested related keys

View File

@ -9,9 +9,10 @@ from typing import (
TYPE_CHECKING,
Type,
TypeVar,
Union,
Union, cast,
)
from ormar.models.excludable import ExcludableItems
from ormar.models.mixins.relation_mixin import RelationMixin
from ormar.queryset.utils import translate_list_to_dict, update
@ -35,7 +36,7 @@ class ExcludableMixin(RelationMixin):
@staticmethod
def get_child(
items: Union[Set, Dict, None], key: str = None
items: Union[Set, Dict, None], key: str = None
) -> Union[Set, Dict, None]:
"""
Used to get nested dictionaries keys if they exists otherwise returns
@ -53,7 +54,7 @@ class ExcludableMixin(RelationMixin):
@staticmethod
def get_excluded(
exclude: Union[Set, Dict, None], key: str = None
exclude: Union[Set, Dict, None], key: str = None
) -> Union[Set, Dict, None]:
"""
Proxy to ExcludableMixin.get_child for exclusions.
@ -69,7 +70,7 @@ class ExcludableMixin(RelationMixin):
@staticmethod
def get_included(
include: Union[Set, Dict, None], key: str = None
include: Union[Set, Dict, None], key: str = None
) -> Union[Set, Dict, None]:
"""
Proxy to ExcludableMixin.get_child for inclusions.
@ -131,9 +132,9 @@ class ExcludableMixin(RelationMixin):
@staticmethod
def _populate_pk_column(
model: Union[Type["Model"], Type["ModelRow"]],
columns: List[str],
use_alias: bool = False,
model: Union[Type["Model"], Type["ModelRow"]],
columns: List[str],
use_alias: bool = False,
) -> List[str]:
"""
Adds primary key column/alias (depends on use_alias flag) to list of
@ -159,12 +160,13 @@ class ExcludableMixin(RelationMixin):
@classmethod
def own_table_columns(
cls,
model: Union[Type["Model"], Type["ModelRow"]],
fields: Optional[Union[Set, Dict]],
exclude_fields: Optional[Union[Set, Dict]],
use_alias: bool = False,
cls,
model: Union[Type["Model"], Type["ModelRow"]],
excludable: ExcludableItems,
alias: str = '',
use_alias: bool = False,
) -> List[str]:
# TODO update docstring
"""
Returns list of aliases or field names for given model.
Aliases/names switch is use_alias flag.
@ -176,15 +178,12 @@ class ExcludableMixin(RelationMixin):
:param model: model on columns are selected
:type model: Type["Model"]
:param fields: set/dict of fields to include
:type fields: Optional[Union[Set, Dict]]
:param exclude_fields: set/dict of fields to exclude
:type exclude_fields: Optional[Union[Set, Dict]]
:param use_alias: flag if aliases or field names should be used
:type use_alias: bool
:return: list of column field names or aliases
:rtype: List[str]
"""
model_excludable = excludable.get(model_cls=model, alias=alias)
columns = [
model.get_column_name_from_alias(col.name) if not use_alias else col.name
for col in model.Meta.table.columns
@ -193,17 +192,17 @@ class ExcludableMixin(RelationMixin):
model.get_column_name_from_alias(col.name)
for col in model.Meta.table.columns
]
if fields:
if model_excludable.include:
columns = [
col
for col, name in zip(columns, field_names)
if model.is_included(fields, name)
if model_excludable.is_included(name)
]
if exclude_fields:
if model_excludable.exclude:
columns = [
col
for col, name in zip(columns, field_names)
if not model.is_excluded(exclude_fields, name)
if not model_excludable.is_excluded(name)
]
# always has to return pk column for ormar to work
@ -215,9 +214,9 @@ class ExcludableMixin(RelationMixin):
@classmethod
def _update_excluded_with_related_not_required(
cls,
exclude: Union["AbstractSetIntStr", "MappingIntStrAny", None],
nested: bool = False,
cls,
exclude: Union["AbstractSetIntStr", "MappingIntStrAny", None],
nested: bool = False,
) -> Union[Set, Dict]:
"""
Used during generation of the dict().
@ -245,9 +244,9 @@ class ExcludableMixin(RelationMixin):
@classmethod
def get_names_to_exclude(
cls,
fields: Optional[Union[Dict, Set]] = None,
exclude_fields: Optional[Union[Dict, Set]] = None,
cls,
excludable: ExcludableItems,
alias: str
) -> Set:
"""
Returns a set of models field names that should be explicitly excluded
@ -259,33 +258,27 @@ class ExcludableMixin(RelationMixin):
Used in parsing data from database rows that construct Models by initializing
them with dicts constructed from those db rows.
:param fields: set/dict of fields to include
:type fields: Optional[Union[Set, Dict]]
:param exclude_fields: set/dict of fields to exclude
:type exclude_fields: Optional[Union[Set, Dict]]
:param alias: alias of current relation
:type alias: str
:param excludable: structure of fields to include and exclude
:type excludable: ExcludableItems
:return: set of field names that should be excluded
:rtype: Set
"""
model = cast(Type["Model"], cls)
model_excludable = excludable.get(model_cls=model, alias=alias)
fields_names = cls.extract_db_own_fields()
if fields and fields is not Ellipsis:
fields_to_keep = {name for name in fields if name in fields_names}
if model_excludable.include and model_excludable.include_all:
fields_to_keep = model_excludable.include.intersection(fields_names)
else:
fields_to_keep = fields_names
fields_to_exclude = fields_names - fields_to_keep
if isinstance(exclude_fields, Set):
if model_excludable.exclude:
fields_to_exclude = fields_to_exclude.union(
{name for name in exclude_fields if name in fields_names}
model_excludable.exclude.intersection(fields_names)
)
elif isinstance(exclude_fields, Dict):
new_to_exclude = {
name
for name in exclude_fields
if name in fields_names and exclude_fields[name] is Ellipsis
}
fields_to_exclude = fields_to_exclude.union(new_to_exclude)
fields_to_exclude = fields_to_exclude - {cls.Meta.pkname}
return fields_to_exclude

View File

@ -14,6 +14,7 @@ from typing import (
import sqlalchemy
from ormar.models import NewBaseModel # noqa: I202
from ormar.models.excludable import ExcludableItems
from ormar.models.helpers.models import group_related_list
@ -33,8 +34,7 @@ class ModelRow(NewBaseModel):
select_related: List = None,
related_models: Any = None,
related_field: Type["ForeignKeyField"] = None,
fields: Optional[Union[Dict, Set]] = None,
exclude_fields: Optional[Union[Dict, Set]] = None,
excludable: ExcludableItems = None,
current_relation_str: str = "",
) -> Optional[T]:
"""
@ -50,6 +50,8 @@ class ModelRow(NewBaseModel):
where rows are populated in a different way as they do not have
nested models in result.
:param excludable: structure of fields to include and exclude
:type excludable: ExcludableItems
:param current_relation_str: name of the relation field
:type current_relation_str: str
:param source_model: model on which relation was defined
@ -62,12 +64,6 @@ class ModelRow(NewBaseModel):
:type related_models: Union[List, Dict]
:param related_field: field with relation declaration
:type related_field: Type[ForeignKeyField]
:param fields: fields and related model fields to include
if provided only those are included
:type fields: Optional[Union[Dict, Set]]
:param exclude_fields: fields and related model fields to exclude
excludes the fields even if they are provided in fields
:type exclude_fields: Optional[Union[Dict, Set]]
:return: returns model if model is populated from database
:rtype: Optional[Model]
"""
@ -75,6 +71,7 @@ class ModelRow(NewBaseModel):
select_related = select_related or []
related_models = related_models or []
table_prefix = ""
excludable = excludable or ExcludableItems()
if select_related:
source_model = cast(Type[T], cls)
@ -87,12 +84,11 @@ class ModelRow(NewBaseModel):
relation_field=related_field,
)
item = cls.populate_nested_models_from_row(
item = cls._populate_nested_models_from_row(
item=item,
row=row,
related_models=related_models,
fields=fields,
exclude_fields=exclude_fields,
excludable=excludable,
current_relation_str=current_relation_str,
source_model=source_model,
)
@ -100,28 +96,26 @@ class ModelRow(NewBaseModel):
item=item,
row=row,
table_prefix=table_prefix,
fields=fields,
exclude_fields=exclude_fields,
excludable=excludable
)
instance: Optional[T] = None
if item.get(cls.Meta.pkname, None) is not None:
item["__excluded__"] = cls.get_names_to_exclude(
fields=fields, exclude_fields=exclude_fields
excludable=excludable, alias=table_prefix
)
instance = cast(T, cls(**item))
instance.set_save_status(True)
return instance
@classmethod
def populate_nested_models_from_row( # noqa: CFQ002
def _populate_nested_models_from_row( # noqa: CFQ002
cls,
item: dict,
row: sqlalchemy.engine.ResultProxy,
source_model: Type[T],
related_models: Any,
fields: Optional[Union[Dict, Set]] = None,
exclude_fields: Optional[Union[Dict, Set]] = None,
excludable: ExcludableItems,
current_relation_str: str = None,
) -> dict:
"""
@ -134,6 +128,8 @@ class ModelRow(NewBaseModel):
Recurrently calls from_row method on nested instances and create nested
instances. In the end those instances are added to the final model dictionary.
:param excludable: structure of fields to include and exclude
:type excludable: ExcludableItems
:param source_model: source model from which relation started
:type source_model: Type[Model]
:param current_relation_str: joined related parts into one string
@ -144,12 +140,6 @@ class ModelRow(NewBaseModel):
:type row: sqlalchemy.engine.result.ResultProxy
:param related_models: list or dict of related models
:type related_models: Union[Dict, List]
:param fields: fields and related model fields to include -
if provided only those are included
:type fields: Optional[Union[Dict, Set]]
:param exclude_fields: fields and related model fields to exclude
excludes the fields even if they are provided in fields
:type exclude_fields: Optional[Union[Dict, Set]]
:return: dictionary with keys corresponding to model fields names
and values are database values
:rtype: Dict
@ -163,8 +153,6 @@ class ModelRow(NewBaseModel):
)
field = cls.Meta.model_fields[related]
field = cast(Type["ForeignKeyField"], field)
fields = cls.get_included(fields, related)
exclude_fields = cls.get_excluded(exclude_fields, related)
model_cls = field.to
remainder = None
@ -174,8 +162,7 @@ class ModelRow(NewBaseModel):
row,
related_models=remainder,
related_field=field,
fields=fields,
exclude_fields=exclude_fields,
excludable=excludable,
current_relation_str=relation_str,
source_model=source_model,
)
@ -188,8 +175,7 @@ class ModelRow(NewBaseModel):
row=row,
related=related,
through_name=through_name,
fields=fields,
exclude_fields=exclude_fields,
excludable=excludable
)
item[through_name] = through_child
setattr(child, through_name, through_child)
@ -203,35 +189,29 @@ class ModelRow(NewBaseModel):
row: sqlalchemy.engine.ResultProxy,
through_name: str,
related: str,
fields: Optional[Union[Dict, Set]] = None,
exclude_fields: Optional[Union[Dict, Set]] = None,
excludable: ExcludableItems
) -> Dict:
# TODO: fix excludes and includes
fields = cls.get_included(fields, through_name)
# exclude_fields = cls.get_excluded(exclude_fields, through_name)
# TODO: fix excludes and includes and docstring
model_cls = cls.Meta.model_fields[through_name].to
exclude_fields = model_cls.extract_related_names()
table_prefix = cls.Meta.alias_manager.resolve_relation_alias(
from_model=cls, relation_name=related
)
child = model_cls.extract_prefixed_table_columns(
item={},
row=row,
table_prefix=table_prefix,
fields=fields,
exclude_fields=exclude_fields,
excludable=excludable,
table_prefix=table_prefix
)
return child
@classmethod
def extract_prefixed_table_columns( # noqa CCR001
def extract_prefixed_table_columns(
cls,
item: dict,
row: sqlalchemy.engine.result.ResultProxy,
table_prefix: str,
fields: Optional[Union[Dict, Set]] = None,
exclude_fields: Optional[Union[Dict, Set]] = None,
) -> dict:
excludable: ExcludableItems
) -> Dict:
"""
Extracts own fields from raw sql result, using a given prefix.
Prefix changes depending on the table's position in a join.
@ -244,6 +224,8 @@ class ModelRow(NewBaseModel):
Used in Model.from_row and PrefetchQuery._populate_rows methods.
:param excludable: structure of fields to include and exclude
:type excludable: ExcludableItems
:param item: dictionary of already populated nested models, otherwise empty dict
:type item: Dict
:param row: raw result row from the database
@ -252,12 +234,6 @@ class ModelRow(NewBaseModel):
each pair of tables have own prefix (two of them depending on direction) -
used in joins to allow multiple joins to the same table.
:type table_prefix: str
:param fields: fields and related model fields to include -
if provided only those are included
:type fields: Optional[Union[Dict, Set]]
:param exclude_fields: fields and related model fields to exclude
excludes the fields even if they are provided in fields
:type exclude_fields: Optional[Union[Dict, Set]]
:return: dictionary with keys corresponding to model fields names
and values are database values
:rtype: Dict
@ -267,8 +243,8 @@ class ModelRow(NewBaseModel):
selected_columns = cls.own_table_columns(
model=cls,
fields=fields or {},
exclude_fields=exclude_fields or {},
excludable=excludable,
alias=table_prefix,
use_alias=False,
)