add skip_reverse parameter, add links to related libs, fix weakref error, fix through error with extra=forbid

This commit is contained in:
collerek
2021-04-11 18:43:23 +02:00
parent e553885221
commit b3b1c156b5
19 changed files with 675 additions and 48 deletions

View File

@ -47,15 +47,35 @@ since they actually have to create and connect to database in most of the tests.
Yet remember that those are - well - tests and not all solutions are suitable to be used in real life applications.
### Part of the `fastapi` ecosystem
As part of the fastapi ecosystem `ormar` is supported in libraries that somehow work with databases.
As of now `ormar` is supported by:
* [`fastapi-users`](https://github.com/frankie567/fastapi-users)
* [`fastapi-crudrouter`](https://github.com/awtkns/fastapi-crudrouter)
* [`fastapi-pagination`](https://github.com/uriyyo/fastapi-pagination)
If you maintain or use different library and would like it to support `ormar` let us know how we can help.
### Dependencies
Ormar is built with:
* [`SQLAlchemy core`][sqlalchemy-core] for query building.
* [`sqlalchemy core`][sqlalchemy-core] for query building.
* [`databases`][databases] for cross-database async support.
* [`pydantic`][pydantic] for data validation.
* `typing_extensions` for python 3.6 - 3.7
### Migrating from `sqlalchemy`
If you currently use `sqlalchemy` and would like to switch to `ormar` check out the auto-translation
tool that can help you with translating existing sqlalchemy orm models so you do not have to do it manually.
**Beta** versions available at github: [`sqlalchemy-to-ormar`](https://github.com/collerek/sqlalchemy-to-ormar)
or simply `pip install sqlalchemy-to-ormar`
### Migrations & Database creation
Because ormar is built on SQLAlchemy core, you can use [`alembic`][alembic] to provide

View File

@ -47,15 +47,35 @@ since they actually have to create and connect to database in most of the tests.
Yet remember that those are - well - tests and not all solutions are suitable to be used in real life applications.
### Part of the `fastapi` ecosystem
As part of the fastapi ecosystem `ormar` is supported in libraries that somehow work with databases.
As of now `ormar` is supported by:
* [`fastapi-users`](https://github.com/frankie567/fastapi-users)
* [`fastapi-crudrouter`](https://github.com/awtkns/fastapi-crudrouter)
* [`fastapi-pagination`](https://github.com/uriyyo/fastapi-pagination)
If you maintain or use different library and would like it to support `ormar` let us know how we can help.
### Dependencies
Ormar is built with:
* [`SQLAlchemy core`][sqlalchemy-core] for query building.
* [`sqlalchemy core`][sqlalchemy-core] for query building.
* [`databases`][databases] for cross-database async support.
* [`pydantic`][pydantic] for data validation.
* `typing_extensions` for python 3.6 - 3.7
### Migrating from `sqlalchemy`
If you currently use `sqlalchemy` and would like to switch to `ormar` check out the auto-translation
tool that can help you with translating existing sqlalchemy orm models so you do not have to do it manually.
**Beta** versions available at github: [`sqlalchemy-to-ormar`](https://github.com/collerek/sqlalchemy-to-ormar)
or simply `pip install sqlalchemy-to-ormar`
### Migrations & Database creation
Because ormar is built on SQLAlchemy core, you can use [`alembic`][alembic] to provide

View File

@ -1,3 +1,29 @@
# 0.10.3
## ✨ Features
* `ForeignKey` and `ManyToMany` now support `skip_reverse: bool = False` flag [#118](https://github.com/collerek/ormar/issues/118).
If you set `skip_reverse` flag internally the field is still registered on the other
side of the relationship so you can:
* `filter` by related models fields from reverse model
* `order_by` by related models fields from reverse model
But you cannot:
* access the related field from reverse model with `related_name`
* even if you `select_related` from reverse side of the model the returned models won't be populated in reversed instance (the join is not prevented so you still can `filter` and `order_by`)
* the relation won't be populated in `dict()` and `json()`
* you cannot pass the nested related objects when populating from `dict()` or `json()` (also through `fastapi`). It will be either ignored or raise error depending on `extra` setting in pydantic `Config`.
## 🐛 Fixes
* Fix weakref `ReferenceError` error [#118](https://github.com/collerek/ormar/issues/118)
* Fix error raised by Through fields when pydantic `Config.extra="forbid"` is set
## 💬 Other
* Introduce link to `sqlalchemy-to-ormar` auto-translator for models
* Provide links to fastapi ecosystem libraries that support `ormar`
# 0.10.2
## ✨ Features

View File

@ -75,7 +75,7 @@ class UndefinedType: # pragma no cover
Undefined = UndefinedType()
__version__ = "0.10.2"
__version__ = "0.10.3"
__all__ = [
"Integer",
"BigInteger",

View File

@ -53,6 +53,8 @@ class BaseField(FieldInfo):
"is_relation", None
) # ForeignKeyField + subclasses
self.is_through: bool = kwargs.pop("is_through", False) # ThroughFields
self.skip_reverse: bool = kwargs.pop("skip_reverse", False)
self.skip_field: bool = kwargs.pop("skip_field", False)
self.owner: Type["Model"] = kwargs.pop("owner", None)
self.to: Type["Model"] = kwargs.pop("to", None)

View File

@ -233,9 +233,13 @@ def ForeignKey( # noqa CFQ002
owner = kwargs.pop("owner", None)
self_reference = kwargs.pop("self_reference", False)
orders_by = kwargs.pop("orders_by", None)
related_orders_by = kwargs.pop("related_orders_by", None)
skip_reverse = kwargs.pop("skip_reverse", False)
skip_field = kwargs.pop("skip_field", False)
validate_not_allowed_fields(kwargs)
if to.__class__ == ForwardRef:
@ -274,6 +278,8 @@ def ForeignKey( # noqa CFQ002
is_relation=True,
orders_by=orders_by,
related_orders_by=related_orders_by,
skip_reverse=skip_reverse,
skip_field=skip_field,
)
Field = type("ForeignKey", (ForeignKeyField, BaseField), {})
@ -312,6 +318,30 @@ class ForeignKeyField(BaseField):
"""
return self.related_name or self.owner.get_name() + "s"
def default_target_field_name(self, reverse: bool = False) -> str:
"""
Returns default target model name on through model.
:param reverse: flag to grab name without accessing related field
:type reverse: bool
:return: name of the field
:rtype: str
"""
self_rel_prefix = "from_" if not reverse else "to_"
prefix = self_rel_prefix if self.self_reference else ""
return f"{prefix}{self.to.get_name()}"
def default_source_field_name(self, reverse: bool = False) -> str:
"""
Returns default target model name on through model.
:param reverse: flag to grab name without accessing related field
:type reverse: bool
:return: name of the field
:rtype: str
"""
self_rel_prefix = "to_" if not reverse else "from_"
prefix = self_rel_prefix if self.self_reference else ""
return f"{prefix}{self.owner.get_name()}"
def evaluate_forward_ref(self, globalns: Any, localns: Any) -> None:
"""
Evaluates the ForwardRef to actual Field based on global and local namespaces

View File

@ -112,11 +112,16 @@ def ManyToMany(
"""
related_name = kwargs.pop("related_name", None)
nullable = kwargs.pop("nullable", True)
owner = kwargs.pop("owner", None)
self_reference = kwargs.pop("self_reference", False)
orders_by = kwargs.pop("orders_by", None)
related_orders_by = kwargs.pop("related_orders_by", None)
skip_reverse = kwargs.pop("skip_reverse", False)
skip_field = kwargs.pop("skip_field", False)
if through is not None and through.__class__ != ForwardRef:
forbid_through_relations(cast(Type["Model"], through))
@ -151,6 +156,8 @@ def ManyToMany(
is_multi=True,
orders_by=orders_by,
related_orders_by=related_orders_by,
skip_reverse=skip_reverse,
skip_field=skip_field,
)
Field = type("ManyToMany", (ManyToManyField, BaseField), {})
@ -184,24 +191,6 @@ class ManyToManyField(ForeignKeyField, ormar.QuerySetProtocol, ormar.RelationPro
or self.name
)
def default_target_field_name(self) -> str:
"""
Returns default target model name on through model.
:return: name of the field
:rtype: str
"""
prefix = "from_" if self.self_reference else ""
return f"{prefix}{self.to.get_name()}"
def default_source_field_name(self) -> str:
"""
Returns default target model name on through model.
:return: name of the field
:rtype: str
"""
prefix = "to_" if self.self_reference else ""
return f"{prefix}{self.owner.get_name()}"
def has_unresolved_forward_refs(self) -> bool:
"""
Verifies if the filed has any ForwardRefs that require updating before the

View File

@ -111,6 +111,7 @@ def register_reverse_model_fields(model_field: "ForeignKeyField") -> None:
self_reference=model_field.self_reference,
self_reference_primary=model_field.self_reference_primary,
orders_by=model_field.related_orders_by,
skip_field=model_field.skip_reverse,
)
# register foreign keys on through model
model_field = cast("ManyToManyField", model_field)
@ -125,6 +126,7 @@ def register_reverse_model_fields(model_field: "ForeignKeyField") -> None:
owner=model_field.to,
self_reference=model_field.self_reference,
orders_by=model_field.related_orders_by,
skip_field=model_field.skip_reverse,
)
@ -145,6 +147,7 @@ def register_through_shortcut_fields(model_field: "ManyToManyField") -> None:
virtual=True,
related_name=model_field.name,
owner=model_field.owner,
nullable=True,
)
model_field.to.Meta.model_fields[through_name] = Through(
@ -153,6 +156,7 @@ def register_through_shortcut_fields(model_field: "ManyToManyField") -> None:
virtual=True,
related_name=related_name,
owner=model_field.to,
nullable=True,
)

View File

@ -90,6 +90,7 @@ def add_cached_properties(new_model: Type["Model"]) -> None:
"""
new_model._quick_access_fields = quick_access_set
new_model._related_names = None
new_model._through_names = None
new_model._related_fields = None
new_model._pydantic_fields = {name for name in new_model.__fields__}
new_model._choices_fields = set()
@ -536,6 +537,7 @@ class ModelMetaclass(pydantic.main.ModelMetaclass):
new_model = populate_meta_tablename_columns_and_pk(name, new_model)
populate_meta_sqlalchemy_table_if_required(new_model.Meta)
expand_reverse_relationships(new_model)
# TODO: iterate only related fields
for field in new_model.Meta.model_fields.values():
register_relation_in_alias_manager(field=field)

View File

@ -20,6 +20,7 @@ class RelationMixin:
Meta: ModelMeta
_related_names: Optional[Set]
_through_names: Optional[Set]
_related_fields: Optional[List]
get_name: Callable
@ -57,19 +58,23 @@ class RelationMixin:
return related_fields
@classmethod
def extract_through_names(cls) -> Set:
def extract_through_names(cls) -> Set[str]:
"""
Extracts related fields through names which are shortcuts to through models.
:return: set of related through fields names
:rtype: Set
"""
related_fields = set()
for name in cls.extract_related_names():
field = cls.Meta.model_fields[name]
if field.is_multi:
related_fields.add(field.through.get_name(lower=True))
return related_fields
if isinstance(cls._through_names, Set):
return cls._through_names
related_names = set()
for name, field in cls.Meta.model_fields.items():
if isinstance(field, BaseField) and field.is_through:
related_names.add(name)
cls._through_names = related_names
return related_names
@classmethod
def extract_related_names(cls) -> Set[str]:
@ -89,6 +94,7 @@ class RelationMixin:
isinstance(field, BaseField)
and field.is_relation
and not field.is_through
and not field.skip_field
):
related_names.add(name)
cls._related_names = related_names

View File

@ -24,7 +24,11 @@ class Model(ModelRow):
Meta: ModelMeta
def __repr__(self) -> str: # pragma nocover
_repr = {k: getattr(self, k) for k, v in self.Meta.model_fields.items()}
_repr = {
k: getattr(self, k)
for k, v in self.Meta.model_fields.items()
if not v.skip_field
}
return f"{self.__class__.__name__}({str(_repr)})"
async def upsert(self: T, **kwargs: Any) -> T:

View File

@ -81,6 +81,7 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
_orm_id: int
_orm_saved: bool
_related_names: Optional[Set]
_through_names: Optional[Set]
_related_names_hash: str
_choices_fields: Optional[Set]
_pydantic_fields: Set
@ -165,6 +166,11 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
for field_to_nullify in excluded:
new_kwargs[field_to_nullify] = None
# extract through fields
through_tmp_dict = dict()
for field_name in self.extract_through_names():
through_tmp_dict[field_name] = new_kwargs.pop(field_name, None)
values, fields_set, validation_error = pydantic.validate_model(
self, new_kwargs # type: ignore
)
@ -174,6 +180,9 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
object.__setattr__(self, "__dict__", values)
object.__setattr__(self, "__fields_set__", fields_set)
# add back through fields
new_kwargs.update(through_tmp_dict)
# register the columns models after initialization
for related in self.extract_related_names().union(self.extract_through_names()):
self.Meta.model_fields[related].expand_relationship(
@ -592,13 +601,16 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
exclude=self._skip_ellipsis(exclude, field),
)
elif nested_model is not None:
dict_instance[field] = nested_model.dict(
relation_map=self._skip_ellipsis(
relation_map, field, default_return=dict()
),
include=self._skip_ellipsis(include, field),
exclude=self._skip_ellipsis(exclude, field),
)
try:
dict_instance[field] = nested_model.dict(
relation_map=self._skip_ellipsis(
relation_map, field, default_return=dict()
),
include=self._skip_ellipsis(include, field),
exclude=self._skip_ellipsis(exclude, field),
)
except ReferenceError:
dict_instance[field] = None
else:
dict_instance[field] = None
return dict_instance

View File

@ -22,7 +22,7 @@ if TYPE_CHECKING: # pragma no cover
from ormar.relations import Relation
from ormar.models import Model, T
from ormar.queryset import QuerySet
from ormar import RelationType
from ormar import RelationType, ForeignKeyField
else:
T = TypeVar("T", bound="Model")
@ -251,7 +251,7 @@ class QuerysetProxy(Generic[T]):
owner_column = self._owner.get_name()
else:
queryset = ormar.QuerySet(model_cls=self.relation.to) # type: ignore
owner_column = self.related_field.name
owner_column = self.related_field_name
kwargs = {owner_column: self._owner}
self._clean_items_on_load()
if keep_reversed and self.type_ == ormar.RelationType.REVERSE:
@ -367,7 +367,7 @@ class QuerysetProxy(Generic[T]):
"""
through_kwargs = kwargs.pop(self.through_model_name, {})
if self.type_ == ormar.RelationType.REVERSE:
kwargs[self.related_field.name] = self._owner
kwargs[self.related_field_name] = self._owner
created = await self.queryset.create(**kwargs)
self._register_related(created)
if self.type_ == ormar.RelationType.MULTIPLE:

View File

@ -124,15 +124,14 @@ class RelationProxy(Generic[T], list):
:rtype: QuerySet
"""
related_field_name = self.related_field_name
related_field = self.relation.to.Meta.model_fields[related_field_name]
pkname = self._owner.get_column_alias(self._owner.Meta.pkname)
self._check_if_model_saved()
kwargs = {f"{related_field.name}__{pkname}": self._owner.pk}
kwargs = {f"{related_field_name}__{pkname}": self._owner.pk}
queryset = (
ormar.QuerySet(
model_cls=self.relation.to, proxy_source_model=self._owner.__class__
)
.select_related(related_field.name)
.select_related(related_field_name)
.filter(**kwargs)
)
return queryset
@ -168,11 +167,12 @@ class RelationProxy(Generic[T], list):
super().remove(item)
relation_name = self.related_field_name
relation = item._orm._get(relation_name)
if relation is None: # pragma nocover
raise ValueError(
f"{self._owner.get_name()} does not have relation {relation_name}"
)
relation.remove(self._owner)
# if relation is None: # pragma nocover
# raise ValueError(
# f"{self._owner.get_name()} does not have relation {relation_name}"
# )
if relation:
relation.remove(self._owner)
self.relation.remove(item)
if self.type_ == ormar.RelationType.MULTIPLE:
await self.queryset_proxy.delete_through_instance(item)
@ -211,7 +211,7 @@ class RelationProxy(Generic[T], list):
self._check_if_model_saved()
if self.type_ == ormar.RelationType.MULTIPLE:
await self.queryset_proxy.create_through_instance(item, **kwargs)
setattr(item, relation_name, self._owner)
setattr(self._owner, self.field_name, item)
else:
setattr(item, relation_name, self._owner)
await item.update()

View File

@ -1,4 +1,5 @@
import datetime
from typing import List
import pytest
import sqlalchemy
@ -59,6 +60,12 @@ async def get_bus(item_id: int):
return bus
@app.get("/buses/", response_model=List[Bus])
async def get_buses():
buses = await Bus.objects.select_related(["owner", "co_owner"]).all()
return buses
@app.post("/trucks/", response_model=Truck)
async def create_truck(truck: Truck):
await truck.save()
@ -84,6 +91,12 @@ async def add_bus_coowner(item_id: int, person: Person):
return bus
@app.get("/buses2/", response_model=List[Bus2])
async def get_buses2():
buses = await Bus2.objects.select_related(["owner", "co_owners"]).all()
return buses
@app.post("/trucks2/", response_model=Truck2)
async def create_truck2(truck: Truck2):
await truck.save()
@ -172,6 +185,10 @@ def test_inheritance_with_relation():
assert unicorn2.co_owner.name == "Joe"
assert unicorn2.max_persons == 50
buses = [Bus(**x) for x in client.get("/buses/").json()]
assert len(buses) == 1
assert buses[0].name == "Unicorn"
def test_inheritance_with_m2m_relation():
client = TestClient(app)
@ -217,3 +234,7 @@ def test_inheritance_with_m2m_relation():
assert shelby.co_owners[0] == alex
assert shelby.co_owners[1] == joe
assert shelby.max_capacity == 2000
buses = [Bus2(**x) for x in client.get("/buses2/").json()]
assert len(buses) == 1
assert buses[0].name == "Unicorn"

View File

@ -0,0 +1,106 @@
import json
from typing import List, Optional
import databases
import pytest
import sqlalchemy
from fastapi import FastAPI
from starlette.testclient import TestClient
import ormar
from tests.settings import DATABASE_URL
app = FastAPI()
metadata = sqlalchemy.MetaData()
database = databases.Database(DATABASE_URL, force_rollback=True)
app.state.database = database
@app.on_event("startup")
async def startup() -> None:
database_ = app.state.database
if not database_.is_connected:
await database_.connect()
@app.on_event("shutdown")
async def shutdown() -> None:
database_ = app.state.database
if database_.is_connected:
await database_.disconnect()
class Department(ormar.Model):
class Meta:
database = database
metadata = metadata
id: int = ormar.Integer(primary_key=True)
department_name: str = ormar.String(max_length=100)
class Course(ormar.Model):
class Meta:
database = database
metadata = metadata
id: int = ormar.Integer(primary_key=True)
course_name: str = ormar.String(max_length=100)
completed: bool = ormar.Boolean()
department: Optional[Department] = ormar.ForeignKey(Department)
# create db and tables
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@app.post("/DepartmentWithCourses/", response_model=Department)
async def create_department(department: Department):
# there is no save all - you need to split into save and save_related
await department.save()
await department.save_related(follow=True, save_all=True)
return department
@app.get("/DepartmentsAll/", response_model=List[Department])
async def get_Courses():
# if you don't provide default name it related model name + s so courses not course
departmentall = await Department.objects.select_related("courses").all()
return departmentall
def test_saving_related_in_fastapi():
client = TestClient(app)
with client as client:
payload = {
"department_name": "Ormar",
"courses": [
{"course_name": "basic1", "completed": True},
{"course_name": "basic2", "completed": True},
],
}
response = client.post("/DepartmentWithCourses/", data=json.dumps(payload))
department = Department(**response.json())
assert department.id is not None
assert len(department.courses) == 2
assert department.department_name == "Ormar"
assert department.courses[0].course_name == "basic1"
assert department.courses[0].completed
assert department.courses[1].course_name == "basic2"
assert department.courses[1].completed
response = client.get("/DepartmentsAll/")
departments = [Department(**x) for x in response.json()]
assert departments[0].id is not None
assert len(departments[0].courses) == 2
assert departments[0].department_name == "Ormar"
assert departments[0].courses[0].course_name == "basic1"
assert departments[0].courses[0].completed
assert departments[0].courses[1].course_name == "basic2"
assert departments[0].courses[1].completed

View File

@ -0,0 +1,148 @@
import json
from typing import List, Optional
import databases
import pytest
import sqlalchemy
from fastapi import FastAPI
from starlette.testclient import TestClient
import ormar
from tests.settings import DATABASE_URL
app = FastAPI()
metadata = sqlalchemy.MetaData()
database = databases.Database(DATABASE_URL, force_rollback=True)
app.state.database = database
@app.on_event("startup")
async def startup() -> None:
database_ = app.state.database
if not database_.is_connected:
await database_.connect()
@app.on_event("shutdown")
async def shutdown() -> None:
database_ = app.state.database
if database_.is_connected:
await database_.disconnect()
class BaseMeta(ormar.ModelMeta):
database = database
metadata = metadata
class Author(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
first_name: str = ormar.String(max_length=80)
last_name: str = ormar.String(max_length=80)
class Category(ormar.Model):
class Meta(BaseMeta):
tablename = "categories"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=40)
class Post(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
title: str = ormar.String(max_length=200)
categories = ormar.ManyToMany(Category, skip_reverse=True)
author: Optional[Author] = ormar.ForeignKey(Author, skip_reverse=True)
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@app.post("/categories/", response_model=Category)
async def create_category(category: Category):
await category.save()
await category.save_related(follow=True, save_all=True)
return category
@app.post("/posts/", response_model=Post)
async def create_post(post: Post):
if post.author:
await post.author.save()
await post.save()
await post.save_related(follow=True, save_all=True)
for category in [cat for cat in post.categories]:
await post.categories.add(category)
return post
@app.get("/categories/", response_model=List[Category])
async def get_categories():
return await Category.objects.select_related("posts").all()
@app.get("/posts/", response_model=List[Post])
async def get_posts():
posts = await Post.objects.select_related(["categories", "author"]).all()
return posts
def test_queries():
client = TestClient(app)
with client as client:
right_category = {"name": "Test category"}
wrong_category = {"name": "Test category2", "posts": [{"title": "Test Post"}]}
# cannot add posts if skipped, will be ignored (with extra=ignore by default)
response = client.post("/categories/", data=json.dumps(wrong_category))
assert response.status_code == 200
response = client.get("/categories/")
assert response.status_code == 200
assert not "posts" in response.json()
categories = [Category(**x) for x in response.json()]
assert categories[0] is not None
assert categories[0].name == "Test category2"
response = client.post("/categories/", data=json.dumps(right_category))
assert response.status_code == 200
response = client.get("/categories/")
assert response.status_code == 200
categories = [Category(**x) for x in response.json()]
assert categories[1] is not None
assert categories[1].name == "Test category"
right_post = {
"title": "ok post",
"author": {"first_name": "John", "last_name": "Smith"},
"categories": [{"name": "New cat"}],
}
response = client.post("/posts/", data=json.dumps(right_post))
assert response.status_code == 200
Category.__config__.extra = "allow"
response = client.get("/posts/")
assert response.status_code == 200
posts = [Post(**x) for x in response.json()]
assert posts[0].title == "ok post"
assert posts[0].author.first_name == "John"
assert posts[0].categories[0].name == "New cat"
wrong_category = {"name": "Test category3", "posts": [{"title": "Test Post"}]}
# cannot add posts if skipped, will be error with extra forbid
Category.__config__.extra = "forbid"
response = client.post("/categories/", data=json.dumps(wrong_category))
assert response.status_code == 422

View File

@ -123,6 +123,16 @@ async def get_test_5(thing_id: UUID):
return await Thing.objects.all(other_thing__id=thing_id)
@app.get(
"/test/error", response_model=List[Thing], response_model_exclude={"other_thing"}
)
async def get_weakref():
ots = await OtherThing.objects.all()
ot = ots[0]
ts = await ot.things.all()
return ts
def test_endpoints():
client = TestClient(app)
with client:
@ -145,3 +155,7 @@ def test_endpoints():
resp5 = client.get(f"/test/5/{ot.id}")
assert resp5.status_code == 200
assert len(resp5.json()) == 3
resp6 = client.get("/test/error")
assert resp6.status_code == 200
assert len(resp6.json()) == 3

View File

@ -0,0 +1,223 @@
from typing import List, Optional
import databases
import pytest
import sqlalchemy
import ormar
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
class BaseMeta(ormar.ModelMeta):
database = database
metadata = metadata
class Author(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
first_name: str = ormar.String(max_length=80)
last_name: str = ormar.String(max_length=80)
class Category(ormar.Model):
class Meta(BaseMeta):
tablename = "categories"
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=40)
class Post(ormar.Model):
class Meta(BaseMeta):
pass
id: int = ormar.Integer(primary_key=True)
title: str = ormar.String(max_length=200)
categories: Optional[List[Category]] = ormar.ManyToMany(Category, skip_reverse=True)
author: Optional[Author] = ormar.ForeignKey(Author, skip_reverse=True)
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
@pytest.fixture(scope="function")
async def cleanup():
yield
async with database:
PostCategory = Post.Meta.model_fields["categories"].through
await PostCategory.objects.delete(each=True)
await Post.objects.delete(each=True)
await Category.objects.delete(each=True)
await Author.objects.delete(each=True)
def test_model_definition():
category = Category(name="Test")
author = Author(first_name="Test", last_name="Author")
post = Post(title="Test Post", author=author)
post.categories = category
assert post.categories[0] == category
assert post.author == author
with pytest.raises(AttributeError):
assert author.posts
with pytest.raises(AttributeError):
assert category.posts
assert "posts" not in category._orm
@pytest.mark.asyncio
async def test_assigning_related_objects(cleanup):
async with database:
guido = await Author.objects.create(first_name="Guido", last_name="Van Rossum")
post = await Post.objects.create(title="Hello, M2M", author=guido)
news = await Category.objects.create(name="News")
# Add a category to a post.
await post.categories.add(news)
# other way is disabled
with pytest.raises(AttributeError):
await news.posts.add(post)
assert await post.categories.get_or_none(name="no exist") is None
assert await post.categories.get_or_none(name="News") == news
# Creating columns object from instance:
await post.categories.create(name="Tips")
assert len(post.categories) == 2
post_categories = await post.categories.all()
assert len(post_categories) == 2
category = await Category.objects.select_related("posts").get(name="News")
with pytest.raises(AttributeError):
assert category.posts
@pytest.mark.asyncio
async def test_quering_of_related_model_works_but_no_result(cleanup):
async with database:
guido = await Author.objects.create(first_name="Guido", last_name="Van Rossum")
post = await Post.objects.create(title="Hello, M2M", author=guido)
news = await Category.objects.create(name="News")
await post.categories.add(news)
post_categories = await post.categories.all()
assert len(post_categories) == 1
assert "posts" not in post.dict().get("categories", [])[0]
assert news == await post.categories.get(name="News")
posts_about_python = await Post.objects.filter(categories__name="python").all()
assert len(posts_about_python) == 0
# relation not in dict
category = (
await Category.objects.select_related("posts")
.filter(posts__author=guido)
.get()
)
assert category == news
assert "posts" not in category.dict()
# relation not in json
category2 = (
await Category.objects.select_related("posts")
.filter(posts__author__first_name="Guido")
.get()
)
assert category2 == news
assert "posts" not in category2.json()
assert "posts" not in Category.schema().get("properties")
@pytest.mark.asyncio
async def test_removal_of_the_relations(cleanup):
async with database:
guido = await Author.objects.create(first_name="Guido", last_name="Van Rossum")
post = await Post.objects.create(title="Hello, M2M", author=guido)
news = await Category.objects.create(name="News")
await post.categories.add(news)
assert len(await post.categories.all()) == 1
await post.categories.remove(news)
assert len(await post.categories.all()) == 0
with pytest.raises(AttributeError):
await news.posts.add(post)
with pytest.raises(AttributeError):
await news.posts.remove(post)
await post.categories.add(news)
await post.categories.clear()
assert len(await post.categories.all()) == 0
await post.categories.add(news)
await news.delete()
assert len(await post.categories.all()) == 0
@pytest.mark.asyncio
async def test_selecting_related(cleanup):
async with database:
guido = await Author.objects.create(first_name="Guido", last_name="Van Rossum")
guido2 = await Author.objects.create(
first_name="Guido2", last_name="Van Rossum"
)
post = await Post.objects.create(title="Hello, M2M", author=guido)
post2 = await Post.objects.create(title="Bye, M2M", author=guido2)
news = await Category.objects.create(name="News")
recent = await Category.objects.create(name="Recent")
await post.categories.add(news)
await post.categories.add(recent)
await post2.categories.add(recent)
assert len(await post.categories.all()) == 2
assert (await post.categories.limit(1).all())[0] == news
assert (await post.categories.offset(1).limit(1).all())[0] == recent
assert await post.categories.first() == news
assert await post.categories.exists()
# still can order
categories = (
await Category.objects.select_related("posts")
.order_by("posts__title")
.all()
)
assert categories[0].name == "Recent"
assert categories[1].name == "News"
# still can filter
categories = await Category.objects.filter(posts__title="Bye, M2M").all()
assert categories[0].name == "Recent"
assert len(categories) == 1
# same for reverse fk
authors = (
await Author.objects.select_related("posts").order_by("posts__title").all()
)
assert authors[0].first_name == "Guido2"
assert authors[1].first_name == "Guido"
authors = await Author.objects.filter(posts__title="Bye, M2M").all()
assert authors[0].first_name == "Guido2"
assert len(authors) == 1