add update to queryset, add update_through_instance, start to update docs

This commit is contained in:
collerek
2021-02-26 11:28:44 +01:00
parent 503f589fa7
commit 7bf781098f
9 changed files with 339 additions and 14 deletions

View File

@ -235,6 +235,64 @@ async def test_ordering_by_through_model() -> Any:
assert post3.categories[2].postcategory.param_name == "volume"
@pytest.mark.asyncio
async def test_update_through_models_from_queryset_on_through() -> Any:
async with database:
post = await Post(title="Test post").save()
await post.categories.create(
name="Test category1",
postcategory={"sort_order": 2, "param_name": "volume"},
)
await post.categories.create(
name="Test category2", postcategory={"sort_order": 1, "param_name": "area"}
)
await post.categories.create(
name="Test category3",
postcategory={"sort_order": 3, "param_name": "velocity"},
)
await PostCategory.objects.filter(param_name="volume", post=post.id).update(
sort_order=4
)
post2 = (
await Post.objects.select_related("categories")
.order_by("-postcategory__sort_order")
.get()
)
assert len(post2.categories) == 3
assert post2.categories[0].postcategory.param_name == "volume"
assert post2.categories[2].postcategory.param_name == "area"
@pytest.mark.asyncio
async def test_update_through_from_related() -> Any:
async with database:
post = await Post(title="Test post").save()
await post.categories.create(
name="Test category1",
postcategory={"sort_order": 2, "param_name": "volume"},
)
await post.categories.create(
name="Test category2", postcategory={"sort_order": 1, "param_name": "area"}
)
await post.categories.create(
name="Test category3",
postcategory={"sort_order": 3, "param_name": "velocity"},
)
await post.categories.filter(name="Test category3").update(
postcategory={"sort_order": 4}
)
post2 = (
await Post.objects.select_related("categories")
.order_by("postcategory__sort_order")
.get()
)
assert len(post2.categories) == 3
assert post2.categories[2].postcategory.sort_order == 4
# TODO: check/ modify following
# add to fields with class lower name (V)
@ -245,11 +303,12 @@ async def test_ordering_by_through_model() -> Any:
# accessing from instance (V) <- no both sides only nested one is relevant, fix one side
# filtering in filter (through name normally) (V) < - table prefix from normal relation,
# check if is_through needed, resolved side of relation
# ordering by in order_by
# ordering by in order_by (V)
# updating in query (V)
# updating from querysetproxy (V)
# modifying from instance (both sides?) (X) <= no, the loaded one doesn't have relations
# updating in query
# modifying from instance (both sides?)
# including/excluding in fields?
# allowing to change fk fields names in through model?
# make through optional? auto-generated for cases other fields are missing?

View File

@ -6,6 +6,7 @@ import pytest
import sqlalchemy
import ormar
from ormar.exceptions import QueryDefinitionError
from tests.settings import DATABASE_URL
database = databases.Database(DATABASE_URL, force_rollback=True)
@ -180,3 +181,42 @@ async def test_queryset_methods():
assert len(categories) == 3 == len(post.categories)
for cat in post.categories:
assert cat.subject.name is not None
@pytest.mark.asyncio
async def test_queryset_update():
async with database:
async with database.transaction(force_rollback=True):
guido = await Author.objects.create(
first_name="Guido", last_name="Van Rossum"
)
subject = await Subject(name="Random").save()
post = await Post.objects.create(title="Hello, M2M", author=guido)
await post.categories.create(name="News", sort_order=1, subject=subject)
await post.categories.create(name="Breaking", sort_order=3, subject=subject)
await post.categories.order_by("sort_order").all()
assert len(post.categories) == 2
assert post.categories[0].sort_order == 1
assert post.categories[0].name == "News"
assert post.categories[1].sort_order == 3
assert post.categories[1].name == "Breaking"
updated = await post.categories.update(each=True, name="Test")
assert updated == 2
await post.categories.order_by("sort_order").all()
assert len(post.categories) == 2
assert post.categories[0].name == "Test"
assert post.categories[1].name == "Test"
updated = await post.categories.filter(sort_order=3).update(name="Test 2")
assert updated == 1
await post.categories.order_by("sort_order").all()
assert len(post.categories) == 2
assert post.categories[0].name == "Test"
assert post.categories[1].name == "Test 2"
with pytest.raises(QueryDefinitionError):
await post.categories.update(name="Test WRONG")

