change queryset to work with column and table aliases
This commit is contained in:
@ -41,8 +41,8 @@ def sqlalchemy_columns_from_model_fields(name: str, object_dict: Dict, tablename
|
||||
if field.primary_key:
|
||||
pkname = field_name
|
||||
if isinstance(field, ForeignKey):
|
||||
reverse_name = field.related_name or field.to.__name__.title() + '_' + name.lower() + 's'
|
||||
relation_name = name + '_' + field.to.__name__.lower()
|
||||
reverse_name = field.related_name or field.to.__name__.lower().title() + '_' + name.lower() + 's'
|
||||
relation_name = name.lower().title() + '_' + field.to.__name__.lower()
|
||||
relationship_manager.add_relation_type(relation_name, reverse_name, field, tablename)
|
||||
columns.append(field.get_column(field_name))
|
||||
return pkname, columns, model_fields
|
||||
@ -88,6 +88,8 @@ class ModelMetaclass(type):
|
||||
attrs['__annotations__'] = copy.deepcopy(pydantic_model.__annotations__)
|
||||
attrs['__model_fields__'] = model_fields
|
||||
|
||||
attrs['_orm_relationship_manager'] = relationship_manager
|
||||
|
||||
new_model = super().__new__( # type: ignore
|
||||
mcs, name, bases, attrs
|
||||
)
|
||||
@ -105,13 +107,13 @@ class Model(list, metaclass=ModelMetaclass):
|
||||
__fields__: Dict[str, pydantic.fields.ModelField]
|
||||
__pydantic_model__: Type[BaseModel]
|
||||
__pkname__: str
|
||||
_orm_relationship_manager: RelationshipManager
|
||||
|
||||
objects = qry.QuerySet()
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
self._orm_id: str = uuid.uuid4().hex
|
||||
self._orm_saved: bool = False
|
||||
self._orm_relationship_manager: RelationshipManager = relationship_manager
|
||||
self.values: Optional[BaseModel] = None
|
||||
|
||||
if "pk" in kwargs:
|
||||
@ -129,8 +131,12 @@ class Model(list, metaclass=ModelMetaclass):
|
||||
value = json.dumps(value)
|
||||
except TypeError: # pragma no cover
|
||||
pass
|
||||
|
||||
value = self.__model_fields__[key].expand_relationship(value, self)
|
||||
setattr(self.values, key, value)
|
||||
|
||||
relation_key = self.__class__.__name__.title() + '_' + key
|
||||
if not self._orm_relationship_manager.contains(relation_key, self):
|
||||
setattr(self.values, key, value)
|
||||
else:
|
||||
super().__setattr__(key, value)
|
||||
|
||||
@ -152,25 +158,36 @@ class Model(list, metaclass=ModelMetaclass):
|
||||
def __eq__(self, other):
|
||||
return self.values.dict() == other.values.dict()
|
||||
|
||||
def __same__(self, other):
|
||||
assert self.__class__ == other.__class__
|
||||
return self._orm_id == other._orm_id or (
|
||||
self.values is not None and other.values is not None and self.pk == other.pk)
|
||||
|
||||
def __repr__(self): # pragma no cover
|
||||
return self.values.__repr__()
|
||||
|
||||
@classmethod
|
||||
def from_row(cls, row, select_related: List = None) -> 'Model':
|
||||
def from_row(cls, row, select_related: List = None, previous_table: str = None) -> 'Model':
|
||||
|
||||
item = {}
|
||||
select_related = select_related or []
|
||||
|
||||
table_prefix = cls._orm_relationship_manager.resolve_relation_join(previous_table, cls.__table__.name)
|
||||
previous_table = cls.__table__.name
|
||||
for related in select_related:
|
||||
if "__" in related:
|
||||
first_part, remainder = related.split("__", 1)
|
||||
model_cls = cls.__model_fields__[first_part].to
|
||||
item[first_part] = model_cls.from_row(row, select_related=[remainder])
|
||||
child = model_cls.from_row(row, select_related=[remainder], previous_table=previous_table)
|
||||
item[first_part] = child
|
||||
else:
|
||||
model_cls = cls.__model_fields__[related].to
|
||||
item[related] = model_cls.from_row(row)
|
||||
child = model_cls.from_row(row, previous_table=previous_table)
|
||||
item[related] = child
|
||||
|
||||
for column in cls.__table__.columns:
|
||||
if column.name not in item:
|
||||
item[column.name] = row[column]
|
||||
item[column.name] = row[f'{table_prefix + "_" if table_prefix else ""}{column.name}']
|
||||
|
||||
return cls(**item)
|
||||
|
||||
@ -202,7 +219,14 @@ class Model(list, metaclass=ModelMetaclass):
|
||||
return self.__table__.primary_key.columns.values()[0]
|
||||
|
||||
def dict(self) -> Dict:
|
||||
return self.values.dict()
|
||||
dict_instance = self.values.dict()
|
||||
for field in self.extract_related_names():
|
||||
nested_model = getattr(self, field)
|
||||
if isinstance(nested_model, list):
|
||||
dict_instance[field] = [x.dict() for x in nested_model]
|
||||
else:
|
||||
dict_instance[field] = nested_model.dict() if nested_model is not None else {}
|
||||
return dict_instance
|
||||
|
||||
def from_dict(self, value_dict: Dict) -> None:
|
||||
for key, value in value_dict.items():
|
||||
|
||||
Reference in New Issue
Block a user