initial working solution for aliases/different db column names in basic operations
This commit is contained in:
@ -64,7 +64,7 @@ class BaseField:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def get_column(cls, name: str) -> sqlalchemy.Column:
|
def get_column(cls, name: str) -> sqlalchemy.Column:
|
||||||
return sqlalchemy.Column(
|
return sqlalchemy.Column(
|
||||||
name,
|
cls.name or name,
|
||||||
cls.column_type,
|
cls.column_type,
|
||||||
*cls.constraints,
|
*cls.constraints,
|
||||||
primary_key=cls.primary_key,
|
primary_key=cls.primary_key,
|
||||||
|
|||||||
@ -38,7 +38,7 @@ def ForeignKey( # noqa CFQ002
|
|||||||
onupdate: str = None,
|
onupdate: str = None,
|
||||||
ondelete: str = None,
|
ondelete: str = None,
|
||||||
) -> Type["ForeignKeyField"]:
|
) -> Type["ForeignKeyField"]:
|
||||||
fk_string = to.Meta.tablename + "." + to.Meta.pkname
|
fk_string = to.Meta.tablename + "." + to.get_column_alias(to.Meta.pkname)
|
||||||
to_field = to.__fields__[to.Meta.pkname]
|
to_field = to.__fields__[to.Meta.pkname]
|
||||||
namespace = dict(
|
namespace = dict(
|
||||||
to=to,
|
to=to,
|
||||||
|
|||||||
@ -79,7 +79,7 @@ class String(ModelFieldFactory):
|
|||||||
if k not in ["cls", "__class__", "kwargs"]
|
if k not in ["cls", "__class__", "kwargs"]
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
kwargs['allow_blank'] = kwargs.get('nullable', True)
|
kwargs["allow_blank"] = kwargs.get("nullable", True)
|
||||||
return super().__new__(cls, **kwargs)
|
return super().__new__(cls, **kwargs)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -145,7 +145,7 @@ class Text(ModelFieldFactory):
|
|||||||
if k not in ["cls", "__class__", "kwargs"]
|
if k not in ["cls", "__class__", "kwargs"]
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
kwargs['allow_blank'] = kwargs.get('nullable', True)
|
kwargs["allow_blank"] = kwargs.get("nullable", True)
|
||||||
return super().__new__(cls, **kwargs)
|
return super().__new__(cls, **kwargs)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@ -124,9 +124,9 @@ def create_and_append_m2m_fk(
|
|||||||
) -> None:
|
) -> None:
|
||||||
column = sqlalchemy.Column(
|
column = sqlalchemy.Column(
|
||||||
model.get_name(),
|
model.get_name(),
|
||||||
model.Meta.table.columns.get(model.Meta.pkname).type,
|
model.Meta.table.columns.get(model.get_column_alias(model.Meta.pkname)).type,
|
||||||
sqlalchemy.schema.ForeignKey(
|
sqlalchemy.schema.ForeignKey(
|
||||||
model.Meta.tablename + "." + model.Meta.pkname,
|
model.Meta.tablename + "." + model.get_column_alias(model.Meta.pkname),
|
||||||
ondelete="CASCADE",
|
ondelete="CASCADE",
|
||||||
onupdate="CASCADE",
|
onupdate="CASCADE",
|
||||||
),
|
),
|
||||||
@ -223,7 +223,9 @@ def populate_meta_tablename_columns_and_pk(
|
|||||||
name: str, new_model: Type["Model"]
|
name: str, new_model: Type["Model"]
|
||||||
) -> Type["Model"]:
|
) -> Type["Model"]:
|
||||||
tablename = name.lower() + "s"
|
tablename = name.lower() + "s"
|
||||||
new_model.Meta.tablename = new_model.Meta.tablename if hasattr(new_model.Meta, 'tablename') else tablename
|
new_model.Meta.tablename = (
|
||||||
|
new_model.Meta.tablename if hasattr(new_model.Meta, "tablename") else tablename
|
||||||
|
)
|
||||||
pkname: Optional[str]
|
pkname: Optional[str]
|
||||||
|
|
||||||
if hasattr(new_model.Meta, "columns"):
|
if hasattr(new_model.Meta, "columns"):
|
||||||
|
|||||||
@ -90,13 +90,13 @@ class Model(NewBaseModel):
|
|||||||
previous_table=previous_table,
|
previous_table=previous_table,
|
||||||
fields=fields,
|
fields=fields,
|
||||||
)
|
)
|
||||||
item[first_part] = child
|
item[model_cls.get_column_name_from_alias(first_part)] = child
|
||||||
else:
|
else:
|
||||||
model_cls = cls.Meta.model_fields[related].to
|
model_cls = cls.Meta.model_fields[related].to
|
||||||
child = model_cls.from_row(
|
child = model_cls.from_row(
|
||||||
row, previous_table=previous_table, fields=fields
|
row, previous_table=previous_table, fields=fields
|
||||||
)
|
)
|
||||||
item[related] = child
|
item[model_cls.get_column_name_from_alias(related)] = child
|
||||||
|
|
||||||
return item
|
return item
|
||||||
|
|
||||||
@ -113,13 +113,16 @@ class Model(NewBaseModel):
|
|||||||
# databases does not keep aliases in Record for postgres, change to raw row
|
# databases does not keep aliases in Record for postgres, change to raw row
|
||||||
source = row._row if isinstance(row, Record) else row
|
source = row._row if isinstance(row, Record) else row
|
||||||
|
|
||||||
selected_columns = cls.own_table_columns(cls, fields or [], nested=nested)
|
selected_columns = cls.own_table_columns(
|
||||||
|
cls, fields or [], nested=nested, use_alias=True
|
||||||
|
)
|
||||||
for column in cls.Meta.table.columns:
|
for column in cls.Meta.table.columns:
|
||||||
if column.name not in item and column.name in selected_columns:
|
alias = cls.get_column_name_from_alias(column.name)
|
||||||
|
if alias not in item and alias in selected_columns:
|
||||||
prefixed_name = (
|
prefixed_name = (
|
||||||
f'{table_prefix + "_" if table_prefix else ""}{column.name}'
|
f'{table_prefix + "_" if table_prefix else ""}{column.name}'
|
||||||
)
|
)
|
||||||
item[column.name] = source[prefixed_name]
|
item[alias] = source[prefixed_name]
|
||||||
|
|
||||||
return item
|
return item
|
||||||
|
|
||||||
|
|||||||
@ -47,6 +47,20 @@ class ModelTableProxy:
|
|||||||
model_dict[field] = field_value.get(target_pkname)
|
model_dict[field] = field_value.get(target_pkname)
|
||||||
return model_dict
|
return model_dict
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_column_alias(cls, field_name: str) -> str:
|
||||||
|
field = cls.Meta.model_fields.get(field_name)
|
||||||
|
if field and field.name is not None and field.name != field_name:
|
||||||
|
return field.name
|
||||||
|
return field_name
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_column_name_from_alias(cls, alias: str) -> str:
|
||||||
|
for field_name, field in cls.Meta.model_fields.items():
|
||||||
|
if field and field.name == alias:
|
||||||
|
return field_name
|
||||||
|
return alias # if not found it's not an alias but actual name
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def extract_related_names(cls) -> Set:
|
def extract_related_names(cls) -> Set:
|
||||||
related_names = set()
|
related_names = set()
|
||||||
@ -151,10 +165,16 @@ class ModelTableProxy:
|
|||||||
return other
|
return other
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def own_table_columns(
|
def own_table_columns( # noqa: CCR001
|
||||||
model: Type["Model"], fields: List, nested: bool = False
|
model: Type["Model"],
|
||||||
|
fields: List,
|
||||||
|
nested: bool = False,
|
||||||
|
use_alias: bool = False,
|
||||||
) -> List[str]:
|
) -> List[str]:
|
||||||
column_names = [col.name for col in model.Meta.table.columns]
|
column_names = [
|
||||||
|
model.get_column_name_from_alias(col.name) if use_alias else col.name
|
||||||
|
for col in model.Meta.table.columns
|
||||||
|
]
|
||||||
if not fields:
|
if not fields:
|
||||||
return column_names
|
return column_names
|
||||||
|
|
||||||
@ -175,6 +195,11 @@ class ModelTableProxy:
|
|||||||
columns = column_names
|
columns = column_names
|
||||||
|
|
||||||
# always has to return pk column
|
# always has to return pk column
|
||||||
if model.Meta.pkname not in columns:
|
pk_alias = (
|
||||||
columns.append(model.Meta.pkname)
|
model.get_column_alias(model.Meta.pkname)
|
||||||
|
if use_alias
|
||||||
|
else model.Meta.pkname
|
||||||
|
)
|
||||||
|
if pk_alias not in columns:
|
||||||
|
columns.append(pk_alias)
|
||||||
return columns
|
return columns
|
||||||
|
|||||||
@ -134,8 +134,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
|||||||
def _extract_related_model_instead_of_field(
|
def _extract_related_model_instead_of_field(
|
||||||
self, item: str
|
self, item: str
|
||||||
) -> Optional[Union["Model", List["Model"]]]:
|
) -> Optional[Union["Model", List["Model"]]]:
|
||||||
if item in self._orm:
|
alias = self.get_column_alias(item)
|
||||||
return self._orm.get(item)
|
if alias in self._orm:
|
||||||
|
return self._orm.get(alias)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def __eq__(self, other: object) -> bool:
|
def __eq__(self, other: object) -> bool:
|
||||||
|
|||||||
@ -44,7 +44,7 @@ class QueryClause:
|
|||||||
) -> Tuple[List[sqlalchemy.sql.expression.TextClause], List[str]]:
|
) -> Tuple[List[sqlalchemy.sql.expression.TextClause], List[str]]:
|
||||||
|
|
||||||
if kwargs.get("pk"):
|
if kwargs.get("pk"):
|
||||||
pk_name = self.model_cls.Meta.pkname
|
pk_name = self.model_cls.get_column_alias(self.model_cls.Meta.pkname)
|
||||||
kwargs[pk_name] = kwargs.pop("pk")
|
kwargs[pk_name] = kwargs.pop("pk")
|
||||||
|
|
||||||
filter_clauses, select_related = self._populate_filter_clauses(**kwargs)
|
filter_clauses, select_related = self._populate_filter_clauses(**kwargs)
|
||||||
|
|||||||
@ -106,7 +106,9 @@ class SqlJoin:
|
|||||||
self.select_from = sqlalchemy.sql.outerjoin(
|
self.select_from = sqlalchemy.sql.outerjoin(
|
||||||
self.select_from, target_table, on_clause
|
self.select_from, target_table, on_clause
|
||||||
)
|
)
|
||||||
self.order_bys.append(text(f"{alias}_{to_table}.{model_cls.Meta.pkname}"))
|
|
||||||
|
pkname_alias = model_cls.get_column_alias(model_cls.Meta.pkname)
|
||||||
|
self.order_bys.append(text(f"{alias}_{to_table}.{pkname_alias}"))
|
||||||
self_related_fields = model_cls.own_table_columns(
|
self_related_fields = model_cls.own_table_columns(
|
||||||
model_cls, self.fields, nested=True
|
model_cls, self.fields, nested=True
|
||||||
)
|
)
|
||||||
@ -125,12 +127,13 @@ class SqlJoin:
|
|||||||
part: str,
|
part: str,
|
||||||
) -> Tuple[str, str]:
|
) -> Tuple[str, str]:
|
||||||
if join_params.prev_model.Meta.model_fields[part].virtual or is_multi:
|
if join_params.prev_model.Meta.model_fields[part].virtual or is_multi:
|
||||||
to_field = model_cls.resolve_relation_field(
|
to_field = model_cls.resolve_relation_name(
|
||||||
model_cls, join_params.prev_model
|
model_cls, join_params.prev_model
|
||||||
)
|
)
|
||||||
to_key = to_field.name
|
to_key = model_cls.get_column_alias(to_field)
|
||||||
from_key = model_cls.Meta.pkname
|
from_key = join_params.prev_model.get_column_alias(model_cls.Meta.pkname)
|
||||||
else:
|
else:
|
||||||
to_key = model_cls.Meta.pkname
|
to_key = model_cls.get_column_alias(model_cls.Meta.pkname)
|
||||||
from_key = part
|
from_key = join_params.prev_model.get_column_alias(part)
|
||||||
|
|
||||||
return to_key, from_key
|
return to_key, from_key
|
||||||
|
|||||||
@ -40,7 +40,8 @@ class Query:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def prefixed_pk_name(self) -> str:
|
def prefixed_pk_name(self) -> str:
|
||||||
return f"{self.table.name}.{self.model_cls.Meta.pkname}"
|
pkname_alias = self.model_cls.get_column_alias(self.model_cls.Meta.pkname)
|
||||||
|
return f"{self.table.name}.{pkname_alias}"
|
||||||
|
|
||||||
def build_select_expression(self) -> Tuple[sqlalchemy.sql.select, List[str]]:
|
def build_select_expression(self) -> Tuple[sqlalchemy.sql.select, List[str]]:
|
||||||
self_related_fields = self.model_cls.own_table_columns(
|
self_related_fields = self.model_cls.own_table_columns(
|
||||||
|
|||||||
@ -70,12 +70,29 @@ class QuerySet:
|
|||||||
return self.model.merge_instances_list(result_rows) # type: ignore
|
return self.model.merge_instances_list(result_rows) # type: ignore
|
||||||
return result_rows
|
return result_rows
|
||||||
|
|
||||||
|
def _prepare_model_to_save(self, new_kwargs: dict) -> dict:
|
||||||
|
new_kwargs = self._remove_pk_from_kwargs(new_kwargs)
|
||||||
|
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
||||||
|
new_kwargs = self._populate_default_values(new_kwargs)
|
||||||
|
new_kwargs = self._translate_columns_to_aliases(new_kwargs)
|
||||||
|
return new_kwargs
|
||||||
|
|
||||||
def _populate_default_values(self, new_kwargs: dict) -> dict:
|
def _populate_default_values(self, new_kwargs: dict) -> dict:
|
||||||
for field_name, field in self.model_meta.model_fields.items():
|
for field_name, field in self.model_meta.model_fields.items():
|
||||||
if field_name not in new_kwargs and field.has_default():
|
if field_name not in new_kwargs and field.has_default():
|
||||||
new_kwargs[field_name] = field.get_default()
|
new_kwargs[field_name] = field.get_default()
|
||||||
return new_kwargs
|
return new_kwargs
|
||||||
|
|
||||||
|
def _translate_columns_to_aliases(self, new_kwargs: dict) -> dict:
|
||||||
|
for field_name, field in self.model_meta.model_fields.items():
|
||||||
|
if (
|
||||||
|
field_name in new_kwargs
|
||||||
|
and field.name is not None
|
||||||
|
and field.name != field_name
|
||||||
|
):
|
||||||
|
new_kwargs[field.name] = new_kwargs.pop(field_name)
|
||||||
|
return new_kwargs
|
||||||
|
|
||||||
def _remove_pk_from_kwargs(self, new_kwargs: dict) -> dict:
|
def _remove_pk_from_kwargs(self, new_kwargs: dict) -> dict:
|
||||||
pkname = self.model_meta.pkname
|
pkname = self.model_meta.pkname
|
||||||
pk = self.model_meta.model_fields[pkname]
|
pk = self.model_meta.model_fields[pkname]
|
||||||
@ -278,9 +295,7 @@ class QuerySet:
|
|||||||
async def create(self, **kwargs: Any) -> "Model":
|
async def create(self, **kwargs: Any) -> "Model":
|
||||||
|
|
||||||
new_kwargs = dict(**kwargs)
|
new_kwargs = dict(**kwargs)
|
||||||
new_kwargs = self._remove_pk_from_kwargs(new_kwargs)
|
new_kwargs = self._prepare_model_to_save(new_kwargs)
|
||||||
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
|
||||||
new_kwargs = self._populate_default_values(new_kwargs)
|
|
||||||
|
|
||||||
expr = self.table.insert()
|
expr = self.table.insert()
|
||||||
expr = expr.values(**new_kwargs)
|
expr = expr.values(**new_kwargs)
|
||||||
@ -288,7 +303,7 @@ class QuerySet:
|
|||||||
instance = self.model(**kwargs)
|
instance = self.model(**kwargs)
|
||||||
pk = await self.database.execute(expr)
|
pk = await self.database.execute(expr)
|
||||||
|
|
||||||
pk_name = self.model_meta.pkname
|
pk_name = self.model.get_column_alias(self.model_meta.pkname)
|
||||||
if pk_name not in kwargs and pk_name in new_kwargs:
|
if pk_name not in kwargs and pk_name in new_kwargs:
|
||||||
instance.pk = new_kwargs[self.model_meta.pkname]
|
instance.pk = new_kwargs[self.model_meta.pkname]
|
||||||
if pk and isinstance(pk, self.model.pk_type()):
|
if pk and isinstance(pk, self.model.pk_type()):
|
||||||
@ -300,9 +315,7 @@ class QuerySet:
|
|||||||
ready_objects = []
|
ready_objects = []
|
||||||
for objt in objects:
|
for objt in objects:
|
||||||
new_kwargs = objt.dict()
|
new_kwargs = objt.dict()
|
||||||
new_kwargs = self._remove_pk_from_kwargs(new_kwargs)
|
new_kwargs = self._prepare_model_to_save(new_kwargs)
|
||||||
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
|
||||||
new_kwargs = self._populate_default_values(new_kwargs)
|
|
||||||
ready_objects.append(new_kwargs)
|
ready_objects.append(new_kwargs)
|
||||||
|
|
||||||
expr = self.table.insert()
|
expr = self.table.insert()
|
||||||
|
|||||||
@ -39,7 +39,7 @@ class RelationProxy(list):
|
|||||||
|
|
||||||
def _set_queryset(self) -> "QuerySet":
|
def _set_queryset(self) -> "QuerySet":
|
||||||
owner_table = self.relation._owner.Meta.tablename
|
owner_table = self.relation._owner.Meta.tablename
|
||||||
pkname = self.relation._owner.Meta.pkname
|
pkname = self.relation._owner.get_column_alias(self.relation._owner.Meta.pkname)
|
||||||
pk_value = self.relation._owner.pk
|
pk_value = self.relation._owner.pk
|
||||||
if not pk_value:
|
if not pk_value:
|
||||||
raise RelationshipInstanceError(
|
raise RelationshipInstanceError(
|
||||||
|
|||||||
95
tests/test_aliases.py
Normal file
95
tests/test_aliases.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
import databases
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class Child(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "children"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
id: ormar.Integer(name='child_id', primary_key=True)
|
||||||
|
first_name: ormar.String(name='fname', max_length=100)
|
||||||
|
last_name: ormar.String(name='lname', max_length=100)
|
||||||
|
born_year: ormar.Integer(name='year_born')
|
||||||
|
|
||||||
|
|
||||||
|
class ArtistChildren(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "children_x_artists"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Artist(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "artists"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
id: ormar.Integer(name='artist_id', primary_key=True)
|
||||||
|
first_name: ormar.String(name='fname', max_length=100)
|
||||||
|
last_name: ormar.String(name='lname', max_length=100)
|
||||||
|
born_year: ormar.Integer(name='year')
|
||||||
|
children: ormar.ManyToMany(Child, through=ArtistChildren)
|
||||||
|
|
||||||
|
|
||||||
|
class Album(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "music_albums"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
id: ormar.Integer(name='album_id', primary_key=True)
|
||||||
|
name: ormar.String(name='album_name', max_length=100)
|
||||||
|
artist: ormar.ForeignKey(Artist, name='artist_id')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="module")
|
||||||
|
def create_test_database():
|
||||||
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
metadata.create_all(engine)
|
||||||
|
yield
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
def test_table_structure():
|
||||||
|
assert 'album_id' in [x.name for x in Album.Meta.table.columns]
|
||||||
|
assert 'album_name' in [x.name for x in Album.Meta.table.columns]
|
||||||
|
assert 'fname' in [x.name for x in Artist.Meta.table.columns]
|
||||||
|
assert 'lname' in [x.name for x in Artist.Meta.table.columns]
|
||||||
|
assert 'year' in [x.name for x in Artist.Meta.table.columns]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_working_with_aliases():
|
||||||
|
async with database:
|
||||||
|
async with database.transaction(force_rollback=True):
|
||||||
|
artist = await Artist.objects.create(first_name='Ted', last_name='Mosbey', born_year=1975)
|
||||||
|
await Album.objects.create(name="Aunt Robin", artist=artist)
|
||||||
|
|
||||||
|
await artist.children.create(first_name='Son', last_name='1', born_year=1990)
|
||||||
|
await artist.children.create(first_name='Son', last_name='2', born_year=1995)
|
||||||
|
|
||||||
|
album = await Album.objects.select_related('artist').first()
|
||||||
|
assert album.artist.last_name == 'Mosbey'
|
||||||
|
|
||||||
|
assert album.artist.id is not None
|
||||||
|
assert album.artist.first_name == 'Ted'
|
||||||
|
assert album.artist.born_year == 1975
|
||||||
|
|
||||||
|
assert album.name == 'Aunt Robin'
|
||||||
|
|
||||||
|
artist = await Artist.objects.select_related('children').get()
|
||||||
|
assert len(artist.children) == 2
|
||||||
|
assert artist.children[0].first_name == 'Son'
|
||||||
|
assert artist.children[1].last_name == '2'
|
||||||
|
|
||||||
Reference in New Issue
Block a user