attribute access and setting for pydantic_model uned the hood
This commit is contained in:
BIN
orm/__pycache__/__init__.cpython-38.pyc
Normal file
BIN
orm/__pycache__/__init__.cpython-38.pyc
Normal file
Binary file not shown.
BIN
orm/__pycache__/exceptions.cpython-38.pyc
Normal file
BIN
orm/__pycache__/exceptions.cpython-38.pyc
Normal file
Binary file not shown.
BIN
orm/__pycache__/fields.cpython-38.pyc
Normal file
BIN
orm/__pycache__/fields.cpython-38.pyc
Normal file
Binary file not shown.
BIN
orm/__pycache__/models.cpython-38.pyc
Normal file
BIN
orm/__pycache__/models.cpython-38.pyc
Normal file
Binary file not shown.
6
orm/exceptions.py
Normal file
6
orm/exceptions.py
Normal file
@ -0,0 +1,6 @@
|
||||
class AsyncOrmException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ModelDefinitionError(AsyncOrmException):
|
||||
pass
|
||||
73
orm/fields.py
Normal file
73
orm/fields.py
Normal file
@ -0,0 +1,73 @@
|
||||
import sqlalchemy
|
||||
|
||||
from orm.exceptions import ModelDefinitionError
|
||||
|
||||
|
||||
class BaseField:
|
||||
__type__ = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
name = kwargs.pop('name', None)
|
||||
args = list(args)
|
||||
if args:
|
||||
if isinstance(args[0], str):
|
||||
if name is not None:
|
||||
raise ModelDefinitionError(
|
||||
'Column name cannot be passed positionally and as a keyword.'
|
||||
)
|
||||
name = args.pop(0)
|
||||
|
||||
self.name = name
|
||||
self.primary_key = kwargs.pop('primary_key', False)
|
||||
self.autoincrement = kwargs.pop('autoincrement', 'auto')
|
||||
|
||||
self.nullable = kwargs.pop('nullable', not self.primary_key)
|
||||
self.default = kwargs.pop('default', None)
|
||||
self.server_default = kwargs.pop('server_default', None)
|
||||
|
||||
self.index = kwargs.pop('index', None)
|
||||
self.unique = kwargs.pop('unique', None)
|
||||
|
||||
def get_column(self, name=None) -> sqlalchemy.Column:
|
||||
name = self.name or name
|
||||
constraints = self.get_constraints()
|
||||
return sqlalchemy.Column(
|
||||
name,
|
||||
self.get_column_type(),
|
||||
*constraints,
|
||||
primary_key=self.primary_key,
|
||||
autoincrement=self.autoincrement,
|
||||
nullable=self.nullable,
|
||||
index=self.index,
|
||||
unique=self.unique,
|
||||
default=self.default,
|
||||
server_default=self.server_default
|
||||
)
|
||||
|
||||
def get_column_type(self) -> sqlalchemy.types.TypeEngine:
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
def get_constraints(self):
|
||||
return []
|
||||
|
||||
|
||||
class String(BaseField):
|
||||
__type__ = str
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
assert 'length' in kwargs, 'length is required'
|
||||
self.length = kwargs.pop('length')
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_column_type(self):
|
||||
return sqlalchemy.String(self.length)
|
||||
|
||||
|
||||
class Integer(BaseField):
|
||||
__type__ = int
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_column_type(self):
|
||||
return sqlalchemy.Integer()
|
||||
69
orm/models.py
Normal file
69
orm/models.py
Normal file
@ -0,0 +1,69 @@
|
||||
from typing import Any
|
||||
|
||||
import sqlalchemy
|
||||
from pydantic import create_model
|
||||
|
||||
from orm.fields import BaseField
|
||||
|
||||
|
||||
class ModelMetaclass(type):
|
||||
def __new__(
|
||||
mcs: type, name: str, bases: Any, attrs: dict
|
||||
) -> type:
|
||||
new_model = super().__new__( # type: ignore
|
||||
mcs, name, bases, attrs
|
||||
)
|
||||
|
||||
if attrs.get("__abstract__"):
|
||||
return new_model
|
||||
|
||||
tablename = attrs["__tablename__"]
|
||||
metadata = attrs["__metadata__"]
|
||||
pkname = None
|
||||
|
||||
columns = []
|
||||
for field_name, field in new_model.__dict__.items():
|
||||
if isinstance(field, BaseField):
|
||||
if field.primary_key:
|
||||
pkname = field_name
|
||||
columns.append(field.get_column(field_name))
|
||||
|
||||
pydantic_fields = {field_name: (base_field.__type__, base_field.default or ...)
|
||||
for field_name, base_field in new_model.__dict__.items()
|
||||
if isinstance(base_field, BaseField)}
|
||||
|
||||
new_model.__table__ = sqlalchemy.Table(tablename, metadata, *columns)
|
||||
new_model.__columns__ = columns
|
||||
new_model.__pkname__ = pkname
|
||||
new_model.__pydantic_fields__ = pydantic_fields
|
||||
new_model.__pydantic_model__ = create_model(name, **pydantic_fields)
|
||||
new_model.__fields__ = new_model.__pydantic_model__.__fields__
|
||||
|
||||
return new_model
|
||||
|
||||
|
||||
class Model(metaclass=ModelMetaclass):
|
||||
__abstract__ = True
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if "pk" in kwargs:
|
||||
kwargs[self.__pkname__] = kwargs.pop("pk")
|
||||
self.values = self.__pydantic_model__(**kwargs)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key in self.__fields__:
|
||||
setattr(self.values, key, value)
|
||||
super().__setattr__(key, value)
|
||||
|
||||
def __getattribute__(self, item):
|
||||
if item != '__fields__' and item in self.__fields__:
|
||||
return getattr(self.values, item)
|
||||
return super().__getattribute__(item)
|
||||
|
||||
@property
|
||||
def pk(self):
|
||||
return getattr(self.values, self.__pkname__)
|
||||
|
||||
@pk.setter
|
||||
def pk(self, value):
|
||||
setattr(self.values, self.__pkname__, value)
|
||||
Reference in New Issue
Block a user