Merge pull request #346 from collerek/add_index
Add multi columns non-unique index and sql_nullable setting. Important performance booster for dict().
This commit is contained in:
@ -657,7 +657,8 @@ The following keyword arguments are supported on all field types.
|
||||
|
||||
All fields are required unless one of the following is set:
|
||||
|
||||
* `nullable` - Creates a nullable column. Sets the default to `None`.
|
||||
* `nullable` - Creates a nullable column. Sets the default to `False`. Read the fields common parameters for details.
|
||||
* `sql_nullable` - Used to set different setting for pydantic and the database. Sets the default to `nullable` value. Read the fields common parameters for details.
|
||||
* `default` - Set a default value for the field. **Not available for relation fields**
|
||||
* `server_default` - Set a default value for the field on server side (like sqlalchemy's `func.now()`). **Not available for relation fields**
|
||||
* `primary key` with `autoincrement` - When a column is set to primary key and autoincrement is set on this column.
|
||||
|
||||
@ -22,18 +22,41 @@ Used both in sql and pydantic (changes pk field to optional for autoincrement).
|
||||
|
||||
## nullable
|
||||
|
||||
`nullable`: `bool` = `not primary_key` -> defaults to False for primary key column, and True for all other.
|
||||
`nullable`: `bool` = `False` -> defaults to False for all fields except relation fields.
|
||||
|
||||
Automatically changed to True if user provide one of the following:
|
||||
|
||||
* `default` value or function is provided
|
||||
* `server_default` value or function is provided
|
||||
* `autoincrement` is set on `Integer` `primary_key` field
|
||||
* **[DEPRECATED]**`pydantic_only=True` is set
|
||||
|
||||
Specifies if field is optional or required, used both with sql and pydantic.
|
||||
|
||||
By default, used for both `pydantic` and `sqlalchemy` as those are the most common settings:
|
||||
|
||||
* `nullable=False` - means database column is not null and field is required in pydantic
|
||||
* `nullable=True` - means database column is null and field is optional in pydantic
|
||||
|
||||
If you want to set different setting for pydantic and the database see `sql_nullable` below.
|
||||
|
||||
!!!note
|
||||
By default all `ForeignKeys` are also nullable, meaning the related `Model` is not required.
|
||||
|
||||
If you change the `ForeignKey` column to `nullable=False`, it becomes required.
|
||||
|
||||
|
||||
!!!info
|
||||
If you want to know more about how you can preload related models during queries and how the relations work read the [queries][queries] and [relations][relations] sections.
|
||||
## sql_nullable
|
||||
|
||||
`sql_nullable`: `bool` = `nullable` -> defaults to the value of nullable (described above).
|
||||
|
||||
Specifies if field is not null or allows nulls in the database only.
|
||||
|
||||
Use this setting in combination with `nullable` only if you want to set different options on pydantic model and in the database.
|
||||
|
||||
A sample usage might be i.e. making field not null in the database, but allow this field to be nullable in pydantic (i.e. with `server_default` value).
|
||||
That will prevent the updates of the field to null (as with `server_default` set you cannot insert null values already as the default value would be used)
|
||||
|
||||
|
||||
|
||||
## default
|
||||
|
||||
@ -660,7 +660,8 @@ The following keyword arguments are supported on all field types.
|
||||
|
||||
All fields are required unless one of the following is set:
|
||||
|
||||
* `nullable` - Creates a nullable column. Sets the default to `None`.
|
||||
* `nullable` - Creates a nullable column. Sets the default to `False`. Read the fields common parameters for details.
|
||||
* `sql_nullable` - Used to set different setting for pydantic and the database. Sets the default to `nullable` value. Read the fields common parameters for details.
|
||||
* `default` - Set a default value for the field. **Not available for relation fields**
|
||||
* `server_default` - Set a default value for the field on server side (like sqlalchemy's `func.now()`). **Not available for relation fields**
|
||||
* `primary key` with `autoincrement` - When a column is set to primary key and autoincrement is set on this column.
|
||||
|
||||
@ -357,11 +357,16 @@ You can overwrite this parameter by providing `Meta` class `tablename` argument.
|
||||
|
||||
On a model level you can also set model-wise constraints on sql columns.
|
||||
|
||||
Right now only `UniqueColumns` constraint is present.
|
||||
Right now only `IndexColumns` and `UniqueColumns` constraints are supported.
|
||||
|
||||
!!!note
|
||||
Note that both constraints should be used only if you want to set a name on constraint or want to set the index on multiple columns, otherwise `index` and `unique` properties on ormar fields are preferred.
|
||||
|
||||
!!!tip
|
||||
To read more about columns constraints like `primary_key`, `unique`, `ForeignKey` etc. visit [fields][fields].
|
||||
|
||||
#### UniqueColumns
|
||||
|
||||
You can set this parameter by providing `Meta` class `constraints` argument.
|
||||
|
||||
```Python hl_lines="14-17"
|
||||
@ -373,6 +378,20 @@ You can set this parameter by providing `Meta` class `constraints` argument.
|
||||
To set one column as unique use [`unique`](../fields/common-parameters.md#unique) common parameter.
|
||||
Of course you can set many columns as unique with this param but each of them will be checked separately.
|
||||
|
||||
#### IndexColumns
|
||||
|
||||
You can set this parameter by providing `Meta` class `constraints` argument.
|
||||
|
||||
```Python hl_lines="14-17"
|
||||
--8<-- "../docs_src/models/docs017.py"
|
||||
```
|
||||
|
||||
!!!note
|
||||
Note that constraints are meant for combination of columns that should be in the index.
|
||||
To set one column index use [`unique`](../fields/common-parameters.md#index) common parameter.
|
||||
Of course, you can set many columns as indexes with this param but each of them will be a separate index.
|
||||
|
||||
|
||||
### Pydantic configuration
|
||||
|
||||
As each `ormar.Model` is also a `pydantic` model, you might want to tweak the settings of the pydantic configuration.
|
||||
|
||||
@ -1,3 +1,21 @@
|
||||
# 0.10.19
|
||||
|
||||
## ✨ Features
|
||||
|
||||
* Add support for multi-column non-unique `IndexColumns` in `Meta.constraints` [#307](https://github.com/collerek/ormar/issues/307)
|
||||
* Add `sql_nullable` field attribute that allows to set different nullable setting for pydantic model and for underlying sql column [#308](https://github.com/collerek/ormar/issues/308)
|
||||
|
||||
## 🐛 Fixes
|
||||
|
||||
* Enable caching of relation map to increase performance [#337](https://github.com/collerek/ormar/issues/337)
|
||||
* Clarify and fix documentation in regard of nullable fields [#339](https://github.com/collerek/ormar/issues/339)
|
||||
|
||||
## 💬 Other
|
||||
|
||||
* Bump supported `databases` version to `<=5.2`.
|
||||
|
||||
|
||||
|
||||
# 0.10.18
|
||||
|
||||
## 🐛 Fixes
|
||||
|
||||
21
docs_src/models/docs017.py
Normal file
21
docs_src/models/docs017.py
Normal file
@ -0,0 +1,21 @@
|
||||
import databases
|
||||
import sqlalchemy
|
||||
|
||||
import ormar
|
||||
|
||||
database = databases.Database("sqlite:///db.sqlite")
|
||||
metadata = sqlalchemy.MetaData()
|
||||
|
||||
|
||||
class Course(ormar.Model):
|
||||
class Meta:
|
||||
database = database
|
||||
metadata = metadata
|
||||
# define your constraints in Meta class of the model
|
||||
# it's a list that can contain multiple constraints
|
||||
# hera a combination of name and column will have a compound index in the db
|
||||
constraints = [ormar.IndexColumns("name", "completed")]
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
completed: bool = ormar.Boolean(default=False)
|
||||
@ -51,6 +51,7 @@ from ormar.fields import (
|
||||
Float,
|
||||
ForeignKey,
|
||||
ForeignKeyField,
|
||||
IndexColumns,
|
||||
Integer,
|
||||
JSON,
|
||||
LargeBinary,
|
||||
@ -77,7 +78,7 @@ class UndefinedType: # pragma no cover
|
||||
|
||||
Undefined = UndefinedType()
|
||||
|
||||
__version__ = "0.10.18"
|
||||
__version__ = "0.10.19"
|
||||
__all__ = [
|
||||
"Integer",
|
||||
"BigInteger",
|
||||
@ -102,6 +103,7 @@ __all__ = [
|
||||
"Undefined",
|
||||
"UUID",
|
||||
"UniqueColumns",
|
||||
"IndexColumns",
|
||||
"QuerySetProtocol",
|
||||
"RelationProtocol",
|
||||
"ModelMeta",
|
||||
|
||||
@ -5,7 +5,8 @@ as well as relation Fields (ForeignKey, ManyToMany).
|
||||
Also a definition for custom CHAR based sqlalchemy UUID field
|
||||
"""
|
||||
from ormar.fields.base import BaseField
|
||||
from ormar.fields.foreign_key import ForeignKey, ForeignKeyField, UniqueColumns
|
||||
from ormar.fields.constraints import IndexColumns, UniqueColumns
|
||||
from ormar.fields.foreign_key import ForeignKey, ForeignKeyField
|
||||
from ormar.fields.many_to_many import ManyToMany, ManyToManyField
|
||||
from ormar.fields.model_fields import (
|
||||
BigInteger,
|
||||
@ -36,6 +37,7 @@ __all__ = [
|
||||
"DateTime",
|
||||
"String",
|
||||
"JSON",
|
||||
"IndexColumns",
|
||||
"Integer",
|
||||
"Text",
|
||||
"Float",
|
||||
@ -45,7 +47,6 @@ __all__ = [
|
||||
"ManyToMany",
|
||||
"ManyToManyField",
|
||||
"BaseField",
|
||||
"UniqueColumns",
|
||||
"ForeignKeyField",
|
||||
"ThroughField",
|
||||
"Through",
|
||||
@ -54,4 +55,5 @@ __all__ = [
|
||||
"DECODERS_MAP",
|
||||
"ENCODERS_MAP",
|
||||
"LargeBinary",
|
||||
"UniqueColumns",
|
||||
]
|
||||
|
||||
@ -43,6 +43,7 @@ class BaseField(FieldInfo):
|
||||
self.primary_key: bool = kwargs.pop("primary_key", False)
|
||||
self.autoincrement: bool = kwargs.pop("autoincrement", False)
|
||||
self.nullable: bool = kwargs.pop("nullable", False)
|
||||
self.sql_nullable: bool = kwargs.pop("sql_nullable", False)
|
||||
self.index: bool = kwargs.pop("index", False)
|
||||
self.unique: bool = kwargs.pop("unique", False)
|
||||
self.pydantic_only: bool = kwargs.pop("pydantic_only", False)
|
||||
@ -265,7 +266,7 @@ class BaseField(FieldInfo):
|
||||
self.column_type,
|
||||
*self.construct_constraints(),
|
||||
primary_key=self.primary_key,
|
||||
nullable=self.nullable and not self.primary_key,
|
||||
nullable=self.sql_nullable,
|
||||
index=self.index,
|
||||
unique=self.unique,
|
||||
default=self.ormar_default,
|
||||
|
||||
22
ormar/fields/constraints.py
Normal file
22
ormar/fields/constraints.py
Normal file
@ -0,0 +1,22 @@
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import Index, UniqueConstraint
|
||||
|
||||
|
||||
class UniqueColumns(UniqueConstraint):
|
||||
"""
|
||||
Subclass of sqlalchemy.UniqueConstraint.
|
||||
Used to avoid importing anything from sqlalchemy by user.
|
||||
"""
|
||||
|
||||
|
||||
class IndexColumns(Index):
|
||||
def __init__(self, *args: Any, name: str = None) -> None:
|
||||
if not name:
|
||||
name = "TEMPORARY_NAME"
|
||||
super().__init__(name, *args)
|
||||
|
||||
"""
|
||||
Subclass of sqlalchemy.Index.
|
||||
Used to avoid importing anything from sqlalchemy by user.
|
||||
"""
|
||||
@ -18,7 +18,6 @@ from typing import (
|
||||
import sqlalchemy
|
||||
from pydantic import BaseModel, create_model
|
||||
from pydantic.typing import ForwardRef, evaluate_forwardref
|
||||
from sqlalchemy import UniqueConstraint
|
||||
|
||||
import ormar # noqa I101
|
||||
from ormar.exceptions import ModelDefinitionError, RelationshipInstanceError
|
||||
@ -160,13 +159,6 @@ def validate_not_allowed_fields(kwargs: Dict) -> None:
|
||||
)
|
||||
|
||||
|
||||
class UniqueColumns(UniqueConstraint):
|
||||
"""
|
||||
Subclass of sqlalchemy.UniqueConstraint.
|
||||
Used to avoid importing anything from sqlalchemy by user.
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ForeignKeyConstraint:
|
||||
"""
|
||||
@ -242,6 +234,9 @@ def ForeignKey( # noqa CFQ002
|
||||
skip_reverse = kwargs.pop("skip_reverse", False)
|
||||
skip_field = kwargs.pop("skip_field", False)
|
||||
|
||||
sql_nullable = kwargs.pop("sql_nullable", None)
|
||||
sql_nullable = nullable if sql_nullable is None else sql_nullable
|
||||
|
||||
validate_not_allowed_fields(kwargs)
|
||||
|
||||
if to.__class__ == ForwardRef:
|
||||
@ -263,6 +258,7 @@ def ForeignKey( # noqa CFQ002
|
||||
alias=name,
|
||||
name=kwargs.pop("real_name", None),
|
||||
nullable=nullable,
|
||||
sql_nullable=sql_nullable,
|
||||
constraints=constraints,
|
||||
unique=unique,
|
||||
column_type=column_type,
|
||||
|
||||
@ -75,6 +75,7 @@ class ModelFieldFactory:
|
||||
default = kwargs.pop("default", None)
|
||||
server_default = kwargs.pop("server_default", None)
|
||||
nullable = kwargs.pop("nullable", None)
|
||||
sql_nullable = kwargs.pop("sql_nullable", None)
|
||||
pydantic_only = kwargs.pop("pydantic_only", False)
|
||||
|
||||
primary_key = kwargs.pop("primary_key", False)
|
||||
@ -86,6 +87,13 @@ class ModelFieldFactory:
|
||||
|
||||
overwrite_pydantic_type = kwargs.pop("overwrite_pydantic_type", None)
|
||||
|
||||
nullable = is_field_nullable(
|
||||
nullable, default, server_default, pydantic_only
|
||||
) or is_auto_primary_key(primary_key, autoincrement)
|
||||
sql_nullable = (
|
||||
nullable if sql_nullable is None else (sql_nullable and not primary_key)
|
||||
)
|
||||
|
||||
namespace = dict(
|
||||
__type__=cls._type,
|
||||
__pydantic_type__=overwrite_pydantic_type
|
||||
@ -97,8 +105,8 @@ class ModelFieldFactory:
|
||||
primary_key=primary_key,
|
||||
default=default,
|
||||
server_default=server_default,
|
||||
nullable=is_field_nullable(nullable, default, server_default, pydantic_only)
|
||||
or is_auto_primary_key(primary_key, autoincrement),
|
||||
nullable=nullable,
|
||||
sql_nullable=sql_nullable,
|
||||
index=kwargs.pop("index", False),
|
||||
unique=kwargs.pop("unique", False),
|
||||
pydantic_only=pydantic_only,
|
||||
|
||||
@ -75,6 +75,8 @@ def populate_default_options_values(
|
||||
if field.__type__ == bytes
|
||||
}
|
||||
|
||||
new_model.__relation_map__ = None
|
||||
|
||||
|
||||
class Connection(sqlite3.Connection):
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None: # pragma: no cover
|
||||
|
||||
@ -285,24 +285,40 @@ def populate_meta_sqlalchemy_table_if_required(meta: "ModelMeta") -> None:
|
||||
|
||||
:param meta: Meta class of the Model without sqlalchemy table constructed
|
||||
:type meta: Model class Meta
|
||||
:return: class with populated Meta.table
|
||||
:rtype: Model class
|
||||
"""
|
||||
if not hasattr(meta, "table") and check_for_null_type_columns_from_forward_refs(
|
||||
meta
|
||||
):
|
||||
for constraint in meta.constraints:
|
||||
if isinstance(constraint, sqlalchemy.UniqueConstraint):
|
||||
constraint.name = (
|
||||
f"uc_{meta.tablename}_"
|
||||
f'{"_".join([str(col) for col in constraint._pending_colargs])}'
|
||||
)
|
||||
set_constraint_names(meta=meta)
|
||||
table = sqlalchemy.Table(
|
||||
meta.tablename, meta.metadata, *meta.columns, *meta.constraints
|
||||
)
|
||||
meta.table = table
|
||||
|
||||
|
||||
def set_constraint_names(meta: "ModelMeta") -> None:
|
||||
"""
|
||||
Populates the names on IndexColumn and UniqueColumns constraints.
|
||||
|
||||
:param meta: Meta class of the Model without sqlalchemy table constructed
|
||||
:type meta: Model class Meta
|
||||
"""
|
||||
for constraint in meta.constraints:
|
||||
if isinstance(constraint, sqlalchemy.UniqueConstraint) and not constraint.name:
|
||||
constraint.name = (
|
||||
f"uc_{meta.tablename}_"
|
||||
f'{"_".join([str(col) for col in constraint._pending_colargs])}'
|
||||
)
|
||||
elif (
|
||||
isinstance(constraint, sqlalchemy.Index)
|
||||
and constraint.name == "TEMPORARY_NAME"
|
||||
):
|
||||
constraint.name = (
|
||||
f"ix_{meta.tablename}_"
|
||||
f'{"_".join([col for col in constraint._pending_colargs])}'
|
||||
)
|
||||
|
||||
|
||||
def update_column_definition(
|
||||
model: Union[Type["Model"], Type["NewBaseModel"]], field: "ForeignKeyField"
|
||||
) -> None:
|
||||
|
||||
@ -17,6 +17,7 @@ import sqlalchemy
|
||||
from sqlalchemy.sql.schema import ColumnCollectionConstraint
|
||||
|
||||
import ormar # noqa I100
|
||||
import ormar.fields.constraints
|
||||
from ormar import ModelDefinitionError # noqa I100
|
||||
from ormar.exceptions import ModelError
|
||||
from ormar.fields import BaseField
|
||||
@ -219,7 +220,8 @@ def update_attrs_from_base_meta( # noqa: CCR001
|
||||
parent_value=parent_value,
|
||||
)
|
||||
parent_value = [
|
||||
ormar.UniqueColumns(*x._pending_colargs) for x in parent_value
|
||||
ormar.fields.constraints.UniqueColumns(*x._pending_colargs)
|
||||
for x in parent_value
|
||||
]
|
||||
if isinstance(current_value, list):
|
||||
current_value.extend(parent_value)
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from typing import (
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
@ -20,6 +21,7 @@ class RelationMixin:
|
||||
from ormar import ModelMeta
|
||||
|
||||
Meta: ModelMeta
|
||||
__relation_map__: Optional[List[str]]
|
||||
_related_names: Optional[Set]
|
||||
_through_names: Optional[Set]
|
||||
_related_fields: Optional[List]
|
||||
@ -120,7 +122,11 @@ class RelationMixin:
|
||||
|
||||
@classmethod
|
||||
def _iterate_related_models( # noqa: CCR001
|
||||
cls, node_list: NodeList = None, source_relation: str = None
|
||||
cls,
|
||||
node_list: NodeList = None,
|
||||
parsed_map: Dict = None,
|
||||
source_relation: str = None,
|
||||
recurrent: bool = False,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Iterates related models recursively to extract relation strings of
|
||||
@ -130,12 +136,15 @@ class RelationMixin:
|
||||
:rtype: List[str]
|
||||
"""
|
||||
if not node_list:
|
||||
if cls.__relation_map__:
|
||||
return cls.__relation_map__
|
||||
node_list = NodeList()
|
||||
parsed_map = dict()
|
||||
current_node = node_list.add(node_class=cls)
|
||||
else:
|
||||
current_node = node_list[-1]
|
||||
relations = cls.extract_related_names()
|
||||
processed_relations = []
|
||||
relations = sorted(cls.extract_related_names())
|
||||
processed_relations: List[str] = []
|
||||
for relation in relations:
|
||||
if not current_node.visited(relation):
|
||||
target_model = cls.Meta.model_fields[relation].to
|
||||
@ -144,12 +153,23 @@ class RelationMixin:
|
||||
relation_name=relation,
|
||||
parent_node=current_node,
|
||||
)
|
||||
deep_relations = target_model._iterate_related_models(
|
||||
source_relation=relation, node_list=node_list
|
||||
)
|
||||
relation_key = f"{cls.get_name()}_{relation}"
|
||||
parsed_map = cast(Dict, parsed_map)
|
||||
deep_relations = parsed_map.get(relation_key)
|
||||
if not deep_relations:
|
||||
deep_relations = target_model._iterate_related_models(
|
||||
source_relation=relation,
|
||||
node_list=node_list,
|
||||
recurrent=True,
|
||||
parsed_map=parsed_map,
|
||||
)
|
||||
parsed_map[relation_key] = deep_relations
|
||||
processed_relations.extend(deep_relations)
|
||||
|
||||
return cls._get_final_relations(processed_relations, source_relation)
|
||||
result = cls._get_final_relations(processed_relations, source_relation)
|
||||
if not recurrent:
|
||||
cls.__relation_map__ = result
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _get_final_relations(
|
||||
|
||||
@ -76,6 +76,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
||||
__tablename__: str
|
||||
__metadata__: sqlalchemy.MetaData
|
||||
__database__: databases.Database
|
||||
__relation_map__: Optional[List[str]]
|
||||
_orm_relationship_manager: AliasManager
|
||||
_orm: RelationsManager
|
||||
_orm_id: int
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
databases[sqlite]>=0.3.2,<0.5.3
|
||||
databases[postgresql]>=0.3.2,<0.5.3
|
||||
databases[mysql]>=0.3.2,<0.5.3
|
||||
databases>=0.3.2,<0.5.3
|
||||
|
||||
pydantic >=1.6.1,!=1.7,!=1.7.1,!=1.7.2,!=1.7.3,!=1.8,!=1.8.1,<=1.8.2
|
||||
sqlalchemy>=1.3.18,<=1.4.23
|
||||
typing_extensions>=3.7,<3.10.0.3
|
||||
|
||||
@ -0,0 +1,420 @@
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import databases
|
||||
import pydantic
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
|
||||
import ormar as orm
|
||||
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
|
||||
|
||||
class MainMeta(orm.ModelMeta):
|
||||
database = database
|
||||
metadata = metadata
|
||||
|
||||
|
||||
class ChagenlogRelease(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "changelog_release"
|
||||
|
||||
|
||||
class CommitIssue(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "commit_issues"
|
||||
|
||||
|
||||
class CommitLabel(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "commit_label"
|
||||
|
||||
|
||||
class MergeRequestCommit(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "merge_request_commits"
|
||||
|
||||
|
||||
class MergeRequestIssue(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "merge_request_issues"
|
||||
|
||||
|
||||
class MergeRequestLabel(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "merge_request_labels"
|
||||
|
||||
|
||||
class ProjectLabel(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "project_label"
|
||||
|
||||
|
||||
class PushCommit(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "push_commit"
|
||||
|
||||
|
||||
class PushLabel(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "push_label"
|
||||
|
||||
|
||||
class TagCommit(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "tag_commits"
|
||||
|
||||
|
||||
class TagIssue(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "tag_issue"
|
||||
|
||||
|
||||
class TagLabel(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "tag_label"
|
||||
|
||||
|
||||
class UserProject(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
access_level: int = orm.Integer(default=0)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "user_project"
|
||||
|
||||
|
||||
class Label(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
title: str = orm.String(max_length=100)
|
||||
description: str = orm.Text(default="")
|
||||
type: str = orm.String(max_length=100, default="")
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "labels"
|
||||
|
||||
|
||||
class Project(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
name: str = orm.String(max_length=100)
|
||||
description: str = orm.Text(default="")
|
||||
git_url: str = orm.String(max_length=500, default="")
|
||||
labels: Optional[Union[List[Label], Label]] = orm.ManyToMany(
|
||||
Label, through=ProjectLabel, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
changelog_jira_tag: str = orm.String(max_length=100, default="")
|
||||
change_type_jira_tag: str = orm.String(max_length=100, default="")
|
||||
jira_prefix: str = orm.String(max_length=10, default="SAN")
|
||||
type: str = orm.String(max_length=10, default="cs")
|
||||
target_branch_name: str = orm.String(max_length=100, default="master")
|
||||
header: str = orm.String(max_length=250, default="")
|
||||
jira_url: str = orm.String(max_length=500,)
|
||||
changelog_file: str = orm.String(max_length=250, default="")
|
||||
version_file: str = orm.String(max_length=250, default="")
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "projects"
|
||||
|
||||
|
||||
class Issue(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
summary: str = orm.Text(default="")
|
||||
description: str = orm.Text(default="")
|
||||
changelog: str = orm.Text(default="")
|
||||
link: str = orm.String(max_length=500)
|
||||
issue_type: str = orm.String(max_length=100)
|
||||
key: str = orm.String(max_length=100)
|
||||
change_type: str = orm.String(max_length=100, default="")
|
||||
data: pydantic.Json = orm.JSON(default={})
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "issues"
|
||||
|
||||
|
||||
class User(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
username: str = orm.String(max_length=100, unique=True)
|
||||
name: str = orm.String(max_length=200, default="")
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "users"
|
||||
|
||||
|
||||
class Branch(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
name: str = orm.String(max_length=200)
|
||||
description: str = orm.Text(default="")
|
||||
automatic_tags: bool = orm.Boolean(default=False)
|
||||
is_it_locked: bool = orm.Boolean(default=True)
|
||||
prefix_tag: str = orm.String(max_length=50, default="")
|
||||
postfix_tag: str = orm.String(max_length=50, default="")
|
||||
project: Project = orm.ForeignKey(Project, ondelete="CASCADE", onupdate="CASCADE")
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "branches"
|
||||
|
||||
|
||||
class Changelog(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
content: str = orm.Text(default="")
|
||||
version: str = orm.Text(default="")
|
||||
past_changelog: int = orm.Integer(default=0)
|
||||
label: Label = orm.ForeignKey(
|
||||
Label, nullable=True, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
project: Project = orm.ForeignKey(Project, ondelete="CASCADE", onupdate="CASCADE")
|
||||
created_date: datetime = orm.DateTime(default=datetime.utcnow())
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "changelogs"
|
||||
|
||||
|
||||
class Commit(orm.Model):
|
||||
id: str = orm.String(max_length=500, primary_key=True)
|
||||
short_id: str = orm.String(max_length=500)
|
||||
title: str = orm.String(max_length=500)
|
||||
message: str = orm.Text(default="")
|
||||
url = orm.String(max_length=500, default="")
|
||||
author_name = orm.String(max_length=500, default="")
|
||||
labels: Optional[Union[List[Label], Label]] = orm.ManyToMany(
|
||||
Label, through=CommitLabel, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
issues: Optional[Union[List[Issue], Issue]] = orm.ManyToMany(
|
||||
Issue, through=CommitIssue, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "commits"
|
||||
|
||||
|
||||
class MergeRequest(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
idd: int = orm.Integer(default=0)
|
||||
title: str = orm.String(max_length=500)
|
||||
state: str = orm.String(max_length=100)
|
||||
merge_status: str = orm.String(max_length=100)
|
||||
description: str = orm.Text(default="")
|
||||
source: Branch = orm.ForeignKey(Branch, related_name="source")
|
||||
target: Branch = orm.ForeignKey(Branch, related_name="target")
|
||||
labels: Optional[Union[List[Label], Label]] = orm.ManyToMany(
|
||||
Label, through=MergeRequestLabel, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
commits: Optional[Union[List[Commit], Commit]] = orm.ManyToMany(
|
||||
Commit, through=MergeRequestCommit, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
issues: Optional[Union[List[Issue], Issue]] = orm.ManyToMany(
|
||||
Issue, through=MergeRequestIssue, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
project: Project = orm.ForeignKey(Project, ondelete="CASCADE", onupdate="CASCADE")
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "merge_requests"
|
||||
|
||||
|
||||
class Push(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
branch: Branch = orm.ForeignKey(
|
||||
Branch, nullable=True, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
has_locking_changes: bool = orm.Boolean(default=False)
|
||||
sha: str = orm.String(max_length=200)
|
||||
labels: Optional[Union[List[Label], Label]] = orm.ManyToMany(
|
||||
Label, through=PushLabel, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
commits: Optional[Union[List[Commit], Commit]] = orm.ManyToMany(
|
||||
Commit,
|
||||
through=PushCommit,
|
||||
through_relation_name="push",
|
||||
through_reverse_relation_name="commit_id",
|
||||
ondelete="CASCADE",
|
||||
onupdate="CASCADE",
|
||||
)
|
||||
author: User = orm.ForeignKey(User, ondelete="CASCADE", onupdate="CASCADE")
|
||||
project: Project = orm.ForeignKey(Project, ondelete="CASCADE", onupdate="CASCADE")
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "pushes"
|
||||
|
||||
|
||||
class Tag(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
name: str = orm.String(max_length=200)
|
||||
ref: str = orm.String(max_length=200)
|
||||
project: Project = orm.ForeignKey(Project, ondelete="CASCADE", onupdate="CASCADE")
|
||||
title: str = orm.String(max_length=200, default="")
|
||||
description: str = orm.Text(default="")
|
||||
commits: Optional[Union[List[Commit], Commit]] = orm.ManyToMany(
|
||||
Commit,
|
||||
through=TagCommit,
|
||||
through_relation_name="tag",
|
||||
through_reverse_relation_name="commit_id",
|
||||
ondelete="CASCADE",
|
||||
onupdate="CASCADE",
|
||||
)
|
||||
issues: Optional[Union[List[Issue], Issue]] = orm.ManyToMany(
|
||||
Issue, through=TagIssue, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
labels: Optional[Union[List[Label], Label]] = orm.ManyToMany(
|
||||
Label, through=TagLabel, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
user: User = orm.ForeignKey(
|
||||
User, nullable=True, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
branch: Branch = orm.ForeignKey(
|
||||
Branch, nullable=True, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "tags"
|
||||
|
||||
|
||||
class Release(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
title: str = orm.String(max_length=200, default="")
|
||||
description: str = orm.Text(default="")
|
||||
tag: Tag = orm.ForeignKey(Tag, ondelete="CASCADE", onupdate="CASCADE")
|
||||
changelogs: List[Changelog] = orm.ManyToMany(
|
||||
Changelog, through=ChagenlogRelease, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
data: pydantic.Json = orm.JSON(default={})
|
||||
|
||||
class Meta(MainMeta):
|
||||
tablename = "releases"
|
||||
|
||||
|
||||
class Webhook(orm.Model):
|
||||
id: int = orm.Integer(name="id", primary_key=True)
|
||||
object_kind = orm.String(max_length=100)
|
||||
project: Project = orm.ForeignKey(Project, ondelete="CASCADE", onupdate="CASCADE")
|
||||
merge_request: MergeRequest = orm.ForeignKey(
|
||||
MergeRequest, nullable=True, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
tag: Tag = orm.ForeignKey(
|
||||
Tag, nullable=True, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
push: Push = orm.ForeignKey(
|
||||
Push, nullable=True, ondelete="CASCADE", onupdate="CASCADE"
|
||||
)
|
||||
created_at: datetime = orm.DateTime(default=datetime.now())
|
||||
data: pydantic.Json = orm.JSON(default={})
|
||||
status: int = orm.Integer(default=200)
|
||||
error: str = orm.Text(default="")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_very_complex_relation_map():
|
||||
async with database:
|
||||
tags = [
|
||||
{"id": 18, "name": "name-18", "ref": "ref-18"},
|
||||
{"id": 17, "name": "name-17", "ref": "ref-17"},
|
||||
{"id": 12, "name": "name-12", "ref": "ref-12"},
|
||||
]
|
||||
payload = [
|
||||
{
|
||||
"id": 9,
|
||||
"title": "prueba-2321",
|
||||
"description": "\n<!--- start changelog ver.v.1.3.0.0 -->\n### [v.1.3.0.0] - 2021-08-19\n#### Resolved Issues\n\n#### Task\n\n- Probar flujo de changelog Automatic Jira: [SAN-86](https://htech.atlassian.net/browse/SAN-86)\n\n Description: Se probara el flujo de changelog automatic. \n\n Changelog: Se agrega función para extraer texto del campo changelog del dashboard de Sanval y ponerlo directamente en el changelog.md del repositorio. \n\n\n \n<!--- end changelog ver.v.1.3.0.0 -->\n",
|
||||
"data": {},
|
||||
},
|
||||
{
|
||||
"id": 8,
|
||||
"title": "prueba-123-prod",
|
||||
"description": "\n<!--- start changelog ver.v.1.2.0.0 -->\n### [v.1.2.0.0] - 2021-08-19\n#### Resolved Issues\n\n#### Task\n\n- Probar flujo de changelog Automatic Jira: [SAN-86](https://htech.atlassian.net/browse/SAN-86)\n\n Description: Se probara el flujo de changelog automatic. \n\n Changelog: Se agrega función para extraer texto del campo changelog del dashboard de Sanval y ponerlo directamente en el changelog.md del repositorio. \n\n\n \n<!--- end changelog ver.v.1.2.0.0 -->\n",
|
||||
"data": {},
|
||||
},
|
||||
{
|
||||
"id": 6,
|
||||
"title": "prueba-3-2",
|
||||
"description": "\n<!--- start changelog ver.v.1.1.0.0 -->\n### [v.1.1.0.0] - 2021-07-29\n#### Resolved Issues\n\n#### Task\n\n- Probar flujo de changelog Automatic Jira: [SAN-86](https://htech.atlassian.net/browse/SAN-86)\n\n Description: Se probara el flujo de changelog automatic. \n\n Changelog: Se agrega función para extraer texto del campo changelog del dashboard de Sanval y ponerlo directamente en el changelog.md del repositorio. \n\n\n \n<!--- end changelog ver.v.1.1.0.0 -->\n",
|
||||
"data": {},
|
||||
},
|
||||
]
|
||||
saved_tags = []
|
||||
for tag in tags:
|
||||
saved_tags.append(await Tag(**tag).save())
|
||||
|
||||
for ind, pay in enumerate(payload):
|
||||
await Release(**pay, tag=saved_tags[ind]).save()
|
||||
|
||||
releases = await Release.objects.order_by(Release.id.desc()).all()
|
||||
dicts = [release.dict() for release in releases]
|
||||
|
||||
result = [
|
||||
{
|
||||
"id": 9,
|
||||
"title": "prueba-2321",
|
||||
"description": "\n<!--- start changelog ver.v.1.3.0.0 -->\n### [v.1.3.0.0] - 2021-08-19\n#### Resolved Issues\n\n#### Task\n\n- Probar flujo de changelog Automatic Jira: [SAN-86](https://htech.atlassian.net/browse/SAN-86)\n\n Description: Se probara el flujo de changelog automatic. \n\n Changelog: Se agrega función para extraer texto del campo changelog del dashboard de Sanval y ponerlo directamente en el changelog.md del repositorio. \n\n\n \n<!--- end changelog ver.v.1.3.0.0 -->\n",
|
||||
"data": {},
|
||||
"tag": {
|
||||
"id": 18,
|
||||
"taglabel": None,
|
||||
"tagcommit": None,
|
||||
"tagissue": None,
|
||||
},
|
||||
"changelogs": [],
|
||||
},
|
||||
{
|
||||
"id": 8,
|
||||
"title": "prueba-123-prod",
|
||||
"description": "\n<!--- start changelog ver.v.1.2.0.0 -->\n### [v.1.2.0.0] - 2021-08-19\n#### Resolved Issues\n\n#### Task\n\n- Probar flujo de changelog Automatic Jira: [SAN-86](https://htech.atlassian.net/browse/SAN-86)\n\n Description: Se probara el flujo de changelog automatic. \n\n Changelog: Se agrega función para extraer texto del campo changelog del dashboard de Sanval y ponerlo directamente en el changelog.md del repositorio. \n\n\n \n<!--- end changelog ver.v.1.2.0.0 -->\n",
|
||||
"data": {},
|
||||
"tag": {
|
||||
"id": 17,
|
||||
"taglabel": None,
|
||||
"tagcommit": None,
|
||||
"tagissue": None,
|
||||
},
|
||||
"changelogs": [],
|
||||
},
|
||||
{
|
||||
"id": 6,
|
||||
"title": "prueba-3-2",
|
||||
"description": "\n<!--- start changelog ver.v.1.1.0.0 -->\n### [v.1.1.0.0] - 2021-07-29\n#### Resolved Issues\n\n#### Task\n\n- Probar flujo de changelog Automatic Jira: [SAN-86](https://htech.atlassian.net/browse/SAN-86)\n\n Description: Se probara el flujo de changelog automatic. \n\n Changelog: Se agrega función para extraer texto del campo changelog del dashboard de Sanval y ponerlo directamente en el changelog.md del repositorio. \n\n\n \n<!--- end changelog ver.v.1.1.0.0 -->\n",
|
||||
"data": {},
|
||||
"tag": {
|
||||
"id": 12,
|
||||
"taglabel": None,
|
||||
"tagcommit": None,
|
||||
"tagissue": None,
|
||||
},
|
||||
"changelogs": [],
|
||||
},
|
||||
]
|
||||
|
||||
assert dicts == result
|
||||
@ -8,6 +8,7 @@ import sqlalchemy as sa
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
import ormar
|
||||
import ormar.fields.constraints
|
||||
from ormar import ModelDefinitionError, property_field
|
||||
from ormar.exceptions import ModelError
|
||||
from tests.settings import DATABASE_URL
|
||||
@ -45,7 +46,9 @@ class DateFieldsModel(ormar.Model):
|
||||
abstract = True
|
||||
metadata = metadata
|
||||
database = db
|
||||
constraints = [ormar.UniqueColumns("creation_date", "modification_date")]
|
||||
constraints = [
|
||||
ormar.fields.constraints.UniqueColumns("creation_date", "modification_date")
|
||||
]
|
||||
|
||||
created_date: datetime.datetime = ormar.DateTime(
|
||||
default=datetime.datetime.now, name="creation_date"
|
||||
@ -58,7 +61,7 @@ class DateFieldsModel(ormar.Model):
|
||||
class Category(DateFieldsModel, AuditModel):
|
||||
class Meta(ormar.ModelMeta):
|
||||
tablename = "categories"
|
||||
constraints = [ormar.UniqueColumns("name", "code")]
|
||||
constraints = [ormar.fields.constraints.UniqueColumns("name", "code")]
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=50, unique=True, index=True)
|
||||
|
||||
68
tests/test_meta_constraints/test_index_constraints.py
Normal file
68
tests/test_meta_constraints/test_index_constraints.py
Normal file
@ -0,0 +1,68 @@
|
||||
import asyncpg # type: ignore
|
||||
import databases
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
|
||||
import ormar.fields.constraints
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
|
||||
|
||||
class Product(ormar.Model):
|
||||
class Meta:
|
||||
tablename = "products"
|
||||
metadata = metadata
|
||||
database = database
|
||||
constraints = [
|
||||
ormar.fields.constraints.IndexColumns("company", "name", name="my_index"),
|
||||
ormar.fields.constraints.IndexColumns("location", "company_type"),
|
||||
]
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
company: str = ormar.String(max_length=200)
|
||||
location: str = ormar.String(max_length=200)
|
||||
company_type: str = ormar.String(max_length=200)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
def test_table_structure():
|
||||
assert len(Product.Meta.table.indexes) > 0
|
||||
indexes = sorted(
|
||||
list(Product.Meta.table.indexes), key=lambda x: x.name, reverse=True
|
||||
)
|
||||
test_index = indexes[0]
|
||||
assert test_index.name == "my_index"
|
||||
assert [col.name for col in test_index.columns] == ["company", "name"]
|
||||
|
||||
test_index = indexes[1]
|
||||
assert test_index.name == "ix_products_location_company_type"
|
||||
assert [col.name for col in test_index.columns] == ["location", "company_type"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_index_is_not_unique():
|
||||
async with database:
|
||||
async with database.transaction(force_rollback=True):
|
||||
await Product.objects.create(
|
||||
name="Cookies", company="Nestle", location="A", company_type="B"
|
||||
)
|
||||
await Product.objects.create(
|
||||
name="Mars", company="Mars", location="B", company_type="Z"
|
||||
)
|
||||
await Product.objects.create(
|
||||
name="Mars", company="Nestle", location="C", company_type="X"
|
||||
)
|
||||
await Product.objects.create(
|
||||
name="Mars", company="Mars", location="D", company_type="Y"
|
||||
)
|
||||
@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import sqlite3
|
||||
|
||||
import asyncpg # type: ignore
|
||||
@ -7,7 +6,7 @@ import pymysql
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
|
||||
import ormar
|
||||
import ormar.fields.constraints
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
@ -19,22 +18,15 @@ class Product(ormar.Model):
|
||||
tablename = "products"
|
||||
metadata = metadata
|
||||
database = database
|
||||
constraints = [ormar.UniqueColumns("name", "company")]
|
||||
constraints = [ormar.fields.constraints.UniqueColumns("name", "company")]
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
company: str = ormar.String(max_length=200)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def event_loop():
|
||||
loop = asyncio.get_event_loop()
|
||||
yield loop
|
||||
loop.close()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
async def create_test_database():
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
|
||||
62
tests/test_model_definition/test_overwriting_sql_nullable.py
Normal file
62
tests/test_model_definition/test_overwriting_sql_nullable.py
Normal file
@ -0,0 +1,62 @@
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
import asyncpg
|
||||
import databases
|
||||
import pymysql
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
import ormar
|
||||
import pytest
|
||||
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
db = databases.Database(DATABASE_URL, force_rollback=True)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
|
||||
|
||||
class BaseMeta(ormar.ModelMeta):
|
||||
metadata = metadata
|
||||
database = db
|
||||
|
||||
|
||||
class PrimaryModel(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "primary_models"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=255, index=True)
|
||||
some_text: Optional[str] = ormar.Text(nullable=True, sql_nullable=False)
|
||||
some_other_text: Optional[str] = ormar.String(
|
||||
max_length=255, nullable=True, sql_nullable=False, server_default=text("''")
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = create_engine(DATABASE_URL)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_models():
|
||||
async with db:
|
||||
primary = await PrimaryModel(
|
||||
name="Foo", some_text="Bar", some_other_text="Baz"
|
||||
).save()
|
||||
assert primary.id == 1
|
||||
|
||||
primary2 = await PrimaryModel(name="Foo2", some_text="Bar2").save()
|
||||
assert primary2.id == 2
|
||||
|
||||
with pytest.raises(
|
||||
(
|
||||
sqlite3.IntegrityError,
|
||||
pymysql.IntegrityError,
|
||||
asyncpg.exceptions.NotNullViolationError,
|
||||
)
|
||||
):
|
||||
await PrimaryModel(name="Foo3").save()
|
||||
Reference in New Issue
Block a user