add possibility to exclude/include fields (refactor to excludableitems), fix for through model only on related side of the relation, fix for exclude of through model related models
This commit is contained in:
@ -7,5 +7,6 @@ ass well as vast number of helper functions for pydantic, sqlalchemy and relatio
|
||||
from ormar.models.newbasemodel import NewBaseModel # noqa I100
|
||||
from ormar.models.model_row import ModelRow # noqa I100
|
||||
from ormar.models.model import Model, T # noqa I100
|
||||
from ormar.models.excludable import ExcludableItems # noqa I100
|
||||
|
||||
__all__ = ["T", "NewBaseModel", "Model", "ModelRow"]
|
||||
__all__ = ["T", "NewBaseModel", "Model", "ModelRow", "ExcludableItems"]
|
||||
|
||||
@ -7,19 +7,12 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
from ormar import Model
|
||||
|
||||
|
||||
# TODO: Add docstrings
|
||||
@dataclass
|
||||
class Excludable:
|
||||
include: Set = field(default_factory=set)
|
||||
exclude: Set = field(default_factory=set)
|
||||
|
||||
@property
|
||||
def include_all(self):
|
||||
return ... in self.include
|
||||
|
||||
@property
|
||||
def exclude_all(self):
|
||||
return ... in self.exclude
|
||||
|
||||
def get_copy(self) -> "Excludable":
|
||||
_copy = self.__class__()
|
||||
_copy.include = {x for x in self.include}
|
||||
@ -28,12 +21,9 @@ class Excludable:
|
||||
|
||||
def set_values(self, value: Set, is_exclude: bool) -> None:
|
||||
prop = "exclude" if is_exclude else "include"
|
||||
if ... in getattr(self, prop) or ... in value:
|
||||
setattr(self, prop, {...})
|
||||
else:
|
||||
current_value = getattr(self, prop)
|
||||
current_value.update(value)
|
||||
setattr(self, prop, current_value)
|
||||
current_value = getattr(self, prop)
|
||||
current_value.update(value)
|
||||
setattr(self, prop, current_value)
|
||||
|
||||
def is_included(self, key: str) -> bool:
|
||||
return (... in self.include or key in self.include) if self.include else True
|
||||
@ -61,13 +51,17 @@ class ExcludableItems:
|
||||
|
||||
def get(self, model_cls: Type["Model"], alias: str = "") -> Excludable:
|
||||
key = f"{alias + '_' if alias else ''}{model_cls.get_name(lower=True)}"
|
||||
return self.items.get(key, Excludable())
|
||||
excludable = self.items.get(key)
|
||||
if not excludable:
|
||||
excludable = Excludable()
|
||||
self.items[key] = excludable
|
||||
return excludable
|
||||
|
||||
def build(
|
||||
self,
|
||||
items: Union[List[str], str, Tuple[str], Set[str], Dict],
|
||||
model_cls: Type["Model"],
|
||||
is_exclude: bool = False,
|
||||
self,
|
||||
items: Union[List[str], str, Tuple[str], Set[str], Dict],
|
||||
model_cls: Type["Model"],
|
||||
is_exclude: bool = False,
|
||||
) -> None:
|
||||
|
||||
if isinstance(items, str):
|
||||
@ -96,7 +90,7 @@ class ExcludableItems:
|
||||
)
|
||||
|
||||
def _set_excludes(
|
||||
self, items: Set, model_name: str, is_exclude: bool, alias: str = ""
|
||||
self, items: Set, model_name: str, is_exclude: bool, alias: str = ""
|
||||
) -> None:
|
||||
|
||||
key = f"{alias + '_' if alias else ''}{model_name}"
|
||||
@ -107,13 +101,13 @@ class ExcludableItems:
|
||||
self.items[key] = excludable
|
||||
|
||||
def _traverse_dict( # noqa: CFQ002
|
||||
self,
|
||||
values: Dict,
|
||||
source_model: Type["Model"],
|
||||
model_cls: Type["Model"],
|
||||
is_exclude: bool,
|
||||
related_items: List = None,
|
||||
alias: str = "",
|
||||
self,
|
||||
values: Dict,
|
||||
source_model: Type["Model"],
|
||||
model_cls: Type["Model"],
|
||||
is_exclude: bool,
|
||||
related_items: List = None,
|
||||
alias: str = "",
|
||||
) -> None:
|
||||
|
||||
self_fields = set()
|
||||
@ -122,14 +116,13 @@ class ExcludableItems:
|
||||
if value is ...:
|
||||
self_fields.add(key)
|
||||
elif isinstance(value, set):
|
||||
related_items.append(key)
|
||||
(
|
||||
table_prefix,
|
||||
target_model,
|
||||
_,
|
||||
_,
|
||||
) = get_relationship_alias_model_and_str(
|
||||
source_model=source_model, related_parts=related_items
|
||||
source_model=source_model, related_parts=related_items + [key]
|
||||
)
|
||||
self._set_excludes(
|
||||
items=value,
|
||||
@ -165,7 +158,7 @@ class ExcludableItems:
|
||||
)
|
||||
|
||||
def _traverse_list(
|
||||
self, values: Set[str], model_cls: Type["Model"], is_exclude: bool
|
||||
self, values: Set[str], model_cls: Type["Model"], is_exclude: bool
|
||||
) -> None:
|
||||
|
||||
# here we have only nested related keys
|
||||
|
||||
@ -4,12 +4,12 @@ from typing import (
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union, cast,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
from ormar.models.excludable import ExcludableItems
|
||||
@ -36,7 +36,7 @@ class ExcludableMixin(RelationMixin):
|
||||
|
||||
@staticmethod
|
||||
def get_child(
|
||||
items: Union[Set, Dict, None], key: str = None
|
||||
items: Union[Set, Dict, None], key: str = None
|
||||
) -> Union[Set, Dict, None]:
|
||||
"""
|
||||
Used to get nested dictionaries keys if they exists otherwise returns
|
||||
@ -52,89 +52,11 @@ class ExcludableMixin(RelationMixin):
|
||||
return items.get(key, {})
|
||||
return items
|
||||
|
||||
@staticmethod
|
||||
def get_excluded(
|
||||
exclude: Union[Set, Dict, None], key: str = None
|
||||
) -> Union[Set, Dict, None]:
|
||||
"""
|
||||
Proxy to ExcludableMixin.get_child for exclusions.
|
||||
|
||||
:param exclude: bag of items to exclude
|
||||
:type exclude: Union[Set, Dict, None]
|
||||
:param key: name of the child to extract
|
||||
:type key: str
|
||||
:return: child extracted from items if exists
|
||||
:rtype: Union[Set, Dict, None]
|
||||
"""
|
||||
return ExcludableMixin.get_child(items=exclude, key=key)
|
||||
|
||||
@staticmethod
|
||||
def get_included(
|
||||
include: Union[Set, Dict, None], key: str = None
|
||||
) -> Union[Set, Dict, None]:
|
||||
"""
|
||||
Proxy to ExcludableMixin.get_child for inclusions.
|
||||
|
||||
:param include: bag of items to include
|
||||
:type include: Union[Set, Dict, None]
|
||||
:param key: name of the child to extract
|
||||
:type key: str
|
||||
:return: child extracted from items if exists
|
||||
:rtype: Union[Set, Dict, None]
|
||||
"""
|
||||
return ExcludableMixin.get_child(items=include, key=key)
|
||||
|
||||
@staticmethod
|
||||
def is_excluded(exclude: Union[Set, Dict, None], key: str = None) -> bool:
|
||||
"""
|
||||
Checks if given key should be excluded on model/ dict.
|
||||
|
||||
:param exclude: bag of items to exclude
|
||||
:type exclude: Union[Set, Dict, None]
|
||||
:param key: name of the child to extract
|
||||
:type key: str
|
||||
:return: child extracted from items if exists
|
||||
:rtype: Union[Set, Dict, None]
|
||||
"""
|
||||
if exclude is None:
|
||||
return False
|
||||
if exclude is Ellipsis: # pragma: nocover
|
||||
return True
|
||||
to_exclude = ExcludableMixin.get_excluded(exclude=exclude, key=key)
|
||||
if isinstance(to_exclude, Set):
|
||||
return key in to_exclude
|
||||
if to_exclude is ...:
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def is_included(include: Union[Set, Dict, None], key: str = None) -> bool:
|
||||
"""
|
||||
Checks if given key should be included on model/ dict.
|
||||
|
||||
:param include: bag of items to include
|
||||
:type include: Union[Set, Dict, None]
|
||||
:param key: name of the child to extract
|
||||
:type key: str
|
||||
:return: child extracted from items if exists
|
||||
:rtype: Union[Set, Dict, None]
|
||||
"""
|
||||
if include is None:
|
||||
return True
|
||||
if include is Ellipsis:
|
||||
return True
|
||||
to_include = ExcludableMixin.get_included(include=include, key=key)
|
||||
if isinstance(to_include, Set):
|
||||
return key in to_include
|
||||
if to_include is ...:
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _populate_pk_column(
|
||||
model: Union[Type["Model"], Type["ModelRow"]],
|
||||
columns: List[str],
|
||||
use_alias: bool = False,
|
||||
model: Union[Type["Model"], Type["ModelRow"]],
|
||||
columns: List[str],
|
||||
use_alias: bool = False,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Adds primary key column/alias (depends on use_alias flag) to list of
|
||||
@ -160,13 +82,12 @@ class ExcludableMixin(RelationMixin):
|
||||
|
||||
@classmethod
|
||||
def own_table_columns(
|
||||
cls,
|
||||
model: Union[Type["Model"], Type["ModelRow"]],
|
||||
excludable: ExcludableItems,
|
||||
alias: str = '',
|
||||
use_alias: bool = False,
|
||||
cls,
|
||||
model: Union[Type["Model"], Type["ModelRow"]],
|
||||
excludable: ExcludableItems,
|
||||
alias: str = "",
|
||||
use_alias: bool = False,
|
||||
) -> List[str]:
|
||||
# TODO update docstring
|
||||
"""
|
||||
Returns list of aliases or field names for given model.
|
||||
Aliases/names switch is use_alias flag.
|
||||
@ -176,6 +97,10 @@ class ExcludableMixin(RelationMixin):
|
||||
|
||||
Primary key field is always added and cannot be excluded (will be added anyway).
|
||||
|
||||
:param alias: relation prefix
|
||||
:type alias: str
|
||||
:param excludable: structure of fields to include and exclude
|
||||
:type excludable: ExcludableItems
|
||||
:param model: model on columns are selected
|
||||
:type model: Type["Model"]
|
||||
:param use_alias: flag if aliases or field names should be used
|
||||
@ -183,7 +108,7 @@ class ExcludableMixin(RelationMixin):
|
||||
:return: list of column field names or aliases
|
||||
:rtype: List[str]
|
||||
"""
|
||||
model_excludable = excludable.get(model_cls=model, alias=alias)
|
||||
model_excludable = excludable.get(model_cls=model, alias=alias) # type: ignore
|
||||
columns = [
|
||||
model.get_column_name_from_alias(col.name) if not use_alias else col.name
|
||||
for col in model.Meta.table.columns
|
||||
@ -214,9 +139,9 @@ class ExcludableMixin(RelationMixin):
|
||||
|
||||
@classmethod
|
||||
def _update_excluded_with_related_not_required(
|
||||
cls,
|
||||
exclude: Union["AbstractSetIntStr", "MappingIntStrAny", None],
|
||||
nested: bool = False,
|
||||
cls,
|
||||
exclude: Union["AbstractSetIntStr", "MappingIntStrAny", None],
|
||||
nested: bool = False,
|
||||
) -> Union[Set, Dict]:
|
||||
"""
|
||||
Used during generation of the dict().
|
||||
@ -243,11 +168,7 @@ class ExcludableMixin(RelationMixin):
|
||||
return exclude
|
||||
|
||||
@classmethod
|
||||
def get_names_to_exclude(
|
||||
cls,
|
||||
excludable: ExcludableItems,
|
||||
alias: str
|
||||
) -> Set:
|
||||
def get_names_to_exclude(cls, excludable: ExcludableItems, alias: str) -> Set:
|
||||
"""
|
||||
Returns a set of models field names that should be explicitly excluded
|
||||
during model initialization.
|
||||
@ -268,7 +189,7 @@ class ExcludableMixin(RelationMixin):
|
||||
model = cast(Type["Model"], cls)
|
||||
model_excludable = excludable.get(model_cls=model, alias=alias)
|
||||
fields_names = cls.extract_db_own_fields()
|
||||
if model_excludable.include and model_excludable.include_all:
|
||||
if model_excludable.include:
|
||||
fields_to_keep = model_excludable.include.intersection(fields_names)
|
||||
else:
|
||||
fields_to_keep = fields_names
|
||||
|
||||
@ -3,11 +3,9 @@ from typing import (
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
@ -17,7 +15,6 @@ from ormar.models import NewBaseModel # noqa: I202
|
||||
from ormar.models.excludable import ExcludableItems
|
||||
from ormar.models.helpers.models import group_related_list
|
||||
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from ormar.fields import ForeignKeyField
|
||||
from ormar.models import T
|
||||
@ -36,6 +33,7 @@ class ModelRow(NewBaseModel):
|
||||
related_field: Type["ForeignKeyField"] = None,
|
||||
excludable: ExcludableItems = None,
|
||||
current_relation_str: str = "",
|
||||
proxy_source_model: Optional[Type["ModelRow"]] = None,
|
||||
) -> Optional[T]:
|
||||
"""
|
||||
Model method to convert raw sql row from database into ormar.Model instance.
|
||||
@ -91,12 +89,10 @@ class ModelRow(NewBaseModel):
|
||||
excludable=excludable,
|
||||
current_relation_str=current_relation_str,
|
||||
source_model=source_model,
|
||||
proxy_source_model=proxy_source_model, # type: ignore
|
||||
)
|
||||
item = cls.extract_prefixed_table_columns(
|
||||
item=item,
|
||||
row=row,
|
||||
table_prefix=table_prefix,
|
||||
excludable=excludable
|
||||
item=item, row=row, table_prefix=table_prefix, excludable=excludable
|
||||
)
|
||||
|
||||
instance: Optional[T] = None
|
||||
@ -117,6 +113,7 @@ class ModelRow(NewBaseModel):
|
||||
related_models: Any,
|
||||
excludable: ExcludableItems,
|
||||
current_relation_str: str = None,
|
||||
proxy_source_model: Type[T] = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Traverses structure of related models and populates the nested models
|
||||
@ -165,20 +162,22 @@ class ModelRow(NewBaseModel):
|
||||
excludable=excludable,
|
||||
current_relation_str=relation_str,
|
||||
source_model=source_model,
|
||||
proxy_source_model=proxy_source_model,
|
||||
)
|
||||
item[model_cls.get_column_name_from_alias(related)] = child
|
||||
if field.is_multi and child:
|
||||
# TODO: way to figure out which side should be populated?
|
||||
through_name = cls.Meta.model_fields[related].through.get_name()
|
||||
# for now it's nested dict, should be instance?
|
||||
through_child = cls.populate_through_instance(
|
||||
row=row,
|
||||
related=related,
|
||||
through_name=through_name,
|
||||
excludable=excludable
|
||||
excludable=excludable,
|
||||
)
|
||||
item[through_name] = through_child
|
||||
setattr(child, through_name, through_child)
|
||||
|
||||
if child.__class__ != proxy_source_model:
|
||||
setattr(child, through_name, through_child)
|
||||
else:
|
||||
item[through_name] = through_child
|
||||
child.set_save_status(True)
|
||||
|
||||
return item
|
||||
@ -189,19 +188,24 @@ class ModelRow(NewBaseModel):
|
||||
row: sqlalchemy.engine.ResultProxy,
|
||||
through_name: str,
|
||||
related: str,
|
||||
excludable: ExcludableItems
|
||||
) -> Dict:
|
||||
# TODO: fix excludes and includes and docstring
|
||||
excludable: ExcludableItems,
|
||||
) -> "ModelRow":
|
||||
model_cls = cls.Meta.model_fields[through_name].to
|
||||
table_prefix = cls.Meta.alias_manager.resolve_relation_alias(
|
||||
from_model=cls, relation_name=related
|
||||
)
|
||||
child = model_cls.extract_prefixed_table_columns(
|
||||
item={},
|
||||
row=row,
|
||||
excludable=excludable,
|
||||
table_prefix=table_prefix
|
||||
# remove relations on through field
|
||||
model_excludable = excludable.get(model_cls=model_cls, alias=table_prefix)
|
||||
model_excludable.set_values(
|
||||
value=model_cls.extract_related_names(), is_exclude=True
|
||||
)
|
||||
child_dict = model_cls.extract_prefixed_table_columns(
|
||||
item={}, row=row, excludable=excludable, table_prefix=table_prefix
|
||||
)
|
||||
child_dict["__excluded__"] = model_cls.get_names_to_exclude(
|
||||
excludable=excludable, alias=table_prefix
|
||||
)
|
||||
child = model_cls(**child_dict) # type: ignore
|
||||
return child
|
||||
|
||||
@classmethod
|
||||
@ -210,7 +214,7 @@ class ModelRow(NewBaseModel):
|
||||
item: dict,
|
||||
row: sqlalchemy.engine.result.ResultProxy,
|
||||
table_prefix: str,
|
||||
excludable: ExcludableItems
|
||||
excludable: ExcludableItems,
|
||||
) -> Dict:
|
||||
"""
|
||||
Extracts own fields from raw sql result, using a given prefix.
|
||||
@ -242,10 +246,7 @@ class ModelRow(NewBaseModel):
|
||||
source = row._row if cls.db_backend_name() == "postgresql" else row
|
||||
|
||||
selected_columns = cls.own_table_columns(
|
||||
model=cls,
|
||||
excludable=excludable,
|
||||
alias=table_prefix,
|
||||
use_alias=False,
|
||||
model=cls, excludable=excludable, alias=table_prefix, use_alias=False,
|
||||
)
|
||||
|
||||
for column in cls.Meta.table.columns:
|
||||
|
||||
Reference in New Issue
Block a user