finish mypy checks add reqs and linting

This commit is contained in:
collerek
2020-09-29 17:06:41 +02:00
parent 3caa87057e
commit d9aa269f7a
16 changed files with 122 additions and 81 deletions

BIN
.coverage

Binary file not shown.

View File

@ -27,6 +27,7 @@ script:
- DATABASE_URL=postgresql://localhost/test_database scripts/test.sh - DATABASE_URL=postgresql://localhost/test_database scripts/test.sh
- DATABASE_URL=mysql://localhost/test_database scripts/test.sh - DATABASE_URL=mysql://localhost/test_database scripts/test.sh
- DATABASE_URL=sqlite:///test.db scripts/test.sh - DATABASE_URL=sqlite:///test.db scripts/test.sh
- mypy --config-file mypy.ini ormar
after_script: after_script:
- codecov - codecov

View File

@ -1,4 +1,4 @@
from typing import Any, List, Optional, TYPE_CHECKING, Union, Type from typing import Any, List, Optional, TYPE_CHECKING, Type, Union
import sqlalchemy import sqlalchemy
from pydantic import Field, typing from pydantic import Field, typing

View File

@ -1,4 +1,4 @@
from typing import Any, List, Optional, TYPE_CHECKING, Type, Union, Generator from typing import Any, Generator, List, Optional, TYPE_CHECKING, Type, Union
import sqlalchemy import sqlalchemy
@ -23,15 +23,15 @@ def create_dummy_instance(fk: Type["Model"], pk: Any = None) -> "Model":
def ForeignKey( # noqa CFQ002 def ForeignKey( # noqa CFQ002
to: Type["Model"], to: Type["Model"],
*, *,
name: str = None, name: str = None,
unique: bool = False, unique: bool = False,
nullable: bool = True, nullable: bool = True,
related_name: str = None, related_name: str = None,
virtual: bool = False, virtual: bool = False,
onupdate: str = None, onupdate: str = None,
ondelete: str = None, ondelete: str = None,
) -> Type["ForeignKeyField"]: ) -> Type["ForeignKeyField"]:
fk_string = to.Meta.tablename + "." + to.Meta.pkname fk_string = to.Meta.tablename + "." + to.Meta.pkname
to_field = to.__fields__[to.Meta.pkname] to_field = to.__fields__[to.Meta.pkname]
@ -74,13 +74,16 @@ class ForeignKeyField(BaseField):
@classmethod @classmethod
def _extract_model_from_sequence( def _extract_model_from_sequence(
cls, value: List, child: "Model", to_register: bool cls, value: List, child: "Model", to_register: bool
) -> List["Model"]: ) -> List["Model"]:
return [cls.expand_relationship(val, child, to_register) for val in value] # type: ignore return [
cls.expand_relationship(val, child, to_register) # type: ignore
for val in value
]
@classmethod @classmethod
def _register_existing_model( def _register_existing_model(
cls, value: "Model", child: "Model", to_register: bool cls, value: "Model", child: "Model", to_register: bool
) -> "Model": ) -> "Model":
if to_register: if to_register:
cls.register_relation(value, child) cls.register_relation(value, child)
@ -88,7 +91,7 @@ class ForeignKeyField(BaseField):
@classmethod @classmethod
def _construct_model_from_dict( def _construct_model_from_dict(
cls, value: dict, child: "Model", to_register: bool cls, value: dict, child: "Model", to_register: bool
) -> "Model": ) -> "Model":
if len(value.keys()) == 1 and list(value.keys())[0] == cls.to.Meta.pkname: if len(value.keys()) == 1 and list(value.keys())[0] == cls.to.Meta.pkname:
value["__pk_only__"] = True value["__pk_only__"] = True
@ -99,7 +102,7 @@ class ForeignKeyField(BaseField):
@classmethod @classmethod
def _construct_model_from_pk( def _construct_model_from_pk(
cls, value: Any, child: "Model", to_register: bool cls, value: Any, child: "Model", to_register: bool
) -> "Model": ) -> "Model":
if not isinstance(value, cls.to.pk_type()): if not isinstance(value, cls.to.pk_type()):
raise RelationshipInstanceError( raise RelationshipInstanceError(
@ -120,7 +123,7 @@ class ForeignKeyField(BaseField):
@classmethod @classmethod
def expand_relationship( def expand_relationship(
cls, value: Any, child: Union["Model", "NewBaseModel"], to_register: bool = True cls, value: Any, child: Union["Model", "NewBaseModel"], to_register: bool = True
) -> Optional[Union["Model", List["Model"]]]: ) -> Optional[Union["Model", List["Model"]]]:
if value is None: if value is None:
return None if not cls.virtual else [] return None if not cls.virtual else []
@ -131,7 +134,7 @@ class ForeignKeyField(BaseField):
"list": cls._extract_model_from_sequence, "list": cls._extract_model_from_sequence,
} }
model = constructors.get( # type: ignore model = constructors.get( # type: ignore
value.__class__.__name__, cls._construct_model_from_pk value.__class__.__name__, cls._construct_model_from_pk
)(value, child, to_register) )(value, child, to_register)
return model return model

