@ -368,6 +368,51 @@ You can set this parameter by providing `Meta` class `constraints` argument.
|
|||||||
--8<-- "../docs_src/models/docs006.py"
|
--8<-- "../docs_src/models/docs006.py"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Model sort order
|
||||||
|
|
||||||
|
When querying the database with given model by default the Model is ordered by the `primary_key`
|
||||||
|
column ascending. If you wish to change the default behaviour you can do it by providing `orders_by`
|
||||||
|
parameter to model `Meta` class.
|
||||||
|
|
||||||
|
Sample default ordering:
|
||||||
|
```python
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
# default sort by column id ascending
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
```
|
||||||
|
Modified
|
||||||
|
```python
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
# now default sort by name descending
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
orders_by = ["-name"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
```
|
||||||
|
|
||||||
## Model Initialization
|
## Model Initialization
|
||||||
|
|
||||||
There are two ways to create and persist the `Model` instance in the database.
|
There are two ways to create and persist the `Model` instance in the database.
|
||||||
|
|||||||
@ -1,15 +1,23 @@
|
|||||||
# Aggregation functions
|
# Aggregation functions
|
||||||
|
|
||||||
Currently 2 aggregation functions are supported.
|
Currently 6 aggregation functions are supported.
|
||||||
|
|
||||||
|
|
||||||
* `count() -> int`
|
* `count() -> int`
|
||||||
* `exists() -> bool`
|
* `exists() -> bool`
|
||||||
|
* `sum(columns) -> Any`
|
||||||
|
* `avg(columns) -> Any`
|
||||||
|
* `min(columns) -> Any`
|
||||||
|
* `max(columns) -> Any`
|
||||||
|
|
||||||
|
|
||||||
* `QuerysetProxy`
|
* `QuerysetProxy`
|
||||||
* `QuerysetProxy.count()` method
|
* `QuerysetProxy.count()` method
|
||||||
* `QuerysetProxy.exists()` method
|
* `QuerysetProxy.exists()` method
|
||||||
|
* `QuerysetProxy.sum(columns)` method
|
||||||
|
* `QuerysetProxy.avg(columns)` method
|
||||||
|
* `QuerysetProxy.min(column)` method
|
||||||
|
* `QuerysetProxy.max(columns)` method
|
||||||
|
|
||||||
|
|
||||||
## count
|
## count
|
||||||
@ -68,6 +76,209 @@ class Book(ormar.Model):
|
|||||||
has_sample = await Book.objects.filter(title='Sample').exists()
|
has_sample = await Book.objects.filter(title='Sample').exists()
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## sum
|
||||||
|
|
||||||
|
`sum(columns) -> Any`
|
||||||
|
|
||||||
|
Returns sum value of columns for rows matching the given criteria (applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
You can pass one or many column names including related columns.
|
||||||
|
|
||||||
|
As of now each column passed is aggregated separately (so `sum(col1+col2)` is not possible,
|
||||||
|
you can have `sum(col1, col2)` and later add 2 returned sums in python)
|
||||||
|
|
||||||
|
You cannot `sum` non numeric columns.
|
||||||
|
|
||||||
|
If you aggregate on one column, the single value is directly returned as a result
|
||||||
|
If you aggregate on multiple columns a dictionary with column: result pairs is returned
|
||||||
|
|
||||||
|
Given models like follows
|
||||||
|
|
||||||
|
```Python
|
||||||
|
--8<-- "../docs_src/aggregations/docs001.py"
|
||||||
|
```
|
||||||
|
|
||||||
|
A sample usage might look like following
|
||||||
|
|
||||||
|
```python
|
||||||
|
author = await Author(name="Author 1").save()
|
||||||
|
await Book(title="Book 1", year=1920, ranking=3, author=author).save()
|
||||||
|
await Book(title="Book 2", year=1930, ranking=1, author=author).save()
|
||||||
|
await Book(title="Book 3", year=1923, ranking=5, author=author).save()
|
||||||
|
|
||||||
|
assert await Book.objects.sum("year") == 5773
|
||||||
|
result = await Book.objects.sum(["year", "ranking"])
|
||||||
|
assert result == dict(year=5773, ranking=9)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# cannot sum string column
|
||||||
|
await Book.objects.sum("title")
|
||||||
|
except ormar.QueryDefinitionError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
assert await Author.objects.select_related("books").sum("books__year") == 5773
|
||||||
|
result = await Author.objects.select_related("books").sum(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert result == dict(books__year=5773, books__ranking=9)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__lt=1925)
|
||||||
|
.sum("books__year")
|
||||||
|
== 3843
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
## avg
|
||||||
|
|
||||||
|
`avg(columns) -> Any`
|
||||||
|
|
||||||
|
Returns avg value of columns for rows matching the given criteria (applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
You can pass one or many column names including related columns.
|
||||||
|
|
||||||
|
As of now each column passed is aggregated separately (so `sum(col1+col2)` is not possible,
|
||||||
|
you can have `sum(col1, col2)` and later add 2 returned sums in python)
|
||||||
|
|
||||||
|
You cannot `avg` non numeric columns.
|
||||||
|
|
||||||
|
If you aggregate on one column, the single value is directly returned as a result
|
||||||
|
If you aggregate on multiple columns a dictionary with column: result pairs is returned
|
||||||
|
|
||||||
|
```Python
|
||||||
|
--8<-- "../docs_src/aggregations/docs001.py"
|
||||||
|
```
|
||||||
|
|
||||||
|
A sample usage might look like following
|
||||||
|
|
||||||
|
```python
|
||||||
|
author = await Author(name="Author 1").save()
|
||||||
|
await Book(title="Book 1", year=1920, ranking=3, author=author).save()
|
||||||
|
await Book(title="Book 2", year=1930, ranking=1, author=author).save()
|
||||||
|
await Book(title="Book 3", year=1923, ranking=5, author=author).save()
|
||||||
|
|
||||||
|
assert round(float(await Book.objects.avg("year")), 2) == 1924.33
|
||||||
|
result = await Book.objects.avg(["year", "ranking"])
|
||||||
|
assert round(float(result.get("year")), 2) == 1924.33
|
||||||
|
assert result.get("ranking") == 3.0
|
||||||
|
|
||||||
|
try:
|
||||||
|
# cannot avg string column
|
||||||
|
await Book.objects.avg("title")
|
||||||
|
except ormar.QueryDefinitionError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
result = await Author.objects.select_related("books").avg("books__year")
|
||||||
|
assert round(float(result), 2) == 1924.33
|
||||||
|
result = await Author.objects.select_related("books").avg(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert round(float(result.get("books__year")), 2) == 1924.33
|
||||||
|
assert result.get("books__ranking") == 3.0
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__lt=1925)
|
||||||
|
.avg("books__year")
|
||||||
|
== 1921.5
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
## min
|
||||||
|
|
||||||
|
`min(columns) -> Any`
|
||||||
|
|
||||||
|
Returns min value of columns for rows matching the given criteria (applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
You can pass one or many column names including related columns.
|
||||||
|
|
||||||
|
As of now each column passed is aggregated separately (so `sum(col1+col2)` is not possible,
|
||||||
|
you can have `sum(col1, col2)` and later add 2 returned sums in python)
|
||||||
|
|
||||||
|
If you aggregate on one column, the single value is directly returned as a result
|
||||||
|
If you aggregate on multiple columns a dictionary with column: result pairs is returned
|
||||||
|
|
||||||
|
```Python
|
||||||
|
--8<-- "../docs_src/aggregations/docs001.py"
|
||||||
|
```
|
||||||
|
|
||||||
|
A sample usage might look like following
|
||||||
|
|
||||||
|
```python
|
||||||
|
author = await Author(name="Author 1").save()
|
||||||
|
await Book(title="Book 1", year=1920, ranking=3, author=author).save()
|
||||||
|
await Book(title="Book 2", year=1930, ranking=1, author=author).save()
|
||||||
|
await Book(title="Book 3", year=1923, ranking=5, author=author).save()
|
||||||
|
|
||||||
|
assert await Book.objects.min("year") == 1920
|
||||||
|
result = await Book.objects.min(["year", "ranking"])
|
||||||
|
assert result == dict(year=1920, ranking=1)
|
||||||
|
|
||||||
|
assert await Book.objects.min("title") == "Book 1"
|
||||||
|
|
||||||
|
assert await Author.objects.select_related("books").min("books__year") == 1920
|
||||||
|
result = await Author.objects.select_related("books").min(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert result == dict(books__year=1920, books__ranking=1)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__gt=1925)
|
||||||
|
.min("books__year")
|
||||||
|
== 1930
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
## max
|
||||||
|
|
||||||
|
`max(columns) -> Any`
|
||||||
|
|
||||||
|
Returns max value of columns for rows matching the given criteria (applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
Returns min value of columns for rows matching the given criteria (applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
You can pass one or many column names including related columns.
|
||||||
|
|
||||||
|
As of now each column passed is aggregated separately (so `sum(col1+col2)` is not possible,
|
||||||
|
you can have `sum(col1, col2)` and later add 2 returned sums in python)
|
||||||
|
|
||||||
|
If you aggregate on one column, the single value is directly returned as a result
|
||||||
|
If you aggregate on multiple columns a dictionary with column: result pairs is returned
|
||||||
|
|
||||||
|
```Python
|
||||||
|
--8<-- "../docs_src/aggregations/docs001.py"
|
||||||
|
```
|
||||||
|
|
||||||
|
A sample usage might look like following
|
||||||
|
|
||||||
|
```python
|
||||||
|
author = await Author(name="Author 1").save()
|
||||||
|
await Book(title="Book 1", year=1920, ranking=3, author=author).save()
|
||||||
|
await Book(title="Book 2", year=1930, ranking=1, author=author).save()
|
||||||
|
await Book(title="Book 3", year=1923, ranking=5, author=author).save()
|
||||||
|
|
||||||
|
assert await Book.objects.max("year") == 1930
|
||||||
|
result = await Book.objects.max(["year", "ranking"])
|
||||||
|
assert result == dict(year=1930, ranking=5)
|
||||||
|
|
||||||
|
assert await Book.objects.max("title") == "Book 3"
|
||||||
|
|
||||||
|
assert await Author.objects.select_related("books").max("books__year") == 1930
|
||||||
|
result = await Author.objects.select_related("books").max(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert result == dict(books__year=1930, books__ranking=5)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__lt=1925)
|
||||||
|
.max("books__year")
|
||||||
|
== 1923
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
## QuerysetProxy methods
|
## QuerysetProxy methods
|
||||||
|
|
||||||
When access directly the related `ManyToMany` field as well as `ReverseForeignKey`
|
When access directly the related `ManyToMany` field as well as `ReverseForeignKey`
|
||||||
@ -89,6 +300,26 @@ objects from other side of the relation.
|
|||||||
Works exactly the same as [exists](./#exists) function above but allows you to select columns from related
|
Works exactly the same as [exists](./#exists) function above but allows you to select columns from related
|
||||||
objects from other side of the relation.
|
objects from other side of the relation.
|
||||||
|
|
||||||
|
### sum
|
||||||
|
|
||||||
|
Works exactly the same as [sum](./#sum) function above but allows you to sum columns from related
|
||||||
|
objects from other side of the relation.
|
||||||
|
|
||||||
|
### avg
|
||||||
|
|
||||||
|
Works exactly the same as [avg](./#avg) function above but allows you to average columns from related
|
||||||
|
objects from other side of the relation.
|
||||||
|
|
||||||
|
### min
|
||||||
|
|
||||||
|
Works exactly the same as [min](./#min) function above but allows you to select minimum of columns from related
|
||||||
|
objects from other side of the relation.
|
||||||
|
|
||||||
|
### max
|
||||||
|
|
||||||
|
Works exactly the same as [max](./#max) function above but allows you to select maximum of columns from related
|
||||||
|
objects from other side of the relation.
|
||||||
|
|
||||||
!!!tip
|
!!!tip
|
||||||
To read more about `QuerysetProxy` visit [querysetproxy][querysetproxy] section
|
To read more about `QuerysetProxy` visit [querysetproxy][querysetproxy] section
|
||||||
|
|
||||||
|
|||||||
@ -289,7 +289,7 @@ books = (
|
|||||||
```
|
```
|
||||||
|
|
||||||
If you want or need to you can nest deeper conditions as deep as you want, in example to
|
If you want or need to you can nest deeper conditions as deep as you want, in example to
|
||||||
acheive a query like this:
|
achieve a query like this:
|
||||||
|
|
||||||
sql:
|
sql:
|
||||||
```
|
```
|
||||||
@ -564,6 +564,38 @@ assert owner.toys[1].name == "Toy 1"
|
|||||||
|
|
||||||
Something like `Track.object.select_related("album").filter(album__name="Malibu").offset(1).limit(1).all()`
|
Something like `Track.object.select_related("album").filter(album__name="Malibu").offset(1).limit(1).all()`
|
||||||
|
|
||||||
|
### Default sorting in ormar
|
||||||
|
|
||||||
|
Since order of rows in a database is not guaranteed, `ormar` **always** issues an `order by` sql clause to each (part of) query even if you do not provide order yourself.
|
||||||
|
|
||||||
|
When querying the database with given model by default the `Model` is ordered by the `primary_key`
|
||||||
|
column ascending. If you wish to change the default behaviour you can do it by providing `orders_by`
|
||||||
|
parameter to model `Meta` class.
|
||||||
|
|
||||||
|
!!!tip
|
||||||
|
To read more about models sort order visit [models](../models/index.md#model-sort-order) section of documentation
|
||||||
|
|
||||||
|
By default the relations follow the same ordering, but you can modify the order in which related models are loaded during query by providing `orders_by` and `related_orders_by`
|
||||||
|
parameters to relations.
|
||||||
|
|
||||||
|
!!!tip
|
||||||
|
To read more about models sort order visit [relations](../relations/index.md#relationship-default-sort-order) section of documentation
|
||||||
|
|
||||||
|
Order in which order_by clauses are applied is as follows:
|
||||||
|
|
||||||
|
* Explicitly passed `order_by()` calls in query
|
||||||
|
* Relation passed `orders_by` and `related_orders_by` if exists
|
||||||
|
* Model `Meta` class `orders_by`
|
||||||
|
* Model `primary_key` column ascending (fallback, used if none of above provided)
|
||||||
|
|
||||||
|
**Order from only one source is applied to each `Model` (so that you can always overwrite it in a single query).**
|
||||||
|
|
||||||
|
That means that if you provide explicit `order_by` for a model in a query, the `Relation` and `Model` sort orders are skipped.
|
||||||
|
|
||||||
|
If you provide a `Relation` one, the `Model` sort is skipped.
|
||||||
|
|
||||||
|
Finally, if you provide one for `Model` the default one by `primary_key` is skipped.
|
||||||
|
|
||||||
### QuerysetProxy methods
|
### QuerysetProxy methods
|
||||||
|
|
||||||
When access directly the related `ManyToMany` field as well as `ReverseForeignKey`
|
When access directly the related `ManyToMany` field as well as `ReverseForeignKey`
|
||||||
|
|||||||
@ -128,6 +128,58 @@ class Post(ormar.Model):
|
|||||||
|
|
||||||
It allows you to use `await post.categories.all()` but also `await category.posts.all()` to fetch data related only to specific post, category etc.
|
It allows you to use `await post.categories.all()` but also `await category.posts.all()` to fetch data related only to specific post, category etc.
|
||||||
|
|
||||||
|
## Relationship default sort order
|
||||||
|
|
||||||
|
By default relations follow model default sort order so `primary_key` column ascending, or any sort order se in `Meta` class.
|
||||||
|
|
||||||
|
!!!tip
|
||||||
|
To read more about models sort order visit [models](../models/index.md#model-sort-order) section of documentation
|
||||||
|
|
||||||
|
But you can modify the order in which related models are loaded during query by providing `orders_by` and `related_orders_by`
|
||||||
|
parameters to relations.
|
||||||
|
|
||||||
|
In relations you can sort only by directly related model columns or for `ManyToMany`
|
||||||
|
columns also `Through` model columns `{through_field_name}__{column_name}`
|
||||||
|
|
||||||
|
Sample configuration might look like this:
|
||||||
|
|
||||||
|
```python hl_lines="24"
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Book(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "books"
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
author: Optional[Author] = ormar.ForeignKey(
|
||||||
|
Author, orders_by=["name"], related_orders_by=["-year"]
|
||||||
|
)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
year: int = ormar.Integer(nullable=True)
|
||||||
|
ranking: int = ormar.Integer(nullable=True)
|
||||||
|
```
|
||||||
|
|
||||||
|
Now calls:
|
||||||
|
|
||||||
|
`await Author.objects.select_related("books").get()` - the books will be sorted by the book year descending
|
||||||
|
|
||||||
|
`await Book.objects.select_related("author").all()` - the authors will be sorted by author name ascending
|
||||||
|
|
||||||
## Self-reference and postponed references
|
## Self-reference and postponed references
|
||||||
|
|
||||||
In order to create auto-relation or create two models that reference each other in at least two
|
In order to create auto-relation or create two models that reference each other in at least two
|
||||||
|
|||||||
@ -1,3 +1,48 @@
|
|||||||
|
# 0.9.9
|
||||||
|
|
||||||
|
## Features
|
||||||
|
* Add possibility to change default ordering of relations and models.
|
||||||
|
* To change model sorting pass `orders_by = [columns]` where `columns: List[str]` to model `Meta` class
|
||||||
|
* To change relation order_by pass `orders_by = [columns]` where `columns: List[str]`
|
||||||
|
* To change reverse relation order_by pass `related_orders_by = [columns]` where `columns: List[str]`
|
||||||
|
* Arguments can be column names or `-{col_name}` to sort descending
|
||||||
|
* In relations you can sort only by directly related model columns
|
||||||
|
or for `ManyToMany` columns also `Through` model columns `"{through_field_name}__{column_name}"`
|
||||||
|
* Order in which order_by clauses are applied is as follows:
|
||||||
|
* Explicitly passed `order_by()` calls in query
|
||||||
|
* Relation passed `orders_by` if exists
|
||||||
|
* Model `Meta` class `orders_by`
|
||||||
|
* Model primary key column asc (fallback, used if none of above provided)
|
||||||
|
* Add 4 new aggregated functions -> `min`, `max`, `sum` and `avg` that are their
|
||||||
|
corresponding sql equivalents.
|
||||||
|
* You can pass one or many column names including related columns.
|
||||||
|
* As of now each column passed is aggregated separately (so `sum(col1+col2)` is not possible,
|
||||||
|
you can have `sum(col1, col2)` and later add 2 returned sums in python)
|
||||||
|
* You cannot `sum` and `avg` non numeric columns
|
||||||
|
* If you aggregate on one column, the single value is directly returned as a result
|
||||||
|
* If you aggregate on multiple columns a dictionary with column: result pairs is returned
|
||||||
|
* Add 4 new signals -> `pre_relation_add`, `post_relation_add`, `pre_relation_remove` and `post_relation_remove`
|
||||||
|
* The newly added signals are emitted for `ManyToMany` relations (both sides)
|
||||||
|
and reverse side of `ForeignKey` relation (same as `QuerysetProxy` is exposed).
|
||||||
|
* Signals recieve following args: `sender: Type[Model]` - sender class,
|
||||||
|
`instance: Model` - instance to which related model is added, `child: Model` - model being added,
|
||||||
|
`relation_name: str` - name of the relation to which child is added,
|
||||||
|
for add signals also `passed_kwargs: Dict` - dict of kwargs passed to `add()`
|
||||||
|
|
||||||
|
## Changes
|
||||||
|
* `Through` models for ManyToMany relations are now instantiated on creation, deletion and update, so you can provide not only
|
||||||
|
autoincrement int as a primary key but any column type with default function provided.
|
||||||
|
* Since `Through` models are now instantiated you can also subscribe to `Through` model
|
||||||
|
pre/post save/update/delete signals
|
||||||
|
* `pre_update` signals receivers now get also passed_args argument which is a
|
||||||
|
dict of values passed to update function if any (else empty dict)
|
||||||
|
|
||||||
|
## Fixes
|
||||||
|
* `pre_update` signal now is sent before the extraction of values so you can modify the passed
|
||||||
|
instance in place and modified fields values will be reflected in database
|
||||||
|
* `bulk_update` now works correctly also with `UUID` primary key column type
|
||||||
|
|
||||||
|
|
||||||
# 0.9.8
|
# 0.9.8
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|||||||
@ -192,6 +192,47 @@ Send for `Model.update()` method.
|
|||||||
|
|
||||||
`sender` is a `ormar.Model` class and `instance` is the model that was deleted.
|
`sender` is a `ormar.Model` class and `instance` is the model that was deleted.
|
||||||
|
|
||||||
|
### pre_relation_add
|
||||||
|
|
||||||
|
`pre_relation_add(sender: Type["Model"], instance: "Model", child: "Model",
|
||||||
|
relation_name: str, passed_args: Dict)`
|
||||||
|
|
||||||
|
Send for `Model.relation_name.add()` method for `ManyToMany` relations and reverse side of `ForeignKey` relation.
|
||||||
|
|
||||||
|
`sender` - sender class, `instance` - instance to which related model is added, `child` - model being added,
|
||||||
|
`relation_name` - name of the relation to which child is added, for add signals also `passed_kwargs` - dict of kwargs passed to `add()`
|
||||||
|
|
||||||
|
### post_relation_add
|
||||||
|
|
||||||
|
`post_relation_add(sender: Type["Model"], instance: "Model", child: "Model",
|
||||||
|
relation_name: str, passed_args: Dict)`
|
||||||
|
|
||||||
|
Send for `Model.relation_name.add()` method for `ManyToMany` relations and reverse side of `ForeignKey` relation.
|
||||||
|
|
||||||
|
`sender` - sender class, `instance` - instance to which related model is added, `child` - model being added,
|
||||||
|
`relation_name` - name of the relation to which child is added, for add signals also `passed_kwargs` - dict of kwargs passed to `add()`
|
||||||
|
|
||||||
|
### pre_relation_remove
|
||||||
|
|
||||||
|
`pre_relation_remove(sender: Type["Model"], instance: "Model", child: "Model",
|
||||||
|
relation_name: str)`
|
||||||
|
|
||||||
|
Send for `Model.relation_name.remove()` method for `ManyToMany` relations and reverse side of `ForeignKey` relation.
|
||||||
|
|
||||||
|
`sender` - sender class, `instance` - instance to which related model is added, `child` - model being added,
|
||||||
|
`relation_name` - name of the relation to which child is added.
|
||||||
|
|
||||||
|
### post_relation_remove
|
||||||
|
|
||||||
|
`post_relation_remove(sender: Type["Model"], instance: "Model", child: "Model",
|
||||||
|
relation_name: str, passed_args: Dict)`
|
||||||
|
|
||||||
|
Send for `Model.relation_name.remove()` method for `ManyToMany` relations and reverse side of `ForeignKey` relation.
|
||||||
|
|
||||||
|
`sender` - sender class, `instance` - instance to which related model is added, `child` - model being added,
|
||||||
|
`relation_name` - name of the relation to which child is added.
|
||||||
|
|
||||||
|
|
||||||
## Defining your own signals
|
## Defining your own signals
|
||||||
|
|
||||||
Note that you can create your own signals although you will have to send them manually in your code or subclass `ormar.Model`
|
Note that you can create your own signals although you will have to send them manually in your code or subclass `ormar.Model`
|
||||||
|
|||||||
0
docs_src/aggregations/__init__.py
Normal file
0
docs_src/aggregations/__init__.py
Normal file
36
docs_src/aggregations/docs001.py
Normal file
36
docs_src/aggregations/docs001.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import databases
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
order_by = ["-name"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Book(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "books"
|
||||||
|
order_by = ["year", "-ranking"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
author: Optional[Author] = ormar.ForeignKey(Author)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
year: int = ormar.Integer(nullable=True)
|
||||||
|
ranking: int = ormar.Integer(nullable=True)
|
||||||
@ -22,9 +22,13 @@ And what's a better name for python ORM than snakes cabinet :)
|
|||||||
from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100
|
from ormar.protocols import QuerySetProtocol, RelationProtocol # noqa: I100
|
||||||
from ormar.decorators import ( # noqa: I100
|
from ormar.decorators import ( # noqa: I100
|
||||||
post_delete,
|
post_delete,
|
||||||
|
post_relation_add,
|
||||||
|
post_relation_remove,
|
||||||
post_save,
|
post_save,
|
||||||
post_update,
|
post_update,
|
||||||
pre_delete,
|
pre_delete,
|
||||||
|
pre_relation_add,
|
||||||
|
pre_relation_remove,
|
||||||
pre_save,
|
pre_save,
|
||||||
pre_update,
|
pre_update,
|
||||||
property_field,
|
property_field,
|
||||||
@ -71,7 +75,7 @@ class UndefinedType: # pragma no cover
|
|||||||
|
|
||||||
Undefined = UndefinedType()
|
Undefined = UndefinedType()
|
||||||
|
|
||||||
__version__ = "0.9.8"
|
__version__ = "0.9.9"
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"Integer",
|
"Integer",
|
||||||
"BigInteger",
|
"BigInteger",
|
||||||
@ -102,9 +106,13 @@ __all__ = [
|
|||||||
"post_delete",
|
"post_delete",
|
||||||
"post_save",
|
"post_save",
|
||||||
"post_update",
|
"post_update",
|
||||||
|
"post_relation_add",
|
||||||
|
"post_relation_remove",
|
||||||
"pre_delete",
|
"pre_delete",
|
||||||
"pre_save",
|
"pre_save",
|
||||||
"pre_update",
|
"pre_update",
|
||||||
|
"pre_relation_remove",
|
||||||
|
"pre_relation_add",
|
||||||
"Signal",
|
"Signal",
|
||||||
"BaseField",
|
"BaseField",
|
||||||
"ManyToManyField",
|
"ManyToManyField",
|
||||||
|
|||||||
@ -10,9 +10,13 @@ Currently only:
|
|||||||
from ormar.decorators.property_field import property_field
|
from ormar.decorators.property_field import property_field
|
||||||
from ormar.decorators.signals import (
|
from ormar.decorators.signals import (
|
||||||
post_delete,
|
post_delete,
|
||||||
|
post_relation_add,
|
||||||
|
post_relation_remove,
|
||||||
post_save,
|
post_save,
|
||||||
post_update,
|
post_update,
|
||||||
pre_delete,
|
pre_delete,
|
||||||
|
pre_relation_add,
|
||||||
|
pre_relation_remove,
|
||||||
pre_save,
|
pre_save,
|
||||||
pre_update,
|
pre_update,
|
||||||
)
|
)
|
||||||
@ -25,4 +29,8 @@ __all__ = [
|
|||||||
"pre_delete",
|
"pre_delete",
|
||||||
"pre_save",
|
"pre_save",
|
||||||
"pre_update",
|
"pre_update",
|
||||||
|
"post_relation_remove",
|
||||||
|
"post_relation_add",
|
||||||
|
"pre_relation_remove",
|
||||||
|
"pre_relation_add",
|
||||||
]
|
]
|
||||||
|
|||||||
@ -22,7 +22,7 @@ def receiver(
|
|||||||
def _decorator(func: Callable) -> Callable:
|
def _decorator(func: Callable) -> Callable:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
Internal decorator that does all the registeriing.
|
Internal decorator that does all the registering.
|
||||||
|
|
||||||
:param func: function to register as receiver
|
:param func: function to register as receiver
|
||||||
:type func: Callable
|
:type func: Callable
|
||||||
@ -117,3 +117,57 @@ def pre_delete(senders: Union[Type["Model"], List[Type["Model"]]]) -> Callable:
|
|||||||
:rtype: Callable
|
:rtype: Callable
|
||||||
"""
|
"""
|
||||||
return receiver(signal="pre_delete", senders=senders)
|
return receiver(signal="pre_delete", senders=senders)
|
||||||
|
|
||||||
|
|
||||||
|
def pre_relation_add(senders: Union[Type["Model"], List[Type["Model"]]]) -> Callable:
|
||||||
|
"""
|
||||||
|
Connect given function to all senders for pre_relation_add signal.
|
||||||
|
|
||||||
|
:param senders: one or a list of "Model" classes
|
||||||
|
that should have the signal receiver registered
|
||||||
|
:type senders: Union[Type["Model"], List[Type["Model"]]]
|
||||||
|
:return: returns the original function untouched
|
||||||
|
:rtype: Callable
|
||||||
|
"""
|
||||||
|
return receiver(signal="pre_relation_add", senders=senders)
|
||||||
|
|
||||||
|
|
||||||
|
def post_relation_add(senders: Union[Type["Model"], List[Type["Model"]]]) -> Callable:
|
||||||
|
"""
|
||||||
|
Connect given function to all senders for post_relation_add signal.
|
||||||
|
|
||||||
|
:param senders: one or a list of "Model" classes
|
||||||
|
that should have the signal receiver registered
|
||||||
|
:type senders: Union[Type["Model"], List[Type["Model"]]]
|
||||||
|
:return: returns the original function untouched
|
||||||
|
:rtype: Callable
|
||||||
|
"""
|
||||||
|
return receiver(signal="post_relation_add", senders=senders)
|
||||||
|
|
||||||
|
|
||||||
|
def pre_relation_remove(senders: Union[Type["Model"], List[Type["Model"]]]) -> Callable:
|
||||||
|
"""
|
||||||
|
Connect given function to all senders for pre_relation_remove signal.
|
||||||
|
|
||||||
|
:param senders: one or a list of "Model" classes
|
||||||
|
that should have the signal receiver registered
|
||||||
|
:type senders: Union[Type["Model"], List[Type["Model"]]]
|
||||||
|
:return: returns the original function untouched
|
||||||
|
:rtype: Callable
|
||||||
|
"""
|
||||||
|
return receiver(signal="pre_relation_remove", senders=senders)
|
||||||
|
|
||||||
|
|
||||||
|
def post_relation_remove(
|
||||||
|
senders: Union[Type["Model"], List[Type["Model"]]]
|
||||||
|
) -> Callable:
|
||||||
|
"""
|
||||||
|
Connect given function to all senders for post_relation_remove signal.
|
||||||
|
|
||||||
|
:param senders: one or a list of "Model" classes
|
||||||
|
that should have the signal receiver registered
|
||||||
|
:type senders: Union[Type["Model"], List[Type["Model"]]]
|
||||||
|
:return: returns the original function untouched
|
||||||
|
:rtype: Callable
|
||||||
|
"""
|
||||||
|
return receiver(signal="post_relation_remove", senders=senders)
|
||||||
|
|||||||
@ -54,6 +54,8 @@ class BaseField(FieldInfo):
|
|||||||
through: Type["Model"]
|
through: Type["Model"]
|
||||||
self_reference: bool = False
|
self_reference: bool = False
|
||||||
self_reference_primary: Optional[str] = None
|
self_reference_primary: Optional[str] = None
|
||||||
|
orders_by: Optional[List[str]] = None
|
||||||
|
related_orders_by: Optional[List[str]] = None
|
||||||
|
|
||||||
encrypt_secret: str
|
encrypt_secret: str
|
||||||
encrypt_backend: EncryptBackends = EncryptBackends.NONE
|
encrypt_backend: EncryptBackends = EncryptBackends.NONE
|
||||||
|
|||||||
@ -3,7 +3,7 @@ import sys
|
|||||||
import uuid
|
import uuid
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from random import choices
|
from random import choices
|
||||||
from typing import Any, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple, Type, Union
|
||||||
|
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from pydantic import BaseModel, create_model
|
from pydantic import BaseModel, create_model
|
||||||
@ -119,6 +119,35 @@ def populate_fk_params_based_on_to_model(
|
|||||||
return __type__, constraints, column_type
|
return __type__, constraints, column_type
|
||||||
|
|
||||||
|
|
||||||
|
def validate_not_allowed_fields(kwargs: Dict) -> None:
|
||||||
|
"""
|
||||||
|
Verifies if not allowed parameters are set on relation models.
|
||||||
|
Usually they are omitted later anyway but this way it's explicitly
|
||||||
|
notify the user that it's not allowed/ supported.
|
||||||
|
|
||||||
|
:raises ModelDefinitionError: if any forbidden field is set
|
||||||
|
:param kwargs: dict of kwargs to verify passed to relation field
|
||||||
|
:type kwargs: Dict
|
||||||
|
"""
|
||||||
|
default = kwargs.pop("default", None)
|
||||||
|
encrypt_secret = kwargs.pop("encrypt_secret", None)
|
||||||
|
encrypt_backend = kwargs.pop("encrypt_backend", None)
|
||||||
|
encrypt_custom_backend = kwargs.pop("encrypt_custom_backend", None)
|
||||||
|
|
||||||
|
not_supported = [
|
||||||
|
default,
|
||||||
|
encrypt_secret,
|
||||||
|
encrypt_backend,
|
||||||
|
encrypt_custom_backend,
|
||||||
|
]
|
||||||
|
if any(x is not None for x in not_supported):
|
||||||
|
raise ModelDefinitionError(
|
||||||
|
f"Argument {next((x for x in not_supported if x is not None))} "
|
||||||
|
f"is not supported "
|
||||||
|
"on relation fields!"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class UniqueColumns(UniqueConstraint):
|
class UniqueColumns(UniqueConstraint):
|
||||||
"""
|
"""
|
||||||
Subclass of sqlalchemy.UniqueConstraint.
|
Subclass of sqlalchemy.UniqueConstraint.
|
||||||
@ -184,24 +213,10 @@ def ForeignKey( # noqa CFQ002
|
|||||||
|
|
||||||
owner = kwargs.pop("owner", None)
|
owner = kwargs.pop("owner", None)
|
||||||
self_reference = kwargs.pop("self_reference", False)
|
self_reference = kwargs.pop("self_reference", False)
|
||||||
|
orders_by = kwargs.pop("orders_by", None)
|
||||||
|
related_orders_by = kwargs.pop("related_orders_by", None)
|
||||||
|
|
||||||
default = kwargs.pop("default", None)
|
validate_not_allowed_fields(kwargs)
|
||||||
encrypt_secret = kwargs.pop("encrypt_secret", None)
|
|
||||||
encrypt_backend = kwargs.pop("encrypt_backend", None)
|
|
||||||
encrypt_custom_backend = kwargs.pop("encrypt_custom_backend", None)
|
|
||||||
|
|
||||||
not_supported = [
|
|
||||||
default,
|
|
||||||
encrypt_secret,
|
|
||||||
encrypt_backend,
|
|
||||||
encrypt_custom_backend,
|
|
||||||
]
|
|
||||||
if any(x is not None for x in not_supported):
|
|
||||||
raise ModelDefinitionError(
|
|
||||||
f"Argument {next((x for x in not_supported if x is not None))} "
|
|
||||||
f"is not supported "
|
|
||||||
"on relation fields!"
|
|
||||||
)
|
|
||||||
|
|
||||||
if to.__class__ == ForwardRef:
|
if to.__class__ == ForwardRef:
|
||||||
__type__ = to if not nullable else Optional[to]
|
__type__ = to if not nullable else Optional[to]
|
||||||
@ -237,6 +252,8 @@ def ForeignKey( # noqa CFQ002
|
|||||||
owner=owner,
|
owner=owner,
|
||||||
self_reference=self_reference,
|
self_reference=self_reference,
|
||||||
is_relation=True,
|
is_relation=True,
|
||||||
|
orders_by=orders_by,
|
||||||
|
related_orders_by=related_orders_by,
|
||||||
)
|
)
|
||||||
|
|
||||||
return type("ForeignKey", (ForeignKeyField, BaseField), namespace)
|
return type("ForeignKey", (ForeignKeyField, BaseField), namespace)
|
||||||
|
|||||||
@ -5,7 +5,7 @@ from pydantic.typing import ForwardRef, evaluate_forwardref
|
|||||||
import ormar # noqa: I100
|
import ormar # noqa: I100
|
||||||
from ormar import ModelDefinitionError
|
from ormar import ModelDefinitionError
|
||||||
from ormar.fields import BaseField
|
from ormar.fields import BaseField
|
||||||
from ormar.fields.foreign_key import ForeignKeyField
|
from ormar.fields.foreign_key import ForeignKeyField, validate_not_allowed_fields
|
||||||
|
|
||||||
if TYPE_CHECKING: # pragma no cover
|
if TYPE_CHECKING: # pragma no cover
|
||||||
from ormar.models import Model
|
from ormar.models import Model
|
||||||
@ -93,26 +93,13 @@ def ManyToMany(
|
|||||||
nullable = kwargs.pop("nullable", True)
|
nullable = kwargs.pop("nullable", True)
|
||||||
owner = kwargs.pop("owner", None)
|
owner = kwargs.pop("owner", None)
|
||||||
self_reference = kwargs.pop("self_reference", False)
|
self_reference = kwargs.pop("self_reference", False)
|
||||||
|
orders_by = kwargs.pop("orders_by", None)
|
||||||
|
related_orders_by = kwargs.pop("related_orders_by", None)
|
||||||
|
|
||||||
if through is not None and through.__class__ != ForwardRef:
|
if through is not None and through.__class__ != ForwardRef:
|
||||||
forbid_through_relations(cast(Type["Model"], through))
|
forbid_through_relations(cast(Type["Model"], through))
|
||||||
|
|
||||||
default = kwargs.pop("default", None)
|
validate_not_allowed_fields(kwargs)
|
||||||
encrypt_secret = kwargs.pop("encrypt_secret", None)
|
|
||||||
encrypt_backend = kwargs.pop("encrypt_backend", None)
|
|
||||||
encrypt_custom_backend = kwargs.pop("encrypt_custom_backend", None)
|
|
||||||
|
|
||||||
not_supported = [
|
|
||||||
default,
|
|
||||||
encrypt_secret,
|
|
||||||
encrypt_backend,
|
|
||||||
encrypt_custom_backend,
|
|
||||||
]
|
|
||||||
if any(x is not None for x in not_supported):
|
|
||||||
raise ModelDefinitionError(
|
|
||||||
f"Argument {next((x for x in not_supported if x is not None))} "
|
|
||||||
f"is not supported "
|
|
||||||
"on relation fields!"
|
|
||||||
)
|
|
||||||
|
|
||||||
if to.__class__ == ForwardRef:
|
if to.__class__ == ForwardRef:
|
||||||
__type__ = to if not nullable else Optional[to]
|
__type__ = to if not nullable else Optional[to]
|
||||||
@ -141,6 +128,8 @@ def ManyToMany(
|
|||||||
self_reference=self_reference,
|
self_reference=self_reference,
|
||||||
is_relation=True,
|
is_relation=True,
|
||||||
is_multi=True,
|
is_multi=True,
|
||||||
|
orders_by=orders_by,
|
||||||
|
related_orders_by=related_orders_by,
|
||||||
)
|
)
|
||||||
|
|
||||||
return type("ManyToMany", (ManyToManyField, BaseField), namespace)
|
return type("ManyToMany", (ManyToManyField, BaseField), namespace)
|
||||||
|
|||||||
@ -51,6 +51,8 @@ def populate_default_options_values(
|
|||||||
new_model.Meta.model_fields = model_fields
|
new_model.Meta.model_fields = model_fields
|
||||||
if not hasattr(new_model.Meta, "abstract"):
|
if not hasattr(new_model.Meta, "abstract"):
|
||||||
new_model.Meta.abstract = False
|
new_model.Meta.abstract = False
|
||||||
|
if not hasattr(new_model.Meta, "orders_by"):
|
||||||
|
new_model.Meta.orders_by = []
|
||||||
|
|
||||||
if any(
|
if any(
|
||||||
is_field_an_forward_ref(field) for field in new_model.Meta.model_fields.values()
|
is_field_an_forward_ref(field) for field in new_model.Meta.model_fields.values()
|
||||||
|
|||||||
@ -110,6 +110,7 @@ def register_reverse_model_fields(model_field: Type["ForeignKeyField"]) -> None:
|
|||||||
owner=model_field.to,
|
owner=model_field.to,
|
||||||
self_reference=model_field.self_reference,
|
self_reference=model_field.self_reference,
|
||||||
self_reference_primary=model_field.self_reference_primary,
|
self_reference_primary=model_field.self_reference_primary,
|
||||||
|
orders_by=model_field.related_orders_by,
|
||||||
)
|
)
|
||||||
# register foreign keys on through model
|
# register foreign keys on through model
|
||||||
model_field = cast(Type["ManyToManyField"], model_field)
|
model_field = cast(Type["ManyToManyField"], model_field)
|
||||||
@ -123,6 +124,7 @@ def register_reverse_model_fields(model_field: Type["ForeignKeyField"]) -> None:
|
|||||||
related_name=model_field.name,
|
related_name=model_field.name,
|
||||||
owner=model_field.to,
|
owner=model_field.to,
|
||||||
self_reference=model_field.self_reference,
|
self_reference=model_field.self_reference,
|
||||||
|
orders_by=model_field.related_orders_by,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -252,6 +252,9 @@ def populate_meta_tablename_columns_and_pk(
|
|||||||
|
|
||||||
new_model.Meta.columns = columns
|
new_model.Meta.columns = columns
|
||||||
new_model.Meta.pkname = pkname
|
new_model.Meta.pkname = pkname
|
||||||
|
if not new_model.Meta.orders_by:
|
||||||
|
# by default we sort by pk name if other option not provided
|
||||||
|
new_model.Meta.orders_by.append(pkname)
|
||||||
return new_model
|
return new_model
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -71,6 +71,7 @@ class ModelMeta:
|
|||||||
signals: SignalEmitter
|
signals: SignalEmitter
|
||||||
abstract: bool
|
abstract: bool
|
||||||
requires_ref_update: bool
|
requires_ref_update: bool
|
||||||
|
orders_by: List[str]
|
||||||
|
|
||||||
|
|
||||||
def add_cached_properties(new_model: Type["Model"]) -> None:
|
def add_cached_properties(new_model: Type["Model"]) -> None:
|
||||||
@ -139,6 +140,10 @@ def register_signals(new_model: Type["Model"]) -> None: # noqa: CCR001
|
|||||||
signals.post_save = Signal()
|
signals.post_save = Signal()
|
||||||
signals.post_update = Signal()
|
signals.post_update = Signal()
|
||||||
signals.post_delete = Signal()
|
signals.post_delete = Signal()
|
||||||
|
signals.pre_relation_add = Signal()
|
||||||
|
signals.post_relation_add = Signal()
|
||||||
|
signals.pre_relation_remove = Signal()
|
||||||
|
signals.post_relation_remove = Signal()
|
||||||
new_model.Meta.signals = signals
|
new_model.Meta.signals = signals
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
import uuid
|
||||||
from typing import Dict, Optional, Set, TYPE_CHECKING
|
from typing import Dict, Optional, Set, TYPE_CHECKING
|
||||||
|
|
||||||
import ormar
|
import ormar
|
||||||
@ -55,6 +56,25 @@ class SavePrepareMixin(RelationMixin, AliasMixin):
|
|||||||
del new_kwargs[pkname]
|
del new_kwargs[pkname]
|
||||||
return new_kwargs
|
return new_kwargs
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse_non_db_fields(cls, model_dict: Dict) -> Dict:
|
||||||
|
"""
|
||||||
|
Receives dictionary of model that is about to be saved and changes uuid fields
|
||||||
|
to strings in bulk_update.
|
||||||
|
|
||||||
|
:param model_dict: dictionary of model that is about to be saved
|
||||||
|
:type model_dict: Dict
|
||||||
|
:return: dictionary of model that is about to be saved
|
||||||
|
:rtype: Dict
|
||||||
|
"""
|
||||||
|
for name, field in cls.Meta.model_fields.items():
|
||||||
|
if field.__type__ == uuid.UUID and name in model_dict:
|
||||||
|
parsers = {"string": lambda x: str(x), "hex": lambda x: "%.32x" % x.int}
|
||||||
|
uuid_format = field.column_type.uuid_format
|
||||||
|
parser = parsers.get(uuid_format, lambda x: x)
|
||||||
|
model_dict[name] = parser(model_dict[name])
|
||||||
|
return model_dict
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def substitute_models_with_pks(cls, model_dict: Dict) -> Dict: # noqa CCR001
|
def substitute_models_with_pks(cls, model_dict: Dict) -> Dict: # noqa CCR001
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -69,6 +69,7 @@ class Model(ModelRow):
|
|||||||
:return: saved Model
|
:return: saved Model
|
||||||
:rtype: Model
|
:rtype: Model
|
||||||
"""
|
"""
|
||||||
|
await self.signals.pre_save.send(sender=self.__class__, instance=self)
|
||||||
self_fields = self._extract_model_db_fields()
|
self_fields = self._extract_model_db_fields()
|
||||||
|
|
||||||
if not self.pk and self.Meta.model_fields[self.Meta.pkname].autoincrement:
|
if not self.pk and self.Meta.model_fields[self.Meta.pkname].autoincrement:
|
||||||
@ -82,8 +83,6 @@ class Model(ModelRow):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.signals.pre_save.send(sender=self.__class__, instance=self)
|
|
||||||
|
|
||||||
self_fields = self.translate_columns_to_aliases(self_fields)
|
self_fields = self.translate_columns_to_aliases(self_fields)
|
||||||
expr = self.Meta.table.insert()
|
expr = self.Meta.table.insert()
|
||||||
expr = expr.values(**self_fields)
|
expr = expr.values(**self_fields)
|
||||||
@ -216,7 +215,9 @@ class Model(ModelRow):
|
|||||||
"You cannot update not saved model! Use save or upsert method."
|
"You cannot update not saved model! Use save or upsert method."
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.signals.pre_update.send(sender=self.__class__, instance=self)
|
await self.signals.pre_update.send(
|
||||||
|
sender=self.__class__, instance=self, passed_args=kwargs
|
||||||
|
)
|
||||||
self_fields = self._extract_model_db_fields()
|
self_fields = self._extract_model_db_fields()
|
||||||
self_fields.pop(self.get_column_name_from_alias(self.Meta.pkname))
|
self_fields.pop(self.get_column_name_from_alias(self.Meta.pkname))
|
||||||
self_fields = self.translate_columns_to_aliases(self_fields)
|
self_fields = self.translate_columns_to_aliases(self_fields)
|
||||||
@ -273,7 +274,10 @@ class Model(ModelRow):
|
|||||||
return self
|
return self
|
||||||
|
|
||||||
async def load_all(
|
async def load_all(
|
||||||
self: T, follow: bool = False, exclude: Union[List, str, Set, Dict] = None
|
self: T,
|
||||||
|
follow: bool = False,
|
||||||
|
exclude: Union[List, str, Set, Dict] = None,
|
||||||
|
order_by: Union[List, str] = None,
|
||||||
) -> T:
|
) -> T:
|
||||||
"""
|
"""
|
||||||
Allow to refresh existing Models fields from database.
|
Allow to refresh existing Models fields from database.
|
||||||
@ -291,6 +295,8 @@ class Model(ModelRow):
|
|||||||
will load second Model A but will never follow into Model X.
|
will load second Model A but will never follow into Model X.
|
||||||
Nested relations of those kind need to be loaded manually.
|
Nested relations of those kind need to be loaded manually.
|
||||||
|
|
||||||
|
:param order_by: columns by which models should be sorted
|
||||||
|
:type order_by: Union[List, str]
|
||||||
:raises NoMatch: If given pk is not found in database.
|
:raises NoMatch: If given pk is not found in database.
|
||||||
|
|
||||||
:param exclude: related models to exclude
|
:param exclude: related models to exclude
|
||||||
@ -308,6 +314,8 @@ class Model(ModelRow):
|
|||||||
queryset = self.__class__.objects
|
queryset = self.__class__.objects
|
||||||
if exclude:
|
if exclude:
|
||||||
queryset = queryset.exclude_fields(exclude)
|
queryset = queryset.exclude_fields(exclude)
|
||||||
|
if order_by:
|
||||||
|
queryset = queryset.order_by(order_by)
|
||||||
instance = await queryset.select_related(relations).get(pk=self.pk)
|
instance = await queryset.select_related(relations).get(pk=self.pk)
|
||||||
self._orm.clear()
|
self._orm.clear()
|
||||||
self.update_from_dict(instance.dict())
|
self.update_from_dict(instance.dict())
|
||||||
|
|||||||
@ -216,6 +216,8 @@ class NewBaseModel(pydantic.BaseModel, ModelTableProxy, metaclass=ModelMetaclass
|
|||||||
)
|
)
|
||||||
if isinstance(object.__getattribute__(self, "__dict__").get(name), list):
|
if isinstance(object.__getattribute__(self, "__dict__").get(name), list):
|
||||||
# virtual foreign key or many to many
|
# virtual foreign key or many to many
|
||||||
|
# TODO: Fix double items in dict, no effect on real action ugly repr
|
||||||
|
# if model.pk not in [x.pk for x in related_list]:
|
||||||
object.__getattribute__(self, "__dict__")[name].append(model)
|
object.__getattribute__(self, "__dict__")[name].append(model)
|
||||||
else:
|
else:
|
||||||
# foreign key relation
|
# foreign key relation
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
"""
|
"""
|
||||||
Contains QuerySet and different Query classes to allow for constructing of sql queries.
|
Contains QuerySet and different Query classes to allow for constructing of sql queries.
|
||||||
"""
|
"""
|
||||||
from ormar.queryset.actions import FilterAction, OrderAction
|
from ormar.queryset.actions import FilterAction, OrderAction, SelectAction
|
||||||
from ormar.queryset.clause import and_, or_
|
from ormar.queryset.clause import and_, or_
|
||||||
from ormar.queryset.filter_query import FilterQuery
|
from ormar.queryset.filter_query import FilterQuery
|
||||||
from ormar.queryset.limit_query import LimitQuery
|
from ormar.queryset.limit_query import LimitQuery
|
||||||
@ -17,6 +17,7 @@ __all__ = [
|
|||||||
"OrderQuery",
|
"OrderQuery",
|
||||||
"FilterAction",
|
"FilterAction",
|
||||||
"OrderAction",
|
"OrderAction",
|
||||||
|
"SelectAction",
|
||||||
"and_",
|
"and_",
|
||||||
"or_",
|
"or_",
|
||||||
]
|
]
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
from ormar.queryset.actions.filter_action import FilterAction
|
from ormar.queryset.actions.filter_action import FilterAction
|
||||||
from ormar.queryset.actions.order_action import OrderAction
|
from ormar.queryset.actions.order_action import OrderAction
|
||||||
|
from ormar.queryset.actions.select_action import SelectAction
|
||||||
|
|
||||||
__all__ = ["FilterAction", "OrderAction"]
|
__all__ = ["FilterAction", "OrderAction", "SelectAction"]
|
||||||
|
|||||||
52
ormar/queryset/actions/select_action.py
Normal file
52
ormar/queryset/actions/select_action.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
import decimal
|
||||||
|
from typing import Any, Callable, TYPE_CHECKING, Type
|
||||||
|
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
from ormar.queryset.actions.query_action import QueryAction # noqa: I202
|
||||||
|
|
||||||
|
if TYPE_CHECKING: # pragma: no cover
|
||||||
|
from ormar import Model
|
||||||
|
|
||||||
|
|
||||||
|
class SelectAction(QueryAction):
|
||||||
|
"""
|
||||||
|
Order Actions is populated by queryset when order_by() is called.
|
||||||
|
|
||||||
|
All required params are extracted but kept raw until actual filter clause value
|
||||||
|
is required -> then the action is converted into text() clause.
|
||||||
|
|
||||||
|
Extracted in order to easily change table prefixes on complex relations.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, select_str: str, model_cls: Type["Model"], alias: str = None
|
||||||
|
) -> None:
|
||||||
|
super().__init__(query_str=select_str, model_cls=model_cls)
|
||||||
|
if alias: # pragma: no cover
|
||||||
|
self.table_prefix = alias
|
||||||
|
|
||||||
|
def _split_value_into_parts(self, order_str: str) -> None:
|
||||||
|
parts = order_str.split("__")
|
||||||
|
self.field_name = parts[-1]
|
||||||
|
self.related_parts = parts[:-1]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_numeric(self) -> bool:
|
||||||
|
return self.get_target_field_type() in [int, float, decimal.Decimal]
|
||||||
|
|
||||||
|
def get_target_field_type(self) -> Any:
|
||||||
|
return self.target_model.Meta.model_fields[self.field_name].__type__
|
||||||
|
|
||||||
|
def get_text_clause(self) -> sqlalchemy.sql.expression.TextClause:
|
||||||
|
alias = f"{self.table_prefix}_" if self.table_prefix else ""
|
||||||
|
return sqlalchemy.text(f"{alias}{self.field_name}")
|
||||||
|
|
||||||
|
def apply_func(
|
||||||
|
self, func: Callable, use_label: bool = True
|
||||||
|
) -> sqlalchemy.sql.expression.TextClause:
|
||||||
|
result = func(self.get_text_clause())
|
||||||
|
if use_label:
|
||||||
|
rel_prefix = f"{self.related_str}__" if self.related_str else ""
|
||||||
|
result = result.label(f"{rel_prefix}{self.field_name}")
|
||||||
|
return result
|
||||||
@ -1,22 +1,24 @@
|
|||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
Dict,
|
||||||
List,
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Tuple,
|
Tuple,
|
||||||
Type,
|
Type,
|
||||||
|
cast,
|
||||||
)
|
)
|
||||||
|
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
import ormar # noqa I100
|
import ormar # noqa I100
|
||||||
from ormar.exceptions import RelationshipInstanceError
|
from ormar.exceptions import ModelDefinitionError, RelationshipInstanceError
|
||||||
from ormar.relations import AliasManager
|
from ormar.relations import AliasManager
|
||||||
|
|
||||||
if TYPE_CHECKING: # pragma no cover
|
if TYPE_CHECKING: # pragma no cover
|
||||||
from ormar import Model
|
from ormar import Model, ManyToManyField
|
||||||
from ormar.queryset import OrderAction
|
from ormar.queryset import OrderAction
|
||||||
from ormar.models.excludable import ExcludableItems
|
from ormar.models.excludable import ExcludableItems
|
||||||
|
|
||||||
@ -36,14 +38,18 @@ class SqlJoin:
|
|||||||
related_models: Any = None,
|
related_models: Any = None,
|
||||||
own_alias: str = "",
|
own_alias: str = "",
|
||||||
source_model: Type["Model"] = None,
|
source_model: Type["Model"] = None,
|
||||||
|
already_sorted: Dict = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.relation_name = relation_name
|
self.relation_name = relation_name
|
||||||
self.related_models = related_models or []
|
self.related_models = related_models or []
|
||||||
self.select_from = select_from
|
self.select_from = select_from
|
||||||
self.columns = columns
|
self.columns = columns
|
||||||
self.excludable = excludable
|
self.excludable = excludable
|
||||||
|
|
||||||
self.order_columns = order_columns
|
self.order_columns = order_columns
|
||||||
self.sorted_orders = sorted_orders
|
self.sorted_orders = sorted_orders
|
||||||
|
self.already_sorted = already_sorted or dict()
|
||||||
|
|
||||||
self.main_model = main_model
|
self.main_model = main_model
|
||||||
self.own_alias = own_alias
|
self.own_alias = own_alias
|
||||||
self.used_aliases = used_aliases
|
self.used_aliases = used_aliases
|
||||||
@ -205,6 +211,7 @@ class SqlJoin:
|
|||||||
relation_str="__".join([self.relation_str, related_name]),
|
relation_str="__".join([self.relation_str, related_name]),
|
||||||
own_alias=self.next_alias,
|
own_alias=self.next_alias,
|
||||||
source_model=self.source_model or self.main_model,
|
source_model=self.source_model or self.main_model,
|
||||||
|
already_sorted=self.already_sorted,
|
||||||
)
|
)
|
||||||
(
|
(
|
||||||
self.used_aliases,
|
self.used_aliases,
|
||||||
@ -307,31 +314,84 @@ class SqlJoin:
|
|||||||
self.used_aliases.append(self.next_alias)
|
self.used_aliases.append(self.next_alias)
|
||||||
|
|
||||||
def _set_default_primary_key_order_by(self) -> None:
|
def _set_default_primary_key_order_by(self) -> None:
|
||||||
|
for order_by in self.next_model.Meta.orders_by:
|
||||||
clause = ormar.OrderAction(
|
clause = ormar.OrderAction(
|
||||||
order_str=self.next_model.Meta.pkname,
|
order_str=order_by, model_cls=self.next_model, alias=self.next_alias,
|
||||||
model_cls=self.next_model,
|
|
||||||
alias=self.next_alias,
|
|
||||||
)
|
)
|
||||||
self.sorted_orders[clause] = clause.get_text_clause()
|
self.sorted_orders[clause] = clause.get_text_clause()
|
||||||
|
|
||||||
|
def _verify_allowed_order_field(self, order_by: str) -> None:
|
||||||
|
"""
|
||||||
|
Verifies if proper field string is used.
|
||||||
|
:param order_by: string with order by definition
|
||||||
|
:type order_by: str
|
||||||
|
"""
|
||||||
|
parts = order_by.split("__")
|
||||||
|
if len(parts) > 2 or parts[0] != self.target_field.through.get_name():
|
||||||
|
raise ModelDefinitionError(
|
||||||
|
"You can order the relation only " "by related or link table columns!"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_alias_and_model(self, order_by: str) -> Tuple[str, Type["Model"]]:
|
||||||
|
"""
|
||||||
|
Returns proper model and alias to be applied in the clause.
|
||||||
|
|
||||||
|
:param order_by: string with order by definition
|
||||||
|
:type order_by: str
|
||||||
|
:return: alias and model to be used in clause
|
||||||
|
:rtype: Tuple[str, Type["Model"]]
|
||||||
|
"""
|
||||||
|
if self.target_field.is_multi and "__" in order_by:
|
||||||
|
self._verify_allowed_order_field(order_by=order_by)
|
||||||
|
alias = self.next_alias
|
||||||
|
model = self.target_field.owner
|
||||||
|
elif self.target_field.is_multi:
|
||||||
|
alias = self.alias_manager.resolve_relation_alias(
|
||||||
|
from_model=self.target_field.through,
|
||||||
|
relation_name=cast(
|
||||||
|
"ManyToManyField", self.target_field
|
||||||
|
).default_target_field_name(),
|
||||||
|
)
|
||||||
|
model = self.target_field.to
|
||||||
|
else:
|
||||||
|
alias = self.alias_manager.resolve_relation_alias(
|
||||||
|
from_model=self.target_field.owner,
|
||||||
|
relation_name=self.target_field.name,
|
||||||
|
)
|
||||||
|
model = self.target_field.to
|
||||||
|
|
||||||
|
return alias, model
|
||||||
|
|
||||||
def _get_order_bys(self) -> None: # noqa: CCR001
|
def _get_order_bys(self) -> None: # noqa: CCR001
|
||||||
"""
|
"""
|
||||||
Triggers construction of order bys if they are given.
|
Triggers construction of order bys if they are given.
|
||||||
Otherwise by default each table is sorted by a primary key column asc.
|
Otherwise by default each table is sorted by a primary key column asc.
|
||||||
"""
|
"""
|
||||||
alias = self.next_alias
|
alias = self.next_alias
|
||||||
if self.order_columns:
|
|
||||||
current_table_sorted = False
|
current_table_sorted = False
|
||||||
|
if f"{alias}_{self.next_model.get_name()}" in self.already_sorted:
|
||||||
|
current_table_sorted = True
|
||||||
|
if self.order_columns:
|
||||||
for condition in self.order_columns:
|
for condition in self.order_columns:
|
||||||
if condition.check_if_filter_apply(
|
if condition.check_if_filter_apply(
|
||||||
target_model=self.next_model, alias=alias
|
target_model=self.next_model, alias=alias
|
||||||
):
|
):
|
||||||
current_table_sorted = True
|
current_table_sorted = True
|
||||||
self.sorted_orders[condition] = condition.get_text_clause()
|
self.sorted_orders[condition] = condition.get_text_clause()
|
||||||
if not current_table_sorted and not self.target_field.is_multi:
|
self.already_sorted[
|
||||||
self._set_default_primary_key_order_by()
|
f"{self.next_alias}_{self.next_model.get_name()}"
|
||||||
|
] = condition
|
||||||
|
if self.target_field.orders_by and not current_table_sorted:
|
||||||
|
current_table_sorted = True
|
||||||
|
for order_by in self.target_field.orders_by:
|
||||||
|
alias, model = self._get_alias_and_model(order_by=order_by)
|
||||||
|
clause = ormar.OrderAction(
|
||||||
|
order_str=order_by, model_cls=model, alias=alias
|
||||||
|
)
|
||||||
|
self.sorted_orders[clause] = clause.get_text_clause()
|
||||||
|
self.already_sorted[f"{alias}_{model.get_name()}"] = clause
|
||||||
|
|
||||||
elif not self.target_field.is_multi:
|
if not current_table_sorted and not self.target_field.is_multi:
|
||||||
self._set_default_primary_key_order_by()
|
self._set_default_primary_key_order_by()
|
||||||
|
|
||||||
def _get_to_and_from_keys(self) -> Tuple[str, str]:
|
def _get_to_and_from_keys(self) -> Tuple[str, str]:
|
||||||
|
|||||||
@ -63,14 +63,23 @@ class Query:
|
|||||||
That way the subquery with limit and offset only on main model has proper
|
That way the subquery with limit and offset only on main model has proper
|
||||||
sorting applied and correct models are fetched.
|
sorting applied and correct models are fetched.
|
||||||
"""
|
"""
|
||||||
|
current_table_sorted = False
|
||||||
if self.order_columns:
|
if self.order_columns:
|
||||||
for clause in self.order_columns:
|
for clause in self.order_columns:
|
||||||
if clause.is_source_model_order:
|
if clause.is_source_model_order:
|
||||||
|
current_table_sorted = True
|
||||||
self.sorted_orders[clause] = clause.get_text_clause()
|
self.sorted_orders[clause] = clause.get_text_clause()
|
||||||
else:
|
|
||||||
clause = ormar.OrderAction(
|
if not current_table_sorted:
|
||||||
order_str=self.model_cls.Meta.pkname, model_cls=self.model_cls
|
self._apply_default_model_sorting()
|
||||||
)
|
|
||||||
|
def _apply_default_model_sorting(self) -> None:
|
||||||
|
"""
|
||||||
|
Applies orders_by from model Meta class (if provided), if it was not provided
|
||||||
|
it was filled by metaclass so it's always there and falls back to pk column
|
||||||
|
"""
|
||||||
|
for order_by in self.model_cls.Meta.orders_by:
|
||||||
|
clause = ormar.OrderAction(order_str=order_by, model_cls=self.model_cls)
|
||||||
self.sorted_orders[clause] = clause.get_text_clause()
|
self.sorted_orders[clause] = clause.get_text_clause()
|
||||||
|
|
||||||
def _pagination_query_required(self) -> bool:
|
def _pagination_query_required(self) -> bool:
|
||||||
|
|||||||
@ -18,7 +18,7 @@ from sqlalchemy import bindparam
|
|||||||
import ormar # noqa I100
|
import ormar # noqa I100
|
||||||
from ormar import MultipleMatches, NoMatch
|
from ormar import MultipleMatches, NoMatch
|
||||||
from ormar.exceptions import ModelError, ModelPersistenceError, QueryDefinitionError
|
from ormar.exceptions import ModelError, ModelPersistenceError, QueryDefinitionError
|
||||||
from ormar.queryset import FilterQuery
|
from ormar.queryset import FilterQuery, SelectAction
|
||||||
from ormar.queryset.actions.order_action import OrderAction
|
from ormar.queryset.actions.order_action import OrderAction
|
||||||
from ormar.queryset.clause import FilterGroup, QueryClause
|
from ormar.queryset.clause import FilterGroup, QueryClause
|
||||||
from ormar.queryset.prefetch_query import PrefetchQuery
|
from ormar.queryset.prefetch_query import PrefetchQuery
|
||||||
@ -557,6 +557,71 @@ class QuerySet:
|
|||||||
expr = sqlalchemy.func.count().select().select_from(expr)
|
expr = sqlalchemy.func.count().select().select_from(expr)
|
||||||
return await self.database.fetch_val(expr)
|
return await self.database.fetch_val(expr)
|
||||||
|
|
||||||
|
async def _query_aggr_function(self, func_name: str, columns: List) -> Any:
|
||||||
|
func = getattr(sqlalchemy.func, func_name)
|
||||||
|
select_actions = [
|
||||||
|
SelectAction(select_str=column, model_cls=self.model) for column in columns
|
||||||
|
]
|
||||||
|
if func_name in ["sum", "avg"]:
|
||||||
|
if any(not x.is_numeric for x in select_actions):
|
||||||
|
raise QueryDefinitionError(
|
||||||
|
"You can use sum and svg only with" "numeric types of columns"
|
||||||
|
)
|
||||||
|
select_columns = [x.apply_func(func, use_label=True) for x in select_actions]
|
||||||
|
expr = self.build_select_expression().alias(f"subquery_for_{func_name}")
|
||||||
|
expr = sqlalchemy.select(select_columns).select_from(expr)
|
||||||
|
# print("\n", expr.compile(compile_kwargs={"literal_binds": True}))
|
||||||
|
result = await self.database.fetch_one(expr)
|
||||||
|
return dict(result) if len(result) > 1 else result[0] # type: ignore
|
||||||
|
|
||||||
|
async def max(self, columns: Union[str, List[str]]) -> Any: # noqa: A003
|
||||||
|
"""
|
||||||
|
Returns max value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: max value of column(s)
|
||||||
|
:rtype: Any
|
||||||
|
"""
|
||||||
|
if not isinstance(columns, list):
|
||||||
|
columns = [columns]
|
||||||
|
return await self._query_aggr_function(func_name="max", columns=columns)
|
||||||
|
|
||||||
|
async def min(self, columns: Union[str, List[str]]) -> Any: # noqa: A003
|
||||||
|
"""
|
||||||
|
Returns min value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: min value of column(s)
|
||||||
|
:rtype: Any
|
||||||
|
"""
|
||||||
|
if not isinstance(columns, list):
|
||||||
|
columns = [columns]
|
||||||
|
return await self._query_aggr_function(func_name="min", columns=columns)
|
||||||
|
|
||||||
|
async def sum(self, columns: Union[str, List[str]]) -> Any: # noqa: A003
|
||||||
|
"""
|
||||||
|
Returns sum value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: sum value of columns
|
||||||
|
:rtype: int
|
||||||
|
"""
|
||||||
|
if not isinstance(columns, list):
|
||||||
|
columns = [columns]
|
||||||
|
return await self._query_aggr_function(func_name="sum", columns=columns)
|
||||||
|
|
||||||
|
async def avg(self, columns: Union[str, List[str]]) -> Any:
|
||||||
|
"""
|
||||||
|
Returns avg value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: avg value of columns
|
||||||
|
:rtype: Union[int, float, List]
|
||||||
|
"""
|
||||||
|
if not isinstance(columns, list):
|
||||||
|
columns = [columns]
|
||||||
|
return await self._query_aggr_function(func_name="avg", columns=columns)
|
||||||
|
|
||||||
async def update(self, each: bool = False, **kwargs: Any) -> int:
|
async def update(self, each: bool = False, **kwargs: Any) -> int:
|
||||||
"""
|
"""
|
||||||
Updates the model table after applying the filters from kwargs.
|
Updates the model table after applying the filters from kwargs.
|
||||||
@ -773,7 +838,7 @@ class QuerySet:
|
|||||||
model = await self.get(pk=kwargs[pk_name])
|
model = await self.get(pk=kwargs[pk_name])
|
||||||
return await model.update(**kwargs)
|
return await model.update(**kwargs)
|
||||||
|
|
||||||
async def all(self, **kwargs: Any) -> Sequence[Optional["Model"]]: # noqa: A003
|
async def all(self, **kwargs: Any) -> List[Optional["Model"]]: # noqa: A003
|
||||||
"""
|
"""
|
||||||
Returns all rows from a database for given model for set filter options.
|
Returns all rows from a database for given model for set filter options.
|
||||||
|
|
||||||
@ -906,6 +971,7 @@ class QuerySet:
|
|||||||
"You cannot update unsaved objects. "
|
"You cannot update unsaved objects. "
|
||||||
f"{self.model.__name__} has to have {pk_name} filled."
|
f"{self.model.__name__} has to have {pk_name} filled."
|
||||||
)
|
)
|
||||||
|
new_kwargs = self.model.parse_non_db_fields(new_kwargs)
|
||||||
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
new_kwargs = self.model.substitute_models_with_pks(new_kwargs)
|
||||||
new_kwargs = self.model.translate_columns_to_aliases(new_kwargs)
|
new_kwargs = self.model.translate_columns_to_aliases(new_kwargs)
|
||||||
new_kwargs = {"new_" + k: v for k, v in new_kwargs.items() if k in columns}
|
new_kwargs = {"new_" + k: v for k, v in new_kwargs.items() if k in columns}
|
||||||
|
|||||||
@ -12,7 +12,8 @@ from typing import ( # noqa: I100, I201
|
|||||||
cast,
|
cast,
|
||||||
)
|
)
|
||||||
|
|
||||||
import ormar
|
|
||||||
|
import ormar # noqa: I100, I202
|
||||||
from ormar.exceptions import ModelPersistenceError, QueryDefinitionError
|
from ormar.exceptions import ModelPersistenceError, QueryDefinitionError
|
||||||
|
|
||||||
if TYPE_CHECKING: # pragma no cover
|
if TYPE_CHECKING: # pragma no cover
|
||||||
@ -126,10 +127,7 @@ class QuerysetProxy:
|
|||||||
f"model without primary key set! \n"
|
f"model without primary key set! \n"
|
||||||
f"Save the child model first."
|
f"Save the child model first."
|
||||||
)
|
)
|
||||||
expr = model_cls.Meta.table.insert()
|
await model_cls(**final_kwargs).save()
|
||||||
expr = expr.values(**final_kwargs)
|
|
||||||
# print("\n", expr.compile(compile_kwargs={"literal_binds": True}))
|
|
||||||
await model_cls.Meta.database.execute(expr)
|
|
||||||
|
|
||||||
async def update_through_instance(self, child: "Model", **kwargs: Any) -> None:
|
async def update_through_instance(self, child: "Model", **kwargs: Any) -> None:
|
||||||
"""
|
"""
|
||||||
@ -185,6 +183,46 @@ class QuerysetProxy:
|
|||||||
"""
|
"""
|
||||||
return await self.queryset.count()
|
return await self.queryset.count()
|
||||||
|
|
||||||
|
async def max(self, columns: Union[str, List[str]]) -> Any: # noqa: A003
|
||||||
|
"""
|
||||||
|
Returns max value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: max value of column(s)
|
||||||
|
:rtype: Any
|
||||||
|
"""
|
||||||
|
return await self.queryset.max(columns=columns)
|
||||||
|
|
||||||
|
async def min(self, columns: Union[str, List[str]]) -> Any: # noqa: A003
|
||||||
|
"""
|
||||||
|
Returns min value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: min value of column(s)
|
||||||
|
:rtype: Any
|
||||||
|
"""
|
||||||
|
return await self.queryset.min(columns=columns)
|
||||||
|
|
||||||
|
async def sum(self, columns: Union[str, List[str]]) -> Any: # noqa: A003
|
||||||
|
"""
|
||||||
|
Returns sum value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: sum value of columns
|
||||||
|
:rtype: int
|
||||||
|
"""
|
||||||
|
return await self.queryset.sum(columns=columns)
|
||||||
|
|
||||||
|
async def avg(self, columns: Union[str, List[str]]) -> Any:
|
||||||
|
"""
|
||||||
|
Returns avg value of columns for rows matching the given criteria
|
||||||
|
(applied with `filter` and `exclude` if set before).
|
||||||
|
|
||||||
|
:return: avg value of columns
|
||||||
|
:rtype: Union[int, float, List]
|
||||||
|
"""
|
||||||
|
return await self.queryset.avg(columns=columns)
|
||||||
|
|
||||||
async def clear(self, keep_reversed: bool = True) -> int:
|
async def clear(self, keep_reversed: bool = True) -> int:
|
||||||
"""
|
"""
|
||||||
Removes all related models from given relation.
|
Removes all related models from given relation.
|
||||||
|
|||||||
@ -152,6 +152,12 @@ class RelationProxy(list):
|
|||||||
f"Object {self._owner.get_name()} has no "
|
f"Object {self._owner.get_name()} has no "
|
||||||
f"{item.get_name()} with given primary key!"
|
f"{item.get_name()} with given primary key!"
|
||||||
)
|
)
|
||||||
|
await self._owner.signals.pre_relation_remove.send(
|
||||||
|
sender=self._owner.__class__,
|
||||||
|
instance=self._owner,
|
||||||
|
child=item,
|
||||||
|
relation_name=self.field_name,
|
||||||
|
)
|
||||||
super().remove(item)
|
super().remove(item)
|
||||||
relation_name = self.related_field_name
|
relation_name = self.related_field_name
|
||||||
relation = item._orm._get(relation_name)
|
relation = item._orm._get(relation_name)
|
||||||
@ -169,6 +175,12 @@ class RelationProxy(list):
|
|||||||
await item.update()
|
await item.update()
|
||||||
else:
|
else:
|
||||||
await item.delete()
|
await item.delete()
|
||||||
|
await self._owner.signals.post_relation_remove.send(
|
||||||
|
sender=self._owner.__class__,
|
||||||
|
instance=self._owner,
|
||||||
|
child=item,
|
||||||
|
relation_name=self.field_name,
|
||||||
|
)
|
||||||
|
|
||||||
async def add(self, item: "Model", **kwargs: Any) -> None:
|
async def add(self, item: "Model", **kwargs: Any) -> None:
|
||||||
"""
|
"""
|
||||||
@ -182,6 +194,13 @@ class RelationProxy(list):
|
|||||||
:type item: Model
|
:type item: Model
|
||||||
"""
|
"""
|
||||||
relation_name = self.related_field_name
|
relation_name = self.related_field_name
|
||||||
|
await self._owner.signals.pre_relation_add.send(
|
||||||
|
sender=self._owner.__class__,
|
||||||
|
instance=self._owner,
|
||||||
|
child=item,
|
||||||
|
relation_name=self.field_name,
|
||||||
|
passed_kwargs=kwargs,
|
||||||
|
)
|
||||||
self._check_if_model_saved()
|
self._check_if_model_saved()
|
||||||
if self.type_ == ormar.RelationType.MULTIPLE:
|
if self.type_ == ormar.RelationType.MULTIPLE:
|
||||||
await self.queryset_proxy.create_through_instance(item, **kwargs)
|
await self.queryset_proxy.create_through_instance(item, **kwargs)
|
||||||
@ -189,3 +208,10 @@ class RelationProxy(list):
|
|||||||
else:
|
else:
|
||||||
setattr(item, relation_name, self._owner)
|
setattr(item, relation_name, self._owner)
|
||||||
await item.update()
|
await item.update()
|
||||||
|
await self._owner.signals.post_relation_add.send(
|
||||||
|
sender=self._owner.__class__,
|
||||||
|
instance=self._owner,
|
||||||
|
child=item,
|
||||||
|
relation_name=self.field_name,
|
||||||
|
passed_kwargs=kwargs,
|
||||||
|
)
|
||||||
|
|||||||
@ -1,9 +1,9 @@
|
|||||||
databases[sqlite]
|
databases[sqlite]>=0.3.2,<=0.4.1
|
||||||
databases[postgresql]
|
databases[postgresql]>=0.3.2,<=0.4.1
|
||||||
databases[mysql]
|
databases[mysql]>=0.3.2,<=0.4.1
|
||||||
pydantic
|
pydantic>=1.6.1,<=1.8
|
||||||
sqlalchemy
|
sqlalchemy>=1.3.18,<=1.3.23
|
||||||
typing_extensions
|
typing_extensions>=3.7,<=3.7.4.3
|
||||||
orjson
|
orjson
|
||||||
cryptography
|
cryptography
|
||||||
|
|
||||||
|
|||||||
177
tests/test_aggr_functions.py
Normal file
177
tests/test_aggr_functions.py
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import databases
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from ormar.exceptions import QueryDefinitionError
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
order_by = ["-name"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Book(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "books"
|
||||||
|
order_by = ["year", "-ranking"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
author: Optional[Author] = ormar.ForeignKey(Author)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
year: int = ormar.Integer(nullable=True)
|
||||||
|
ranking: int = ormar.Integer(nullable=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="module")
|
||||||
|
def create_test_database():
|
||||||
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
metadata.create_all(engine)
|
||||||
|
yield
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="function")
|
||||||
|
async def cleanup():
|
||||||
|
yield
|
||||||
|
async with database:
|
||||||
|
await Book.objects.delete(each=True)
|
||||||
|
await Author.objects.delete(each=True)
|
||||||
|
|
||||||
|
|
||||||
|
async def sample_data():
|
||||||
|
author = await Author(name="Author 1").save()
|
||||||
|
await Book(title="Book 1", year=1920, ranking=3, author=author).save()
|
||||||
|
await Book(title="Book 2", year=1930, ranking=1, author=author).save()
|
||||||
|
await Book(title="Book 3", year=1923, ranking=5, author=author).save()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_min_method():
|
||||||
|
async with database:
|
||||||
|
await sample_data()
|
||||||
|
assert await Book.objects.min("year") == 1920
|
||||||
|
result = await Book.objects.min(["year", "ranking"])
|
||||||
|
assert result == dict(year=1920, ranking=1)
|
||||||
|
|
||||||
|
assert await Book.objects.min("title") == "Book 1"
|
||||||
|
|
||||||
|
assert await Author.objects.select_related("books").min("books__year") == 1920
|
||||||
|
result = await Author.objects.select_related("books").min(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert result == dict(books__year=1920, books__ranking=1)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__gt=1925)
|
||||||
|
.min("books__year")
|
||||||
|
== 1930
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_max_method():
|
||||||
|
async with database:
|
||||||
|
await sample_data()
|
||||||
|
assert await Book.objects.max("year") == 1930
|
||||||
|
result = await Book.objects.max(["year", "ranking"])
|
||||||
|
assert result == dict(year=1930, ranking=5)
|
||||||
|
|
||||||
|
assert await Book.objects.max("title") == "Book 3"
|
||||||
|
|
||||||
|
assert await Author.objects.select_related("books").max("books__year") == 1930
|
||||||
|
result = await Author.objects.select_related("books").max(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert result == dict(books__year=1930, books__ranking=5)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__lt=1925)
|
||||||
|
.max("books__year")
|
||||||
|
== 1923
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_sum_method():
|
||||||
|
async with database:
|
||||||
|
await sample_data()
|
||||||
|
assert await Book.objects.sum("year") == 5773
|
||||||
|
result = await Book.objects.sum(["year", "ranking"])
|
||||||
|
assert result == dict(year=5773, ranking=9)
|
||||||
|
|
||||||
|
with pytest.raises(QueryDefinitionError):
|
||||||
|
await Book.objects.sum("title")
|
||||||
|
|
||||||
|
assert await Author.objects.select_related("books").sum("books__year") == 5773
|
||||||
|
result = await Author.objects.select_related("books").sum(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert result == dict(books__year=5773, books__ranking=9)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__lt=1925)
|
||||||
|
.sum("books__year")
|
||||||
|
== 3843
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_avg_method():
|
||||||
|
async with database:
|
||||||
|
await sample_data()
|
||||||
|
assert round(float(await Book.objects.avg("year")), 2) == 1924.33
|
||||||
|
result = await Book.objects.avg(["year", "ranking"])
|
||||||
|
assert round(float(result.get("year")), 2) == 1924.33
|
||||||
|
assert result.get("ranking") == 3.0
|
||||||
|
|
||||||
|
with pytest.raises(QueryDefinitionError):
|
||||||
|
await Book.objects.avg("title")
|
||||||
|
|
||||||
|
result = await Author.objects.select_related("books").avg("books__year")
|
||||||
|
assert round(float(result), 2) == 1924.33
|
||||||
|
result = await Author.objects.select_related("books").avg(
|
||||||
|
["books__year", "books__ranking"]
|
||||||
|
)
|
||||||
|
assert round(float(result.get("books__year")), 2) == 1924.33
|
||||||
|
assert result.get("books__ranking") == 3.0
|
||||||
|
|
||||||
|
assert (
|
||||||
|
await Author.objects.select_related("books")
|
||||||
|
.filter(books__year__lt=1925)
|
||||||
|
.avg("books__year")
|
||||||
|
== 1921.5
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_queryset_method():
|
||||||
|
async with database:
|
||||||
|
await sample_data()
|
||||||
|
author = await Author.objects.select_related("books").get()
|
||||||
|
assert await author.books.min("year") == 1920
|
||||||
|
assert await author.books.max("year") == 1930
|
||||||
|
assert await author.books.sum("ranking") == 9
|
||||||
|
assert await author.books.avg("ranking") == 3.0
|
||||||
|
assert await author.books.max(["year", "title"]) == dict(
|
||||||
|
year=1930, title="Book 3"
|
||||||
|
)
|
||||||
@ -1,5 +1,4 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import os
|
|
||||||
|
|
||||||
import databases
|
import databases
|
||||||
import pydantic
|
import pydantic
|
||||||
|
|||||||
115
tests/test_default_model_order.py
Normal file
115
tests/test_default_model_order.py
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import databases
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
orders_by = ["-name"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Book(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "books"
|
||||||
|
orders_by = ["year", "-ranking"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
author: Optional[Author] = ormar.ForeignKey(Author)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
year: int = ormar.Integer(nullable=True)
|
||||||
|
ranking: int = ormar.Integer(nullable=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="module")
|
||||||
|
def create_test_database():
|
||||||
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
metadata.create_all(engine)
|
||||||
|
yield
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="function")
|
||||||
|
async def cleanup():
|
||||||
|
yield
|
||||||
|
async with database:
|
||||||
|
await Book.objects.delete(each=True)
|
||||||
|
await Author.objects.delete(each=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied():
|
||||||
|
async with database:
|
||||||
|
tolkien = await Author(name="J.R.R. Tolkien").save()
|
||||||
|
sapkowski = await Author(name="Andrzej Sapkowski").save()
|
||||||
|
king = await Author(name="Stephen King").save()
|
||||||
|
lewis = await Author(name="C.S Lewis").save()
|
||||||
|
|
||||||
|
authors = await Author.objects.all()
|
||||||
|
assert authors[0] == king
|
||||||
|
assert authors[1] == tolkien
|
||||||
|
assert authors[2] == lewis
|
||||||
|
assert authors[3] == sapkowski
|
||||||
|
|
||||||
|
authors = await Author.objects.order_by("name").all()
|
||||||
|
assert authors[3] == king
|
||||||
|
assert authors[2] == tolkien
|
||||||
|
assert authors[1] == lewis
|
||||||
|
assert authors[0] == sapkowski
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied_on_related():
|
||||||
|
async with database:
|
||||||
|
tolkien = await Author(name="J.R.R. Tolkien").save()
|
||||||
|
silmarillion = await Book(
|
||||||
|
author=tolkien, title="The Silmarillion", year=1977
|
||||||
|
).save()
|
||||||
|
lotr = await Book(
|
||||||
|
author=tolkien, title="The Lord of the Rings", year=1955
|
||||||
|
).save()
|
||||||
|
hobbit = await Book(author=tolkien, title="The Hobbit", year=1933).save()
|
||||||
|
|
||||||
|
await tolkien.books.all()
|
||||||
|
assert tolkien.books[0] == hobbit
|
||||||
|
assert tolkien.books[1] == lotr
|
||||||
|
assert tolkien.books[2] == silmarillion
|
||||||
|
|
||||||
|
await tolkien.books.order_by("-title").all()
|
||||||
|
assert tolkien.books[2] == hobbit
|
||||||
|
assert tolkien.books[1] == lotr
|
||||||
|
assert tolkien.books[0] == silmarillion
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied_on_related_two_fields():
|
||||||
|
async with database:
|
||||||
|
sanders = await Author(name="Brandon Sanderson").save()
|
||||||
|
twok = await Book(
|
||||||
|
author=sanders, title="The Way of Kings", year=2010, ranking=10
|
||||||
|
).save()
|
||||||
|
bret = await Author(name="Peter V. Bret").save()
|
||||||
|
tds = await Book(
|
||||||
|
author=bret, title="The Desert Spear", year=2010, ranking=9
|
||||||
|
).save()
|
||||||
|
|
||||||
|
books = await Book.objects.all()
|
||||||
|
assert books[0] == twok
|
||||||
|
assert books[1] == tds
|
||||||
151
tests/test_default_relation_order.py
Normal file
151
tests/test_default_relation_order.py
Normal file
@ -0,0 +1,151 @@
|
|||||||
|
from typing import List, Optional
|
||||||
|
from uuid import UUID, uuid4
|
||||||
|
|
||||||
|
import databases
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Book(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "books"
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
author: Optional[Author] = ormar.ForeignKey(
|
||||||
|
Author, orders_by=["name"], related_orders_by=["-year"]
|
||||||
|
)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
year: int = ormar.Integer(nullable=True)
|
||||||
|
ranking: int = ormar.Integer(nullable=True)
|
||||||
|
|
||||||
|
|
||||||
|
class Animal(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "animals"
|
||||||
|
|
||||||
|
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
|
||||||
|
name: str = ormar.String(max_length=200)
|
||||||
|
specie: str = ormar.String(max_length=200)
|
||||||
|
|
||||||
|
|
||||||
|
class Human(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "humans"
|
||||||
|
|
||||||
|
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
|
||||||
|
name: str = ormar.Text(default="")
|
||||||
|
pets: List[Animal] = ormar.ManyToMany(
|
||||||
|
Animal,
|
||||||
|
related_name="care_takers",
|
||||||
|
orders_by=["specie", "-name"],
|
||||||
|
related_orders_by=["name"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="module")
|
||||||
|
def create_test_database():
|
||||||
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
metadata.create_all(engine)
|
||||||
|
yield
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="function")
|
||||||
|
async def cleanup():
|
||||||
|
yield
|
||||||
|
async with database:
|
||||||
|
await Book.objects.delete(each=True)
|
||||||
|
await Author.objects.delete(each=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied_from_reverse_relation():
|
||||||
|
async with database:
|
||||||
|
tolkien = await Author(name="J.R.R. Tolkien").save()
|
||||||
|
hobbit = await Book(author=tolkien, title="The Hobbit", year=1933).save()
|
||||||
|
silmarillion = await Book(
|
||||||
|
author=tolkien, title="The Silmarillion", year=1977
|
||||||
|
).save()
|
||||||
|
lotr = await Book(
|
||||||
|
author=tolkien, title="The Lord of the Rings", year=1955
|
||||||
|
).save()
|
||||||
|
|
||||||
|
tolkien = await Author.objects.select_related("books").get()
|
||||||
|
assert tolkien.books[2] == hobbit
|
||||||
|
assert tolkien.books[1] == lotr
|
||||||
|
assert tolkien.books[0] == silmarillion
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied_from_relation():
|
||||||
|
async with database:
|
||||||
|
bret = await Author(name="Peter V. Bret").save()
|
||||||
|
tds = await Book(
|
||||||
|
author=bret, title="The Desert Spear", year=2010, ranking=9
|
||||||
|
).save()
|
||||||
|
sanders = await Author(name="Brandon Sanderson").save()
|
||||||
|
twok = await Book(
|
||||||
|
author=sanders, title="The Way of Kings", year=2010, ranking=10
|
||||||
|
).save()
|
||||||
|
|
||||||
|
books = await Book.objects.order_by("year").select_related("author").all()
|
||||||
|
assert books[0] == twok
|
||||||
|
assert books[1] == tds
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied_from_relation_on_m2m():
|
||||||
|
async with database:
|
||||||
|
alice = await Human(name="Alice").save()
|
||||||
|
|
||||||
|
spot = await Animal(name="Spot", specie="Cat").save()
|
||||||
|
zkitty = await Animal(name="ZKitty", specie="Cat").save()
|
||||||
|
noodle = await Animal(name="Noodle", specie="Anaconda").save()
|
||||||
|
|
||||||
|
await alice.pets.add(noodle)
|
||||||
|
await alice.pets.add(spot)
|
||||||
|
await alice.pets.add(zkitty)
|
||||||
|
|
||||||
|
await alice.load_all()
|
||||||
|
assert alice.pets[0] == noodle
|
||||||
|
assert alice.pets[1] == zkitty
|
||||||
|
assert alice.pets[2] == spot
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied_from_reverse_relation_on_m2m():
|
||||||
|
async with database:
|
||||||
|
|
||||||
|
max = await Animal(name="Max", specie="Dog").save()
|
||||||
|
joe = await Human(name="Joe").save()
|
||||||
|
zack = await Human(name="Zack").save()
|
||||||
|
julia = await Human(name="Julia").save()
|
||||||
|
|
||||||
|
await max.care_takers.add(joe)
|
||||||
|
await max.care_takers.add(zack)
|
||||||
|
await max.care_takers.add(julia)
|
||||||
|
|
||||||
|
await max.load_all()
|
||||||
|
assert max.care_takers[0] == joe
|
||||||
|
assert max.care_takers[1] == julia
|
||||||
|
assert max.care_takers[2] == zack
|
||||||
332
tests/test_default_through_relation_order.py
Normal file
332
tests/test_default_through_relation_order.py
Normal file
@ -0,0 +1,332 @@
|
|||||||
|
from typing import Any, Dict, List, Tuple, Type, cast
|
||||||
|
from uuid import UUID, uuid4
|
||||||
|
|
||||||
|
import databases
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from ormar import ModelDefinitionError, Model, QuerySet, pre_relation_remove, pre_update
|
||||||
|
from ormar import pre_save
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Animal(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "animals"
|
||||||
|
|
||||||
|
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
|
||||||
|
name: str = ormar.Text(default="")
|
||||||
|
# favoriteHumans
|
||||||
|
|
||||||
|
|
||||||
|
class Link(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "link_table"
|
||||||
|
|
||||||
|
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
|
||||||
|
animal_order: int = ormar.Integer(nullable=True)
|
||||||
|
human_order: int = ormar.Integer(nullable=True)
|
||||||
|
|
||||||
|
|
||||||
|
class Human(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "humans"
|
||||||
|
|
||||||
|
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
|
||||||
|
name: str = ormar.Text(default="")
|
||||||
|
favoriteAnimals: List[Animal] = ormar.ManyToMany(
|
||||||
|
Animal,
|
||||||
|
through=Link,
|
||||||
|
related_name="favoriteHumans",
|
||||||
|
orders_by=["link__animal_order"],
|
||||||
|
related_orders_by=["link__human_order"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Human2(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "humans2"
|
||||||
|
|
||||||
|
id: UUID = ormar.UUID(primary_key=True, default=uuid4)
|
||||||
|
name: str = ormar.Text(default="")
|
||||||
|
favoriteAnimals: List[Animal] = ormar.ManyToMany(
|
||||||
|
Animal, related_name="favoriteHumans2", orders_by=["link__animal_order__fail"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="module")
|
||||||
|
def create_test_database():
|
||||||
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
metadata.create_all(engine)
|
||||||
|
yield
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ordering_by_through_fail():
|
||||||
|
async with database:
|
||||||
|
alice = await Human2(name="Alice").save()
|
||||||
|
spot = await Animal(name="Spot").save()
|
||||||
|
await alice.favoriteAnimals.add(spot)
|
||||||
|
with pytest.raises(ModelDefinitionError):
|
||||||
|
await alice.load_all()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_filtered_query(
|
||||||
|
sender: Type[Model], instance: Model, to_class: Type[Model]
|
||||||
|
) -> QuerySet:
|
||||||
|
"""
|
||||||
|
Helper function.
|
||||||
|
Gets the query filtered by the appropriate class name.
|
||||||
|
"""
|
||||||
|
pk = getattr(instance, f"{to_class.get_name()}").pk
|
||||||
|
filter_kwargs = {f"{to_class.get_name()}": pk}
|
||||||
|
query = sender.objects.filter(**filter_kwargs)
|
||||||
|
return query
|
||||||
|
|
||||||
|
|
||||||
|
def _get_through_model_relations(
|
||||||
|
sender: Type[Model], instance: Model
|
||||||
|
) -> Tuple[Type[Model], Type[Model]]:
|
||||||
|
relations = list(instance.extract_related_names())
|
||||||
|
rel_one = sender.Meta.model_fields[relations[0]].to
|
||||||
|
rel_two = sender.Meta.model_fields[relations[1]].to
|
||||||
|
return rel_one, rel_two
|
||||||
|
|
||||||
|
|
||||||
|
async def _populate_order_on_insert(
|
||||||
|
sender: Type[Model], instance: Model, from_class: Type[Model], to_class: Type[Model]
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Helper function.
|
||||||
|
|
||||||
|
Get max values from database for both orders and adds 1 (0 if max is None) if the
|
||||||
|
order is not provided. If the order is provided it reorders the existing links
|
||||||
|
to match the newly defined order.
|
||||||
|
|
||||||
|
Assumes names f"{model.get_name()}_order" like for Animal: animal_order.
|
||||||
|
"""
|
||||||
|
order_column = f"{from_class.get_name()}_order"
|
||||||
|
if getattr(instance, order_column) is None:
|
||||||
|
query = _get_filtered_query(sender, instance, to_class)
|
||||||
|
max_order = await query.max(order_column)
|
||||||
|
max_order = max_order + 1 if max_order is not None else 0
|
||||||
|
setattr(instance, order_column, max_order)
|
||||||
|
else:
|
||||||
|
await _reorder_on_update(
|
||||||
|
sender=sender,
|
||||||
|
instance=instance,
|
||||||
|
from_class=from_class,
|
||||||
|
to_class=to_class,
|
||||||
|
passed_args={order_column: getattr(instance, order_column)},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _reorder_on_update(
|
||||||
|
sender: Type[Model],
|
||||||
|
instance: Model,
|
||||||
|
from_class: Type[Model],
|
||||||
|
to_class: Type[Model],
|
||||||
|
passed_args: Dict,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Helper function.
|
||||||
|
Actually reorders links by given order passed in add/update query to the link
|
||||||
|
model.
|
||||||
|
|
||||||
|
Assumes names f"{model.get_name()}_order" like for Animal: animal_order.
|
||||||
|
"""
|
||||||
|
order = f"{from_class.get_name()}_order"
|
||||||
|
if order in passed_args:
|
||||||
|
query = _get_filtered_query(sender, instance, to_class)
|
||||||
|
to_reorder = await query.exclude(pk=instance.pk).order_by(order).all()
|
||||||
|
new_order = passed_args.get(order)
|
||||||
|
if to_reorder and new_order is not None:
|
||||||
|
# can be more efficient - here we renumber all even if not needed.
|
||||||
|
for ind, link in enumerate(to_reorder):
|
||||||
|
if ind < new_order:
|
||||||
|
setattr(link, order, ind)
|
||||||
|
else:
|
||||||
|
setattr(link, order, ind + 1)
|
||||||
|
await sender.objects.bulk_update(
|
||||||
|
cast(List[Model], to_reorder), columns=[order]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pre_save(Link)
|
||||||
|
async def order_link_on_insert(sender: Type[Model], instance: Model, **kwargs: Any):
|
||||||
|
"""
|
||||||
|
Signal receiver registered on Link model, triggered every time before one is created
|
||||||
|
by calling save() on a model. Note that signal functions for pre_save signal accepts
|
||||||
|
sender class, instance and have to accept **kwargs even if it's empty as of now.
|
||||||
|
"""
|
||||||
|
rel_one, rel_two = _get_through_model_relations(sender, instance)
|
||||||
|
await _populate_order_on_insert(
|
||||||
|
sender=sender, instance=instance, from_class=rel_one, to_class=rel_two
|
||||||
|
)
|
||||||
|
await _populate_order_on_insert(
|
||||||
|
sender=sender, instance=instance, from_class=rel_two, to_class=rel_one
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pre_update(Link)
|
||||||
|
async def reorder_links_on_update(
|
||||||
|
sender: Type[ormar.Model], instance: ormar.Model, passed_args: Dict, **kwargs: Any
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Signal receiver registered on Link model, triggered every time before one is updated
|
||||||
|
by calling update() on a model. Note that signal functions for pre_update signal
|
||||||
|
accepts sender class, instance, passed_args which is a dict of kwargs passed to
|
||||||
|
update and have to accept **kwargs even if it's empty as of now.
|
||||||
|
"""
|
||||||
|
|
||||||
|
rel_one, rel_two = _get_through_model_relations(sender, instance)
|
||||||
|
await _reorder_on_update(
|
||||||
|
sender=sender,
|
||||||
|
instance=instance,
|
||||||
|
from_class=rel_one,
|
||||||
|
to_class=rel_two,
|
||||||
|
passed_args=passed_args,
|
||||||
|
)
|
||||||
|
await _reorder_on_update(
|
||||||
|
sender=sender,
|
||||||
|
instance=instance,
|
||||||
|
from_class=rel_two,
|
||||||
|
to_class=rel_one,
|
||||||
|
passed_args=passed_args,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pre_relation_remove([Animal, Human])
|
||||||
|
async def reorder_links_on_remove(
|
||||||
|
sender: Type[ormar.Model],
|
||||||
|
instance: ormar.Model,
|
||||||
|
child: ormar.Model,
|
||||||
|
relation_name: str,
|
||||||
|
**kwargs: Any,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Signal receiver registered on Anima and Human models, triggered every time before
|
||||||
|
relation on a model is removed. Note that signal functions for pre_relation_remove
|
||||||
|
signal accepts sender class, instance, child, relation_name and have to accept
|
||||||
|
**kwargs even if it's empty as of now.
|
||||||
|
|
||||||
|
Note that if classes have many relations you need to check if current one is ordered
|
||||||
|
"""
|
||||||
|
through_class = sender.Meta.model_fields[relation_name].through
|
||||||
|
through_instance = getattr(instance, through_class.get_name())
|
||||||
|
if not through_instance:
|
||||||
|
parent_pk = instance.pk
|
||||||
|
child_pk = child.pk
|
||||||
|
filter_kwargs = {f"{sender.get_name()}": parent_pk, child.get_name(): child_pk}
|
||||||
|
through_instance = await through_class.objects.get(**filter_kwargs)
|
||||||
|
rel_one, rel_two = _get_through_model_relations(through_class, through_instance)
|
||||||
|
await _reorder_on_update(
|
||||||
|
sender=through_class,
|
||||||
|
instance=through_instance,
|
||||||
|
from_class=rel_one,
|
||||||
|
to_class=rel_two,
|
||||||
|
passed_args={f"{rel_one.get_name()}_order": 999999},
|
||||||
|
)
|
||||||
|
await _reorder_on_update(
|
||||||
|
sender=through_class,
|
||||||
|
instance=through_instance,
|
||||||
|
from_class=rel_two,
|
||||||
|
to_class=rel_one,
|
||||||
|
passed_args={f"{rel_two.get_name()}_order": 999999},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ordering_by_through_on_m2m_field():
|
||||||
|
async with database:
|
||||||
|
|
||||||
|
def verify_order(instance, expected):
|
||||||
|
field_name = (
|
||||||
|
"favoriteAnimals" if isinstance(instance, Human) else "favoriteHumans"
|
||||||
|
)
|
||||||
|
order_field_name = (
|
||||||
|
"animal_order" if isinstance(instance, Human) else "human_order"
|
||||||
|
)
|
||||||
|
assert [x.name for x in getattr(instance, field_name)] == expected
|
||||||
|
assert [
|
||||||
|
getattr(x.link, order_field_name) for x in getattr(instance, field_name)
|
||||||
|
] == [i for i in range(len(expected))]
|
||||||
|
|
||||||
|
alice = await Human(name="Alice").save()
|
||||||
|
bob = await Human(name="Bob").save()
|
||||||
|
charlie = await Human(name="Charlie").save()
|
||||||
|
|
||||||
|
spot = await Animal(name="Spot").save()
|
||||||
|
kitty = await Animal(name="Kitty").save()
|
||||||
|
noodle = await Animal(name="Noodle").save()
|
||||||
|
|
||||||
|
await alice.favoriteAnimals.add(noodle)
|
||||||
|
await alice.favoriteAnimals.add(spot)
|
||||||
|
await alice.favoriteAnimals.add(kitty)
|
||||||
|
|
||||||
|
await alice.load_all()
|
||||||
|
verify_order(alice, ["Noodle", "Spot", "Kitty"])
|
||||||
|
|
||||||
|
await bob.favoriteAnimals.add(noodle)
|
||||||
|
await bob.favoriteAnimals.add(kitty)
|
||||||
|
await bob.favoriteAnimals.add(spot)
|
||||||
|
|
||||||
|
await bob.load_all()
|
||||||
|
verify_order(bob, ["Noodle", "Kitty", "Spot"])
|
||||||
|
|
||||||
|
await charlie.favoriteAnimals.add(kitty)
|
||||||
|
await charlie.favoriteAnimals.add(noodle)
|
||||||
|
await charlie.favoriteAnimals.add(spot)
|
||||||
|
|
||||||
|
await charlie.load_all()
|
||||||
|
verify_order(charlie, ["Kitty", "Noodle", "Spot"])
|
||||||
|
|
||||||
|
animals = [noodle, kitty, spot]
|
||||||
|
for animal in animals:
|
||||||
|
await animal.load_all()
|
||||||
|
verify_order(animal, ["Alice", "Bob", "Charlie"])
|
||||||
|
|
||||||
|
zack = await Human(name="Zack").save()
|
||||||
|
|
||||||
|
await noodle.favoriteHumans.add(zack, human_order=0)
|
||||||
|
await noodle.load_all()
|
||||||
|
verify_order(noodle, ["Zack", "Alice", "Bob", "Charlie"])
|
||||||
|
|
||||||
|
await zack.load_all()
|
||||||
|
verify_order(zack, ["Noodle"])
|
||||||
|
|
||||||
|
await noodle.favoriteHumans.filter(name="Zack").update(link=dict(human_order=1))
|
||||||
|
await noodle.load_all()
|
||||||
|
verify_order(noodle, ["Alice", "Zack", "Bob", "Charlie"])
|
||||||
|
|
||||||
|
await noodle.favoriteHumans.filter(name="Zack").update(link=dict(human_order=2))
|
||||||
|
await noodle.load_all()
|
||||||
|
verify_order(noodle, ["Alice", "Bob", "Zack", "Charlie"])
|
||||||
|
|
||||||
|
await noodle.favoriteHumans.filter(name="Zack").update(link=dict(human_order=3))
|
||||||
|
await noodle.load_all()
|
||||||
|
verify_order(noodle, ["Alice", "Bob", "Charlie", "Zack"])
|
||||||
|
|
||||||
|
await kitty.favoriteHumans.remove(bob)
|
||||||
|
await kitty.load_all()
|
||||||
|
assert [x.name for x in kitty.favoriteHumans] == ["Alice", "Charlie"]
|
||||||
|
|
||||||
|
bob = await noodle.favoriteHumans.get(pk=bob.pk)
|
||||||
|
assert bob.link.human_order == 1
|
||||||
|
|
||||||
|
await noodle.favoriteHumans.remove(
|
||||||
|
await noodle.favoriteHumans.filter(link__human_order=2).get()
|
||||||
|
)
|
||||||
|
await noodle.load_all()
|
||||||
|
verify_order(noodle, ["Alice", "Bob", "Zack"])
|
||||||
@ -107,6 +107,30 @@ async def test_load_all_many_to_many():
|
|||||||
assert hq.nicks[1].name == "Bazinga20"
|
assert hq.nicks[1].name == "Bazinga20"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_load_all_with_order():
|
||||||
|
async with database:
|
||||||
|
async with database.transaction(force_rollback=True):
|
||||||
|
nick1 = await NickName.objects.create(name="Barry", is_lame=False)
|
||||||
|
nick2 = await NickName.objects.create(name="Joe", is_lame=True)
|
||||||
|
hq = await HQ.objects.create(name="Main")
|
||||||
|
await hq.nicks.add(nick1)
|
||||||
|
await hq.nicks.add(nick2)
|
||||||
|
|
||||||
|
hq = await HQ.objects.get(name="Main")
|
||||||
|
await hq.load_all(order_by="-nicks__name")
|
||||||
|
|
||||||
|
assert hq.nicks[0] == nick2
|
||||||
|
assert hq.nicks[0].name == "Joe"
|
||||||
|
|
||||||
|
assert hq.nicks[1] == nick1
|
||||||
|
assert hq.nicks[1].name == "Barry"
|
||||||
|
|
||||||
|
await hq.load_all()
|
||||||
|
assert hq.nicks[0] == nick1
|
||||||
|
assert hq.nicks[1] == nick2
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_loading_reversed_relation():
|
async def test_loading_reversed_relation():
|
||||||
async with database:
|
async with database:
|
||||||
|
|||||||
@ -161,7 +161,7 @@ async def test_only_one_side_has_through() -> Any:
|
|||||||
assert post2.categories[0].postcategory is not None
|
assert post2.categories[0].postcategory is not None
|
||||||
|
|
||||||
categories = await Category.objects.select_related("posts").all()
|
categories = await Category.objects.select_related("posts").all()
|
||||||
categories = cast(Sequence[Category], categories)
|
assert isinstance(categories[0], Category)
|
||||||
assert categories[0].postcategory is None
|
assert categories[0].postcategory is None
|
||||||
assert categories[0].posts[0].postcategory is not None
|
assert categories[0].posts[0].postcategory is not None
|
||||||
|
|
||||||
|
|||||||
80
tests/test_proper_order_of_sorting_apply.py
Normal file
80
tests/test_proper_order_of_sorting_apply.py
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import databases
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMeta(ormar.ModelMeta):
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
|
||||||
|
class Author(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "authors"
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
name: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Book(ormar.Model):
|
||||||
|
class Meta(BaseMeta):
|
||||||
|
tablename = "books"
|
||||||
|
orders_by = ["-ranking"]
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
author: Optional[Author] = ormar.ForeignKey(
|
||||||
|
Author, orders_by=["name"], related_orders_by=["-year"]
|
||||||
|
)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
year: int = ormar.Integer(nullable=True)
|
||||||
|
ranking: int = ormar.Integer(nullable=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="module")
|
||||||
|
def create_test_database():
|
||||||
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
metadata.create_all(engine)
|
||||||
|
yield
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="function")
|
||||||
|
async def cleanup():
|
||||||
|
yield
|
||||||
|
async with database:
|
||||||
|
await Book.objects.delete(each=True)
|
||||||
|
await Author.objects.delete(each=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_default_orders_is_applied_from_reverse_relation():
|
||||||
|
async with database:
|
||||||
|
tolkien = await Author(name="J.R.R. Tolkien").save()
|
||||||
|
hobbit = await Book(author=tolkien, title="The Hobbit", year=1933).save()
|
||||||
|
silmarillion = await Book(
|
||||||
|
author=tolkien, title="The Silmarillion", year=1977
|
||||||
|
).save()
|
||||||
|
lotr = await Book(
|
||||||
|
author=tolkien, title="The Lord of the Rings", year=1955
|
||||||
|
).save()
|
||||||
|
|
||||||
|
tolkien = await Author.objects.select_related("books").get()
|
||||||
|
assert tolkien.books[2] == hobbit
|
||||||
|
assert tolkien.books[1] == lotr
|
||||||
|
assert tolkien.books[0] == silmarillion
|
||||||
|
|
||||||
|
tolkien = (
|
||||||
|
await Author.objects.select_related("books").order_by("books__title").get()
|
||||||
|
)
|
||||||
|
assert tolkien.books[0] == hobbit
|
||||||
|
assert tolkien.books[1] == lotr
|
||||||
|
assert tolkien.books[2] == silmarillion
|
||||||
217
tests/test_signals_for_relations.py
Normal file
217
tests/test_signals_for_relations.py
Normal file
@ -0,0 +1,217 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import databases
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
import ormar
|
||||||
|
from ormar import (
|
||||||
|
post_relation_add,
|
||||||
|
post_relation_remove,
|
||||||
|
pre_relation_add,
|
||||||
|
pre_relation_remove,
|
||||||
|
)
|
||||||
|
import pydantic
|
||||||
|
from tests.settings import DATABASE_URL
|
||||||
|
|
||||||
|
database = databases.Database(DATABASE_URL, force_rollback=True)
|
||||||
|
metadata = sqlalchemy.MetaData()
|
||||||
|
|
||||||
|
|
||||||
|
class AuditLog(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "audits"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
event_type: str = ormar.String(max_length=100)
|
||||||
|
event_log: pydantic.Json = ormar.JSON()
|
||||||
|
|
||||||
|
|
||||||
|
class Cover(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "covers"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Artist(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "artists"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
id: int = ormar.Integer(name="artist_id", primary_key=True)
|
||||||
|
name: str = ormar.String(name="fname", max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class Album(ormar.Model):
|
||||||
|
class Meta:
|
||||||
|
tablename = "albums"
|
||||||
|
metadata = metadata
|
||||||
|
database = database
|
||||||
|
|
||||||
|
id: int = ormar.Integer(primary_key=True)
|
||||||
|
title: str = ormar.String(max_length=100)
|
||||||
|
cover: Optional[Cover] = ormar.ForeignKey(Cover)
|
||||||
|
artists = ormar.ManyToMany(Artist)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="module")
|
||||||
|
def create_test_database():
|
||||||
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
metadata.create_all(engine)
|
||||||
|
yield
|
||||||
|
metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True, scope="function")
|
||||||
|
async def cleanup():
|
||||||
|
yield
|
||||||
|
async with database:
|
||||||
|
await AuditLog.objects.delete(each=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_relation_signal_functions():
|
||||||
|
async with database:
|
||||||
|
async with database.transaction(force_rollback=True):
|
||||||
|
|
||||||
|
@pre_relation_add([Album, Cover, Artist])
|
||||||
|
async def before_relation_add(
|
||||||
|
sender, instance, child, relation_name, passed_kwargs, **kwargs
|
||||||
|
):
|
||||||
|
await AuditLog.objects.create(
|
||||||
|
event_type="RELATION_PRE_ADD",
|
||||||
|
event_log=dict(
|
||||||
|
class_affected=sender.get_name(),
|
||||||
|
parent_id=instance.pk,
|
||||||
|
child_id=child.pk,
|
||||||
|
relation_name=relation_name,
|
||||||
|
kwargs=passed_kwargs,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
passed_kwargs.pop("dummy", None)
|
||||||
|
|
||||||
|
@post_relation_add([Album, Cover, Artist])
|
||||||
|
async def after_relation_add(
|
||||||
|
sender, instance, child, relation_name, passed_kwargs, **kwargs
|
||||||
|
):
|
||||||
|
await AuditLog.objects.create(
|
||||||
|
event_type="RELATION_POST_ADD",
|
||||||
|
event_log=dict(
|
||||||
|
class_affected=sender.get_name(),
|
||||||
|
parent_id=instance.pk,
|
||||||
|
child_id=child.pk,
|
||||||
|
relation_name=relation_name,
|
||||||
|
kwargs=passed_kwargs,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
@pre_relation_remove([Album, Cover, Artist])
|
||||||
|
async def before_relation_remove(
|
||||||
|
sender, instance, child, relation_name, **kwargs
|
||||||
|
):
|
||||||
|
await AuditLog.objects.create(
|
||||||
|
event_type="RELATION_PRE_REMOVE",
|
||||||
|
event_log=dict(
|
||||||
|
class_affected=sender.get_name(),
|
||||||
|
parent_id=instance.pk,
|
||||||
|
child_id=child.pk,
|
||||||
|
relation_name=relation_name,
|
||||||
|
kwargs=kwargs,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
@post_relation_remove([Album, Cover, Artist])
|
||||||
|
async def after_relation_remove(
|
||||||
|
sender, instance, child, relation_name, **kwargs
|
||||||
|
):
|
||||||
|
await AuditLog.objects.create(
|
||||||
|
event_type="RELATION_POST_REMOVE",
|
||||||
|
event_log=dict(
|
||||||
|
class_affected=sender.get_name(),
|
||||||
|
parent_id=instance.pk,
|
||||||
|
child_id=child.pk,
|
||||||
|
relation_name=relation_name,
|
||||||
|
kwargs=kwargs,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
cover = await Cover(title="New").save()
|
||||||
|
artist = await Artist(name="Artist").save()
|
||||||
|
album = await Album(title="New Album").save()
|
||||||
|
|
||||||
|
await cover.albums.add(album, index=0)
|
||||||
|
log = await AuditLog.objects.get(event_type="RELATION_PRE_ADD")
|
||||||
|
assert log.event_log.get("parent_id") == cover.pk
|
||||||
|
assert log.event_log.get("child_id") == album.pk
|
||||||
|
assert log.event_log.get("relation_name") == "albums"
|
||||||
|
assert log.event_log.get("kwargs") == dict(index=0)
|
||||||
|
|
||||||
|
log2 = await AuditLog.objects.get(event_type="RELATION_POST_ADD")
|
||||||
|
assert log2.event_log.get("parent_id") == cover.pk
|
||||||
|
assert log2.event_log.get("child_id") == album.pk
|
||||||
|
assert log2.event_log.get("relation_name") == "albums"
|
||||||
|
assert log2.event_log.get("kwargs") == dict(index=0)
|
||||||
|
|
||||||
|
await album.artists.add(artist, dummy="test")
|
||||||
|
|
||||||
|
log3 = await AuditLog.objects.filter(
|
||||||
|
event_type="RELATION_PRE_ADD", id__gt=log2.pk
|
||||||
|
).get()
|
||||||
|
assert log3.event_log.get("parent_id") == album.pk
|
||||||
|
assert log3.event_log.get("child_id") == artist.pk
|
||||||
|
assert log3.event_log.get("relation_name") == "artists"
|
||||||
|
assert log3.event_log.get("kwargs") == dict(dummy="test")
|
||||||
|
|
||||||
|
log4 = await AuditLog.objects.get(
|
||||||
|
event_type="RELATION_POST_ADD", id__gt=log3.pk
|
||||||
|
)
|
||||||
|
assert log4.event_log.get("parent_id") == album.pk
|
||||||
|
assert log4.event_log.get("child_id") == artist.pk
|
||||||
|
assert log4.event_log.get("relation_name") == "artists"
|
||||||
|
assert log4.event_log.get("kwargs") == dict()
|
||||||
|
|
||||||
|
assert album.cover == cover
|
||||||
|
assert len(album.artists) == 1
|
||||||
|
|
||||||
|
await cover.albums.remove(album)
|
||||||
|
log = await AuditLog.objects.get(event_type="RELATION_PRE_REMOVE")
|
||||||
|
assert log.event_log.get("parent_id") == cover.pk
|
||||||
|
assert log.event_log.get("child_id") == album.pk
|
||||||
|
assert log.event_log.get("relation_name") == "albums"
|
||||||
|
assert log.event_log.get("kwargs") == dict()
|
||||||
|
|
||||||
|
log2 = await AuditLog.objects.get(event_type="RELATION_POST_REMOVE")
|
||||||
|
assert log2.event_log.get("parent_id") == cover.pk
|
||||||
|
assert log2.event_log.get("child_id") == album.pk
|
||||||
|
assert log2.event_log.get("relation_name") == "albums"
|
||||||
|
assert log2.event_log.get("kwargs") == dict()
|
||||||
|
|
||||||
|
await album.artists.remove(artist)
|
||||||
|
log3 = await AuditLog.objects.filter(
|
||||||
|
event_type="RELATION_PRE_REMOVE", id__gt=log2.pk
|
||||||
|
).get()
|
||||||
|
assert log3.event_log.get("parent_id") == album.pk
|
||||||
|
assert log3.event_log.get("child_id") == artist.pk
|
||||||
|
assert log3.event_log.get("relation_name") == "artists"
|
||||||
|
assert log3.event_log.get("kwargs") == dict()
|
||||||
|
|
||||||
|
log4 = await AuditLog.objects.get(
|
||||||
|
event_type="RELATION_POST_REMOVE", id__gt=log3.pk
|
||||||
|
)
|
||||||
|
assert log4.event_log.get("parent_id") == album.pk
|
||||||
|
assert log4.event_log.get("child_id") == artist.pk
|
||||||
|
assert log4.event_log.get("relation_name") == "artists"
|
||||||
|
assert log4.event_log.get("kwargs") == dict()
|
||||||
|
|
||||||
|
await album.load_all()
|
||||||
|
assert len(album.artists) == 0
|
||||||
|
assert album.cover is None
|
||||||
Reference in New Issue
Block a user