liniting and missin type annots

This commit is contained in:
collerek
2020-08-27 11:04:49 +02:00
parent c5389023b8
commit 279d3966b1
7 changed files with 63 additions and 37 deletions

BIN
.coverage

Binary file not shown.

View File

@ -1,5 +1,5 @@
from ormar.models.newbasemodel import NewBaseModel
from ormar.models.model import Model
from ormar.models.metaclass import expand_reverse_relationships
from ormar.models.newbasemodel import NewBaseModel # noqa I100
from ormar.models.model import Model # noqa I100
from ormar.models.metaclass import expand_reverse_relationships # noqa I100
__all__ = ["NewBaseModel", "Model", "expand_reverse_relationships"]

View File

@ -7,7 +7,7 @@ import ormar.queryset # noqa I100
from ormar.models import NewBaseModel # noqa I100
def group_related_list(list_):
def group_related_list(list_: List) -> dict:
test_dict = dict()
grouped = itertools.groupby(list_, key=lambda x: x.split("__")[0])
for key, group in grouped:
@ -45,6 +45,23 @@ class Model(NewBaseModel):
)
previous_table = cls.Meta.table.name
item = cls.populate_nested_models_from_row(
item, row, related_models, previous_table
)
item = cls.extract_prefixed_table_columns(item, row, table_prefix)
instance = cls(**item) if item.get(cls.Meta.pkname, None) is not None else None
return instance
@classmethod
def populate_nested_models_from_row(
cls,
item: dict,
row: sqlalchemy.engine.ResultProxy,
related_models: Any,
previous_table: sqlalchemy.Table,
) -> dict:
for related in related_models:
if isinstance(related_models, dict) and related_models[related]:
first_part, remainder = related, related_models[related]
@ -58,14 +75,18 @@ class Model(NewBaseModel):
child = model_cls.from_row(row, previous_table=previous_table)
item[related] = child
return item
@classmethod
def extract_prefixed_table_columns(
cls, item: dict, row: sqlalchemy.engine.result.ResultProxy, table_prefix: str
) -> dict:
for column in cls.Meta.table.columns:
if column.name not in item:
item[column.name] = row[
f'{table_prefix + "_" if table_prefix else ""}{column.name}'
]
instance = cls(**item) if item.get(cls.Meta.pkname, None) is not None else None
return instance
return item
async def save(self) -> "Model":
self_fields = self._extract_model_db_fields()
@ -85,11 +106,9 @@ class Model(NewBaseModel):
self_fields = self._extract_model_db_fields()
self_fields.pop(self.Meta.pkname)
expr = (
self.Meta.table.update()
.values(**self_fields)
.where(self.pk_column == getattr(self, self.Meta.pkname))
)
expr = self.Meta.table.update().values(**self_fields)
expr = expr.where(self.pk_column == getattr(self, self.Meta.pkname))
await self.Meta.database.execute(expr)
return self

View File

@ -1,6 +1,5 @@
import copy
import inspect
from typing import List, Set, TYPE_CHECKING
from typing import List, Optional, Set, TYPE_CHECKING
import ormar
from ormar.fields.foreign_key import ForeignKeyField
@ -80,11 +79,12 @@ class ModelTableProxy:
return self_fields
@staticmethod
def resolve_relation_name(item: "Model", related: "Model"):
def resolve_relation_name(item: "Model", related: "Model") -> Optional[str]:
for name, field in item.Meta.model_fields.items():
if issubclass(field, ForeignKeyField):
# fastapi is creating clones of response model that's why it can be a subclass
# of the original one so we need to compare Meta too
# fastapi is creating clones of response model
# that's why it can be a subclass of the original model
# so we need to compare Meta too as this one is copied as is
if field.to == related.__class__ or field.to.Meta == related.Meta:
return name

View File