View File

@ -293,7 +293,9 @@ def populate_choices_validators( # noqa CCR001
class ModelMetaclass(pydantic.main.ModelMetaclass): class ModelMetaclass(pydantic.main.ModelMetaclass):
def __new__(mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict) -> "ModelMetaclass": # type: ignore def __new__( # type: ignore
mcs: "ModelMetaclass", name: str, bases: Any, attrs: dict
) -> "ModelMetaclass":
attrs["Config"] = get_pydantic_base_orm_config() attrs["Config"] = get_pydantic_base_orm_config()
attrs["__name__"] = name attrs["__name__"] = name
attrs = extract_annotations_and_default_vals(attrs, bases) attrs = extract_annotations_and_default_vals(attrs, bases)
@ -312,7 +314,9 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
field_name = new_model.Meta.pkname field_name = new_model.Meta.pkname
field = Integer(name=field_name, primary_key=True) field = Integer(name=field_name, primary_key=True)
attrs["__annotations__"][field_name] = field attrs["__annotations__"][field_name] = field
populate_default_pydantic_field_value(field, field_name, attrs) # type: ignore populate_default_pydantic_field_value(
field, field_name, attrs # type: ignore
)
new_model = super().__new__( # type: ignore new_model = super().__new__( # type: ignore
mcs, name, bases, attrs mcs, name, bases, attrs

View File

@ -1,5 +1,5 @@
import itertools import itertools
from typing import Any, List, Dict, Optional from typing import Any, Dict, List, Optional
import sqlalchemy import sqlalchemy
from databases.backends.postgres import Record from databases.backends.postgres import Record
@ -28,12 +28,12 @@ class Model(NewBaseModel):
__abstract__ = False __abstract__ = False
@classmethod @classmethod
def from_row( def from_row( # noqa CCR001
cls, cls,
row: sqlalchemy.engine.ResultProxy, row: sqlalchemy.engine.ResultProxy,
select_related: List = None, select_related: List = None,
related_models: Any = None, related_models: Any = None,
previous_table: str = None, previous_table: str = None,
) -> Optional["Model"]: ) -> Optional["Model"]:
item: Dict[str, Any] = {} item: Dict[str, Any] = {}
@ -44,9 +44,9 @@ class Model(NewBaseModel):
# breakpoint() # breakpoint()
if ( if (
previous_table previous_table
and previous_table in cls.Meta.model_fields and previous_table in cls.Meta.model_fields
and issubclass(cls.Meta.model_fields[previous_table], ManyToManyField) and issubclass(cls.Meta.model_fields[previous_table], ManyToManyField)
): ):
previous_table = cls.Meta.model_fields[ previous_table = cls.Meta.model_fields[
previous_table previous_table
@ -57,7 +57,7 @@ class Model(NewBaseModel):
previous_table, cls.Meta.table.name previous_table, cls.Meta.table.name
) )
else: else:
table_prefix = '' table_prefix = ""
previous_table = cls.Meta.table.name previous_table = cls.Meta.table.name
item = cls.populate_nested_models_from_row( item = cls.populate_nested_models_from_row(
@ -70,11 +70,11 @@ class Model(NewBaseModel):
@classmethod @classmethod
def populate_nested_models_from_row( def populate_nested_models_from_row(
cls, cls,
item: dict, item: dict,
row: sqlalchemy.engine.ResultProxy, row: sqlalchemy.engine.ResultProxy,
related_models: Any, related_models: Any,
previous_table: sqlalchemy.Table, previous_table: sqlalchemy.Table,
) -> dict: ) -> dict:
for related in related_models: for related in related_models:
if isinstance(related_models, dict) and related_models[related]: if isinstance(related_models, dict) and related_models[related]:
@ -93,7 +93,7 @@ class Model(NewBaseModel):
@classmethod @classmethod
def extract_prefixed_table_columns( # noqa CCR001 def extract_prefixed_table_columns( # noqa CCR001
cls, item: dict, row: sqlalchemy.engine.result.ResultProxy, table_prefix: str cls, item: dict, row: sqlalchemy.engine.result.ResultProxy, table_prefix: str
) -> dict: ) -> dict:
for column in cls.Meta.table.columns: for column in cls.Meta.table.columns:
if column.name not in item: if column.name not in item:
@ -142,6 +142,8 @@ class Model(NewBaseModel):
expr = self.Meta.table.select().where(self.pk_column == self.pk) expr = self.Meta.table.select().where(self.pk_column == self.pk)
row = await self.Meta.database.fetch_one(expr) row = await self.Meta.database.fetch_one(expr)
if not row: # pragma nocover if not row: # pragma nocover
raise ValueError('Instance was deleted from database and cannot be refreshed') raise ValueError(
"Instance was deleted from database and cannot be refreshed"
)
self.from_dict(dict(row)) self.from_dict(dict(row))
return self return self

View File

@ -1,5 +1,5 @@
import inspect import inspect
from typing import List, Optional, Set, TYPE_CHECKING, Type, TypeVar, Union, Dict from typing import Dict, List, Set, TYPE_CHECKING, Type, TypeVar, Union
import ormar import ormar
from ormar.exceptions import RelationshipInstanceError from ormar.exceptions import RelationshipInstanceError
@ -94,7 +94,8 @@ class ModelTableProxy:
@staticmethod @staticmethod
def resolve_relation_name( def resolve_relation_name(
item: Union["NewBaseModel", Type["NewBaseModel"]], related: Union["NewBaseModel", Type["NewBaseModel"]] item: Union["NewBaseModel", Type["NewBaseModel"]],
related: Union["NewBaseModel", Type["NewBaseModel"]],
) -> str: ) -> str:
for name, field in item.Meta.model_fields.items(): for name, field in item.Meta.model_fields.items():
if issubclass(field, ForeignKeyField): if issubclass(field, ForeignKeyField):

View File

@ -84,7 +84,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
for k, v in kwargs.items() for k, v in kwargs.items()
} }
values, fields_set, validation_error = pydantic.validate_model(self, kwargs) # type: ignore values, fields_set, validation_error = pydantic.validate_model(
self, kwargs # type: ignore
)
if validation_error and not pk_only: if validation_error and not pk_only:
raise validation_error raise validation_error
@ -218,7 +220,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
condition = ( condition = (
isinstance(value, str) if op == "loads" else not isinstance(value, str) isinstance(value, str) if op == "loads" else not isinstance(value, str)
) )
operand: Callable[[Any], Any] = json.loads if op == "loads" else json.dumps operand: Callable[[Any], Any] = (
json.loads if op == "loads" else json.dumps # type: ignore
)
if condition: if condition:
try: try:

