273 lines
8.3 KiB
Python
273 lines
8.3 KiB
Python
# type: ignore
|
|
import base64
|
|
import decimal
|
|
import hashlib
|
|
import uuid
|
|
import datetime
|
|
from typing import Any
|
|
|
|
import databases
|
|
import pytest
|
|
import sqlalchemy
|
|
|
|
import ormar
|
|
from ormar import ModelDefinitionError, NoMatch
|
|
from ormar.fields.sqlalchemy_encrypted import EncryptedString
|
|
from tests.settings import DATABASE_URL
|
|
|
|
database = databases.Database(DATABASE_URL)
|
|
metadata = sqlalchemy.MetaData()
|
|
|
|
|
|
class BaseMeta(ormar.ModelMeta):
|
|
metadata = metadata
|
|
database = database
|
|
|
|
|
|
default_fernet = dict(
|
|
encrypt_secret="asd123", encrypt_backend=ormar.EncryptBackends.FERNET,
|
|
)
|
|
|
|
|
|
class DummyBackend(ormar.fields.EncryptBackend):
|
|
def _initialize_backend(self, secret_key: bytes) -> None:
|
|
pass
|
|
|
|
def encrypt(self, value: Any) -> str:
|
|
return value
|
|
|
|
def decrypt(self, value: Any) -> str:
|
|
return value
|
|
|
|
|
|
class Author(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "authors"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
name: str = ormar.String(max_length=100, **default_fernet)
|
|
uuid_test = ormar.UUID(default=uuid.uuid4, uuid_format="string")
|
|
uuid_test2 = ormar.UUID(nullable=True, uuid_format="string")
|
|
password: str = ormar.String(
|
|
max_length=128,
|
|
encrypt_secret="udxc32",
|
|
encrypt_backend=ormar.EncryptBackends.HASH,
|
|
)
|
|
birth_year: int = ormar.Integer(
|
|
nullable=True,
|
|
encrypt_secret="secure89key%^&psdijfipew",
|
|
encrypt_backend=ormar.EncryptBackends.FERNET,
|
|
)
|
|
test_text: str = ormar.Text(default="", **default_fernet)
|
|
test_bool: bool = ormar.Boolean(nullable=False, **default_fernet)
|
|
test_float: float = ormar.Float(**default_fernet)
|
|
test_float2: float = ormar.Float(nullable=True, **default_fernet)
|
|
test_datetime = ormar.DateTime(default=datetime.datetime.now, **default_fernet)
|
|
test_date = ormar.Date(default=datetime.date.today, **default_fernet)
|
|
test_time = ormar.Time(default=datetime.time, **default_fernet)
|
|
test_json = ormar.JSON(default={}, **default_fernet)
|
|
test_bigint: int = ormar.BigInteger(default=0, **default_fernet)
|
|
test_smallint: int = ormar.SmallInteger(default=0, **default_fernet)
|
|
test_decimal = ormar.Decimal(scale=2, precision=10, **default_fernet)
|
|
test_decimal2 = ormar.Decimal(max_digits=10, decimal_places=2, **default_fernet)
|
|
custom_backend: str = ormar.String(
|
|
max_length=200,
|
|
encrypt_secret="asda8",
|
|
encrypt_backend=ormar.EncryptBackends.CUSTOM,
|
|
encrypt_custom_backend=DummyBackend,
|
|
)
|
|
|
|
|
|
class Hash(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "hashes"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
name: str = ormar.String(
|
|
max_length=128,
|
|
encrypt_secret="udxc32",
|
|
encrypt_backend=ormar.EncryptBackends.HASH,
|
|
)
|
|
|
|
|
|
class Filter(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "filters"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
name: str = ormar.String(max_length=100, **default_fernet)
|
|
hash = ormar.ForeignKey(Hash)
|
|
|
|
|
|
class Report(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "reports"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
name: str = ormar.String(max_length=100)
|
|
filters = ormar.ManyToMany(Filter)
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def create_test_database():
|
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
|
metadata.drop_all(engine)
|
|
metadata.create_all(engine)
|
|
yield
|
|
metadata.drop_all(engine)
|
|
|
|
|
|
def test_error_on_encrypted_pk():
|
|
with pytest.raises(ModelDefinitionError):
|
|
|
|
class Wrong(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "wrongs"
|
|
|
|
id: int = ormar.Integer(
|
|
primary_key=True,
|
|
encrypt_secret="asd123",
|
|
encrypt_backend=ormar.EncryptBackends.FERNET,
|
|
)
|
|
|
|
|
|
def test_error_on_encrypted_relation():
|
|
with pytest.raises(ModelDefinitionError):
|
|
|
|
class Wrong2(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "wrongs2"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
author = ormar.ForeignKey(
|
|
Author,
|
|
encrypt_secret="asd123",
|
|
encrypt_backend=ormar.EncryptBackends.FERNET,
|
|
)
|
|
|
|
|
|
def test_error_on_encrypted_m2m_relation():
|
|
with pytest.raises(ModelDefinitionError):
|
|
|
|
class Wrong3(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "wrongs3"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
author = ormar.ManyToMany(
|
|
Author,
|
|
encrypt_secret="asd123",
|
|
encrypt_backend=ormar.EncryptBackends.FERNET,
|
|
)
|
|
|
|
|
|
def test_wrong_backend():
|
|
with pytest.raises(ModelDefinitionError):
|
|
|
|
class Wrong3(ormar.Model):
|
|
class Meta(BaseMeta):
|
|
tablename = "wrongs3"
|
|
|
|
id: int = ormar.Integer(primary_key=True)
|
|
author = ormar.Integer(
|
|
encrypt_secret="asd123",
|
|
encrypt_backend=ormar.EncryptBackends.CUSTOM,
|
|
encrypt_custom_backend="aa",
|
|
)
|
|
|
|
|
|
def test_db_structure():
|
|
assert Author.Meta.table.c.get("name").type.__class__ == EncryptedString
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_and_retrieve():
|
|
async with database:
|
|
test_uuid = uuid.uuid4()
|
|
await Author(
|
|
name="Test",
|
|
birth_year=1988,
|
|
password="test123",
|
|
uuid_test=test_uuid,
|
|
test_float=1.2,
|
|
test_bool=True,
|
|
test_decimal=decimal.Decimal(3.5),
|
|
test_decimal2=decimal.Decimal(5.5),
|
|
test_json=dict(aa=12),
|
|
custom_backend="test12",
|
|
).save()
|
|
author = await Author.objects.get()
|
|
|
|
assert author.name == "Test"
|
|
assert author.birth_year == 1988
|
|
password = (
|
|
"03e4a4d513e99cb3fe4ee3db282c053daa3f3572b849c3868939a306944ad5c08"
|
|
"22b50d4886e10f4cd418c3f2df3ceb02e2e7ac6e920ae0c90f2dedfc8fa16e2"
|
|
)
|
|
assert author.password == password
|
|
assert author.uuid_test == test_uuid
|
|
assert author.uuid_test2 is None
|
|
assert author.test_datetime.date() == datetime.date.today()
|
|
assert author.test_date == datetime.date.today()
|
|
assert author.test_text == ""
|
|
assert author.test_float == 1.2
|
|
assert author.test_float2 is None
|
|
assert author.test_bigint == 0
|
|
assert author.test_json == {"aa": 12}
|
|
assert author.test_decimal == 3.5
|
|
assert author.test_decimal2 == 5.5
|
|
assert author.custom_backend == "test12"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fernet_filters_nomatch():
|
|
async with database:
|
|
await Filter(name="test1").save()
|
|
await Filter(name="test1").save()
|
|
|
|
filters = await Filter.objects.all()
|
|
assert filters[0].name == filters[1].name == "test1"
|
|
|
|
with pytest.raises(NoMatch):
|
|
await Filter.objects.get(name="test1")
|
|
|
|
assert await Filter.objects.get_or_none(name="test1") is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hash_filters_works():
|
|
async with database:
|
|
await Hash(name="test1").save()
|
|
await Hash(name="test2").save()
|
|
|
|
secret = hashlib.sha256("udxc32".encode()).digest()
|
|
secret = base64.urlsafe_b64encode(secret)
|
|
hashed_test1 = hashlib.sha512(secret + "test1".encode()).hexdigest()
|
|
|
|
hash1 = await Hash.objects.get(name="test1")
|
|
assert hash1.name == hashed_test1
|
|
|
|
with pytest.raises(NoMatch):
|
|
await Filter.objects.get(name__icontains="test")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_related_model_fields_properly_decrypted():
|
|
async with database:
|
|
hash1 = await Hash(name="test1").save()
|
|
report = await Report.objects.create(name="Report1")
|
|
await report.filters.create(name="test1", hash=hash1)
|
|
await report.filters.create(name="test2")
|
|
|
|
report2 = await Report.objects.select_related("filters").get()
|
|
assert report2.filters[0].name == "test1"
|
|
assert report2.filters[1].name == "test2"
|
|
|
|
secret = hashlib.sha256("udxc32".encode()).digest()
|
|
secret = base64.urlsafe_b64encode(secret)
|
|
hashed_test1 = hashlib.sha512(secret + "test1".encode()).hexdigest()
|
|
|
|
report2 = await Report.objects.select_related("filters__hash").get()
|
|
assert report2.filters[0].name == "test1"
|
|
assert report2.filters[0].hash.name == hashed_test1
|