check other backends trial1
This commit is contained in:
@ -211,12 +211,17 @@ class BaseField(FieldInfo):
|
||||
:return: List of sqlalchemy foreign keys - by default one.
|
||||
:rtype: List[sqlalchemy.schema.ForeignKey]
|
||||
"""
|
||||
return [
|
||||
sqlalchemy.schema.ForeignKey(
|
||||
con.name, ondelete=con.ondelete, onupdate=con.onupdate
|
||||
constraints = [
|
||||
sqlalchemy.ForeignKey(
|
||||
con.reference,
|
||||
ondelete=con.ondelete,
|
||||
onupdate=con.onupdate,
|
||||
name=f"fk_{cls.owner.Meta.tablename}_{cls.to.Meta.tablename}"
|
||||
f"_{cls.to.get_column_alias(cls.to.Meta.pkname)}_{cls.name}",
|
||||
)
|
||||
for con in cls.constraints
|
||||
]
|
||||
return constraints
|
||||
|
||||
@classmethod
|
||||
def get_column(cls, name: str) -> sqlalchemy.Column:
|
||||
@ -230,7 +235,7 @@ class BaseField(FieldInfo):
|
||||
:return: actual definition of the database column as sqlalchemy requires.
|
||||
:rtype: sqlalchemy.Column
|
||||
"""
|
||||
return sqlalchemy.Column(
|
||||
column = sqlalchemy.Column(
|
||||
cls.alias or name,
|
||||
cls.column_type,
|
||||
*cls.construct_constraints(),
|
||||
@ -241,6 +246,7 @@ class BaseField(FieldInfo):
|
||||
default=cls.default,
|
||||
server_default=cls.server_default,
|
||||
)
|
||||
return column
|
||||
|
||||
@classmethod
|
||||
def expand_relationship(
|
||||
|
||||
@ -3,6 +3,7 @@ import uuid
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
||||
|
||||
import sqlalchemy
|
||||
from pydantic import BaseModel, create_model
|
||||
from pydantic.typing import ForwardRef, evaluate_forwardref
|
||||
from sqlalchemy import UniqueConstraint
|
||||
@ -94,6 +95,7 @@ def populate_fk_params_based_on_to_model(
|
||||
:rtype: Tuple[Any, List, Any]
|
||||
"""
|
||||
fk_string = to.Meta.tablename + "." + to.get_column_alias(to.Meta.pkname)
|
||||
to.Meta.tablename + "." + to.get_column_alias(to.Meta.pkname)
|
||||
to_field = to.Meta.model_fields[to.Meta.pkname]
|
||||
pk_only_model = create_dummy_model(to, to_field)
|
||||
__type__ = (
|
||||
@ -103,7 +105,7 @@ def populate_fk_params_based_on_to_model(
|
||||
)
|
||||
constraints = [
|
||||
ForeignKeyConstraint(
|
||||
name=fk_string, ondelete=ondelete, onupdate=onupdate # type: ignore
|
||||
reference=fk_string, ondelete=ondelete, onupdate=onupdate, name=None
|
||||
)
|
||||
]
|
||||
column_type = to_field.column_type
|
||||
@ -124,9 +126,10 @@ class ForeignKeyConstraint:
|
||||
to produce sqlalchemy.ForeignKeys
|
||||
"""
|
||||
|
||||
name: str
|
||||
ondelete: str
|
||||
onupdate: str
|
||||
reference: Union[str, sqlalchemy.Column]
|
||||
name: Optional[str]
|
||||
ondelete: Optional[str]
|
||||
onupdate: Optional[str]
|
||||
|
||||
|
||||
def ForeignKey( # noqa CFQ002
|
||||
|
||||
@ -214,7 +214,7 @@ class Integer(ModelFieldFactory, int):
|
||||
:return: initialized column with proper options
|
||||
:rtype: sqlalchemy Column
|
||||
"""
|
||||
return sqlalchemy.Integer()
|
||||
return sqlalchemy.Integer
|
||||
|
||||
|
||||
class Text(ModelFieldFactory, str):
|
||||
|
||||
@ -1,9 +1,7 @@
|
||||
import copy
|
||||
import logging
|
||||
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
||||
|
||||
import sqlalchemy
|
||||
from sqlalchemy import ForeignKeyConstraint
|
||||
|
||||
from ormar import ForeignKey, Integer, ModelDefinitionError # noqa: I202
|
||||
from ormar.fields import BaseField, ManyToManyField
|
||||
@ -81,10 +79,12 @@ def create_and_append_m2m_fk(
|
||||
model.Meta.tablename + "." + pk_alias,
|
||||
ondelete="CASCADE",
|
||||
onupdate="CASCADE",
|
||||
name=f"fk_{model_field.through.Meta.tablename}_{model.Meta.tablename}"
|
||||
f"_{field_name}_{pk_alias}",
|
||||
),
|
||||
)
|
||||
model_field.through.Meta.columns.append(column)
|
||||
model_field.through.Meta.table.append_column(copy.deepcopy(column))
|
||||
model_field.through.Meta.table.append_column(column)
|
||||
|
||||
|
||||
def check_pk_column_validity(
|
||||
@ -235,18 +235,15 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
|
||||
if not hasattr(meta, "table") and check_for_null_type_columns_from_forward_refs(
|
||||
meta
|
||||
):
|
||||
if meta.tablename == 'albums':
|
||||
meta.constraints.append(ForeignKeyConstraint(['artist'],['artists.id'],
|
||||
ondelete='CASCADE',
|
||||
onupdate='CASCADE'))
|
||||
for constraint in meta.constraints:
|
||||
if isinstance(constraint, sqlalchemy.UniqueConstraint):
|
||||
constraint.name = (
|
||||
f"uc_{meta.tablename}_"
|
||||
f'{"_".join([str(col) for col in constraint._pending_colargs])}'
|
||||
)
|
||||
table = sqlalchemy.Table(
|
||||
meta.tablename,
|
||||
meta.metadata,
|
||||
*[copy.deepcopy(col) for col in meta.columns],
|
||||
*meta.constraints,
|
||||
meta.tablename, meta.metadata, *meta.columns, *meta.constraints,
|
||||
)
|
||||
if meta.tablename == 'albums':
|
||||
pass
|
||||
meta.table = table
|
||||
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@ from ormar.models.helpers import (
|
||||
populate_meta_tablename_columns_and_pk,
|
||||
register_relation_in_alias_manager,
|
||||
)
|
||||
from ormar.models.helpers.sqlalchemy import sqlalchemy_columns_from_model_fields
|
||||
from ormar.models.quick_access_views import quick_access_set
|
||||
from ormar.queryset import QuerySet
|
||||
from ormar.relations.alias_manager import AliasManager
|
||||
@ -348,20 +349,25 @@ def copy_and_replace_m2m_through_model(
|
||||
new_meta: ormar.ModelMeta = type( # type: ignore
|
||||
"Meta", (), dict(through_class.Meta.__dict__),
|
||||
)
|
||||
copy_name = through_class.__name__ + attrs.get("__name__", "")
|
||||
copy_through = type(copy_name, (ormar.Model,), {"Meta": new_meta})
|
||||
new_meta.tablename += "_" + meta.tablename
|
||||
# create new table with copied columns but remove foreign keys
|
||||
# they will be populated later in expanding reverse relation
|
||||
if hasattr(new_meta, "table"):
|
||||
del new_meta.table
|
||||
new_meta.columns = [col for col in new_meta.columns if not col.foreign_keys]
|
||||
new_meta.model_fields = {
|
||||
name: field
|
||||
for name, field in new_meta.model_fields.items()
|
||||
if not issubclass(field, ForeignKeyField)
|
||||
}
|
||||
if hasattr(new_meta, "column"):
|
||||
del new_meta.columns
|
||||
_, columns = sqlalchemy_columns_from_model_fields(
|
||||
new_meta.model_fields, copy_through
|
||||
) # type: ignore
|
||||
new_meta.columns = columns
|
||||
populate_meta_sqlalchemy_table_if_required(new_meta)
|
||||
copy_name = through_class.__name__ + attrs.get("__name__", "")
|
||||
copy_through = type(copy_name, (ormar.Model,), {"Meta": new_meta})
|
||||
copy_field.through = copy_through
|
||||
|
||||
parent_fields[field_name] = copy_field
|
||||
|
||||
@ -590,6 +590,7 @@ class QuerySet:
|
||||
expr = FilterQuery(filter_clauses=self.filter_clauses).apply(
|
||||
self.table.delete()
|
||||
)
|
||||
print("\n", expr.compile(compile_kwargs={"literal_binds": True}))
|
||||
return await self.database.execute(expr)
|
||||
|
||||
def paginate(self, page: int, page_size: int = 20) -> "QuerySet":
|
||||
|
||||
@ -1,15 +1,25 @@
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import databases
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
from sqlalchemy import ForeignKeyConstraint, create_engine, inspect
|
||||
from sqlalchemy import (
|
||||
Column,
|
||||
ForeignKey,
|
||||
ForeignKeyConstraint,
|
||||
Integer,
|
||||
String,
|
||||
Table,
|
||||
create_engine,
|
||||
inspect,
|
||||
)
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
import ormar
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
database = databases.Database(DATABASE_URL)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL, echo=True)
|
||||
|
||||
@ -29,31 +39,34 @@ class Album(ormar.Model):
|
||||
tablename = "albums"
|
||||
metadata = metadata
|
||||
database = database
|
||||
constraint = []
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
artist: Optional[Artist] = ormar.ForeignKey(Artist, ondelete='CASCADE')
|
||||
artist: Optional[Artist] = ormar.ForeignKey(Artist, ondelete="CASCADE")
|
||||
|
||||
|
||||
class Track(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "tracks"
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
album: Optional[Album] = ormar.ForeignKey(Album, ondelete='CASCADE')
|
||||
title: str = ormar.String(max_length=100)
|
||||
#
|
||||
# class Track(ormar.Model):
|
||||
# class Meta:
|
||||
# tablename = "tracks"
|
||||
# metadata = metadata
|
||||
# database = database
|
||||
#
|
||||
# id: int = ormar.Integer(primary_key=True)
|
||||
# album: Optional[Album] = ormar.ForeignKey(Album, ondelete='CASCADE')
|
||||
# title: str = ormar.String(max_length=100)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
if "sqlite" in DATABASE_URL:
|
||||
with engine.connect() as connection:
|
||||
connection.execute("PRAGMA foreign_keys = ON;")
|
||||
# if "sqlite" in DATABASE_URL:
|
||||
# with engine.connect() as connection:
|
||||
# connection.execute("PRAGMA foreign_keys = ON;")
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
# tables = list(metadata.tables.values())
|
||||
# tab = Album.Meta.table
|
||||
# breakpoint()
|
||||
yield
|
||||
# metadata.drop_all(engine)
|
||||
|
||||
@ -62,11 +75,13 @@ def create_test_database():
|
||||
async def test_simple_cascade():
|
||||
async with database:
|
||||
# async with database.transaction(force_rollback=True):
|
||||
artist = await Artist(name='Dr Alban').save()
|
||||
artist = await Artist(name="Dr Alban").save()
|
||||
await Album(name="Jamaica", artist=artist).save()
|
||||
await Artist.objects.delete(id=artist.id)
|
||||
|
||||
artists = await Artist.objects.all()
|
||||
assert len(artists) == 0
|
||||
# breakpoint()
|
||||
|
||||
async with database:
|
||||
albums = await Album.objects.all()
|
||||
assert len(albums) == 0
|
||||
|
||||
Reference in New Issue
Block a user