split tests into packages
This commit is contained in:
271
tests/test_encryption/test_encrypted_columns.py
Normal file
271
tests/test_encryption/test_encrypted_columns.py
Normal file
@ -0,0 +1,271 @@
|
||||
# type: ignore
|
||||
import base64
|
||||
import decimal
|
||||
import hashlib
|
||||
import uuid
|
||||
import datetime
|
||||
from typing import Any
|
||||
|
||||
import databases
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
|
||||
import ormar
|
||||
from ormar import ModelDefinitionError, NoMatch
|
||||
from ormar.fields.sqlalchemy_encrypted import EncryptedString
|
||||
from tests.settings import DATABASE_URL
|
||||
|
||||
database = databases.Database(DATABASE_URL)
|
||||
metadata = sqlalchemy.MetaData()
|
||||
|
||||
|
||||
class BaseMeta(ormar.ModelMeta):
|
||||
metadata = metadata
|
||||
database = database
|
||||
|
||||
|
||||
default_fernet = dict(
|
||||
encrypt_secret="asd123", encrypt_backend=ormar.EncryptBackends.FERNET,
|
||||
)
|
||||
|
||||
|
||||
class DummyBackend(ormar.fields.EncryptBackend):
|
||||
def _initialize_backend(self, secret_key: bytes) -> None:
|
||||
pass
|
||||
|
||||
def encrypt(self, value: Any) -> str:
|
||||
return value
|
||||
|
||||
def decrypt(self, value: Any) -> str:
|
||||
return value
|
||||
|
||||
|
||||
class Author(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "authors"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100, **default_fernet)
|
||||
uuid_test = ormar.UUID(default=uuid.uuid4, uuid_format="string")
|
||||
uuid_test2 = ormar.UUID(nullable=True, uuid_format="string")
|
||||
password: str = ormar.String(
|
||||
max_length=128,
|
||||
encrypt_secret="udxc32",
|
||||
encrypt_backend=ormar.EncryptBackends.HASH,
|
||||
)
|
||||
birth_year: int = ormar.Integer(
|
||||
nullable=True,
|
||||
encrypt_secret="secure89key%^&psdijfipew",
|
||||
encrypt_backend=ormar.EncryptBackends.FERNET,
|
||||
)
|
||||
test_text: str = ormar.Text(default="", **default_fernet)
|
||||
test_bool: bool = ormar.Boolean(nullable=False, **default_fernet)
|
||||
test_float: float = ormar.Float(**default_fernet)
|
||||
test_float2: float = ormar.Float(nullable=True, **default_fernet)
|
||||
test_datetime = ormar.DateTime(default=datetime.datetime.now, **default_fernet)
|
||||
test_date = ormar.Date(default=datetime.date.today, **default_fernet)
|
||||
test_time = ormar.Time(default=datetime.time, **default_fernet)
|
||||
test_json = ormar.JSON(default={}, **default_fernet)
|
||||
test_bigint: int = ormar.BigInteger(default=0, **default_fernet)
|
||||
test_decimal = ormar.Decimal(scale=2, precision=10, **default_fernet)
|
||||
test_decimal2 = ormar.Decimal(max_digits=10, decimal_places=2, **default_fernet)
|
||||
custom_backend: str = ormar.String(
|
||||
max_length=200,
|
||||
encrypt_secret="asda8",
|
||||
encrypt_backend=ormar.EncryptBackends.CUSTOM,
|
||||
encrypt_custom_backend=DummyBackend,
|
||||
)
|
||||
|
||||
|
||||
class Hash(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "hashes"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(
|
||||
max_length=128,
|
||||
encrypt_secret="udxc32",
|
||||
encrypt_backend=ormar.EncryptBackends.HASH,
|
||||
)
|
||||
|
||||
|
||||
class Filter(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "filters"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100, **default_fernet)
|
||||
hash = ormar.ForeignKey(Hash)
|
||||
|
||||
|
||||
class Report(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "reports"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
name: str = ormar.String(max_length=100)
|
||||
filters = ormar.ManyToMany(Filter)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def create_test_database():
|
||||
engine = sqlalchemy.create_engine(DATABASE_URL)
|
||||
metadata.drop_all(engine)
|
||||
metadata.create_all(engine)
|
||||
yield
|
||||
metadata.drop_all(engine)
|
||||
|
||||
|
||||
def test_error_on_encrypted_pk():
|
||||
with pytest.raises(ModelDefinitionError):
|
||||
|
||||
class Wrong(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "wrongs"
|
||||
|
||||
id: int = ormar.Integer(
|
||||
primary_key=True,
|
||||
encrypt_secret="asd123",
|
||||
encrypt_backend=ormar.EncryptBackends.FERNET,
|
||||
)
|
||||
|
||||
|
||||
def test_error_on_encrypted_relation():
|
||||
with pytest.raises(ModelDefinitionError):
|
||||
|
||||
class Wrong2(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "wrongs2"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
author = ormar.ForeignKey(
|
||||
Author,
|
||||
encrypt_secret="asd123",
|
||||
encrypt_backend=ormar.EncryptBackends.FERNET,
|
||||
)
|
||||
|
||||
|
||||
def test_error_on_encrypted_m2m_relation():
|
||||
with pytest.raises(ModelDefinitionError):
|
||||
|
||||
class Wrong3(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "wrongs3"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
author = ormar.ManyToMany(
|
||||
Author,
|
||||
encrypt_secret="asd123",
|
||||
encrypt_backend=ormar.EncryptBackends.FERNET,
|
||||
)
|
||||
|
||||
|
||||
def test_wrong_backend():
|
||||
with pytest.raises(ModelDefinitionError):
|
||||
|
||||
class Wrong3(ormar.Model):
|
||||
class Meta(BaseMeta):
|
||||
tablename = "wrongs3"
|
||||
|
||||
id: int = ormar.Integer(primary_key=True)
|
||||
author = ormar.Integer(
|
||||
encrypt_secret="asd123",
|
||||
encrypt_backend=ormar.EncryptBackends.CUSTOM,
|
||||
encrypt_custom_backend="aa",
|
||||
)
|
||||
|
||||
|
||||
def test_db_structure():
|
||||
assert Author.Meta.table.c.get("name").type.__class__ == EncryptedString
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_save_and_retrieve():
|
||||
async with database:
|
||||
test_uuid = uuid.uuid4()
|
||||
await Author(
|
||||
name="Test",
|
||||
birth_year=1988,
|
||||
password="test123",
|
||||
uuid_test=test_uuid,
|
||||
test_float=1.2,
|
||||
test_bool=True,
|
||||
test_decimal=decimal.Decimal(3.5),
|
||||
test_decimal2=decimal.Decimal(5.5),
|
||||
test_json=dict(aa=12),
|
||||
custom_backend="test12",
|
||||
).save()
|
||||
author = await Author.objects.get()
|
||||
|
||||
assert author.name == "Test"
|
||||
assert author.birth_year == 1988
|
||||
password = (
|
||||
"03e4a4d513e99cb3fe4ee3db282c053daa3f3572b849c3868939a306944ad5c08"
|
||||
"22b50d4886e10f4cd418c3f2df3ceb02e2e7ac6e920ae0c90f2dedfc8fa16e2"
|
||||
)
|
||||
assert author.password == password
|
||||
assert author.uuid_test == test_uuid
|
||||
assert author.uuid_test2 is None
|
||||
assert author.test_datetime.date() == datetime.date.today()
|
||||
assert author.test_date == datetime.date.today()
|
||||
assert author.test_text == ""
|
||||
assert author.test_float == 1.2
|
||||
assert author.test_float2 is None
|
||||
assert author.test_bigint == 0
|
||||
assert author.test_json == {"aa": 12}
|
||||
assert author.test_decimal == 3.5
|
||||
assert author.test_decimal2 == 5.5
|
||||
assert author.custom_backend == "test12"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fernet_filters_nomatch():
|
||||
async with database:
|
||||
await Filter(name="test1").save()
|
||||
await Filter(name="test1").save()
|
||||
|
||||
filters = await Filter.objects.all()
|
||||
assert filters[0].name == filters[1].name == "test1"
|
||||
|
||||
with pytest.raises(NoMatch):
|
||||
await Filter.objects.get(name="test1")
|
||||
|
||||
assert await Filter.objects.get_or_none(name="test1") is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_hash_filters_works():
|
||||
async with database:
|
||||
await Hash(name="test1").save()
|
||||
await Hash(name="test2").save()
|
||||
|
||||
secret = hashlib.sha256("udxc32".encode()).digest()
|
||||
secret = base64.urlsafe_b64encode(secret)
|
||||
hashed_test1 = hashlib.sha512(secret + "test1".encode()).hexdigest()
|
||||
|
||||
hash1 = await Hash.objects.get(name="test1")
|
||||
assert hash1.name == hashed_test1
|
||||
|
||||
with pytest.raises(NoMatch):
|
||||
await Filter.objects.get(name__icontains="test")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_related_model_fields_properly_decrypted():
|
||||
async with database:
|
||||
hash1 = await Hash(name="test1").save()
|
||||
report = await Report.objects.create(name="Report1")
|
||||
await report.filters.create(name="test1", hash=hash1)
|
||||
await report.filters.create(name="test2")
|
||||
|
||||
report2 = await Report.objects.select_related("filters").get()
|
||||
assert report2.filters[0].name == "test1"
|
||||
assert report2.filters[1].name == "test2"
|
||||
|
||||
secret = hashlib.sha256("udxc32".encode()).digest()
|
||||
secret = base64.urlsafe_b64encode(secret)
|
||||
hashed_test1 = hashlib.sha512(secret + "test1".encode()).hexdigest()
|
||||
|
||||
report2 = await Report.objects.select_related("filters__hash").get()
|
||||
assert report2.filters[0].name == "test1"
|
||||
assert report2.filters[0].hash.name == hashed_test1
|
||||
Reference in New Issue
Block a user