Add intial database support and tests

This commit is contained in:
Frank Matthieß 2024-08-31 23:13:14 +02:00
parent b40984d8b0
commit 60e2d3e044
8 changed files with 504 additions and 0 deletions

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
#
from .version import __version__
from .database import *

View file

@ -0,0 +1,48 @@
#!/usr/bin/env python3
#
import os
from .version import __version__
from .log import info, debug, error
class ItemsdbDatabaseException(Exception):
pass
database_type = os.getenv("ITEMSDB_DATABASE_TYPE", "sqlite3")
database_types = [
"sqlite3",
"postgresql",
]
if database_type not in database_types:
e_msg = f"Database type '' unknown. Use one of: {', '.join(database_types)}"
error(e_msg)
raise ItemsdbDatabaseException(e_msg)
debug(f"Use database type '{database_type}'")
if database_type == "sqlite3":
try:
from itemsdb.sqlite3 import *
debug(f"itemsdb database extension '{database_type}' loaded")
except ModuleNotFoundError as i_e:
e_msg = f"Fail to load itemsdb.sqlite3 extension module"
error(e_msg)
raise ItemsdbDatabaseException(e_msg)
elif database_type == "postgresql":
e_msg = f"Database extension module '{database_type}' not implemented yet."
error(e_msg)
raise ItemsdbDatabaseException(e_msg)
else:
e_msg = (
f"No database extension module loaded. Possibly '{database_type}' is "
"not implemented yet."
)
error(e_msg)
raise ItemsdbDatabaseException(e_msg)

View file

@ -0,0 +1,26 @@
#!/usr/bin/env python3
#
from .version import __version__
import sys
def __print__(msg, file=sys.stderr):
print(msg, file=file)
def info(msg, file=sys.stderr):
__print__(f"INFO: {msg}", file=file)
def warn(msg, file=sys.stderr):
__print__(f"WARNING: {msg}", file=file)
def error(msg, file=sys.stderr):
__print__(f"ERROR: {msg}", file=file)
def debug(msg, file=sys.stderr):
__print__(f"DEBUG: {msg}", file=file)

View file

@ -0,0 +1,12 @@
#!/usr/bin/env python3
#
from ..version import __version__
from .models import *
from .functions import *
class ItemsdbException(Exception):
pass

View file

