Add benchmarking test suite and greatly improve performance in a few cases (#948)

* Add benchmarking test suite

* Improve amortized time of model relation loads with a large number of rows

* Improve performance of loading models with many related models

* Improve performance of loading models with many related models to O(N)ish

* Fix bug where N model creation with shared related model would build in N^2 time

* Lower blocking time for queryset results

* Add docstrings and streamline hash code

Co-authored-by: haydeec1 <Eric.Haydel@jhuapl.edu>
This commit is contained in:
erichaydel
2022-12-10 11:12:11 -05:00
committed by GitHub
parent 171ef2ffaa
commit 7c18fa55e7
25 changed files with 1250 additions and 230 deletions

View File

@ -16,6 +16,31 @@ class MergeModelMixin:
in the end all parent (main) models should be unique.
"""
@classmethod
def _recursive_add(cls, model_group: List["Model"]) -> List["Model"]:
"""
Instead of accumulating the model additions one by one, this recursively adds
the models. E.G.
[1, 2, 3, 4].accumulate_add() would give [3, 3, 4], then [6, 4], then [10]
where this method looks like
[1, 2, 3, 4].recursive_add() gives [[3], [7]], [10]
It's the same number of adds, but it gives better O(N) performance on sublists
"""
if len(model_group) <= 1:
return model_group
added_values = []
iterable_group = iter(model_group)
for model in iterable_group:
next_model = next(iterable_group, None)
if next_model is not None:
combined = cls.merge_two_instances(next_model, model)
else:
combined = model
added_values.append(combined)
return cls._recursive_add(added_values)
@classmethod
def merge_instances_list(cls, result_rows: List["Model"]) -> List["Model"]:
"""
@ -37,10 +62,7 @@ class MergeModelMixin:
grouped_instances.setdefault(model.pk, []).append(model)
for group in grouped_instances.values():
model = group.pop(0)
if group:
for next_model in group:
model = cls.merge_two_instances(next_model, model)
model = cls._recursive_add(group)[0]
merged_rows.append(model)
return merged_rows

View File

@ -18,6 +18,7 @@ from typing import (
Union,
cast,
)
import functools
import databases
import pydantic
@ -41,6 +42,7 @@ from ormar.models.modelproxy import ModelTableProxy
from ormar.models.utils import Extra
from ormar.queryset.utils import translate_list_to_dict
from ormar.relations.alias_manager import AliasManager
from ormar.relations.relation import Relation
from ormar.relations.relation_manager import RelationsManager
if TYPE_CHECKING: # pragma no cover
@ -66,7 +68,14 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
the logic concerned with database connection and data persistance.
"""
__slots__ = ("_orm_id", "_orm_saved", "_orm", "_pk_column", "__pk_only__")
__slots__ = (
"_orm_id",
"_orm_saved",
"_orm",
"_pk_column",
"__pk_only__",
"__cached_hash__",
)
if TYPE_CHECKING: # pragma no cover
pk: Any
@ -78,6 +87,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
__metadata__: sqlalchemy.MetaData
__database__: databases.Database
__relation_map__: Optional[List[str]]
__cached_hash__: Optional[int]
_orm_relationship_manager: AliasManager
_orm: RelationsManager
_orm_id: int
@ -171,12 +181,22 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: None
:rtype: None
"""
prev_hash = hash(self)
if hasattr(self, name):
object.__setattr__(self, name, value)
else:
# let pydantic handle errors for unknown fields
super().__setattr__(name, value)
# In this case, the hash could have changed, so update it
if name == self.Meta.pkname or self.pk is None:
object.__setattr__(self, "__cached_hash__", None)
new_hash = hash(self)
if prev_hash != new_hash:
self._update_relation_cache(prev_hash, new_hash)
def __getattr__(self, item: str) -> Any:
"""
Used only to silence mypy errors for Through models and reverse relations.
@ -213,6 +233,26 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
for name, value in relations.items():
setattr(self, name, value)
def _update_relation_cache(self, prev_hash: int, new_hash: int) -> None:
"""
Update all relation proxy caches with different hash if we have changed
:param prev_hash: The previous hash to update
:type prev_hash: int
:param new_hash: The hash to update to
:type new_hash: int
"""
def _update_cache(relations: List[Relation], recurse: bool = True) -> None:
for relation in relations:
relation_proxy = relation.get()
if hasattr(relation_proxy, "update_cache"):
relation_proxy.update_cache(prev_hash, new_hash) # type: ignore
elif recurse and hasattr(relation_proxy, "_orm"):
_update_cache(relation_proxy._orm._relations.values(), recurse=False) # type: ignore
_update_cache(list(self._orm._relations.values()))
def _internal_set(self, name: str, value: Any) -> None:
"""
Delegates call to pydantic.
@ -353,6 +393,23 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
return self.__same__(other)
return super().__eq__(other) # pragma no cover
def __hash__(self) -> int:
if getattr(self, "__cached_hash__", None) is not None:
return self.__cached_hash__ or 0
if self.pk is not None:
ret = hash(str(self.pk) + self.__class__.__name__)
else:
vals = {
k: v
for k, v in self.__dict__.items()
if k not in self.extract_related_names()
}
ret = hash(str(vals) + self.__class__.__name__)
object.__setattr__(self, "__cached_hash__", ret)
return ret
def __same__(self, other: "NewBaseModel") -> bool:
"""
Used by __eq__, compares other model to this model.
@ -365,23 +422,12 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:return: result of comparison
:rtype: bool
"""
return (
# self._orm_id == other._orm_id
(self.pk == other.pk and self.pk is not None)
or (
(self.pk is None and other.pk is None)
and {
k: v
for k, v in self.__dict__.items()
if k not in self.extract_related_names()
}
== {
k: v
for k, v in other.__dict__.items()
if k not in other.extract_related_names()
}
)
)
if (self.pk is None and other.pk is not None) or (
self.pk is not None and other.pk is None
):
return False
else:
return hash(self) == other.__hash__()
def _copy_and_set_values(
self: "NewBaseModel", values: "DictStrAny", fields_set: "SetStr", *, deep: bool
@ -391,7 +437,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
recursion through related fields.
"""
self_dict = values
self_dict.update(self.dict())
self_dict.update(self.dict(exclude_list=True))
return cast(
"NewBaseModel",
super()._copy_and_set_values(
@ -630,6 +676,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
exclude: Optional[Dict],
exclude_primary_keys: bool,
exclude_through_models: bool,
exclude_list: bool,
) -> Dict:
"""
Traverse nested models and converts them into dictionaries.
@ -643,6 +690,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:type include: Optional[Dict]
:param exclude: fields to exclude
:type exclude: Optional[Dict]
:param exclude: whether to exclude lists
:type exclude: bool
:return: current model dict with child models converted to dictionaries
:rtype: Dict
"""
@ -656,6 +705,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
try:
nested_model = getattr(self, field)
if isinstance(nested_model, MutableSequence):
if exclude_list:
continue
dict_instance[field] = self._extract_nested_models_from_list(
relation_map=self._skip_ellipsis( # type: ignore
relation_map, field, default_return=dict()
@ -695,6 +747,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
exclude_none: bool = False,
exclude_primary_keys: bool = False,
exclude_through_models: bool = False,
exclude_list: bool = False,
relation_map: Dict = None,
) -> "DictStrAny": # noqa: A003'
"""
@ -724,6 +777,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
:type exclude_defaults: bool
:param exclude_none: flag to exclude None values - passed to pydantic
:type exclude_none: bool
:param exclude_list: flag to exclude lists of nested values models from dict
:type exclude_list: bool
:param relation_map: map of the relations to follow to avoid circural deps
:type relation_map: Dict
:return:
@ -769,6 +824,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
exclude=exclude, # type: ignore
exclude_primary_keys=exclude_primary_keys,
exclude_through_models=exclude_through_models,
exclude_list=exclude_list,
)
# include model properties as fields in dict

View File

@ -1,3 +1,4 @@
import asyncio
from typing import (
Any,
Dict,
@ -173,7 +174,7 @@ class QuerySet(Generic[T]):
)
return await query.prefetch_related(models=models, rows=rows) # type: ignore
def _process_query_result_rows(self, rows: List) -> List["T"]:
async def _process_query_result_rows(self, rows: List) -> List["T"]:
"""
Process database rows and initialize ormar Model from each of the rows.
@ -182,16 +183,19 @@ class QuerySet(Generic[T]):
:return: list of models
:rtype: List[Model]
"""
result_rows = [
self.model.from_row(
row=row,
select_related=self._select_related,
excludable=self._excludable,
source_model=self.model,
proxy_source_model=self.proxy_source_model,
result_rows = []
for row in rows:
result_rows.append(
self.model.from_row(
row=row,
select_related=self._select_related,
excludable=self._excludable,
source_model=self.model,
proxy_source_model=self.proxy_source_model,
)
)
for row in rows
]
await asyncio.sleep(0)
if result_rows:
return self.model.merge_instances_list(result_rows) # type: ignore
return cast(List["T"], result_rows)
@ -914,7 +918,7 @@ class QuerySet(Generic[T]):
+ self.order_bys,
)
rows = await self.database.fetch_all(expr)
processed_rows = self._process_query_result_rows(rows)
processed_rows = await self._process_query_result_rows(rows)
if self._prefetch_related and processed_rows:
processed_rows = await self._prefetch_related_models(processed_rows, rows)
self.check_single_result_rows_count(processed_rows)
@ -979,7 +983,7 @@ class QuerySet(Generic[T]):
expr = self.build_select_expression()
rows = await self.database.fetch_all(expr)
processed_rows = self._process_query_result_rows(rows)
processed_rows = await self._process_query_result_rows(rows)
if self._prefetch_related and processed_rows:
processed_rows = await self._prefetch_related_models(processed_rows, rows)
self.check_single_result_rows_count(processed_rows)
@ -1050,7 +1054,7 @@ class QuerySet(Generic[T]):
expr = self.build_select_expression()
rows = await self.database.fetch_all(expr)
result_rows = self._process_query_result_rows(rows)
result_rows = await self._process_query_result_rows(rows)
if self._prefetch_related and result_rows:
result_rows = await self._prefetch_related_models(result_rows, rows)
@ -1098,12 +1102,12 @@ class QuerySet(Generic[T]):
rows.append(row)
continue
yield self._process_query_result_rows(rows)[0]
yield (await self._process_query_result_rows(rows))[0]
last_primary_key = current_primary_key
rows = [row]
if rows:
yield self._process_query_result_rows(rows)[0]
yield (await self._process_query_result_rows(rows))[0]
async def create(self, **kwargs: Any) -> "T":
"""
@ -1138,7 +1142,10 @@ class QuerySet(Generic[T]):
if not objects:
raise ModelListEmptyError("Bulk create objects are empty!")
ready_objects = [obj.prepare_model_to_save(obj.dict()) for obj in objects]
ready_objects = []
for obj in objects:
ready_objects.append(obj.prepare_model_to_save(obj.dict()))
await asyncio.sleep(0) # Allow context switching to prevent blocking
# don't use execute_many, as in databases it's executed in a loop
# instead of using execute_many from drivers
@ -1196,6 +1203,7 @@ class QuerySet(Generic[T]):
ready_objects.append(
{"new_" + k: v for k, v in new_kwargs.items() if k in columns}
)
await asyncio.sleep(0)
pk_column = self.model_meta.table.c.get(self.model.get_column_alias(pk_name))
pk_column_name = self.model.get_column_alias(pk_name)

View File

@ -128,15 +128,20 @@ class Relation(Generic[T]):
"""
if not isinstance(self.related_models, RelationProxy): # pragma nocover
raise ValueError("Cannot find existing models in parent relation type")
if self._to_remove:
self._clean_related()
for ind, relation_child in enumerate(self.related_models[:]):
if child not in self.related_models:
return None
else:
# We need to clear the weakrefs that don't point to anything anymore
# There's an assumption here that if some of the related models went out of scope,
# then they all did, so we can just check the first one
try:
if relation_child == child:
return ind
except ReferenceError: # pragma no cover
self._to_remove.add(ind)
return None
self.related_models[0].__repr__.__self__
return self.related_models.index(child)
except ReferenceError:
missing = self.related_models._get_list_of_missing_weakrefs()
self._to_remove.update(missing)
return self.related_models.index(child)
def add(self, child: "Model") -> None:
"""
@ -186,6 +191,8 @@ class Relation(Generic[T]):
:return: related model/models if set
:rtype: Optional[Union[List[Model], Model]]
"""
if self._to_remove:
self._clean_related()
return self.related_models
def __repr__(self) -> str: # pragma no cover

View File

@ -1,4 +1,15 @@
from typing import Any, Generic, List, Optional, TYPE_CHECKING, Type, TypeVar
from typing import (
Any,
Dict,
Generic,
List,
Optional,
TYPE_CHECKING,
Set,
Type,
TypeVar,
)
from typing_extensions import SupportsIndex
import ormar
from ormar.exceptions import NoMatch, RelationshipInstanceError
@ -26,7 +37,6 @@ class RelationProxy(Generic[T], List[T]):
field_name: str,
data_: Any = None,
) -> None:
super().__init__(data_ or ())
self.relation: "Relation[T]" = relation
self.type_: "RelationType" = type_
self.field_name = field_name
@ -36,6 +46,20 @@ class RelationProxy(Generic[T], List[T]):
)
self._related_field_name: Optional[str] = None
self._relation_cache: Dict[int, int] = {}
validated_data = []
if data_ is not None:
idx = 0
for d in data_:
try:
self._relation_cache[d.__hash__()] = idx
validated_data.append(d)
idx += 1
except ReferenceError:
pass
super().__init__(validated_data or ())
@property
def related_field_name(self) -> str:
"""
@ -55,6 +79,101 @@ class RelationProxy(Generic[T], List[T]):
def __getitem__(self, item: Any) -> "T": # type: ignore
return super().__getitem__(item)
def append(self, item: "T") -> None:
"""
Appends an item to the list in place
:param item: The generic item of the list
:type item: T
"""
idx = len(self)
self._relation_cache[item.__hash__()] = idx
super().append(item)
def update_cache(self, prev_hash: int, new_hash: int) -> None:
"""
Updates the cache from the old hash to the new one.
This maintains the index cache, which allows O(1) indexing and
existence checks
:param prev_hash: The hash to update
:type prev_hash: int
:param prev_hash: The new hash to update to
:type new_hash: int
"""
try:
idx = self._relation_cache.pop(prev_hash)
self._relation_cache[new_hash] = idx
except KeyError:
pass
def index(self, item: T, *args: Any) -> int:
"""
Gets the index of the item in the list
:param item: The item to get the index of
:type item: "T"
"""
return self._relation_cache[item.__hash__()]
def _get_list_of_missing_weakrefs(self) -> Set[int]:
"""
Iterates through the list and checks for weakrefs.
:return: The set of missing weakref indices
:rtype: Set[int]
"""
to_remove = set()
for ind, relation_child in enumerate(self[:]):
try:
relation_child.__repr__.__self__ # type: ignore
except ReferenceError: # pragma no cover
to_remove.add(ind)
return to_remove
def pop(self, index: SupportsIndex = 0) -> T:
"""
Pops the index off the list and returns it. By default,
it pops off the element at index 0.
This also clears the value from the relation cache.
:param index: The index to pop
:type index: SupportsIndex
:return: The item at the provided index
:rtype: "T"
"""
item = self[index]
# Try to delete it, but do it the long way if weakly-referenced thing doesn't exist
try:
self._relation_cache.pop(item.__hash__())
except ReferenceError:
for hash_, idx in self._relation_cache.items():
if idx == index:
self._relation_cache.pop(hash_)
break
index_int = int(index)
for idx in range(index_int + 1, len(self)):
self._relation_cache[self[idx].__hash__()] -= 1
return super().pop(index)
def __contains__(self, item: object) -> bool:
"""
Checks whether the item exists in self. This relies
on the relation cache, which is a hashmap of values
in the list. It runs in O(1) time.
:param item: The item to check if the list contains
:type item: object
"""
try:
return item.__hash__() in self._relation_cache
except ReferenceError:
return False
def __getattribute__(self, item: str) -> Any:
"""
Since some QuerySetProxy methods overwrite builtin list methods we
@ -83,6 +202,7 @@ class RelationProxy(Generic[T], List[T]):
return getattr(self.queryset_proxy, item)
def _clear(self) -> None:
self._relation_cache.clear()
super().clear()
def _initialize_queryset(self) -> None:
@ -164,7 +284,10 @@ class RelationProxy(Generic[T], List[T]):
child=item,
relation_name=self.field_name,
)
super().remove(item)
index_to_remove = self._relation_cache[item.__hash__()]
self.pop(index_to_remove)
relation_name = self.related_field_name
relation = item._orm._get(relation_name)
# if relation is None: # pragma nocover
@ -200,6 +323,7 @@ class RelationProxy(Generic[T], List[T]):
:param item: child to add to relation
:type item: Model
"""
new_idx = len(self)
relation_name = self.related_field_name
await self._owner.signals.pre_relation_add.send(
sender=self._owner.__class__,
@ -215,6 +339,7 @@ class RelationProxy(Generic[T], List[T]):
else:
setattr(item, relation_name, self._owner)
await item.upsert()
self._relation_cache[item.__hash__()] = new_idx
await self._owner.signals.post_relation_add.send(
sender=self._owner.__class__,
instance=self._owner,