fix recursion limit for complicated models structures with many loops
This commit is contained in:
@ -4,11 +4,10 @@ from typing import (
|
||||
Optional,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
Union,
|
||||
)
|
||||
|
||||
from ormar import BaseField
|
||||
from ormar.models.traversible import NodeList
|
||||
|
||||
|
||||
class RelationMixin:
|
||||
@ -17,7 +16,7 @@ class RelationMixin:
|
||||
"""
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import ModelMeta, Model
|
||||
from ormar import ModelMeta
|
||||
|
||||
Meta: ModelMeta
|
||||
_related_names: Optional[Set]
|
||||
@ -135,61 +134,37 @@ class RelationMixin:
|
||||
|
||||
@classmethod
|
||||
def _iterate_related_models( # noqa: CCR001
|
||||
cls,
|
||||
visited: Set[str] = None,
|
||||
source_visited: Set[str] = None,
|
||||
source_relation: str = None,
|
||||
source_model: Union[Type["Model"], Type["RelationMixin"]] = None,
|
||||
cls, node_list: NodeList = None, source_relation: str = None
|
||||
) -> List[str]:
|
||||
"""
|
||||
Iterates related models recursively to extract relation strings of
|
||||
nested not visited models.
|
||||
|
||||
:param visited: set of already visited models
|
||||
:type visited: Set[str]
|
||||
:param source_relation: name of the current relation
|
||||
:type source_relation: str
|
||||
:param source_model: model from which relation comes in nested relations
|
||||
:type source_model: Type["Model"]
|
||||
:return: list of relation strings to be passed to select_related
|
||||
:rtype: List[str]
|
||||
"""
|
||||
source_visited = source_visited or cls._populate_source_model_prefixes()
|
||||
if not node_list:
|
||||
node_list = NodeList()
|
||||
current_node = node_list.add(node_class=cls)
|
||||
else:
|
||||
current_node = node_list[-1]
|
||||
relations = cls.extract_related_names()
|
||||
processed_relations = []
|
||||
for relation in relations:
|
||||
target_model = cls.Meta.model_fields[relation].to
|
||||
if cls._is_reverse_side_of_same_relation(source_model, target_model):
|
||||
continue
|
||||
if target_model not in source_visited or not source_model:
|
||||
if not current_node.visited(relation):
|
||||
target_model = cls.Meta.model_fields[relation].to
|
||||
node_list.add(
|
||||
node_class=target_model,
|
||||
relation_name=relation,
|
||||
parent_node=current_node,
|
||||
)
|
||||
deep_relations = target_model._iterate_related_models(
|
||||
visited=visited,
|
||||
source_visited=source_visited,
|
||||
source_relation=relation,
|
||||
source_model=cls,
|
||||
source_relation=relation, node_list=node_list
|
||||
)
|
||||
processed_relations.extend(deep_relations)
|
||||
else:
|
||||
processed_relations.append(relation)
|
||||
|
||||
return cls._get_final_relations(processed_relations, source_relation)
|
||||
|
||||
@staticmethod
|
||||
def _is_reverse_side_of_same_relation(
|
||||
source_model: Optional[Union[Type["Model"], Type["RelationMixin"]]],
|
||||
target_model: Type["Model"],
|
||||
) -> bool:
|
||||
"""
|
||||
Alias to check if source model is the same as target
|
||||
:param source_model: source model - relation comes from it
|
||||
:type source_model: Type["Model"]
|
||||
:param target_model: target model - relation leads to it
|
||||
:type target_model: Type["Model"]
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
return bool(source_model and target_model == source_model)
|
||||
|
||||
@staticmethod
|
||||
def _get_final_relations(
|
||||
processed_relations: List, source_relation: Optional[str]
|
||||
@ -212,12 +187,3 @@ class RelationMixin:
|
||||
else:
|
||||
final_relations = [source_relation] if source_relation else []
|
||||
return final_relations
|
||||
|
||||
@classmethod
|
||||
def _populate_source_model_prefixes(cls) -> Set:
|
||||
relations = cls.extract_related_names()
|
||||
visited = {cls}
|
||||
for relation in relations:
|
||||
target_model = cls.Meta.model_fields[relation].to
|
||||
visited.add(target_model)
|
||||
return visited
|
||||
|
||||
118
ormar/models/traversible.py
Normal file
118
ormar/models/traversible.py
Normal file
@ -0,0 +1,118 @@
|
||||
from typing import Any, List, Optional, TYPE_CHECKING, Type
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar.models.mixins.relation_mixin import RelationMixin
|
||||
|
||||
|
||||
class NodeList:
|
||||
"""
|
||||
Helper class that helps with iterating nested models
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.node_list: List["Node"] = []
|
||||
|
||||
def __getitem__(self, item: Any) -> Any:
|
||||
return self.node_list.__getitem__(item)
|
||||
|
||||
def add(
|
||||
self,
|
||||
node_class: Type["RelationMixin"],
|
||||
relation_name: str = None,
|
||||
parent_node: "Node" = None,
|
||||
) -> "Node":
|
||||
"""
|
||||
Adds new Node or returns the existing one
|
||||
|
||||
:param node_class: Model in current node
|
||||
:type node_class: ormar.models.metaclass.ModelMetaclass
|
||||
:param relation_name: name of the current relation
|
||||
:type relation_name: str
|
||||
:param parent_node: parent node
|
||||
:type parent_node: Optional[Node]
|
||||
:return: returns new or already existing node
|
||||
:rtype: Node
|
||||
"""
|
||||
existing_node = self.find(
|
||||
relation_name=relation_name, node_class=node_class, parent_node=parent_node
|
||||
)
|
||||
if not existing_node:
|
||||
current_node = Node(
|
||||
node_class=node_class,
|
||||
relation_name=relation_name,
|
||||
parent_node=parent_node,
|
||||
)
|
||||
self.node_list.append(current_node)
|
||||
return current_node
|
||||
return existing_node # pragma: no cover
|
||||
|
||||
def find(
|
||||
self,
|
||||
node_class: Type["RelationMixin"],
|
||||
relation_name: Optional[str] = None,
|
||||
parent_node: "Node" = None,
|
||||
) -> Optional["Node"]:
|
||||
"""
|
||||
Searches for existing node with given parameters
|
||||
|
||||
:param node_class: Model in current node
|
||||
:type node_class: ormar.models.metaclass.ModelMetaclass
|
||||
:param relation_name: name of the current relation
|
||||
:type relation_name: str
|
||||
:param parent_node: parent node
|
||||
:type parent_node: Optional[Node]
|
||||
:return: returns already existing node or None
|
||||
:rtype: Optional[Node]
|
||||
"""
|
||||
for node in self.node_list:
|
||||
if (
|
||||
node.node_class == node_class
|
||||
and node.parent_node == parent_node
|
||||
and node.relation_name == relation_name
|
||||
):
|
||||
return node # pragma: no cover
|
||||
return None
|
||||
|
||||
|
||||
class Node:
|
||||
def __init__(
|
||||
self,
|
||||
node_class: Type["RelationMixin"],
|
||||
relation_name: str = None,
|
||||
parent_node: "Node" = None,
|
||||
) -> None:
|
||||
self.relation_name = relation_name
|
||||
self.node_class = node_class
|
||||
self.parent_node = parent_node
|
||||
self.visited_children: List["Node"] = []
|
||||
if self.parent_node:
|
||||
self.parent_node.visited_children.append(self)
|
||||
|
||||
def __repr__(self) -> str: # pragma: no cover
|
||||
return (
|
||||
f"{self.node_class.get_name(lower=False)}, "
|
||||
f"relation:{self.relation_name}, "
|
||||
f"parent: {self.parent_node}"
|
||||
)
|
||||
|
||||
def visited(self, relation_name: str) -> bool:
|
||||
"""
|
||||
Checks if given relation was already visited.
|
||||
|
||||
Relation was visited if it's name is in current node children.
|
||||
|
||||
Relation was visited if one of the parent node had the same Model class
|
||||
|
||||
:param relation_name: name of relation
|
||||
:type relation_name: str
|
||||
:return: result of the check
|
||||
:rtype: bool
|
||||
"""
|
||||
target_model = self.node_class.Meta.model_fields[relation_name].to
|
||||
if self.parent_node:
|
||||
node = self
|
||||
while node.parent_node:
|
||||
node = node.parent_node
|
||||
if node.node_class == target_model:
|
||||
return True
|
||||
return False
|
||||
Reference in New Issue
Block a user