Merge pull request #58 from collerek/check_uuid

fix uuid column to char(36)
This commit is contained in:
collerek
2020-11-28 17:31:52 +07:00
committed by GitHub
8 changed files with 113 additions and 32 deletions

View File

@ -251,11 +251,21 @@ You can use either `length` and `precision` parameters or `max_digits` and `deci
### UUID
`UUID()` has no required parameters.
`UUID(uuid_format: str = 'hex')` has no required parameters.
* Sqlalchemy column: `ormar.UUID` based on `sqlalchemy.CHAR` field
* Sqlalchemy column: `ormar.UUID` based on `sqlalchemy.CHAR(36)` or `sqlalchemy.CHAR(32)` field (for string or hex format respectively)
* Type (used for pydantic): `uuid.UUID`
`uuid_format` parameters allow 'hex'(default) or 'string' values.
Depending on the format either 32 or 36 char is used in the database.
Sample:
* 'hex' format value = "c616ab438cce49dbbf4380d109251dce" (CHAR(32))
* 'string' value = "c616ab43-8cce-49db-bf43-80d109251dce" (CHAR(36))
When loaded it's always python UUID so you can compare it and compare two formats values between each other.
[relations]: ./relations.md
[queries]: ./queries.md
[pydantic]: https://pydantic-docs.helpmanual.io/usage/types/#constrained-types

View File

@ -1,3 +1,13 @@
# 0.5.4
* Allow to pass `uuid_format` (allowed 'hex'(default) or 'string') to `UUID` field to change the format in which it's saved.
By default field is saved in hex format (trimmed to 32 chars (without dashes)), but you can pass
format='string' to use 36 (with dashes) instead to adjust to existing db or other libraries.
Sample:
* hex value = c616ab438cce49dbbf4380d109251dce
* string value = c616ab43-8cce-49db-bf43-80d109251dce
# 0.5.3
* Fixed bug in `Model.dict()` method that was ignoring exclude parameter and not include dictionary argument.

View File

@ -30,7 +30,7 @@ class UndefinedType: # pragma no cover
Undefined = UndefinedType()
__version__ = "0.5.3"
__version__ = "0.5.4"
__all__ = [
"Integer",
"BigInteger",

View File

@ -318,6 +318,21 @@ class Decimal(ModelFieldFactory, decimal.Decimal):
class UUID(ModelFieldFactory, uuid.UUID):
_type = uuid.UUID
def __new__( # type: ignore # noqa CFQ002
cls, *, uuid_format: str = "hex", **kwargs: Any
) -> Type[BaseField]:
kwargs = {
**kwargs,
**{
k: v
for k, v in locals().items()
if k not in ["cls", "__class__", "kwargs"]
},
}
return super().__new__(cls, **kwargs)
@classmethod
def get_column_type(cls, **kwargs: Any) -> Any:
return sqlalchemy_uuid.UUID()
uuid_format = kwargs.get("uuid_format", "hex")
return sqlalchemy_uuid.UUID(uuid_format=uuid_format)

View File

@ -15,6 +15,10 @@ class UUID(TypeDecorator): # pragma nocover
impl = CHAR
def __init__(self, *args: Any, uuid_format: str = "hex", **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.uuid_format = uuid_format
def _cast_to_uuid(self, value: Union[str, int, bytes]) -> uuid.UUID:
if not isinstance(value, uuid.UUID):
if isinstance(value, bytes):
@ -28,7 +32,11 @@ class UUID(TypeDecorator): # pragma nocover
return ret_value
def load_dialect_impl(self, dialect: DefaultDialect) -> Any:
return dialect.type_descriptor(CHAR(32))
return (
dialect.type_descriptor(CHAR(36))
if self.uuid_format == "string"
else dialect.type_descriptor(CHAR(32))
)
def process_bind_param(
self, value: Union[str, int, bytes, uuid.UUID, None], dialect: DefaultDialect
@ -37,7 +45,7 @@ class UUID(TypeDecorator): # pragma nocover
return value
if not isinstance(value, uuid.UUID):
value = self._cast_to_uuid(value)
return "%.32x" % value.int
return str(value) if self.uuid_format == "string" else "%.32x" % value.int
def process_result_value(
self, value: Optional[str], dialect: DefaultDialect

View File

@ -73,7 +73,7 @@ def test_dumping_to_dict_no_exclusion(sample_data):
dict1 = item1.dict()
assert dict1["name"] == "Teddy Bear"
assert dict1["category"]["name"] == "Toys"
assert dict1["category"]["tier"]['name'] == "Tier I"
assert dict1["category"]["tier"]["name"] == "Tier I"
assert dict1["created_by"]["email"] == "test@test.com"
dict2 = item2.dict()
@ -114,26 +114,29 @@ def test_dumping_to_dict_exclude_nested_dict(sample_data):
dict1 = item2.dict(exclude={"category": {"tier": {"name"}}, "name": ...})
assert "name" not in dict1
assert "category" in dict1
assert dict1["category"]['name'] == 'Weapons'
assert dict1["category"]["name"] == "Weapons"
assert dict1["created_by"]["email"] == "test@test.com"
assert dict1["category"]["tier"].get('name') is None
assert dict1["category"]["tier"].get("name") is None
def test_dumping_to_dict_exclude_and_include_nested_dict(sample_data):
item1, item2 = sample_data
dict1 = item2.dict(exclude={"category": {"tier": {"name"}}},
include={'name', 'category'})
assert dict1.get('name') == 'M16'
dict1 = item2.dict(
exclude={"category": {"tier": {"name"}}}, include={"name", "category"}
)
assert dict1.get("name") == "M16"
assert "category" in dict1
assert dict1["category"]['name'] == 'Weapons'
assert dict1["category"]["name"] == "Weapons"
assert "created_by" not in dict1
assert dict1["category"]["tier"].get('name') is None
assert dict1["category"]["tier"].get("name") is None
dict2 = item1.dict(exclude={"id": ...},
include={'name': ..., 'category': {'name': ..., 'tier': {'id'}}})
assert dict2.get('name') == 'Teddy Bear'
assert dict2.get('id') is None # models not saved
assert dict2["category"]['name'] == 'Toys'
dict2 = item1.dict(
exclude={"id": ...},
include={"name": ..., "category": {"name": ..., "tier": {"id"}}},
)
assert dict2.get("name") == "Teddy Bear"
assert dict2.get("id") is None # models not saved
assert dict2["category"]["name"] == "Toys"
assert "created_by" not in dict1
assert dict1["category"]["tier"].get('name') is None
assert dict1["category"]["tier"]['id'] is None
assert dict1["category"]["tier"].get("name") is None
assert dict1["category"]["tier"]["id"] is None

View File

@ -66,9 +66,11 @@ class RandomModel(ormar.Model):
id: int = ormar.Integer(primary_key=True)
password: str = ormar.String(max_length=255, default=gen_pass)
first_name: str = ormar.String(max_length=255, default='John')
first_name: str = ormar.String(max_length=255, default="John")
last_name: str = ormar.String(max_length=255)
created_date: datetime.datetime = ormar.DateTime(server_default=sqlalchemy.func.now())
created_date: datetime.datetime = ormar.DateTime(
server_default=sqlalchemy.func.now()
)
class User(ormar.Model):
@ -115,7 +117,7 @@ async def create_user(user: User):
@app.post("/users2/", response_model=User)
async def create_user2(user: User):
user = await user.save()
return user.dict(exclude={'password'})
return user.dict(exclude={"password"})
@app.post("/users3/", response_model=UserBase)
@ -126,7 +128,7 @@ async def create_user3(user: User2):
@app.post("/users4/")
async def create_user4(user: User2):
user = await user.save()
return user.dict(exclude={'password'})
return user.dict(exclude={"password"})
@app.post("/random/", response_model=RandomModel)
@ -166,13 +168,23 @@ def test_all_endpoints():
# response has only 3 fields from UserBase
response = client.post("/users3/", json=user)
assert list(response.json().keys()) == ['email', 'first_name', 'last_name']
assert list(response.json().keys()) == ["email", "first_name", "last_name"]
response = client.post("/users4/", json=user)
assert list(response.json().keys()) == ['id', 'email', 'first_name', 'last_name', 'category']
assert list(response.json().keys()) == [
"id",
"email",
"first_name",
"last_name",
"category",
]
user3 = {
'last_name': 'Test'
}
user3 = {"last_name": "Test"}
response = client.post("/random/", json=user3)
assert list(response.json().keys()) == ['id', 'password', 'first_name', 'last_name', 'created_date']
assert list(response.json().keys()) == [
"id",
"password",
"first_name",
"last_name",
"created_date",
]

View File

@ -36,6 +36,16 @@ class UUIDSample(ormar.Model):
test_text: str = ormar.Text()
class UUIDSample2(ormar.Model):
class Meta:
tablename = "uuids2"
metadata = metadata
database = database
id: uuid.UUID = ormar.UUID(primary_key=True, default=uuid.uuid4, uuid_format='string')
test_text: str = ormar.Text()
class User(ormar.Model):
class Meta:
tablename = "users"
@ -145,6 +155,19 @@ async def test_uuid_column():
item = await UUIDSample.objects.filter(id=u1.id).get()
assert item.id == u1.id
item2 = await UUIDSample.objects.first()
item3 = await UUIDSample.objects.get(pk=item2.id)
assert item2.id == item3.id
assert isinstance(item3.id, uuid.UUID)
u3 = await UUIDSample2(**u1.dict()).save()
u1_2 = await UUIDSample.objects.get(pk=u3.id)
assert u1_2 == u1
u4 = await UUIDSample2.objects.get(pk=u3.id)
assert u3 == u4
@pytest.mark.asyncio
async def test_model_crud():