View File

@ -1,4 +1,4 @@
from typing import List, TYPE_CHECKING, Tuple, Type, Optional from typing import List, Optional, TYPE_CHECKING, Tuple, Type
import sqlalchemy import sqlalchemy
from sqlalchemy import text from sqlalchemy import text

View File

@ -1,4 +1,4 @@
from typing import Any, List, Mapping, TYPE_CHECKING, Type, Union, Optional from typing import Any, List, Optional, TYPE_CHECKING, Type, Union
import databases import databases
import sqlalchemy import sqlalchemy
@ -14,17 +14,18 @@ from ormar.queryset.query import Query
if TYPE_CHECKING: # pragma no cover if TYPE_CHECKING: # pragma no cover
from ormar import Model from ormar import Model
from ormar.models.metaclass import ModelMeta from ormar.models.metaclass import ModelMeta
from ormar.relations.querysetproxy import QuerysetProxy
class QuerySet: class QuerySet:
def __init__( # noqa CFQ002 def __init__( # noqa CFQ002
self, self,
model_cls: Type["Model"] = None, model_cls: Type["Model"] = None,
filter_clauses: List = None, filter_clauses: List = None,
exclude_clauses: List = None, exclude_clauses: List = None,
select_related: List = None, select_related: List = None,
limit_count: int = None, limit_count: int = None,
offset: int = None, offset: int = None,
) -> None: ) -> None:
self.model_cls = model_cls self.model_cls = model_cls
self.filter_clauses = [] if filter_clauses is None else filter_clauses self.filter_clauses = [] if filter_clauses is None else filter_clauses
@ -34,8 +35,15 @@ class QuerySet:
self.query_offset = offset self.query_offset = offset
self.order_bys = None self.order_bys = None
def __get__(self, instance: "QuerySet", owner: Type["Model"]) -> "QuerySet": def __get__(
return self.__class__(model_cls=owner) self,
instance: Union["QuerySet", "QuerysetProxy"],
owner: Union[Type["Model"], Type["QuerysetProxy"]],
) -> "QuerySet":
if issubclass(owner, ormar.Model):
return self.__class__(model_cls=owner)
else: # pragma nocover
return self.__class__()
@property @property
def model_meta(self) -> "ModelMeta": def model_meta(self) -> "ModelMeta":
@ -68,7 +76,7 @@ class QuerySet:
pkname = self.model_meta.pkname pkname = self.model_meta.pkname
pk = self.model_meta.model_fields[pkname] pk = self.model_meta.model_fields[pkname]
if new_kwargs.get(pkname, ormar.Undefined) is None and ( if new_kwargs.get(pkname, ormar.Undefined) is None and (
pk.nullable or pk.autoincrement pk.nullable or pk.autoincrement
): ):
del new_kwargs[pkname] del new_kwargs[pkname]
return new_kwargs return new_kwargs
@ -216,7 +224,7 @@ class QuerySet:
rows = await self.database.fetch_all(expr) rows = await self.database.fetch_all(expr)
processed_rows = self._process_query_result_rows(rows) processed_rows = self._process_query_result_rows(rows)
self.check_single_result_rows_count(processed_rows) self.check_single_result_rows_count(processed_rows)
return processed_rows[0] # type: ignore return processed_rows[0] # type: ignore
async def get_or_create(self, **kwargs: Any) -> "Model": async def get_or_create(self, **kwargs: Any) -> "Model":
try: try:
@ -276,7 +284,7 @@ class QuerySet:
await self.database.execute_many(expr, ready_objects) await self.database.execute_many(expr, ready_objects)
async def bulk_update( async def bulk_update(
self, objects: List["Model"], columns: List[str] = None self, objects: List["Model"], columns: List[str] = None
) -> None: ) -> None:
ready_objects = [] ready_objects = []
pk_name = self.model_meta.pkname pk_name = self.model_meta.pkname

