refactor merging of instances from queryset to fakepydantic

This commit is contained in:
collerek
2020-08-09 12:53:28 +02:00
parent 3f2568b27e
commit 836836c136
5 changed files with 49 additions and 37 deletions

View File

@ -249,8 +249,10 @@ class ForeignKey(BaseField):
else:
if not isinstance(value, self.to.pk_type()):
raise RelationshipInstanceError(
f"Relationship error - ForeignKey {self.to.__name__} is of type {self.to.pk_type()} "
f"of type {self.__type__} while {type(value)} passed as a parameter."
f"Relationship error - ForeignKey {self.to.__name__} "
f"is of type {self.to.pk_type()} "
f"of type {self.__type__} "
f"while {type(value)} passed as a parameter."
)
model = create_dummy_instance(fk=self.to, pk=value)

View File

@ -210,7 +210,7 @@ class FakePydantic(list, metaclass=ModelMetaclass):
return self.__table__.primary_key.columns.values()[0]
@classmethod
def pk_type(cls):
def pk_type(cls) -> Any:
return cls.__model_fields__[cls.__pkname__].__type__
def dict(self) -> Dict: # noqa: A003
@ -259,6 +259,35 @@ class FakePydantic(list, metaclass=ModelMetaclass):
)
return self_fields
@classmethod
def merge_instances_list(cls, result_rows: List["Model"]) -> List["Model"]:
merged_rows = []
for index, model in enumerate(result_rows):
if index > 0 and model.pk == result_rows[index - 1].pk:
result_rows[-1] = cls.merge_two_instances(model, merged_rows[-1])
else:
merged_rows.append(model)
return merged_rows
@classmethod
def merge_two_instances(cls, one: "Model", other: "Model") -> "Model":
for field in one.__model_fields__.keys():
# print(field, one.dict(), other.dict())
if isinstance(getattr(one, field), list) and not isinstance(
getattr(one, field), Model
):
setattr(other, field, getattr(one, field) + getattr(other, field))
elif isinstance(getattr(one, field), Model):
if getattr(one, field).pk == getattr(other, field).pk:
setattr(
other,
field,
cls.merge_two_instances(
getattr(one, field), getattr(other, field)
),
)
return other
class Model(FakePydantic):
__abstract__ = True

View File

@ -479,39 +479,10 @@ class QuerySet:
for row in rows
]
result_rows = self.merge_result_rows(result_rows)
result_rows = self.model_cls.merge_instances_list(result_rows)
return result_rows
@classmethod
def merge_result_rows(cls, result_rows: List["Model"]) -> List["Model"]:
merged_rows = []
for index, model in enumerate(result_rows):
if index > 0 and model.pk == result_rows[index - 1].pk:
result_rows[-1] = cls.merge_two_instances(model, merged_rows[-1])
else:
merged_rows.append(model)
return merged_rows
@classmethod
def merge_two_instances(cls, one: "Model", other: "Model") -> "Model":
for field in one.__model_fields__.keys():
# print(field, one.dict(), other.dict())
if isinstance(getattr(one, field), list) and not isinstance(
getattr(one, field), orm.models.Model
):
setattr(other, field, getattr(one, field) + getattr(other, field))
elif isinstance(getattr(one, field), orm.models.Model):
if getattr(one, field).pk == getattr(other, field).pk:
setattr(
other,
field,
cls.merge_two_instances(
getattr(one, field), getattr(other, field)
),
)
return other
async def create(self, **kwargs: Any) -> "Model":
new_kwargs = dict(**kwargs)