@ -0,0 +1,232 @@
#!/usr/bin/env python3
#
import os
import os.path
from ..version import __version__
from ..log import (
info,
warn,
error,
debug,
)
from .models import (
item_types,
items,
link_types,
links,
itemsdb_models,
)
from peewee import (
ModelBase,
SqliteDatabase,
IntegrityError,
)
DATABASE_TYPE = "sqlite3"
def __check_model_type__(model):
if not isinstance(model, ModelBase):
e_msg = (
f"Wrong parameter type: '{type(model)}'. This should be derived "
"from peewee.Model"
)
error(e_msg)
raise Exception(e_msg)
def getTablename(model):
__check_model_type__(model)
return model._meta.table_name
def setTablename(model, table_name):
__check_model_type__(model)
debug(f"Set table name for '{model.__name__}' to '{table_name}'")
try:
model._meta.table_name = table_name
return True, table_name, ""
except Exception as e:
e_msg = (
f"Fail to set table name '{table_name}' for model "
f"'{model.__name__}': {e}"
)
error(e_msg)
return False, model._meta.table_name, e_msg
def createDatabase(*args, **kwargs):
debug(f"Create database from type '{DATABASE_TYPE}'")
if (len(args) == 0 and "filename" not in kwargs) or (
len(args) > 0 and not args[0]
):
e_msg = (
"You need to set the 'filename' parameter to create the sqlite3 "
"database"
)
error(e_msg)
return False, None, e_msg
if len(args) > 0 and args[0]:
debug("Use first parameter as filename")
filename = args[0]
else:
debug("Use keyword parameter 'filename' as filename")
filename = kwargs.get("filename")
if not filename:
e_msg = "'filename' parameter MUST NOT be empty"
error(e_msg)
return False, None, e_msg
if os.path.exists(filename):
debug(f"Sqlite3 database file '{filename}' already exist")
if not ("force" in kwargs and kwargs["force"] is True):
e_msg = (
f"Sqlite3 database file '{filename}' already exist and you "
"have not set 'force=True'. The database file will not be "
"recreated."
)
return False, None, e_msg
os.makedirs(os.path.dirname(filename), exist_ok=True)
pragmas = kwargs.get("pragmas", None)
try:
if pragmas:
if isinstance(pragmas, dict):
db = SqliteDatabase(filename, pragmas=pragmas)
success = True
else:
db = None
msg = f"Parameter 'pragmas' must be from type 'dict' but is: {pragmas}"
debug(msg)
success = False
else:
db = SqliteDatabase(filename)
success = True
if success:
db.connect()
db.close()
pragma_msg = f"{' with pragmas: {pragmas}' if pragmas else ''}"
msg = f"Sqlite3 database '{filename}'{pragma_msg} created"
debug(msg)
except Exception as c_e:
msg = f"Sqlite3 database('{filename}') creation failed: {c_e}"
debug(msg)
success = False
return success, db, msg
def createTables(db, table_models, prefix=""):
with db:
e_msg = ""
if prefix:
prefix_result = [
setTablename(m, f"{prefix}_{getTablename(m)}")
for m in table_models
if not getTablename(m).startswith(f"{prefix}_")
]
if False in [r[0] for r in prefix_result]:
prefix_errors = [
f"{r[1]}: {r[2]}" for r in prefix_result if r[0] is False
]
newline = "\n"
newline_tab = "\n\t"
e_msg = (
f"Fail to set table names:${newline_tab}"
f"{newline_tab.join(prefix_errors)}{newline}"
)
error(e_msg)
return False, db, e_msg
e_msg = "Setting table names successful"
debug(
"Database table names has created: "
f"{', '.join([m._meta.table_name for m in table_models])}"
)
try:
for m in table_models:
debug(f"Set active database for model '{m._meta.name}'")
m._meta.database = db
db.create_tables(table_models)
e_msg = "Creating tables successful "
except Exception as ct_e:
e_msg = f"Fail to create database tables: {ct_e}"
debug(e_msg)
return False, db, e_msg
return True, db, e_msg
def createItemType(db, name, schema):
if not isinstance(db, SqliteDatabase):
msg = (
"Parameter 'db' must be from type 'SqliteDatabase' but is "
f"'{type(db)}'"
)
return False, None, msg
elif not isinstance(name, str):
msg = f"Parameter 'name' must be from type 'str' but is '{type(name)}'"
return False, None, msg
elif not isinstance(schema, dict):
msg = (
"Parameter 'schema' must be from type 'dict' but is "
f"'{type(schema)}'"
)
return False, None, msg
item_type = None
with db:
try:
item_type = item_types.create(name=name, schema=schema)
msg = f"Item type '{name}' created"
success = True
except IntegrityError as i_e:
msg = f"Item type '{name}' already exist: {i_e}"
debug(msg)
success = False
except Exception as e:
msg = f"Fail to create item type '{name}': {e}"
debug(msg)
success = False
return success, item_type, msg
def removeItemType(dn, name):
if not isinstance(db, SqliteDatabase):
msg = (
"Parameter 'db' must be from type 'SqliteDatabase' but is "
f"'{type(db)}'"
)
return False, None, msg
elif not isinstance(name, str):
msg = f"Parameter 'name' must be from type 'str' but is '{type(name)}'"
return False, None, msg
with db:
try:
item_type = item_types.get(item_types.name == name)
item_type.delete_instance()
success = True
except item_types.DoesNotExist:
msg = f"Item type '{name}' does not exist"
debug(msg)
success = False
except Exception as e:
msg = f"Fail to remove item type '{name}': {e}"
debug(msg)
success = False
return success, name, msg