View File

@ -1,7 +1,7 @@
import string import string
import uuid import uuid
from random import choices from random import choices
from typing import List, Dict from typing import Dict, List
import sqlalchemy import sqlalchemy
from sqlalchemy import text from sqlalchemy import text

View File

@ -1,4 +1,4 @@
from typing import Any, List, TYPE_CHECKING, Tuple, Union from typing import Any, List, Optional, TYPE_CHECKING, Union
import ormar import ormar
@ -14,14 +14,25 @@ class QuerysetProxy:
def __init__(self, relation: "Relation") -> None: def __init__(self, relation: "Relation") -> None:
self.relation: Relation = relation self.relation: Relation = relation
self.queryset: "QuerySet" self._queryset: Optional["QuerySet"] = None
def _assign_child_to_parent(self, child: "Model") -> None: @property
owner = self.relation._owner def queryset(self) -> "QuerySet":
rel_name = owner.resolve_relation_name(owner, child) if not self._queryset:
setattr(owner, rel_name, child) raise AttributeError
return self._queryset
def _register_related(self, child: Union["Model", List["Model"]]) -> None: @queryset.setter
def queryset(self, value: "QuerySet") -> None:
self._queryset = value
def _assign_child_to_parent(self, child: Optional["Model"]) -> None:
if child:
owner = self.relation._owner
rel_name = owner.resolve_relation_name(owner, child)
setattr(owner, rel_name, child)
def _register_related(self, child: Union["Model", List[Optional["Model"]]]) -> None:
if isinstance(child, list): if isinstance(child, list):
for subchild in child: for subchild in child:
self._assign_child_to_parent(subchild) self._assign_child_to_parent(subchild)
@ -40,13 +51,13 @@ class QuerysetProxy:
owner_column = self.relation._owner.get_name() owner_column = self.relation._owner.get_name()
child_column = child.get_name() child_column = child.get_name()
kwargs = {owner_column: self.relation._owner, child_column: child} kwargs = {owner_column: self.relation._owner, child_column: child}
link_instance = await queryset.filter(**kwargs).get() link_instance = await queryset.filter(**kwargs).get() # type: ignore
await link_instance.delete() await link_instance.delete()
def filter(self, **kwargs: Any) -> "QuerySet": # noqa: A003 def filter(self, **kwargs: Any) -> "QuerySet": # noqa: A003
return self.queryset.filter(**kwargs) return self.queryset.filter(**kwargs)
def select_related(self, related: Union[List, Tuple, str]) -> "QuerySet": def select_related(self, related: Union[List, str]) -> "QuerySet":
return self.queryset.select_related(related) return self.queryset.select_related(related)
async def exists(self) -> bool: async def exists(self) -> bool:
@ -59,7 +70,7 @@ class QuerysetProxy:
queryset = ormar.QuerySet(model_cls=self.relation.through) queryset = ormar.QuerySet(model_cls=self.relation.through)
owner_column = self.relation._owner.get_name() owner_column = self.relation._owner.get_name()
kwargs = {owner_column: self.relation._owner} kwargs = {owner_column: self.relation._owner}
return await queryset.delete(**kwargs) return await queryset.delete(**kwargs) # type: ignore
def limit(self, limit_count: int) -> "QuerySet": def limit(self, limit_count: int) -> "QuerySet":
return self.queryset.limit(limit_count) return self.queryset.limit(limit_count)
@ -77,7 +88,7 @@ class QuerysetProxy:
self._register_related(get) self._register_related(get)
return get return get
async def all(self, **kwargs: Any) -> List["Model"]: # noqa: A003 async def all(self, **kwargs: Any) -> List[Optional["Model"]]: # noqa: A003
all_items = await self.queryset.all(**kwargs) all_items = await self.queryset.all(**kwargs)
self._register_related(all_items) self._register_related(all_items)
return all_items return all_items

