Compare commits

...

4 commits

Author SHA1 Message Date
Frank Matthieß
831b801696 Extend sqlite3 database, item and link support 2024-09-15 22:49:23 +02:00
Frank Matthieß
60e2d3e044 Add intial database support and tests 2024-08-31 23:13:14 +02:00
Frank Matthieß
b40984d8b0 Add .gitignore 2024-08-31 23:12:01 +02:00
Frank Matthieß
5bc5b810e0 Ad pipenv Pipfile 2024-08-31 23:09:31 +02:00
13 changed files with 1025 additions and 0 deletions

9
.env Normal file
View file

@ -0,0 +1,9 @@
PYTHONPATH=${PWD}/src/modules/
ITEMSDB_SQLITE3_DATABASEDIR=${PWD}/build/database
ITEMSDB_SQLITE3_FILENAME=itemsdb.db
ITEMSDB_PREFIX=test2_
ITEMSDB_LOGLEVEL=debug
ITEMSDB_DEBUG_MODULES=itemsdb.sqlite3

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
#
build/
.venv/
**/__pycache__/

13
Pipfile Normal file
View file

@ -0,0 +1,13 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
pytest = "*"
[packages]
peewee = "*"
[requires]
python_version = "3.11"

View file

@ -8,3 +8,57 @@ dependency graph.
For further information see the [Items database
documentation](https://code.matthiess.it/collaboration/itemsdb-doc).
## Requirements
* Python3 >= 3.9
* python packages
* pip
* pipenv
* [See pipenv install inctructions](https://pipenv.pypa.io/en/latest/installation.html#make-sure-you-have-python-and-pip)
* sqlite3
* git
* Access to the project git repository
## Install Python requirements
* The required python packages will be installed with the `pipenv` command.
* Install this python package with your native os package installer:
```bash
apt-get install pipenv
```
* Change to the root of your git repository worktree and execute:
```bash
pipenv install
```
This will install all python package dependencies for running this
application.
* To install the development requirements you should execute this command:
```bash
pipenv install --dev
```
* To get an overview of the installed packages and theire depenmmdencies run
"`pipenv graph`"
```bash
pipenv graph
```
## Activate the pipenv environment
Change to the project git repository worktree and execute "`pipenv shell`" to
activate the previously installed python virtual environment.
```bash
pipenv shell
```
This is similar to execute "`. $venv_base_dir/bin/activate`".

View file

@ -0,0 +1,118 @@
#!/usr/bin/env python3
#
from enum import StrEnum, auto, verify, UNIQUE
from uuid import uuid4 as new_uuid
from .version import __version__
from .log import info, debug, error, pf
version = __version__
from .database import *
from .sqlite3 import *
@verify(UNIQUE)
class ItemsDBError(StrEnum):
PARAMETER_TYPE = auto()
ITEM_TYPE = auto()
LINK_TYPE = auto()
SCHEMA_TYPE = auto()
ITEM = auto()
LINK = auto()
SCHEMA = auto()
VERSION = auto()
class ItemsDBException(Exception):
def __init__(self, msg, error_code, **exc_args):
self.msg = msg
self.error_code = error_code
for arg_k, arg_v in exc_args.items():
if arg_k in ["msg", "error_code"]:
continue
self.__setattr__(arg_k, arg_v)
def __str__(self):
msg = ""
for n in self.__dir__():
if n in ["add_note", "args", "with_traceback"]:
continue
msg = f"{n}: {pf(self.__getattribute__(n))}\n"
return msg
class BaseObject():
def __init__(self, item_type, item_data, **item_args):
self.type = item_type
self.data = item_data
self.id = item_args.get("id", None)
self.version = item_args.get("version", None)
def __new_id__(self):
try:
item_id = self.id
if not item_id:
self.id = new_uuid()
except AttributeError:
self.id = new_uuid()
def __new_version__(self):
try:
item_version = self.version
if not item_version:
self.version = new_uuid()
except AttributeError:
self.version = new_uuid()
class Item(BaseObject):
pass
class Link(BaseObject):
pass
class BaseSchema():
def __init_(self, name, schema):
e_msg = ""
if not isinstance(name, str):
e_msg += (
f"Schema name must be from type 'str' but is '{type(name)}'"
)
if not isinstance(schema, dict):
e_msg += (
f"Schema data must be from type 'dict' but is '{type(schema)}'"
)
if e_msg:
raise ItemsDBException(e_msg, ItemsDBError.PARAMETER_TYPE))
self.name = name
self.schema = schema
class ItemSchema(BaseSchema):
pass
class LinkSchema(BaseSchema):
pass
__all__ = [
"DBError",
"DBException",
"DBSqlite3",
"version",
"Item",
"Link",
"ItemSchema",
"LinkSchema",
]

View file

@ -0,0 +1,120 @@
#!/usr/bin/env python3
#
from enum import StrEnum, auto, verify, UNIQUE
from .version import __version__
from .log import info, debug, error
@verify(UNIQUE)
class DBError(StrEnum):
CONNECTION = auto()
CREATION = auto()
CURSOR = auto()
FILE_EXIST = auto()
NOTIMPLEMENTED = auto()
PARAMETER_NEEDED = auto()
PARAMETER_TYPE = auto()
TYPE_NOTIMPLEMENTED = auto()
TYPE_UNKNOWN = auto()
UNKNOWN = auto()
class DBException(Exception):
def __init__(self, msg="", error_code=None, db=None, con=None, cur=None):
self.msg = msg
self.db = db
self.con = con
self.cur = cur
self.error_code = error_code
def __str__(self):
msg = f"msg : {self.msg}\n" if self.msg else ""
msg += f"error: {self.error_code}\n" if self.error_code else ""
msg += f"db : {str(self.db)}\n" if self.db else ""
msg += f"con : {str(self.con)}\n" if self.con else ""
msg += f"cur : {str(self.cur)}\n" if self.cur else ""
return msg
class DBBase:
db_types = [
"sqlite3",
"postgresql",
]
def __init__(self, db_type, parameter=None):
if db_type not in self.db_types:
raise DBException(
msg=(
f"Database type '{db_type}' unknown. Use one of: "
f"{', '.join(self.db_types)}"
),
error_code=DBError("type_unknown"),
)
elif db_type == "postgresql":
raise DBException(
msg=f"Database type '{db_type}' not implemented yet",
error_code=DBError("type_notimplemented"),
)
self.type = db_type
self.parameter = parameter
def __throw_db_exception__(
self,
msg="Not implemented by database driver",
error_code=DBError.NOTIMPLEMENTED,
):
raise DBException(
msg=msg,
error_code=error_code,
)
def connect(self):
self.__throw_db_exception__()
def createDatabase(self):
self.__throw_db_exception__()
def createTables(self):
self.__throw_db_exception__()
def createItem(self):
self.__throw_db_exception__()
def createLink(self):
self.__throw_db_exception__()
def createItemSchema(self):
self.__throw_db_exception__()
def createLinkSchema(self):
self.__throw_db_exception__()
def getItem(self, item):
self.__throw_db_exception__()
def insertItem(self, item):
self.__throw_db_exception__()
def updateItem(self, item):
self.__throw_db_exception__()
def insertLink(self, link_type, fromItem, toItem):
self.__throw_db_exception__()
def updateLink(self, link_type, fromItem, toItem):
self.__throw_db_exception__()
def __str__(self):
return f"Not implemented by database driver: {id(self)}"
__all__ = [
"DBError",
"DBException",
"DBBase",
]

View file

@ -0,0 +1,64 @@
#!/usr/bin/env python3
#
from .version import __version__
import sys
from os.path import basename
from inspect import (
getouterframes,
currentframe,
isfunction,
ismethod,
isclass,
ismodule,
)
from pprint import pformat as pf
def __print__(msg, file=sys.stderr):
fn, ln = __getCallerData__()
debug_prefix = f"{fn}:{ln}: "
print(f"{debug_prefix}{msg}", file=file)
def info(msg, file=sys.stderr):
__print__(f"INFO: {msg}", file=file)
def warn(msg, file=sys.stderr):
__print__(f"WARNING: {msg}", file=file)
def error(msg, file=sys.stderr):
__print__(f"ERROR: {msg}", file=file)
def debug(msg, file=sys.stderr):
__print__(f"DEBUG: {msg}", file=file)
def __getCallerData__(level=4):
# return:
#
# FrameInfo(frame, filename, lineno, function, code_context, index)
# https://docs.python.org/3/library/inspect.html#inspect.FrameInfo
#
outer_frames = getouterframes(currentframe())
len_outer_frames = len(outer_frames)
if len_outer_frames < level:
outer_frame = outer_frames[-1:]
else:
outer_frame = outer_frames[level - 1]
(
frame,
filename,
lineno,
function,
_,
_,
) = outer_frame
return basename(filename), lineno

View file

@ -0,0 +1,225 @@
#!/usr/bin/env python3
#
import os
import os.path
from ..version import __version__
from ..log import info, debug, error, pf
from ..database import DBError, DBException, DBBase
from playhouse.sqlite_ext import SqliteExtDatabase
from .models import *
# from .functions import *
__all__ = [
"DBSqlite3",
"DBError",
"DBException",
]
class DBSqlite3(DBBase):
type = "sqlite3"
def __init__(self, parameter=None):
debug(f"type: '{self.type}', parameter={pf(parameter)}")
super().__init__(self.type, parameter)
if not parameter:
self.pragmas = self.__pragma_parameter__({})
elif not isinstance(parameter, dict):
e_msg = (
"Database parameters mut be from type 'dict' but is "
f"'{type(parameter)}'"
)
raise DBException(e_msg, DBError("parameter_type"))
else:
for pk, pv in parameter.items():
self.__setattr__(pk, pv)
self.pragmas = self.__pragma_parameter__(
parameter.get("pragmas", {})
)
def __pragma_parameter__(self, pragmas):
# See:http://docs.peewee-orm.com/en/latest/peewee/sqlite_ext.html#getting-started
db_pragmas = dict(
cache_size=-1024 * 64, journal_mode="wal", foreign_keys=1
)
for pk, pv in pragmas.items():
db_pragmas[pk] = pv
return [(k, v) for k, v in db_pragmas.items()]
def __str__(self):
msg = f"Database type: {self.type}\n"
msg += (
f" parameter : {pf(self.parameter)}\n"
if self.parameter
else ""
)
msg += (
f" pragmas : {pf(self.pragmas)}\n"
if self.pragmas
else ""
)
return msg
def __try_database_init(self):
filename = self.parameter["filename"]
try:
self.db = SqliteExtDatabase(filename, pragmas=self.pragmas)
return True, f"Database '{filename}' initialzed"
except Exception as e:
raise DBException(e, DBError.CREATION)
def __try_connect__(self):
if self.db.is_closed():
try:
self.db.connect()
debug("Open")
except Exception as e:
raise DBException(e, DBError.CONNECTION)
def __isInitialized(self):
try:
_ = self.db
return True
except AttributeError:
return False
def __get_prefix__(self):
if "prefix" not in self.parameter:
return ""
return self.parameter["prefix"]
def create(self, filename=None):
if filename:
debug(f"Create database file '{filename}'")
self.parameter.update(filename=filename)
elif not self.parameter["filename"]:
e_msg = f"Sqlite3 database filename not given"
debug(e_msg)
raise DBException(e_msg, DBError.PARAMETER_NEEDED)
debug(f"{self.parameter['filename']=}")
debug(f"{os.path.exists(self.parameter['filename'])=}")
if os.path.exists(self.parameter["filename"]):
e_msg = (
"Sqlite3 database file already exist: "
f"'{self.parameter['filename']}'"
)
debug(e_msg)
return False, e_msg
debug("If not exist, create database directory")
os.makedirs(os.path.dirname(self.parameter["filename"]), exist_ok=True)
try:
self.db = SqliteExtDatabase(
self.parameter["filename"], pragmas=self.pragmas
)
# Create the sqlite3 database file by open/connect and close it.
self.db.connect()
self.connection = self.db.connection()
except Exception as e:
raise DBException(e, DBError.CREATION)
return True, f"Sqlite3 database '{self.parameter['filename']}' created"
def open(self):
db_filename = self.parameter["filename"]
debug(f"Try to open database '{db_filename}'")
if not self.__isInitialized():
debug("Database not initialized")
self.__try_database_init()
debug("Try to connect to database")
self.db.connect(reuse_if_open=True)
self.connection = self.db.connection()
return True, f"Sqlite3 database '{db_filename}' open"
def close(self):
if self.db.is_closed():
return True
else:
try:
self.db.close()
self.connection = None
return True
except Exception as db_e:
return False, str(db_e)
def createTables(self, models=itemsdb_models):
self.__try_connect__()
debug(f"Create {len(models)} tables in database")
with self.db:
try:
prefix = self.__get_prefix__()
if prefix:
debug(f"Use database table prefix: '{prefix}'")
for model in models:
table_name = f"{prefix}{model.__name__}"
debug(
f"Set table name for '{model.__name__}' to "
f"'{table_name}'"
)
model._meta.table_name = table_name
debug(
f"Assign the model '{model.__name__}' to the "
"database."
)
model._meta.database = self.db
debug("Try to create database tables")
tables_not_exist = [
model for model in models if not self.tableExist(model)
]
if tables_not_exist:
table_names_not_exist = ", ".join(
[model._meta.table_name for model in tables_not_exist]
)
debug("Create database tables: " f"{table_names_not_exist}")
self.db.create_tables(tables_not_exist)
else:
debug("Database tables already exists")
self.models = models
return True, "Sqlite3 database tables created"
except Exception as ct_e:
raise DBException(ct_e, DBError.CREATION)
def tableExist(self, model):
sql_query = (
"select tbl_name from sqlite_schema where type = 'table' "
"and tbl_name = ?"
)
table_name = model._meta.table_name
query_result = model.raw(sql_query, table_name)
query_result_len = len(query_result)
if query_result_len > 0:
debug(f"Database table exist: '{table_name}'")
return True
else:
debug(f"Database table does not exist: '{table_name}'")
return False
def getItem(self, item):
return item
def insertItem(self, item):
pass
def updateItem(self, item):
pass
def insertLink(self, link_type, fromItem, toItem):
pass
def updateLink(self, link_type, fromItem, toItem):
pass

View file

@ -0,0 +1,232 @@
#!/usr/bin/env python3
#
import os
import os.path
from ..version import __version__
from ..log import (
info,
warn,
error,
debug,
)
from .models import (
item_types,
items,
link_types,
links,
itemsdb_models,
)
from peewee import (
ModelBase,
SqliteDatabase,
IntegrityError,
)
DATABASE_TYPE = "sqlite3"
def __check_model_type__(model):
if not isinstance(model, ModelBase):
e_msg = (
f"Wrong parameter type: '{type(model)}'. This should be derived "
"from peewee.Model"
)
error(e_msg)
raise Exception(e_msg)
def getTablename(model):
__check_model_type__(model)
return model._meta.table_name
def setTablename(model, table_name):
__check_model_type__(model)
debug(f"Set table name for '{model.__name__}' to '{table_name}'")
try:
model._meta.table_name = table_name
return True, table_name, ""
except Exception as e:
e_msg = (
f"Fail to set table name '{table_name}' for model "
f"'{model.__name__}': {e}"
)
error(e_msg)
return False, model._meta.table_name, e_msg
def createDatabase(*args, **kwargs):
debug(f"Create database from type '{DATABASE_TYPE}'")
if (len(args) == 0 and "filename" not in kwargs) or (
len(args) > 0 and not args[0]
):
e_msg = (
"You need to set the 'filename' parameter to create the sqlite3 "
"database"
)
error(e_msg)
return False, None, e_msg
if len(args) > 0 and args[0]:
debug("Use first parameter as filename")
filename = args[0]
else:
debug("Use keyword parameter 'filename' as filename")
filename = kwargs.get("filename")
if not filename:
e_msg = "'filename' parameter MUST NOT be empty"
error(e_msg)
return False, None, e_msg
if os.path.exists(filename):
debug(f"Sqlite3 database file '{filename}' already exist")
if not ("force" in kwargs and kwargs["force"] is True):
e_msg = (
f"Sqlite3 database file '{filename}' already exist and you "
"have not set 'force=True'. The database file will not be "
"recreated."
)
return False, None, e_msg
os.makedirs(os.path.dirname(filename), exist_ok=True)
pragmas = kwargs.get("pragmas", None)
try:
if pragmas:
if isinstance(pragmas, dict):
db = SqliteDatabase(filename, pragmas=pragmas)
success = True
else:
db = None
msg = f"Parameter 'pragmas' must be from type 'dict' but is: {pragmas}"
debug(msg)
success = False
else:
db = SqliteDatabase(filename)
success = True
if success:
db.connect()
db.close()
pragma_msg = f"{' with pragmas: {pragmas}' if pragmas else ''}"
msg = f"Sqlite3 database '{filename}'{pragma_msg} created"
debug(msg)
except Exception as c_e:
msg = f"Sqlite3 database('{filename}') creation failed: {c_e}"
debug(msg)
success = False
return success, db, msg
def createTables(db, table_models, prefix=""):
with db:
e_msg = ""
if prefix:
prefix_result = [
setTablename(m, f"{prefix}_{getTablename(m)}")
for m in table_models
if not getTablename(m).startswith(f"{prefix}_")
]
if False in [r[0] for r in prefix_result]:
prefix_errors = [
f"{r[1]}: {r[2]}" for r in prefix_result if r[0] is False
]
newline = "\n"
newline_tab = "\n\t"
e_msg = (
f"Fail to set table names:${newline_tab}"
f"{newline_tab.join(prefix_errors)}{newline}"
)
error(e_msg)
return False, db, e_msg
e_msg = "Setting table names successful"
debug(
"Database table names has created: "
f"{', '.join([m._meta.table_name for m in table_models])}"
)
try:
for m in table_models:
debug(f"Set active database for model '{m._meta.name}'")
m._meta.database = db
db.create_tables(table_models)
e_msg = "Creating tables successful "
except Exception as ct_e:
e_msg = f"Fail to create database tables: {ct_e}"
debug(e_msg)
return False, db, e_msg
return True, db, e_msg
def createItemType(db, name, schema):
if not isinstance(db, SqliteDatabase):
msg = (
"Parameter 'db' must be from type 'SqliteDatabase' but is "
f"'{type(db)}'"
)
return False, None, msg
elif not isinstance(name, str):
msg = f"Parameter 'name' must be from type 'str' but is '{type(name)}'"
return False, None, msg
elif not isinstance(schema, dict):
msg = (
"Parameter 'schema' must be from type 'dict' but is "
f"'{type(schema)}'"
)
return False, None, msg
item_type = None
with db:
try:
item_type = item_types.create(name=name, schema=schema)
msg = f"Item type '{name}' created"
success = True
except IntegrityError as i_e:
msg = f"Item type '{name}' already exist: {i_e}"
debug(msg)
success = False
except Exception as e:
msg = f"Fail to create item type '{name}': {e}"
debug(msg)
success = False
return success, item_type, msg
def removeItemType(dn, name):
if not isinstance(db, SqliteDatabase):
msg = (
"Parameter 'db' must be from type 'SqliteDatabase' but is "
f"'{type(db)}'"
)
return False, None, msg
elif not isinstance(name, str):
msg = f"Parameter 'name' must be from type 'str' but is '{type(name)}'"
return False, None, msg
with db:
try:
item_type = item_types.get(item_types.name == name)
item_type.delete_instance()
success = True
except item_types.DoesNotExist:
msg = f"Item type '{name}' does not exist"
debug(msg)
success = False
except Exception as e:
msg = f"Fail to remove item type '{name}': {e}"
debug(msg)
success = False
return success, name, msg

View file

@ -0,0 +1,82 @@
#!/usr/bin/env python3
#
import inspect
from ..version import __version__
from ..log import info, warn, error, debug
from peewee import (
Model,
TextField,
BinaryUUIDField,
ForeignKeyField,
IntegerField,
)
class BaseModel(Model):
class Meta:
database = None
class BaseModelTypes(BaseModel):
name = TextField(unique=True, null=False)
schema = TextField()
class Version(BaseModel):
class Meta:
table_name = "itemsdb"
version = IntegerField(primary_key=True)
state = TextField(null=False)
class link_types(BaseModelTypes):
class Meta:
table_name = "link_types"
pass
class item_types(BaseModelTypes):
class Meta:
table_name = "item_types"
pass
class items(BaseModel):
class Meta:
table_name = "items"
id = BinaryUUIDField(primary_key=True)
version = BinaryUUIDField(null=False)
type = ForeignKeyField(item_types)
data = TextField(null=False, default={})
class links(BaseModel):
class Meta:
table_name = "links"
id = BinaryUUIDField(primary_key=True)
version = BinaryUUIDField(null=False)
type = ForeignKeyField(link_types)
from_item = ForeignKeyField(items)
to_item = ForeignKeyField(items)
itemsdb_models = [link_types, links, item_types, items]
__all = [
"Version",
"link_types",
"item_types",
"links",
"items",
"itemsdb_models",
]

View file

@ -0,0 +1,4 @@
#!/usr/bin/env python3
#
__version__ = "0.0.1"

39
test/create-database.py Normal file
View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
#
from itemsdb.sqlite3 import *
from itemsdb.log import *
import os.path
dbdir = os.getenv("ITEMSDB_SQLITE3_DATABASEDIR", ".")
dbname = os.getenv("ITEMSDB_SQLITE3_FILENAME", "itemsdb.db")
dbprefix = os.getenv("ITEMSDB_PREFIX", "")
dbfilename = os.path.join(dbdir, f"{dbprefix}{dbname}")
parameter = dict(filename=dbfilename, prefix=dbprefix)
db = DBSqlite3(parameter=parameter)
debug(f"Database: '{type(db)}'")
try:
db_created, msg = db.create()
debug(f"Database after create(): '{type(db)}'")
if db_created:
print(f"Database '{dbfilename}' created")
else:
print(msg)
open, open_msg = db.open()
print(open_msg)
debug(f"Database after open(): '{type(db)}'")
tables_created, msg = db.createTables()
if tables_created:
print(f"{msg}: {', '.join([model.__name__ for model in db.models])}")
else:
print(msg)
except DBException as db_e:
print(db_e)

61
test/init_sqlite3.py Executable file
View file

@ -0,0 +1,61 @@
#!/usr/bin/env python3
#
import sys
import os
import os.path
print(os.path.abspath(os.curdir))
print(__file__)
module_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../src/modules")
)
print(f"Add python module path: '{module_path}'")
sys.path.insert(0, module_path)
try:
from itemsdb.log import info, warn, error, debug
except Exception as i_e:
e_msg = f"Fail to import itemsdb.log: {i_e}"
print(e_msg, file=sys.stderr)
raise Exception(e_msg)
from itemsdb import *
database_filename = os.getenv("ITEMSDB_DATABASE_NAME", "itemsdb_test.db")
database_path_module = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../build", database_filename)
)
database_path = os.getenv("ITEMSDB_DATABASE_PATH", database_path_module)
database_table_prefix = os.getenv("ITEMSDB_DATABASE_TABLE_PREFIX", "test")
database_pragmas = dict(
journal_mode="wal", foreign_keys=1, ignore_check_constraints=0
)
success, db, e_msg = createDatabase(
database_path, force=True, pragmas=database_pragmas
)
if not success:
error(e_msg)
sys.exit(1)
success, db, e_msg = createTables(
db, itemsdb_models, prefix=database_table_prefix
)
if not success:
error(e_msg)
sys.exit(2)
item_name = "server"
item_schema = dict(name=item_name, properties=[1, 2, 3, 4, 5])
success, item_type, e_msg = createItemType(db, item_name, item_schema)
if not success:
error(e_msg)
sys.exit(3)