# MIT License
#
# Copyright (c) 2024 Eugenio Parodi <ceccopierangiolieugenio AT googlemail DOT com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__=['TTkTableModelSQLite3']
import sqlite3
import threading
from typing import Any
from TermTk.TTkCore.log import TTkLog
from TermTk.TTkCore.constant import TTkK
from TermTk.TTkAbstract.abstracttablemodel import TTkAbstractTableModel, TTkModelIndex
class _TTkModelIndexSQLite3(TTkModelIndex):
__slots__ = ('_col','_rowId','_sqModel')
def __init__(self, col:int, rowId:str, sqModel) -> None:
self._col = col
self._rowId = rowId
self._sqModel = sqModel
super().__init__()
def row(self) -> int:
return self._sqModel._getRow(self._rowId)-1
def col(self) -> int:
return self._col
def data(self) -> object:
return self._sqModel.data(row=self.row(),col=self.col())
def setData(self, data: object) -> None:
return self._sqModel.setData(row=self.row(),col=self.col(),data=data)
[docs]
class TTkTableModelSQLite3(TTkAbstractTableModel):
'''
:py:class:`TTkTableModelSQLite3` extends :py:class:`TTkAbstractTableModel`,
allowing to map an sqlite3 table to this table model
Quickstart:
In This example i assume i have a database named **sqlite.database.db** which contain a table **users**
Please refer to `test.ui.032.table.10.sqlite.py <https://github.com/ceccopierangiolieugenio/pyTermTk/blob/main/tests/t.ui/test.ui.032.table.10.sqlite.py>`_ for working eample.
.. code-block:: python
import TermTk as ttk
filename='sqlite.database.db'
tablename='users'
root = ttk.TTk(mouseTrack=True, layout=ttk.TTkGridLayout())
basicTableModel = ttk.TTkTableModelSQLite3(fileName=filename, table=tablename)
table = ttk.TTkTable(parent=root, tableModel=basicTableModel, sortingEnabled=True)
table.resizeColumnsToContents()
root.mainloop()
'''
__slots__ = (
'_conn', '_cur', '_table',
'_key', '_columns', '_columnTypes', '_count',
'_sort', '_sortColumn',
'_sqliteMutex',
'_idMap')
def __init__(self, *,
fileName:str,
table:str,
# header:list[str]=None
) -> None:
'''
:param fileName: the sqlite3 file database
:type fileName: str
:param table: the name of the sqlite3 table to be mapped
:type table: str
'''
self._sqliteMutex = threading.Lock()
self._table = table
self._columns = []
self._columnTypes = [] # Add this to store column types
self._idMap = {}
self._sort = ''
self._sortColumn = -1
with self._sqliteMutex:
self._conn = conn = sqlite3.connect(fileName, check_same_thread=False)
self._cur = cur = self._conn.cursor()
res = cur.execute(f"SELECT COUNT(*) FROM {table}")
self._count = res.fetchone()[0]
for row in cur.execute(f"PRAGMA table_info({table})"):
if row[-1] == 0:
self._columns.append(row[1]) # column name
self._columnTypes.append(row[2]) # column type
if row[-1] == 1:
self._key = row[1]
self._refreshIdMap()
super().__init__()
def _refreshIdMap(self):
self._idMap = {
_id : _rn
for _rn,_id in self._cur.execute(f"SELECT ROW_NUMBER() OVER ({self._sort}) RN, {self._key} from {self._table}")}
[docs]
def rowCount(self) -> int:
return self._count
[docs]
def columnCount(self) -> int:
return len(self._columns)
def _getRow(self, key:str) -> int:
return self._idMap[key]
[docs]
def index(self, row:int, col:int) -> TTkModelIndex:
with self._sqliteMutex:
res = self._cur.execute(
f"SELECT {self._key} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
key = None if not (_fetch:=res.fetchone()) else _fetch[0]
return _TTkModelIndexSQLite3(col=col,rowId=key,sqModel=self)
[docs]
def data(self, row:int, col:int) -> Any:
with self._sqliteMutex:
res = self._cur.execute(
f"SELECT {self._columns[col]} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
return None if not (_fetch:=res.fetchone()) else _fetch[0]
[docs]
def setData(self, row:int, col:int, data:object) -> None:
with self._sqliteMutex:
res = self._cur.execute(
f"SELECT {self._key} FROM {self._table} "
f"{self._sort} "
f"LIMIT 1 OFFSET {row}")
key=res.fetchone()[0]
res = self._cur.execute(
f"UPDATE {self._table} "
f"SET {self._columns[col]} = '{data}' "
f"WHERE {self._key} = {key} ")
self._conn.commit()
if col == self._sortColumn:
self._refreshIdMap()
return True
[docs]
def flags(self, row:int, col:int) -> TTkK.ItemFlag:
return (
TTkK.ItemFlag.ItemIsEnabled |
TTkK.ItemFlag.ItemIsEditable |
TTkK.ItemFlag.ItemIsSelectable )
[docs]
def sort(self, column:int, order:TTkK.SortOrder) -> None:
self._sortColumn = column
if column == -1:
self._sort = ''
else:
if order ==TTkK.SortOrder.AscendingOrder:
self._sort = f"ORDER BY {self._columns[column]} ASC"
else:
self._sort = f"ORDER BY {self._columns[column]} DESC"
with self._sqliteMutex:
self._refreshIdMap()
def _getDefaultValueForType(self, sqlite_type: str) -> str:
"""Get appropriate default value based on SQLite column type"""
sqlite_type = sqlite_type.upper()
if 'INT' in sqlite_type:
return '0'
elif 'REAL' in sqlite_type or 'FLOAT' in sqlite_type or 'DOUBLE' in sqlite_type:
return '0.0'
elif 'TEXT' in sqlite_type or 'CHAR' in sqlite_type or 'VARCHAR' in sqlite_type:
return "''" # Empty string
elif 'BLOB' in sqlite_type:
return 'NULL'
elif 'BOOL' in sqlite_type:
return '0' # False
else:
return 'NULL' # Default fallback
[docs]
def insertRows(self, row: int, count: int = 1) -> bool:
if row < 0 or count <= 0 or row > self._count:
return False
try:
with self._sqliteMutex:
# Create appropriate default values based on column types
placeholders = [self._getDefaultValueForType(_ct) for _ct in self._columnTypes]
placeholders_str = f"({', '.join(placeholders)})"
columns_str = ', '.join(self._columns)
# Insert the specified number of rows
self._cur.execute(f"""
INSERT INTO {self._table} ({columns_str})
VALUES {','.join([placeholders_str]*count)}
""")
self._conn.commit()
self._count += count
self._refreshIdMap()
except sqlite3.Error as e:
TTkLog.error(f"Error adding rows: {e}")
return False
self.dataChanged.emit((row,0),(self._count - row, self.columnCount()))
self.modelChanged.emit()
return True
[docs]
def removeRows(self, row: int, count: int = 1) -> bool:
if row < 0 or count <= 0 or row >= self._count or row + count > self._count:
return False
try:
with self._sqliteMutex:
self._cur.execute(f"""
DELETE FROM {self._table}
WHERE {self._key} in (
SELECT {self._key} from {self._table}
{self._sort}
LIMIT {count} OFFSET {row}
)
""")
self._conn.commit()
# Update The rows Count
res = self._cur.execute(f"SELECT COUNT(*) FROM {self._table}")
self._count = res.fetchone()[0]
self._refreshIdMap()
except sqlite3.Error as e:
TTkLog.error(f"Error removing rows: {e}")
return False
self.dataChanged.emit((row,0),(self._count - row, self.columnCount()))
self.modelChanged.emit()
return True
[docs]
def insertColumns(self, column:int, count:int) -> bool:
'''
.. attention:: The current implementation of :py:class:`TTkTableModelSQLite3` does not supports columns operations
'''
TTkLog.warn("The current implementation of ModelSQLite3 does not supports columns operations")
return False
[docs]
def removeColumns(self, column:int, count:int) -> bool:
'''
.. attention:: The current implementation of :py:class:`TTkTableModelSQLite3` does not supports columns operations
'''
TTkLog.warn("The current implementation of ModelSQLite3 does not supports columns operations")
return False