View File

@ -20,11 +20,11 @@ class RelationType(Enum):
class Relation: class Relation:
def __init__( def __init__(
self, self,
manager: "RelationsManager", manager: "RelationsManager",
type_: RelationType, type_: RelationType,
to: Type["Model"], to: Type["Model"],
through: Type["Model"] = None, through: Type["Model"] = None,
) -> None: ) -> None:
self.manager = manager self.manager = manager
self._owner: "Model" = manager.owner self._owner: "Model" = manager.owner
@ -37,7 +37,9 @@ class Relation:
else None else None
) )
def _find_existing(self, child: "Model") -> Optional[int]: def _find_existing(
self, child: Union["NewBaseModel", Type["NewBaseModel"]]
) -> Optional[int]:
if not isinstance(self.related_models, RelationProxy): # pragma nocover if not isinstance(self.related_models, RelationProxy): # pragma nocover
raise ValueError("Cannot find existing models in parent relation type") raise ValueError("Cannot find existing models in parent relation type")
for ind, relation_child in enumerate(self.related_models[:]): for ind, relation_child in enumerate(self.related_models[:]):

View File

@ -1,4 +1,4 @@
from typing import List, Optional, TYPE_CHECKING, Type, Union, Dict from typing import Dict, List, Optional, TYPE_CHECKING, Type, Union
from weakref import proxy from weakref import proxy
from ormar.fields import BaseField from ormar.fields import BaseField
@ -17,9 +17,9 @@ if TYPE_CHECKING: # pragma no cover
class RelationsManager: class RelationsManager:
def __init__( def __init__(
self, self,
related_fields: List[Type[ForeignKeyField]] = None, related_fields: List[Type[ForeignKeyField]] = None,
owner: "NewBaseModel" = None, owner: "NewBaseModel" = None,
) -> None: ) -> None:
self.owner = proxy(owner) self.owner = proxy(owner)
self._related_fields = related_fields or [] self._related_fields = related_fields or []
@ -73,13 +73,17 @@ class RelationsManager:
if child_relation: if child_relation:
child_relation.add(parent) child_relation.add(parent)
def remove(self, name: str, child: Union["NewBaseModel", Type["NewBaseModel"]]) -> None: def remove(
self, name: str, child: Union["NewBaseModel", Type["NewBaseModel"]]
) -> None:
relation = self._get(name) relation = self._get(name)
if relation: if relation:
relation.remove(child) relation.remove(child)
@staticmethod @staticmethod
def remove_parent(item: Union["NewBaseModel", Type["NewBaseModel"]], name: "Model") -> None: def remove_parent(
item: Union["NewBaseModel", Type["NewBaseModel"]], name: "Model"
) -> None:
related_model = name related_model = name
rel_name = item.resolve_relation_name(item, related_model) rel_name = item.resolve_relation_name(item, related_model)
if rel_name in item._orm: if rel_name in item._orm:

View File

@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Tuple, Type, Optional from typing import Optional, TYPE_CHECKING, Tuple, Type
from weakref import proxy from weakref import proxy
import ormar import ormar

View File

@ -16,6 +16,7 @@ psycopg2-binary
mysqlclient mysqlclient
# Testing # Testing
mypy
pytest pytest
pytest-cov pytest-cov
codecov codecov