@ -153,7 +153,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
def pk_type(cls) -> Any:
return cls.Meta.model_fields[cls.Meta.pkname].__type__
def remove(self, name: "Model"):
def remove(self, name: "Model") -> None:
self._orm.remove_parent(self, name)
def dict( # noqa A003

View File

@ -46,9 +46,13 @@ class Query:
def relation_manager(self) -> AliasManager:
return self.model_cls.Meta.alias_manager
@property
def prefixed_pk_name(self) -> str:
return f"{self.table.name}.{self.model_cls.Meta.pkname}"
def build_select_expression(self) -> Tuple[sqlalchemy.sql.select, List[str]]:
self.columns = list(self.table.columns)
self.order_bys = [text(f"{self.table.name}.{self.model_cls.Meta.pkname}")]
self.order_bys = [text(self.prefixed_pk_name)]
self.select_from = self.table
self._select_related.sort(key=lambda item: (item, -len(item)))

View File

@ -2,16 +2,17 @@ import string
import uuid
from enum import Enum
from random import choices
from typing import List, TYPE_CHECKING, Type, Union, Optional
from typing import List, Optional, TYPE_CHECKING, Type, Union
from weakref import proxy
import sqlalchemy
from sqlalchemy import text
import ormar
from ormar.exceptions import RelationshipInstanceError
import ormar # noqa I100
from ormar.exceptions import RelationshipInstanceError # noqa I100
from ormar.fields.foreign_key import ForeignKeyField # noqa I100
if TYPE_CHECKING: # pragma no cover
from ormar.models import Model
@ -51,20 +52,20 @@ class AliasManager:
class RelationProxy(list):
def __init__(self, relation: "Relation"):
def __init__(self, relation: "Relation") -> None:
super(RelationProxy, self).__init__()
self.relation = relation
self._owner = self.relation.manager.owner
def remove(self, item: "Model"):
def remove(self, item: "Model") -> None:
super().remove(item)
rel_name = item.resolve_relation_name(item, self._owner)
item._orm._get(rel_name).remove(self._owner)
def append(self, item: "Model"):
def append(self, item: "Model") -> None:
super().append(item)
def add(self, item):
def add(self, item: "Model") -> None:
rel_name = item.resolve_relation_name(item, self._owner)
setattr(item, rel_name, self._owner)
@ -78,7 +79,7 @@ class Relation:
RelationProxy(relation=self) if type_ == RelationType.REVERSE else None
)
def _find_existing(self, child) -> Optional[int]:
def _find_existing(self, child: "Model") -> Optional[int]:
for ind, relation_child in enumerate(self.related_models[:]):
try:
if relation_child.__same__(child):
@ -114,14 +115,14 @@ class Relation:
def get(self) -> Union[List["Model"], "Model"]:
return self.related_models
def __repr__(self): # pragma no cover
def __repr__(self) -> str: # pragma no cover
return str(self.related_models)
class RelationsManager:
def __init__(
self, related_fields: List[Type[ForeignKeyField]] = None, owner: "Model" = None
):
) -> None:
self.owner = owner
self._related_fields = related_fields or []
self._related_names = [field.name for field in self._related_fields]
@ -129,26 +130,27 @@ class RelationsManager:
for field in self._related_fields:
self._add_relation(field)
def _add_relation(self, field):
def _add_relation(self, field: Type[ForeignKeyField]) -> None:
self._relations[field.name] = Relation(
manager=self,
type_=RelationType.PRIMARY if not field.virtual else RelationType.REVERSE,
)
def __contains__(self, item):
def __contains__(self, item: str) -> bool:
return item in self._related_names
def get(self, name) -> Optional[Union[List["Model"], "Model"]]:
def get(self, name: str) -> Optional[Union[List["Model"], "Model"]]:
relation = self._relations.get(name, None)
if relation:
return relation.get()
def _get(self, name) -> Optional[Relation]:
def _get(self, name: str) -> Optional[Relation]:
relation = self._relations.get(name, None)
if relation:
return relation
def add(self, parent: "Model", child: "Model", child_name: str, virtual: bool):
@staticmethod
def add(parent: "Model", child: "Model", child_name: str, virtual: bool) -> None:
to_field = next(
(
field
@ -160,7 +162,8 @@ class RelationsManager:
if not to_field: # pragma no cover
raise RelationshipInstanceError(
f"Model {child.__class__} does not have reference to model {parent.__class__}"
f"Model {child.__class__} does not have "
f"reference to model {parent.__class__}"
)
to_name = to_field.name
@ -181,12 +184,12 @@ class RelationsManager:
parent_relation.add(child)
child._orm._get(to_name).add(parent)
def remove(self, name: str, child: "Model"):
def remove(self, name: str, child: "Model") -> None:
relation = self._get(name)
relation.remove(child)
@staticmethod
def remove_parent(item: "Model", name: Union[str, "Model"]):
def remove_parent(item: "Model", name: Union[str, "Model"]) -> None:
related_model = name
name = item.resolve_relation_name(item, related_model)
if name in item._orm: