Merge pull request #92 from collerek/cascades

Cascades
This commit is contained in:
collerek
2021-02-02 18:52:37 +07:00
committed by GitHub
14 changed files with 402 additions and 27 deletions

View File

@ -51,10 +51,6 @@ jobs:
run: |
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Run sqlite
env:
DATABASE_URL: "sqlite:///testsuite"
run: bash scripts/test.sh
- name: Run postgres
env:
DATABASE_URL: "postgresql://username:password@localhost:5432/testsuite"
@ -63,6 +59,10 @@ jobs:
env:
DATABASE_URL: "mysql://username:password@127.0.0.1:3306/testsuite"
run: bash scripts/test.sh
- name: Run sqlite
env:
DATABASE_URL: "sqlite:///testsuite"
run: bash scripts/test.sh
- run: mypy --config-file mypy.ini ormar tests
- name: Upload coverage
uses: codecov/codecov-action@v1

View File

@ -38,7 +38,36 @@ Current options are:
**Arguments**:
- `new_model (Model class)`: newly constructed Model
- `model_fields (Union[Dict[str, type], Dict])`:
- `model_fields (Union[Dict[str, type], Dict])`: dict of model fields
<a name="models.helpers.models.substitue_backend_pool_for_sqlite"></a>
#### substitue\_backend\_pool\_for\_sqlite
```python
substitue_backend_pool_for_sqlite(new_model: Type["Model"]) -> None
```
Recreates Connection pool for sqlite3 with new factory that
executes "PRAGMA foreign_keys=1; on initialization to enable foreign keys.
**Arguments**:
- `new_model (Model class)`: newly declared ormar Model
<a name="models.helpers.models.check_required_meta_parameters"></a>
#### check\_required\_meta\_parameters
```python
check_required_meta_parameters(new_model: Type["Model"]) -> None
```
Verifies if ormar.Model has database and metadata set.
Recreates Connection pool for sqlite3
**Arguments**:
- `new_model (Model class)`: newly declared ormar Model
<a name="models.helpers.models.extract_annotations_and_default_vals"></a>
#### extract\_annotations\_and\_default\_vals

View File

@ -30,6 +30,8 @@ nested models in result.
**Arguments**:
- `current_relation_str (str)`: name of the relation field
- `source_model (Type[Model])`: model on which relation was defined
- `row (sqlalchemy.engine.result.ResultProxy)`: raw result row from the database
- `select_related (List)`: list of names of related models fetched from database
- `related_models (Union[List, Dict])`: list or dict of related models

View File

@ -1,3 +1,31 @@
# 0.9.0
## Important
* **Braking Fix:** Version 0.8.0 introduced a bug that prevents generation of foreign_keys constraint in the database,
both in alembic and during creation through sqlalchemy.engine, this is fixed now.
* **THEREFORE IF YOU USE VERSION >=0.8.0 YOU ARE STRONGLY ADVISED TO UPDATE** cause despite
that most of the `ormar` functions are working your database **CREATED with ormar (or ormar + alembic)**
does not have relations and suffer from perspective of performance and data integrity.
* If you were using `ormar` to connect to existing database your performance and integrity
should be fine nevertheless you should update to reflect all future schema updates in your models.
## Breaking
* **Breaking:** All foreign_keys and unique constraints now have a name so `alembic`
can identify them in db and not depend on db
* **Breaking:** During model construction if `Meta` class of the `Model` does not
include `metadata` or `database` now `ModelDefinitionError` will be raised instead of generic `AttributeError`.
* **Breaking:** `encode/databases` used for running the queries does not have a connection pool
for sqlite backend, meaning that each querry is run with a new connection and there is no way to
enable enforcing ForeignKeys constraints as those are by default turned off on every connection.
This is changed in `ormar` since >=0.9.0 and by default each sqlite3 query has `"PRAGMA foreign_keys=1;"`
run so now each sqlite3 connection by default enforces ForeignKey constraints including cascades.
## Other
* Update api docs.
* Add tests for fk creation in db and for cascades in db
# 0.8.1
## Features

View File

@ -65,7 +65,7 @@ class UndefinedType: # pragma no cover
Undefined = UndefinedType()
__version__ = "0.8.1"
__version__ = "0.9.0"
__all__ = [
"Integer",
"BigInteger",

View File

@ -211,12 +211,17 @@ class BaseField(FieldInfo):
:return: List of sqlalchemy foreign keys - by default one.
:rtype: List[sqlalchemy.schema.ForeignKey]
"""
return [
sqlalchemy.schema.ForeignKey(
con.name, ondelete=con.ondelete, onupdate=con.onupdate
constraints = [
sqlalchemy.ForeignKey(
con.reference,
ondelete=con.ondelete,
onupdate=con.onupdate,
name=f"fk_{cls.owner.Meta.tablename}_{cls.to.Meta.tablename}"
f"_{cls.to.get_column_alias(cls.to.Meta.pkname)}_{cls.name}",
)
for con in cls.constraints
]
return constraints
@classmethod
def get_column(cls, name: str) -> sqlalchemy.Column:
@ -230,7 +235,7 @@ class BaseField(FieldInfo):
:return: actual definition of the database column as sqlalchemy requires.
:rtype: sqlalchemy.Column
"""
return sqlalchemy.Column(
column = sqlalchemy.Column(
cls.alias or name,
cls.column_type,
*cls.construct_constraints(),
@ -241,6 +246,7 @@ class BaseField(FieldInfo):
default=cls.default,
server_default=cls.server_default,
)
return column
@classmethod
def expand_relationship(

View File

@ -3,6 +3,7 @@ import uuid
from dataclasses import dataclass
from typing import Any, List, Optional, TYPE_CHECKING, Tuple, Type, Union
import sqlalchemy
from pydantic import BaseModel, create_model
from pydantic.typing import ForwardRef, evaluate_forwardref
from sqlalchemy import UniqueConstraint
@ -103,7 +104,7 @@ def populate_fk_params_based_on_to_model(
)
constraints = [
ForeignKeyConstraint(
name=fk_string, ondelete=ondelete, onupdate=onupdate # type: ignore
reference=fk_string, ondelete=ondelete, onupdate=onupdate, name=None
)
]
column_type = to_field.column_type
@ -124,9 +125,10 @@ class ForeignKeyConstraint:
to produce sqlalchemy.ForeignKeys
"""
name: str
ondelete: str
onupdate: str
reference: Union[str, sqlalchemy.Column]
name: Optional[str]
ondelete: Optional[str]
onupdate: Optional[str]
def ForeignKey( # noqa CFQ002

View File

@ -1,10 +1,11 @@
import itertools
import sqlite3
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type
from pydantic.typing import ForwardRef
import ormar # noqa: I100
from ormar.fields.foreign_key import ForeignKeyField
from ormar.models.helpers.pydantic import populate_pydantic_default_values
from pydantic.typing import ForwardRef
if TYPE_CHECKING: # pragma no cover
from ormar import Model
@ -41,7 +42,7 @@ def populate_default_options_values(
:param new_model: newly constructed Model
:type new_model: Model class
:param model_fields:
:param model_fields: dict of model fields
:type model_fields: Union[Dict[str, type], Dict]
"""
if not hasattr(new_model.Meta, "constraints"):
@ -59,6 +60,54 @@ def populate_default_options_values(
new_model.Meta.requires_ref_update = False
class Connection(sqlite3.Connection):
def __init__(self, *args: Any, **kwargs: Any) -> None: # pragma: no cover
super().__init__(*args, **kwargs)
self.execute("PRAGMA foreign_keys=1;")
def substitue_backend_pool_for_sqlite(new_model: Type["Model"]) -> None:
"""
Recreates Connection pool for sqlite3 with new factory that
executes "PRAGMA foreign_keys=1; on initialization to enable foreign keys.
:param new_model: newly declared ormar Model
:type new_model: Model class
"""
backend = new_model.Meta.database._backend
if (
backend._dialect.name == "sqlite" and "factory" not in backend._options
): # pragma: no cover
backend._options["factory"] = Connection
old_pool = backend._pool
backend._pool = old_pool.__class__(backend._database_url, **backend._options)
def check_required_meta_parameters(new_model: Type["Model"]) -> None:
"""
Verifies if ormar.Model has database and metadata set.
Recreates Connection pool for sqlite3
:param new_model: newly declared ormar Model
:type new_model: Model class
"""
if not hasattr(new_model.Meta, "database"):
if not getattr(new_model.Meta, "abstract", False):
raise ormar.ModelDefinitionError(
f"{new_model.__name__} does not have database defined."
)
else:
substitue_backend_pool_for_sqlite(new_model=new_model)
if not hasattr(new_model.Meta, "metadata"):
if not getattr(new_model.Meta, "abstract", False):
raise ormar.ModelDefinitionError(
f"{new_model.__name__} does not have metadata defined."
)
def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]:
"""
Extracts annotations from class namespace dict and triggers

View File

@ -1,4 +1,3 @@
import copy
import logging
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
@ -80,10 +79,12 @@ def create_and_append_m2m_fk(
model.Meta.tablename + "." + pk_alias,
ondelete="CASCADE",
onupdate="CASCADE",
name=f"fk_{model_field.through.Meta.tablename}_{model.Meta.tablename}"
f"_{field_name}_{pk_alias}",
),
)
model_field.through.Meta.columns.append(column)
model_field.through.Meta.table.append_column(copy.deepcopy(column))
model_field.through.Meta.table.append_column(column)
def check_pk_column_validity(
@ -234,12 +235,16 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
if not hasattr(meta, "table") and check_for_null_type_columns_from_forward_refs(
meta
):
meta.table = sqlalchemy.Table(
meta.tablename,
meta.metadata,
*[copy.deepcopy(col) for col in meta.columns],
*meta.constraints,
for constraint in meta.constraints:
if isinstance(constraint, sqlalchemy.UniqueConstraint):
constraint.name = (
f"uc_{meta.tablename}_"
f'{"_".join([str(col) for col in constraint._pending_colargs])}'
)
table = sqlalchemy.Table(
meta.tablename, meta.metadata, *meta.columns, *meta.constraints,
)
meta.table = table
def update_column_definition(

View File

@ -33,6 +33,8 @@ from ormar.models.helpers import (
populate_meta_tablename_columns_and_pk,
register_relation_in_alias_manager,
)
from ormar.models.helpers.models import check_required_meta_parameters
from ormar.models.helpers.sqlalchemy import sqlalchemy_columns_from_model_fields
from ormar.models.quick_access_views import quick_access_set
from ormar.queryset import QuerySet
from ormar.relations.alias_manager import AliasManager
@ -348,20 +350,23 @@ def copy_and_replace_m2m_through_model(
new_meta: ormar.ModelMeta = type( # type: ignore
"Meta", (), dict(through_class.Meta.__dict__),
)
copy_name = through_class.__name__ + attrs.get("__name__", "")
copy_through = type(copy_name, (ormar.Model,), {"Meta": new_meta})
new_meta.tablename += "_" + meta.tablename
# create new table with copied columns but remove foreign keys
# they will be populated later in expanding reverse relation
if hasattr(new_meta, "table"):
del new_meta.table
new_meta.columns = [col for col in new_meta.columns if not col.foreign_keys]
new_meta.model_fields = {
name: field
for name, field in new_meta.model_fields.items()
if not issubclass(field, ForeignKeyField)
}
_, columns = sqlalchemy_columns_from_model_fields(
new_meta.model_fields, copy_through
) # type: ignore
new_meta.columns = columns
populate_meta_sqlalchemy_table_if_required(new_meta)
copy_name = through_class.__name__ + attrs.get("__name__", "")
copy_through = type(copy_name, (ormar.Model,), {"Meta": new_meta})
copy_field.through = copy_through
parent_fields[field_name] = copy_field
@ -578,6 +583,7 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
if hasattr(new_model, "Meta"):
populate_default_options_values(new_model, model_fields)
check_required_meta_parameters(new_model)
add_property_fields(new_model, attrs)
register_signals(new_model=new_model)
populate_choices_validators(new_model)

View File

@ -62,6 +62,10 @@ class Model(NewBaseModel):
where rows are populated in a different way as they do not have
nested models in result.
:param current_relation_str: name of the relation field
:type current_relation_str: str
:param source_model: model on which relation was defined
:type source_model: Type[Model]
:param row: raw result row from the database
:type row: sqlalchemy.engine.result.ResultProxy
:param select_related: list of names of related models fetched from database

156
tests/test_cascades.py Normal file
View File

@ -0,0 +1,156 @@
from typing import Optional
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class Band(ormar.Model):
class Meta:
tablename = "bands"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class ArtistsBands(ormar.Model):
class Meta:
tablename = "artists_x_bands"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
class Artist(ormar.Model):
class Meta:
tablename = "artists"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
bands = ormar.ManyToMany(Band, through=ArtistsBands)
class Album(ormar.Model):
class Meta:
tablename = "albums"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
artist: Optional[Artist] = ormar.ForeignKey(Artist, ondelete="CASCADE")
class Track(ormar.Model):
class Meta:
tablename = "tracks"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
album: Optional[Album] = ormar.ForeignKey(Album, ondelete="CASCADE")
title: str = ormar.String(max_length=100)
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.drop_all(engine)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.fixture(scope="function")
async def cleanup():
yield
async with database:
await Band.objects.delete(each=True)
await Artist.objects.delete(each=True)
@pytest.mark.asyncio
async def test_simple_cascade(cleanup):
async with database:
artist = await Artist(name="Dr Alban").save()
await Album(name="Jamaica", artist=artist).save()
await Artist.objects.delete(id=artist.id)
artists = await Artist.objects.all()
assert len(artists) == 0
albums = await Album.objects.all()
assert len(albums) == 0
@pytest.mark.asyncio
async def test_nested_cascade(cleanup):
async with database:
artist = await Artist(name="Dr Alban").save()
album = await Album(name="Jamaica", artist=artist).save()
await Track(title="Yuhu", album=album).save()
await Artist.objects.delete(id=artist.id)
artists = await Artist.objects.all()
assert len(artists) == 0
albums = await Album.objects.all()
assert len(albums) == 0
tracks = await Track.objects.all()
assert len(tracks) == 0
@pytest.mark.asyncio
async def test_many_to_many_cascade(cleanup):
async with database:
artist = await Artist(name="Dr Alban").save()
band = await Band(name="Scorpions").save()
await artist.bands.add(band)
check = await Artist.objects.select_related("bands").get()
assert check.bands[0].name == "Scorpions"
await Artist.objects.delete(id=artist.id)
artists = await Artist.objects.all()
assert len(artists) == 0
bands = await Band.objects.all()
assert len(bands) == 1
connections = await ArtistsBands.objects.all()
assert len(connections) == 0
@pytest.mark.asyncio
async def test_reverse_many_to_many_cascade(cleanup):
async with database:
artist = await Artist(name="Dr Alban").save()
band = await Band(name="Scorpions").save()
await artist.bands.add(band)
check = await Artist.objects.select_related("bands").get()
assert check.bands[0].name == "Scorpions"
await Band.objects.delete(id=band.id)
artists = await Artist.objects.all()
assert len(artists) == 1
connections = await ArtistsBands.objects.all()
assert len(connections) == 0
bands = await Band.objects.all()
assert len(bands) == 0

View File

@ -0,0 +1,55 @@
from typing import Optional
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
engine = sqlalchemy.create_engine(DATABASE_URL)
class Artist(ormar.Model):
class Meta:
tablename = "artists"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
class Album(ormar.Model):
class Meta:
tablename = "albums"
metadata = metadata
database = database
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
artist: Optional[Artist] = ormar.ForeignKey(Artist, ondelete="CASCADE")
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
metadata.drop_all(engine)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
def test_simple_cascade():
inspector = sqlalchemy.inspect(engine)
columns = inspector.get_columns("albums")
assert len(columns) == 3
col_names = [col.get("name") for col in columns]
assert sorted(["id", "name", "artist"]) == sorted(col_names)
fks = inspector.get_foreign_keys("albums")
assert len(fks) == 1
assert fks[0]["name"] == "fk_albums_artists_id_artist"
assert fks[0]["constrained_columns"][0] == "artist"
assert fks[0]["referred_columns"][0] == "id"
assert fks[0]["options"].get("ondelete") == "CASCADE"

View File

@ -3,6 +3,7 @@ import asyncio
import datetime
import decimal
import databases
import pydantic
import pytest
import sqlalchemy
@ -15,11 +16,14 @@ from tests.settings import DATABASE_URL
metadata = sqlalchemy.MetaData()
database = databases.Database(DATABASE_URL, force_rollback=True)
class ExampleModel(Model):
class Meta:
tablename = "example"
metadata = metadata
database = database
test: int = ormar.Integer(primary_key=True)
test_string: str = ormar.String(max_length=250)
@ -52,6 +56,7 @@ class ExampleModel2(Model):
class Meta:
tablename = "examples"
metadata = metadata
database = database
test: int = ormar.Integer(primary_key=True)
test_string: str = ormar.String(max_length=250)
@ -112,6 +117,29 @@ def test_model_attribute_json_access(example):
assert example.test_json == dict(aa=12)
def test_missing_metadata():
with pytest.raises(ModelDefinitionError):
class JsonSample2(ormar.Model):
class Meta:
tablename = "jsons2"
database = database
id: int = ormar.Integer(primary_key=True)
test_json = ormar.JSON(nullable=True)
def test_missing_database():
with pytest.raises(ModelDefinitionError):
class JsonSample3(ormar.Model):
class Meta:
tablename = "jsons3"
id: int = ormar.Integer(primary_key=True)
test_json = ormar.JSON(nullable=True)
def test_non_existing_attr(example):
with pytest.raises(ValueError):
example.new_attr = 12
@ -137,12 +165,13 @@ def test_sqlalchemy_table_is_created(example):
@typing.no_type_check
def test_no_pk_in_model_definition(): # type: ignore
def test_no_pk_in_model_definition():
with pytest.raises(ModelDefinitionError): # type: ignore
class ExampleModel2(Model): # type: ignore
class Meta:
tablename = "example2"
database = database
metadata = metadata
test_string: str = ormar.String(max_length=250) # type: ignore
@ -156,6 +185,7 @@ def test_two_pks_in_model_definition():
class ExampleModel2(Model):
class Meta:
tablename = "example3"
database = database
metadata = metadata
id: int = ormar.Integer(primary_key=True)
@ -169,6 +199,7 @@ def test_setting_pk_column_as_pydantic_only_in_model_definition():
class ExampleModel2(Model):
class Meta:
tablename = "example4"
database = database
metadata = metadata
test: int = ormar.Integer(primary_key=True, pydantic_only=True)
@ -181,6 +212,7 @@ def test_decimal_error_in_model_definition():
class ExampleModel2(Model):
class Meta:
tablename = "example5"
database = database
metadata = metadata
test: decimal.Decimal = ormar.Decimal(primary_key=True)
@ -193,6 +225,7 @@ def test_string_error_in_model_definition():
class ExampleModel2(Model):
class Meta:
tablename = "example6"
database = database
metadata = metadata
test: str = ormar.String(primary_key=True)