some further cleanup and optim
This commit is contained in:
@ -5,6 +5,7 @@ import sqlalchemy
|
||||
from pydantic import Field, typing
|
||||
from pydantic.fields import FieldInfo
|
||||
|
||||
import ormar # noqa I101
|
||||
from ormar import ModelDefinitionError # noqa I101
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
@ -34,6 +35,10 @@ class BaseField(FieldInfo):
|
||||
default: Any
|
||||
server_default: Any
|
||||
|
||||
@classmethod
|
||||
def is_valid_uni_relation(cls) -> bool:
|
||||
return not issubclass(cls, ormar.fields.ManyToManyField) and not cls.virtual
|
||||
|
||||
@classmethod
|
||||
def get_alias(cls) -> str:
|
||||
return cls.alias if cls.alias else cls.name
|
||||
|
||||
@ -17,10 +17,10 @@ from ormar.exceptions import RelationshipInstanceError
|
||||
try:
|
||||
import orjson as json
|
||||
except ImportError: # pragma: nocover
|
||||
import json # type: ignore
|
||||
import json # type: ignore
|
||||
|
||||
import ormar
|
||||
from ormar.fields import BaseField, ManyToManyField
|
||||
import ormar # noqa: I100
|
||||
from ormar.fields import BaseField
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.models.metaclass import ModelMeta
|
||||
|
||||
@ -111,29 +111,22 @@ class ModelTableProxy:
|
||||
|
||||
@classmethod
|
||||
def _extract_db_related_names(cls) -> Set:
|
||||
related_names = set()
|
||||
for name, field in cls.Meta.model_fields.items():
|
||||
if (
|
||||
inspect.isclass(field)
|
||||
and issubclass(field, ForeignKeyField)
|
||||
and not issubclass(field, ManyToManyField)
|
||||
and not field.virtual
|
||||
):
|
||||
related_names.add(name)
|
||||
related_names = cls.extract_related_names()
|
||||
related_names = {
|
||||
name
|
||||
for name in related_names
|
||||
if cls.Meta.model_fields[name].is_valid_uni_relation()
|
||||
}
|
||||
return related_names
|
||||
|
||||
@classmethod
|
||||
def _exclude_related_names_not_required(cls, nested: bool = False) -> Set:
|
||||
if nested:
|
||||
return cls.extract_related_names()
|
||||
related_names = set()
|
||||
for name, field in cls.Meta.model_fields.items():
|
||||
if (
|
||||
inspect.isclass(field)
|
||||
and issubclass(field, ForeignKeyField)
|
||||
and field.nullable
|
||||
):
|
||||
related_names.add(name)
|
||||
related_names = cls.extract_related_names()
|
||||
related_names = {
|
||||
name for name in related_names if cls.Meta.model_fields[name].nullable
|
||||
}
|
||||
return related_names
|
||||
|
||||
def _extract_model_db_fields(self) -> Dict:
|
||||
@ -151,8 +144,8 @@ class ModelTableProxy:
|
||||
|
||||
@staticmethod
|
||||
def resolve_relation_name( # noqa CCR001
|
||||
item: Union["NewBaseModel", Type["NewBaseModel"]],
|
||||
related: Union["NewBaseModel", Type["NewBaseModel"]]
|
||||
item: Union["NewBaseModel", Type["NewBaseModel"]],
|
||||
related: Union["NewBaseModel", Type["NewBaseModel"]],
|
||||
) -> str:
|
||||
for name, field in item.Meta.model_fields.items():
|
||||
if issubclass(field, ForeignKeyField):
|
||||
@ -168,7 +161,7 @@ class ModelTableProxy:
|
||||
|
||||
@staticmethod
|
||||
def resolve_relation_field(
|
||||
item: Union["Model", Type["Model"]], related: Union["Model", Type["Model"]]
|
||||
item: Union["Model", Type["Model"]], related: Union["Model", Type["Model"]]
|
||||
) -> Type[BaseField]:
|
||||
name = ModelTableProxy.resolve_relation_name(item, related)
|
||||
to_field = item.Meta.model_fields.get(name)
|
||||
@ -215,12 +208,12 @@ class ModelTableProxy:
|
||||
for field in one.Meta.model_fields.keys():
|
||||
current_field = getattr(one, field)
|
||||
if isinstance(current_field, list) and not isinstance(
|
||||
current_field, ormar.Model
|
||||
current_field, ormar.Model
|
||||
):
|
||||
setattr(other, field, current_field + getattr(other, field))
|
||||
elif (
|
||||
isinstance(current_field, ormar.Model)
|
||||
and current_field.pk == getattr(other, field).pk
|
||||
isinstance(current_field, ormar.Model)
|
||||
and current_field.pk == getattr(other, field).pk
|
||||
):
|
||||
setattr(
|
||||
other,
|
||||
@ -231,7 +224,7 @@ class ModelTableProxy:
|
||||
|
||||
@staticmethod
|
||||
def _populate_pk_column(
|
||||
model: Type["Model"], columns: List[str], use_alias: bool = False,
|
||||
model: Type["Model"], columns: List[str], use_alias: bool = False,
|
||||
) -> List[str]:
|
||||
pk_alias = (
|
||||
model.get_column_alias(model.Meta.pkname)
|
||||
@ -244,10 +237,10 @@ class ModelTableProxy:
|
||||
|
||||
@staticmethod
|
||||
def own_table_columns(
|
||||
model: Type["Model"],
|
||||
fields: Optional[Union[Set, Dict]],
|
||||
exclude_fields: Optional[Union[Set, Dict]],
|
||||
use_alias: bool = False,
|
||||
model: Type["Model"],
|
||||
fields: Optional[Union[Set, Dict]],
|
||||
exclude_fields: Optional[Union[Set, Dict]],
|
||||
use_alias: bool = False,
|
||||
) -> List[str]:
|
||||
columns = [
|
||||
model.get_column_name_from_alias(col.name) if not use_alias else col.name
|
||||
|
||||
@ -9,7 +9,8 @@ from typing import (
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set, TYPE_CHECKING,
|
||||
Set,
|
||||
TYPE_CHECKING,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
@ -43,7 +44,14 @@ if TYPE_CHECKING: # pragma no cover
|
||||
class NewBaseModel(
|
||||
pydantic.BaseModel, ModelTableProxy, Excludable, metaclass=ModelMetaclass
|
||||
):
|
||||
__slots__ = ("_orm_id", "_orm_saved", "_orm", "_related_names", "_related_names_hash", "_props")
|
||||
__slots__ = (
|
||||
"_orm_id",
|
||||
"_orm_saved",
|
||||
"_orm",
|
||||
"_related_names",
|
||||
"_related_names_hash",
|
||||
"_props",
|
||||
)
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
__model_fields__: Dict[str, Type[BaseField]]
|
||||
@ -130,7 +138,14 @@ class NewBaseModel(
|
||||
super().__setattr__(name, value)
|
||||
|
||||
def __getattribute__(self, item: str) -> Any:
|
||||
if item in ("_orm_id", "_orm_saved", "_orm", "__fields__", "_related_names", "_props"):
|
||||
if item in (
|
||||
"_orm_id",
|
||||
"_orm_saved",
|
||||
"_orm",
|
||||
"__fields__",
|
||||
"_related_names",
|
||||
"_props",
|
||||
):
|
||||
return object.__getattribute__(self, item)
|
||||
if item == "pk":
|
||||
return self.__dict__.get(self.Meta.pkname, None)
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import copy
|
||||
from typing import Any, Dict, List, Optional, Sequence, Set, TYPE_CHECKING, Type, Union
|
||||
|
||||
import databases
|
||||
@ -175,7 +174,7 @@ class QuerySet:
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
|
||||
current_excluded = copy.deepcopy(self._exclude_columns)
|
||||
current_excluded = self._exclude_columns
|
||||
if not isinstance(columns, dict):
|
||||
current_excluded = update_dict_from_list(current_excluded, columns)
|
||||
else:
|
||||
@ -197,7 +196,7 @@ class QuerySet:
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
|
||||
current_included = copy.deepcopy(self._exclude_columns)
|
||||
current_included = self._exclude_columns
|
||||
if not isinstance(columns, dict):
|
||||
current_included = update_dict_from_list(current_included, columns)
|
||||
else:
|
||||
|
||||
@ -1,9 +1,7 @@
|
||||
from ormar.relations.alias_manager import AliasManager
|
||||
from ormar.relations.relation import Relation, RelationType
|
||||
from ormar.relations.relation_manager import RelationsManager
|
||||
from ormar.relations.utils import (
|
||||
get_relations_sides_and_names,
|
||||
)
|
||||
from ormar.relations.utils import get_relations_sides_and_names
|
||||
|
||||
__all__ = [
|
||||
"AliasManager",
|
||||
|
||||
@ -5,9 +5,7 @@ from ormar.fields import BaseField
|
||||
from ormar.fields.foreign_key import ForeignKeyField
|
||||
from ormar.fields.many_to_many import ManyToManyField
|
||||
from ormar.relations.relation import Relation, RelationType
|
||||
from ormar.relations.utils import (
|
||||
get_relations_sides_and_names,
|
||||
)
|
||||
from ormar.relations.utils import get_relations_sides_and_names
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from ormar import Model
|
||||
|
||||
@ -9,11 +9,11 @@ if TYPE_CHECKING: # pragma no cover
|
||||
|
||||
|
||||
def get_relations_sides_and_names(
|
||||
to_field: Type[BaseField],
|
||||
parent: "Model",
|
||||
child: "Model",
|
||||
child_name: str,
|
||||
virtual: bool,
|
||||
to_field: Type[BaseField],
|
||||
parent: "Model",
|
||||
child: "Model",
|
||||
child_name: str,
|
||||
virtual: bool,
|
||||
) -> Tuple["Model", "Model", str, str]:
|
||||
to_name = to_field.name
|
||||
if issubclass(to_field, ManyToManyField):
|
||||
|
||||
Reference in New Issue
Block a user