Extend sqlite3 database, item and link support

This commit is contained in:
Frank Matthieß 2024-09-15 22:49:23 +02:00
parent 60e2d3e044
commit 831b801696
10 changed files with 597 additions and 92 deletions

9
.env Normal file
View file

@ -0,0 +1,9 @@
PYTHONPATH=${PWD}/src/modules/
ITEMSDB_SQLITE3_DATABASEDIR=${PWD}/build/database
ITEMSDB_SQLITE3_FILENAME=itemsdb.db
ITEMSDB_PREFIX=test2_
ITEMSDB_LOGLEVEL=debug
ITEMSDB_DEBUG_MODULES=itemsdb.sqlite3

View file

@ -4,9 +4,10 @@ url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
pytest = "*"
[packages]
peewee = "*"
[requires]
python_version = "3.9"
python_version = "3.11"

View file

@ -8,3 +8,57 @@ dependency graph.
For further information see the [Items database
documentation](https://code.matthiess.it/collaboration/itemsdb-doc).
## Requirements
* Python3 >= 3.9
* python packages
* pip
* pipenv
* [See pipenv install inctructions](https://pipenv.pypa.io/en/latest/installation.html#make-sure-you-have-python-and-pip)
* sqlite3
* git
* Access to the project git repository
## Install Python requirements
* The required python packages will be installed with the `pipenv` command.
* Install this python package with your native os package installer:
```bash
apt-get install pipenv
```
* Change to the root of your git repository worktree and execute:
```bash
pipenv install
```
This will install all python package dependencies for running this
application.
* To install the development requirements you should execute this command:
```bash
pipenv install --dev
```
* To get an overview of the installed packages and theire depenmmdencies run
"`pipenv graph`"
```bash
pipenv graph
```
## Activate the pipenv environment
Change to the project git repository worktree and execute "`pipenv shell`" to
activate the previously installed python virtual environment.
```bash
pipenv shell
```
This is similar to execute "`. $venv_base_dir/bin/activate`".

View file

@ -1,7 +1,118 @@
#!/usr/bin/env python3
#
from enum import StrEnum, auto, verify, UNIQUE
from uuid import uuid4 as new_uuid
from .version import __version__
from .log import info, debug, error, pf
version = __version__
from .database import *
from .sqlite3 import *
@verify(UNIQUE)
class ItemsDBError(StrEnum):
PARAMETER_TYPE = auto()
ITEM_TYPE = auto()
LINK_TYPE = auto()
SCHEMA_TYPE = auto()
ITEM = auto()
LINK = auto()
SCHEMA = auto()
VERSION = auto()
class ItemsDBException(Exception):
def __init__(self, msg, error_code, **exc_args):
self.msg = msg
self.error_code = error_code
for arg_k, arg_v in exc_args.items():
if arg_k in ["msg", "error_code"]:
continue
self.__setattr__(arg_k, arg_v)
def __str__(self):
msg = ""
for n in self.__dir__():
if n in ["add_note", "args", "with_traceback"]:
continue
msg = f"{n}: {pf(self.__getattribute__(n))}\n"
return msg
class BaseObject():
def __init__(self, item_type, item_data, **item_args):
self.type = item_type
self.data = item_data
self.id = item_args.get("id", None)
self.version = item_args.get("version", None)
def __new_id__(self):
try:
item_id = self.id
if not item_id:
self.id = new_uuid()
except AttributeError:
self.id = new_uuid()
def __new_version__(self):
try:
item_version = self.version
if not item_version:
self.version = new_uuid()
except AttributeError:
self.version = new_uuid()
class Item(BaseObject):
pass
class Link(BaseObject):
pass
class BaseSchema():
def __init_(self, name, schema):
e_msg = ""
if not isinstance(name, str):
e_msg += (
f"Schema name must be from type 'str' but is '{type(name)}'"
)
if not isinstance(schema, dict):
e_msg += (
f"Schema data must be from type 'dict' but is '{type(schema)}'"
)
if e_msg:
raise ItemsDBException(e_msg, ItemsDBError.PARAMETER_TYPE))
self.name = name
self.schema = schema
class ItemSchema(BaseSchema):
pass
class LinkSchema(BaseSchema):
pass
__all__ = [
"DBError",
"DBException",
"DBSqlite3",
"version",
"Item",
"Link",
"ItemSchema",
"LinkSchema",
]

View file

@ -2,47 +2,119 @@
#
import os
from enum import StrEnum, auto, verify, UNIQUE
from .version import __version__
from .log import info, debug, error
class ItemsdbDatabaseException(Exception):
pass
@verify(UNIQUE)
class DBError(StrEnum):
CONNECTION = auto()
CREATION = auto()
CURSOR = auto()
FILE_EXIST = auto()
NOTIMPLEMENTED = auto()
PARAMETER_NEEDED = auto()
PARAMETER_TYPE = auto()
TYPE_NOTIMPLEMENTED = auto()
TYPE_UNKNOWN = auto()
UNKNOWN = auto()
database_type = os.getenv("ITEMSDB_DATABASE_TYPE", "sqlite3")
class DBException(Exception):
def __init__(self, msg="", error_code=None, db=None, con=None, cur=None):
self.msg = msg
self.db = db
self.con = con
self.cur = cur
self.error_code = error_code
database_types = [
"sqlite3",
"postgresql",
def __str__(self):
msg = f"msg : {self.msg}\n" if self.msg else ""
msg += f"error: {self.error_code}\n" if self.error_code else ""
msg += f"db : {str(self.db)}\n" if self.db else ""
msg += f"con : {str(self.con)}\n" if self.con else ""
msg += f"cur : {str(self.cur)}\n" if self.cur else ""
return msg
class DBBase:
db_types = [
"sqlite3",
"postgresql",
]
def __init__(self, db_type, parameter=None):
if db_type not in self.db_types:
raise DBException(
msg=(
f"Database type '{db_type}' unknown. Use one of: "
f"{', '.join(self.db_types)}"
),
error_code=DBError("type_unknown"),
)
elif db_type == "postgresql":
raise DBException(
msg=f"Database type '{db_type}' not implemented yet",
error_code=DBError("type_notimplemented"),
)
self.type = db_type
self.parameter = parameter
def __throw_db_exception__(
self,
msg="Not implemented by database driver",
error_code=DBError.NOTIMPLEMENTED,
):
raise DBException(
msg=msg,
error_code=error_code,
)
def connect(self):
self.__throw_db_exception__()
def createDatabase(self):
self.__throw_db_exception__()
def createTables(self):
self.__throw_db_exception__()
def createItem(self):
self.__throw_db_exception__()
def createLink(self):
self.__throw_db_exception__()
def createItemSchema(self):
self.__throw_db_exception__()
def createLinkSchema(self):
self.__throw_db_exception__()
def getItem(self, item):
self.__throw_db_exception__()
def insertItem(self, item):
self.__throw_db_exception__()
def updateItem(self, item):
self.__throw_db_exception__()
def insertLink(self, link_type, fromItem, toItem):
self.__throw_db_exception__()
def updateLink(self, link_type, fromItem, toItem):
self.__throw_db_exception__()
def __str__(self):
return f"Not implemented by database driver: {id(self)}"
__all__ = [
"DBError",
"DBException",
"DBBase",
]
if database_type not in database_types:
e_msg = f"Database type '' unknown. Use one of: {', '.join(database_types)}"
error(e_msg)
raise ItemsdbDatabaseException(e_msg)
debug(f"Use database type '{database_type}'")
if database_type == "sqlite3":
try:
from itemsdb.sqlite3 import *
debug(f"itemsdb database extension '{database_type}' loaded")
except ModuleNotFoundError as i_e:
e_msg = f"Fail to load itemsdb.sqlite3 extension module"
error(e_msg)
raise ItemsdbDatabaseException(e_msg)
elif database_type == "postgresql":
e_msg = f"Database extension module '{database_type}' not implemented yet."
error(e_msg)
raise ItemsdbDatabaseException(e_msg)
else:
e_msg = (
f"No database extension module loaded. Possibly '{database_type}' is "
"not implemented yet."
)
error(e_msg)
raise ItemsdbDatabaseException(e_msg)

View file

@ -4,10 +4,23 @@
from .version import __version__
import sys
from os.path import basename
from inspect import (
getouterframes,
currentframe,
isfunction,
ismethod,
isclass,
ismodule,
)
from pprint import pformat as pf
def __print__(msg, file=sys.stderr):
print(msg, file=file)
fn, ln = __getCallerData__()
debug_prefix = f"{fn}:{ln}: "
print(f"{debug_prefix}{msg}", file=file)
def info(msg, file=sys.stderr):
@ -24,3 +37,28 @@ def error(msg, file=sys.stderr):
def debug(msg, file=sys.stderr):
__print__(f"DEBUG: {msg}", file=file)
def __getCallerData__(level=4):
# return:
#
# FrameInfo(frame, filename, lineno, function, code_context, index)
# https://docs.python.org/3/library/inspect.html#inspect.FrameInfo
#
outer_frames = getouterframes(currentframe())
len_outer_frames = len(outer_frames)
if len_outer_frames < level:
outer_frame = outer_frames[-1:]
else:
outer_frame = outer_frames[level - 1]
(
frame,
filename,
lineno,
function,
_,
_,
) = outer_frame
return basename(filename), lineno

View file

@ -1,12 +1,225 @@
#!/usr/bin/env python3
#
import os
import os.path
from ..version import __version__
from ..log import info, debug, error, pf
from ..database import DBError, DBException, DBBase
from playhouse.sqlite_ext import SqliteExtDatabase
from .models import *
from .functions import *
# from .functions import *
__all__ = [
"DBSqlite3",
"DBError",
"DBException",
]
class ItemsdbException(Exception):
pass
class DBSqlite3(DBBase):
type = "sqlite3"
def __init__(self, parameter=None):
debug(f"type: '{self.type}', parameter={pf(parameter)}")
super().__init__(self.type, parameter)
if not parameter:
self.pragmas = self.__pragma_parameter__({})
elif not isinstance(parameter, dict):
e_msg = (
"Database parameters mut be from type 'dict' but is "
f"'{type(parameter)}'"
)
raise DBException(e_msg, DBError("parameter_type"))
else:
for pk, pv in parameter.items():
self.__setattr__(pk, pv)
self.pragmas = self.__pragma_parameter__(
parameter.get("pragmas", {})
)
def __pragma_parameter__(self, pragmas):
# See:http://docs.peewee-orm.com/en/latest/peewee/sqlite_ext.html#getting-started
db_pragmas = dict(
cache_size=-1024 * 64, journal_mode="wal", foreign_keys=1
)
for pk, pv in pragmas.items():
db_pragmas[pk] = pv
return [(k, v) for k, v in db_pragmas.items()]
def __str__(self):
msg = f"Database type: {self.type}\n"
msg += (
f" parameter : {pf(self.parameter)}\n"
if self.parameter
else ""
)
msg += (
f" pragmas : {pf(self.pragmas)}\n"
if self.pragmas
else ""
)
return msg
def __try_database_init(self):
filename = self.parameter["filename"]
try:
self.db = SqliteExtDatabase(filename, pragmas=self.pragmas)
return True, f"Database '{filename}' initialzed"
except Exception as e:
raise DBException(e, DBError.CREATION)
def __try_connect__(self):
if self.db.is_closed():
try:
self.db.connect()
debug("Open")
except Exception as e:
raise DBException(e, DBError.CONNECTION)
def __isInitialized(self):
try:
_ = self.db
return True
except AttributeError:
return False
def __get_prefix__(self):
if "prefix" not in self.parameter:
return ""
return self.parameter["prefix"]
def create(self, filename=None):
if filename:
debug(f"Create database file '{filename}'")
self.parameter.update(filename=filename)
elif not self.parameter["filename"]:
e_msg = f"Sqlite3 database filename not given"
debug(e_msg)
raise DBException(e_msg, DBError.PARAMETER_NEEDED)
debug(f"{self.parameter['filename']=}")
debug(f"{os.path.exists(self.parameter['filename'])=}")
if os.path.exists(self.parameter["filename"]):
e_msg = (
"Sqlite3 database file already exist: "
f"'{self.parameter['filename']}'"
)
debug(e_msg)
return False, e_msg
debug("If not exist, create database directory")
os.makedirs(os.path.dirname(self.parameter["filename"]), exist_ok=True)
try:
self.db = SqliteExtDatabase(
self.parameter["filename"], pragmas=self.pragmas
)
# Create the sqlite3 database file by open/connect and close it.
self.db.connect()
self.connection = self.db.connection()
except Exception as e:
raise DBException(e, DBError.CREATION)
return True, f"Sqlite3 database '{self.parameter['filename']}' created"
def open(self):
db_filename = self.parameter["filename"]
debug(f"Try to open database '{db_filename}'")
if not self.__isInitialized():
debug("Database not initialized")
self.__try_database_init()
debug("Try to connect to database")
self.db.connect(reuse_if_open=True)
self.connection = self.db.connection()
return True, f"Sqlite3 database '{db_filename}' open"
def close(self):
if self.db.is_closed():
return True
else:
try:
self.db.close()
self.connection = None
return True
except Exception as db_e:
return False, str(db_e)
def createTables(self, models=itemsdb_models):
self.__try_connect__()
debug(f"Create {len(models)} tables in database")
with self.db:
try:
prefix = self.__get_prefix__()
if prefix:
debug(f"Use database table prefix: '{prefix}'")
for model in models:
table_name = f"{prefix}{model.__name__}"
debug(
f"Set table name for '{model.__name__}' to "
f"'{table_name}'"
)
model._meta.table_name = table_name
debug(
f"Assign the model '{model.__name__}' to the "
"database."
)
model._meta.database = self.db
debug("Try to create database tables")
tables_not_exist = [
model for model in models if not self.tableExist(model)
]
if tables_not_exist:
table_names_not_exist = ", ".join(
[model._meta.table_name for model in tables_not_exist]
)
debug("Create database tables: " f"{table_names_not_exist}")
self.db.create_tables(tables_not_exist)
else:
debug("Database tables already exists")
self.models = models
return True, "Sqlite3 database tables created"
except Exception as ct_e:
raise DBException(ct_e, DBError.CREATION)
def tableExist(self, model):
sql_query = (
"select tbl_name from sqlite_schema where type = 'table' "
"and tbl_name = ?"
)
table_name = model._meta.table_name
query_result = model.raw(sql_query, table_name)
query_result_len = len(query_result)
if query_result_len > 0:
debug(f"Database table exist: '{table_name}'")
return True
else:
debug(f"Database table does not exist: '{table_name}'")
return False
def getItem(self, item):
return item
def insertItem(self, item):
pass
def updateItem(self, item):
pass
def insertLink(self, link_type, fromItem, toItem):
pass
def updateLink(self, link_type, fromItem, toItem):
pass

View file

@ -13,6 +13,7 @@ from peewee import (
TextField,
BinaryUUIDField,
ForeignKeyField,
IntegerField,
)
@ -26,6 +27,14 @@ class BaseModelTypes(BaseModel):
schema = TextField()
class Version(BaseModel):
class Meta:
table_name = "itemsdb"
version = IntegerField(primary_key=True)
state = TextField(null=False)
class link_types(BaseModelTypes):
class Meta:
table_name = "link_types"
@ -45,6 +54,7 @@ class items(BaseModel):
table_name = "items"
id = BinaryUUIDField(primary_key=True)
version = BinaryUUIDField(null=False)
type = ForeignKeyField(item_types)
data = TextField(null=False, default={})
@ -54,9 +64,19 @@ class links(BaseModel):
table_name = "links"
id = BinaryUUIDField(primary_key=True)
version = BinaryUUIDField(null=False)
type = ForeignKeyField(link_types)
from_item = ForeignKeyField(items)
to_item = ForeignKeyField(items)
itemsdb_models = [link_types, links, item_types, items]
__all = [
"Version",
"link_types",
"item_types",
"links",
"items",
"itemsdb_models",
]

39
test/create-database.py Normal file
View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
#
from itemsdb.sqlite3 import *
from itemsdb.log import *
import os.path
dbdir = os.getenv("ITEMSDB_SQLITE3_DATABASEDIR", ".")
dbname = os.getenv("ITEMSDB_SQLITE3_FILENAME", "itemsdb.db")
dbprefix = os.getenv("ITEMSDB_PREFIX", "")
dbfilename = os.path.join(dbdir, f"{dbprefix}{dbname}")
parameter = dict(filename=dbfilename, prefix=dbprefix)
db = DBSqlite3(parameter=parameter)
debug(f"Database: '{type(db)}'")
try:
db_created, msg = db.create()
debug(f"Database after create(): '{type(db)}'")
if db_created:
print(f"Database '{dbfilename}' created")
else:
print(msg)
open, open_msg = db.open()
print(open_msg)
debug(f"Database after open(): '{type(db)}'")
tables_created, msg = db.createTables()
if tables_created:
print(f"{msg}: {', '.join([model.__name__ for model in db.models])}")
else:
print(msg)
except DBException as db_e:
print(db_e)

View file

@ -59,55 +59,3 @@ success, item_type, e_msg = createItemType(db, item_name, item_schema)
if not success:
error(e_msg)
sys.exit(3)
# DATABASEDIR = getenv("ITEMSDB_DATABASEDIR", "./build")
# database_filename = f"{DATABASEDIR}/itemsdb_test.db"
# makedirs(DATABASEDIR, exist_ok=True)
# db = SqliteDatabase(database_filename)
# class BaseModel(Model):
# class Meta:
# database = db
# class item_types(BaseModel):
# name = TextField(unique=True, null=False)
# schema = TextField()
# class items(BaseModel):
# id = BinaryUUIDField(primary_key=True)
# type = ForeignKeyField(item_types)
# data = TextField(null=False, default={})
# print("Open db, do the work and close it again")
# db.connect()
# db.create_tables([item_types, items])
# type_server = item_types.select().where(item_types.name == "Server")
# if type_server.count() == 0:
# it_server = item_types()
# it_server.name = "Server"
# it_server.schema = {}
# it_server.save()
# try:
# it_n = "Server1"
# it_s = item_types.get(item_types.name == it_n)
# i_server = items.create(
# id=uuid(),
# type=it_s,
# data={},
# )
# except item_types.DoesNotExist:
# print(f"item type '{it_n}' does not exist.")
# db.close()