switch pool for sqlite with new factory, add tests if fks are reflected in db

This commit is contained in:
collerek
2021-02-02 11:33:49 +01:00
parent aea6200bfd
commit d436f54643
9 changed files with 154 additions and 37 deletions

View File

@ -214,7 +214,7 @@ class Integer(ModelFieldFactory, int):
:return: initialized column with proper options
:rtype: sqlalchemy Column
"""
return sqlalchemy.Integer
return sqlalchemy.Integer()
class Text(ModelFieldFactory, str):

View File

@ -1,10 +1,11 @@
import itertools
import sqlite3
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type
from pydantic.typing import ForwardRef
import ormar # noqa: I100
from ormar.fields.foreign_key import ForeignKeyField
from ormar.models.helpers.pydantic import populate_pydantic_default_values
from pydantic.typing import ForwardRef
if TYPE_CHECKING: # pragma no cover
from ormar import Model
@ -41,7 +42,7 @@ def populate_default_options_values(
:param new_model: newly constructed Model
:type new_model: Model class
:param model_fields:
:param model_fields: dict of model fields
:type model_fields: Union[Dict[str, type], Dict]
"""
if not hasattr(new_model.Meta, "constraints"):
@ -59,6 +60,54 @@ def populate_default_options_values(
new_model.Meta.requires_ref_update = False
class Connection(sqlite3.Connection):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.execute("PRAGMA foreign_keys=1;")
def substitue_backend_pool_for_sqlite(new_model: Type["Model"]) -> None:
"""
Recreates Connection pool for sqlite3 with new factory that
executes "PRAGMA foreign_keys=1; on initialization to enable foreign keys.
:param new_model: newly declared ormar Model
:type new_model: Model class
"""
backend = new_model.Meta.database._backend
if (
backend._dialect.name == "sqlite" and "factory" not in backend._options
): # pragma: no cover
backend._options["factory"] = Connection
old_pool = backend._pool
backend._pool = old_pool.__class__(backend._database_url, **backend._options)
def check_required_meta_parameters(new_model: Type["Model"]) -> None:
"""
Verifies if ormar.Model has database and metadata set.
Recreates Connection pool for sqlite3
:param new_model: newly declared ormar Model
:type new_model: Model class
"""
if not hasattr(new_model.Meta, "database"):
if not getattr(new_model.Meta, "abstract", False):
raise ormar.ModelDefinitionError(
f"{new_model.__name__} does not have database defined."
)
else:
substitue_backend_pool_for_sqlite(new_model=new_model)
if not hasattr(new_model.Meta, "metadata"):
if not getattr(new_model.Meta, "abstract", False):
raise ormar.ModelDefinitionError(
f"{new_model.__name__} does not have metadata defined."
)
def extract_annotations_and_default_vals(attrs: Dict) -> Tuple[Dict, Dict]:
"""
Extracts annotations from class namespace dict and triggers

View File

@ -33,6 +33,7 @@ from ormar.models.helpers import (
populate_meta_tablename_columns_and_pk,
register_relation_in_alias_manager,
)
from ormar.models.helpers.models import check_required_meta_parameters
from ormar.models.helpers.sqlalchemy import sqlalchemy_columns_from_model_fields
from ormar.models.quick_access_views import quick_access_set
from ormar.queryset import QuerySet
@ -582,6 +583,7 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
if hasattr(new_model, "Meta"):
populate_default_options_values(new_model, model_fields)
check_required_meta_parameters(new_model)
add_property_fields(new_model, attrs)
register_signals(new_model=new_model)
populate_choices_validators(new_model)

View File

@ -62,6 +62,10 @@ class Model(NewBaseModel):
where rows are populated in a different way as they do not have
nested models in result.
:param current_relation_str: name of the relation field
:type current_relation_str: str
:param source_model: model on which relation was defined
:type source_model: Type[Model]
:param row: raw result row from the database
:type row: sqlalchemy.engine.result.ResultProxy
:param select_related: list of names of related models fetched from database

View File

@ -590,7 +590,6 @@ class QuerySet:
expr = FilterQuery(filter_clauses=self.filter_clauses).apply(
self.table.delete()
)
print("\n", expr.compile(compile_kwargs={"literal_binds": True}))
return await self.database.execute(expr)
def paginate(self, page: int, page_size: int = 20) -> "QuerySet":