View File

@ -0,0 +1,147 @@
from typing import List, Optional
from uuid import UUID, uuid4
import databases
import pydantic
import pytest
import sqlalchemy
from fastapi import FastAPI
from starlette.testclient import TestClient
import ormar
from tests.settings import DATABASE_URL
app = FastAPI()
database = databases.Database(DATABASE_URL, force_rollback=True)
metadata = sqlalchemy.MetaData()
app.state.database = database
@app.on_event("startup")
async def startup() -> None:
database_ = app.state.database
if not database_.is_connected:
await database_.connect()
@app.on_event("shutdown")
async def shutdown() -> None:
database_ = app.state.database
if database_.is_connected:
await database_.disconnect()
@pytest.fixture(autouse=True, scope="module")
def create_test_database():
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
yield
metadata.drop_all(engine)
class BaseMeta(ormar.ModelMeta):
database = database
metadata = metadata
class OtherThing(ormar.Model):
class Meta(BaseMeta):
tablename = "other_things"
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
name: str = ormar.Text(default="")
ot_contents: str = ormar.Text(default="")
class Thing(ormar.Model):
class Meta(BaseMeta):
tablename = "things"
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
name: str = ormar.Text(default="")
js: pydantic.Json = ormar.JSON(nullable=True)
other_thing: Optional[OtherThing] = ormar.ForeignKey(OtherThing, nullable=True)
@app.post("/test/1")
async def post_test_1():
# don't split initialization and attribute assignment
ot = await OtherThing(ot_contents="otc").save()
await Thing(other_thing=ot, name="t1").save()
await Thing(other_thing=ot, name="t2").save()
await Thing(other_thing=ot, name="t3").save()
# if you do not care about returned object you can even go with bulk_create
# all of them are created in one transaction
# things = [Thing(other_thing=ot, name='t1'),
# Thing(other_thing=ot, name="t2"),
# Thing(other_thing=ot, name="t3")]
# await Thing.objects.bulk_create(things)
@app.get("/test/2", response_model=List[Thing])
async def get_test_2():
# if you only query for one use get or first
ot = await OtherThing.objects.get()
ts = await ot.things.all()
# specifically null out the relation on things before return
for t in ts:
t.remove(ot, name="other_thing")
return ts
@app.get("/test/3", response_model=List[Thing])
async def get_test_3():
ot = await OtherThing.objects.select_related("things").get()
# exclude unwanted field while ot is still in scope
# in order not to pass it to fastapi
return [t.dict(exclude={"other_thing"}) for t in ot.things]
@app.get("/test/4", response_model=List[Thing], response_model_exclude={"other_thing"})
async def get_test_4():
ot = await OtherThing.objects.get()
# query from the active side
return await Thing.objects.all(other_thing=ot)
@app.get("/get_ot/", response_model=OtherThing)
async def get_ot():
return await OtherThing.objects.get()
# more real life (usually) is not getting some random OT and get it's Things
# but query for a specific one by some kind of id
@app.get(
"/test/5/{thing_id}",
response_model=List[Thing],
response_model_exclude={"other_thing"},
)
async def get_test_5(thing_id: UUID):
return await Thing.objects.all(other_thing__id=thing_id)
def test_endpoints():
client = TestClient(app)
with client:
resp = client.post("/test/1")
assert resp.status_code == 200
resp2 = client.get("/test/2")
assert resp2.status_code == 200
assert len(resp2.json()) == 3
resp3 = client.get("/test/3")
assert resp3.status_code == 200
assert len(resp3.json()) == 3
resp4 = client.get("/test/4")
assert resp4.status_code == 200
assert len(resp4.json()) == 3
ot = OtherThing(**client.get("/get_ot/").json())
resp5 = client.get(f"/test/5/{ot.id}")
assert resp5.status_code == 200
assert len(resp5.json()) == 3