View file

@ -0,0 +1,62 @@
#!/usr/bin/env python3
#
import inspect
from ..version import __version__
from ..log import info, warn, error, debug
from peewee import (
Model,
TextField,
BinaryUUIDField,
ForeignKeyField,
)
class BaseModel(Model):
class Meta:
database = None
class BaseModelTypes(BaseModel):
name = TextField(unique=True, null=False)
schema = TextField()
class link_types(BaseModelTypes):
class Meta:
table_name = "link_types"
pass
class item_types(BaseModelTypes):
class Meta:
table_name = "item_types"
pass
class items(BaseModel):
class Meta:
table_name = "items"
id = BinaryUUIDField(primary_key=True)
type = ForeignKeyField(item_types)
data = TextField(null=False, default={})
class links(BaseModel):
class Meta:
table_name = "links"
id = BinaryUUIDField(primary_key=True)
type = ForeignKeyField(link_types)
from_item = ForeignKeyField(items)
to_item = ForeignKeyField(items)
itemsdb_models = [link_types, links, item_types, items]

View file

@ -0,0 +1,4 @@
#!/usr/bin/env python3
#
__version__ = "0.0.1"

113
test/init_sqlite3.py Executable file
View file

@ -0,0 +1,113 @@
#!/usr/bin/env python3
#
import sys
import os
import os.path
print(os.path.abspath(os.curdir))
print(__file__)
module_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../src/modules")
)
print(f"Add python module path: '{module_path}'")
sys.path.insert(0, module_path)
try:
from itemsdb.log import info, warn, error, debug
except Exception as i_e:
e_msg = f"Fail to import itemsdb.log: {i_e}"
print(e_msg, file=sys.stderr)
raise Exception(e_msg)
from itemsdb import *
database_filename = os.getenv("ITEMSDB_DATABASE_NAME", "itemsdb_test.db")
database_path_module = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../build", database_filename)
)
database_path = os.getenv("ITEMSDB_DATABASE_PATH", database_path_module)
database_table_prefix = os.getenv("ITEMSDB_DATABASE_TABLE_PREFIX", "test")
database_pragmas = dict(
journal_mode="wal", foreign_keys=1, ignore_check_constraints=0
)
success, db, e_msg = createDatabase(
database_path, force=True, pragmas=database_pragmas
)
if not success:
error(e_msg)
sys.exit(1)
success, db, e_msg = createTables(
db, itemsdb_models, prefix=database_table_prefix
)
if not success:
error(e_msg)
sys.exit(2)
item_name = "server"
item_schema = dict(name=item_name, properties=[1, 2, 3, 4, 5])
success, item_type, e_msg = createItemType(db, item_name, item_schema)
if not success:
error(e_msg)
sys.exit(3)
# DATABASEDIR = getenv("ITEMSDB_DATABASEDIR", "./build")
# database_filename = f"{DATABASEDIR}/itemsdb_test.db"
# makedirs(DATABASEDIR, exist_ok=True)
# db = SqliteDatabase(database_filename)
# class BaseModel(Model):
# class Meta:
# database = db
# class item_types(BaseModel):
# name = TextField(unique=True, null=False)
# schema = TextField()
# class items(BaseModel):
# id = BinaryUUIDField(primary_key=True)
# type = ForeignKeyField(item_types)
# data = TextField(null=False, default={})
# print("Open db, do the work and close it again")
# db.connect()
# db.create_tables([item_types, items])
# type_server = item_types.select().where(item_types.name == "Server")
# if type_server.count() == 0:
# it_server = item_types()
# it_server.name = "Server"
# it_server.schema = {}
# it_server.save()
# try:
# it_n = "Server1"
# it_s = item_types.get(item_types.name == it_n)
# i_server = items.create(
# id=uuid(),
# type=it_s,
# data={},
# )
# except item_types.DoesNotExist:
# print(f"item type '{it_n}' does not exist.")
# db.close()