'''
Created on 2020-08-24
@author: wf
'''
from tests.basetest import Basetest
import unittest
from datetime import datetime
import time
import os
import sys
from lodstorage.sample import Sample
from lodstorage.uml import UML
from lodstorage.schema import Schema
from lodstorage.sql import SQLDB, EntityInfo
[docs]class TestSQLDB(Basetest):
'''
Test the SQLDB database wrapper
'''
[docs] def checkListOfRecords(self,listOfRecords,entityName,primaryKey=None,executeMany=True,fixNone=False,fixDates=False,debug=False,doClose=True):
'''
check the handling of the given list of Records
Args:
listOfRecords(list): a list of dicts that contain the data to be stored
entityName(string): the name of the entity type to be used as a table name
primaryKey(string): the name of the key / column to be used as a primary key
executeMany(boolean): True if executeMany mode of sqlite3 should be used
fixNone(boolean): fix dict entries that are undefined to have a "None" entry
debug(boolean): True if debug information e.g. CREATE TABLE and INSERT INTO commands should be shown
doClose(boolean): True if the connection should be closed
'''
size=len(listOfRecords)
if self.debug:
print("%s size is %d fixNone is %r fixDates is: %r" % (entityName,size,fixNone,fixDates))
self.sqlDB=SQLDB(debug=debug,errorDebug=True)
entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityName,primaryKey)
startTime=time.time()
self.sqlDB.store(listOfRecords,entityInfo,executeMany=executeMany,fixNone=fixNone)
elapsed=time.time()-startTime
if self.debug:
print ("adding %d %s records took %5.3f s => %5.f records/s" % (size,entityName,elapsed,size/elapsed))
resultList=self.sqlDB.queryAll(entityInfo,fixDates=fixDates)
if self.debug:
print ("selecting %d %s records took %5.3f s => %5.f records/s" % (len(resultList),entityName,elapsed,len(resultList)/elapsed))
if doClose:
self.sqlDB.close()
return resultList
[docs] def testEntityInfo(self):
'''
test creating entityInfo from the sample record
'''
listOfRecords=Sample.getRoyals()
entityInfo=EntityInfo(listOfRecords[:3],'Person','name',debug=True)
self.assertEqual("CREATE TABLE Person(name TEXT PRIMARY KEY,born DATE,numberInLine INTEGER,wikidataurl TEXT,age FLOAT,ofAge BOOLEAN,lastmodified TIMESTAMP)",entityInfo.createTableCmd)
self.assertEqual("INSERT INTO Person (name,born,numberInLine,wikidataurl,age,ofAge,lastmodified) values (:name,:born,:numberInLine,:wikidataurl,:age,:ofAge,:lastmodified)",entityInfo.insertCmd)
self.sqlDB=SQLDB(debug=self.debug,errorDebug=True)
entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityInfo.name,entityInfo.primaryKey)
tableList=self.sqlDB.getTableList()
if self.debug:
print (tableList)
self.assertEqual(1,len(tableList))
personTable=tableList[0]
self.assertEqual("Person",personTable['name'])
self.assertEqual(7,len(personTable['columns']))
uml=UML()
plantUml=uml.tableListToPlantUml(tableList,packageName="Royals",withSkin=False)
if self.debug:
print(plantUml)
expected="""package Royals {
class Person << Entity >> {
age : FLOAT
born : DATE
lastmodified : TIMESTAMP
name : TEXT <<PK>>
numberInLine : INTEGER
ofAge : BOOLEAN
wikidataurl : TEXT
}
}
"""
self.assertEqual(expected,plantUml)
# testGeneralization
listOfRecords=[{'name': 'Royal family', 'country': 'UK', 'lastmodified':datetime.now()}]
entityInfo=self.sqlDB.createTable(listOfRecords[:10],'Family','name')
tableList=self.sqlDB.getTableList()
self.assertEqual(2,len(tableList))
uml=UML()
plantUml=uml.tableListToPlantUml(tableList,generalizeTo="PersonBase",withSkin=False)
if self.debug:
print(plantUml)
expected='''class PersonBase << Entity >> {
lastmodified : TIMESTAMP
name : TEXT <<PK>>
}
class Person << Entity >> {
age : FLOAT
born : DATE
numberInLine : INTEGER
ofAge : BOOLEAN
wikidataurl : TEXT
}
class Family << Entity >> {
country : TEXT
}
PersonBase <|-- Person
PersonBase <|-- Family
'''
self.assertEqual(expected,plantUml)
[docs] def testIssue15(self):
'''
https://github.com/WolfgangFahl/pyLoDStorage/issues/15
auto create view ddl in mergeschema
'''
self.sqlDB=SQLDB(debug=self.debug,errorDebug=self.debug)
listOfRecords=Sample.getRoyals()
entityInfo=EntityInfo(listOfRecords[:3],'Person','name',debug=self.debug)
entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityInfo.name,entityInfo.primaryKey)
listOfRecords=[{'name': 'Royal family', 'country': 'UK', 'lastmodified':datetime.now()}]
entityInfo=self.sqlDB.createTable(listOfRecords[:10],'Family','name')
tableList=self.sqlDB.getTableList()
viewDDL=Schema.getGeneralViewDDL(tableList,"PersonBase")
if self.debug:
print (viewDDL)
expected="""CREATE VIEW PersonBase AS
SELECT name,lastmodified FROM Person
UNION
SELECT name,lastmodified FROM Family"""
self.assertEqual(expected,viewDDL)
pass
[docs] def testUniqueConstraint(self):
'''
test for https://github.com/WolfgangFahl/pyLoDStorage/issues/4
sqlite3.IntegrityError: UNIQUE constraint failed: ... show debug info
'''
listOfDicts=[
{"name": "John Doe"},
{"name": "Frank Doe"},
{"name": "John Doe"},
{"name":"Tim Doe"}]
sqlDB=SQLDB(debug=self.debug,errorDebug=True)
entityInfo=sqlDB.createTable(listOfDicts[:10],'Does','name')
try:
sqlDB.store(listOfDicts,entityInfo,executeMany=False)
self.fail("There should be an exception")
except Exception as ex:
expected="""INSERT INTO Does (name) values (:name)
failed:UNIQUE constraint failed: Does.name
record #3={'name': 'John Doe'}"""
errMsg=str(ex)
self.assertEqual(expected,errMsg)
[docs] def testSqlite3(self):
'''
test sqlite3 with a few records from the royal family
'''
listOfRecords=Sample.getRoyals()
resultList=self.checkListOfRecords(listOfRecords, 'Person', 'name',debug=True)
if self.debug:
print(resultList)
self.assertEqual(listOfRecords,resultList)
[docs] def testIssue13_setNoneValue(self):
'''
https://github.com/WolfgangFahl/pyLoDStorage/issues/13
set None value for undefined LoD entries
'''
listOfRecords=[
{ 'make': 'Ford','model':'Model T', 'color':'black'},
{ 'make': 'VW', 'model':'beetle'}
]
entityName="Car"
primaryKey="Model"
resultList=self.checkListOfRecords(listOfRecords, entityName, primaryKey,fixNone=True)
if self.debug:
print (resultList)
[docs] def testIssue14_execute(self):
'''
https://github.com/WolfgangFahl/pyLoDStorage/issues/14
offer execute wrapper directly via sqlDB
'''
sqlDB=SQLDB()
ddl="""
CREATE TABLE contacts (
contact_id INTEGER PRIMARY KEY,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL
)
"""
sqlDB.execute(ddl)
tableList=sqlDB.getTableList()
if self.debug:
print(tableList)
self.assertEqual(1,len(tableList))
self.assertEqual("contacts",tableList[0]['name'])
[docs] def testIssue41(self):
'''
https://github.com/WolfgangFahl/pyLoDStorage/issues/41
improve error message when create table command fails
'''
listOfRecords=[{
'name':'value',
'py/object': 'datetime.time'
}]
self.sqlDB=SQLDB(debug=self.debug,errorDebug=True)
try:
_entityInfo=self.sqlDB.createTable(listOfRecords[:1],'Invalid','name')
self.fail("There should be an exception")
except Exception as ex:
self.assertTrue("CREATE TABLE Invalid" in str(ex))
[docs] def testBindingError(self):
'''
test list of Records with incomplete record leading to
"You did not supply a value for binding 2"
see https://bugs.python.org/issue41638
'''
listOfRecords=[{'name':'Pikachu', 'type':'Electric'},{'name':'Raichu' }]
for executeMany in [True,False]:
try:
self.checkListOfRecords(listOfRecords,'Pokemon','name',executeMany=executeMany)
self.fail("There should be an exception")
except Exception as ex:
if self.debug:
print(str(ex))
self.assertTrue('no value supplied for column' in str(ex))
[docs] def testListOfCities(self):
'''
test sqlite3 with some 120000 city records
'''
listOfRecords=Sample.getCities()
for fixDates in [True,False]:
retrievedList=self.checkListOfRecords(listOfRecords,'City',fixDates=fixDates)
self.assertEqual(len(listOfRecords),len(retrievedList))
[docs] def testQueryParams(self):
'''
test Query Params
'''
listOfDicts=[
{"city": "New York", "country": "US"},
{"city": "Amsterdam", "country": "NL"},
{"city": "Paris", "country": "FR"}]
sqlDB=SQLDB(debug=self.debug,errorDebug=True)
entityInfo=sqlDB.createTable(listOfDicts[:10],'cities','city')
sqlDB.store(listOfDicts,entityInfo,executeMany=False)
query="SELECT * from cities WHERE country in (?)"
params=('FR',)
frCities=sqlDB.query(query,params)
if self.debug:
print (frCities);
self.assertEqual([{'city': 'Paris', 'country': 'FR'}],frCities)
[docs] def testSqllite3Speed(self):
'''
test sqlite3 speed with some 100000 artificial sample records
consisting of two columns with a running index
'''
limit=100000
listOfRecords=Sample.getSample(limit)
self.checkListOfRecords(listOfRecords, 'Sample', 'pKey')
[docs] def testIssue87AllowUsingQueryWithGenerator(self):
'''
test the query gen approach
'''
debug=self.debug
#debug=True
sqlDB=self.getSampleTableDB(sampleSize=5)
sqlQuery="select * FROM sample"
for cindex,record in enumerate(sqlDB.queryGen(sqlQuery)):
if debug:
print(record)
self.assertEqual(cindex,record["cindex"])
[docs] def testBackup(self):
'''
test creating a backup of the SQL database
'''
if sys.version_info >= (3, 7):
listOfRecords=Sample.getCities()
self.checkListOfRecords(listOfRecords,'City',fixDates=True,doClose=False)
backupDB="/tmp/testSqlite.db"
showProgress=200 if self.debug else 0
self.sqlDB.backup(backupDB,profile=self.debug,showProgress=showProgress)
size=os.stat(backupDB).st_size
if self.debug:
print ("size of backup DB is %d" % size)
self.assertTrue(size>600000)
self.sqlDB.close()
# restore
ramDB=SQLDB.restore(backupDB, SQLDB.RAM, profile=self.debug,showProgress=showProgress)
entityInfo=EntityInfo(listOfRecords[:50],'City',debug=self.debug)
allCities=ramDB.queryAll(entityInfo)
self.assertEqual(len(allCities),len(listOfRecords))
[docs] def testCopy(self):
'''
test copying databases into another database
'''
dbFile="/tmp/DAWT_Sample3x1000.db"
copyDB=SQLDB(dbFile)
for sampleNo in range(3):
listOfRecords=Sample.getSample(1000)
self.checkListOfRecords(listOfRecords, 'Sample_%d_1000' %sampleNo, 'pKey',doClose=False)
self.sqlDB.copyTo(copyDB)
size=os.stat(dbFile).st_size
if self.debug:
print ("size of copy DB is %d" % size)
self.assertTrue(size>70000)
tableList=copyDB.getTableList()
if self.debug:
print(tableList)
for sampleNo in range(3):
self.assertEqual('Sample_%d_1000' %sampleNo,tableList[sampleNo]['name'])
# check that database is writable
# https://stackoverflow.com/a/44707371/1497139
copyDB.execute("pragma user_version=0")
[docs] @staticmethod
def getSampleTableDB(withDrop=False,debug=False,failIfTooFew=False,sampleSize=1000):
listOfRecords=Sample.getSample(sampleSize)
sqlDB=SQLDB()
entityName="sample"
primaryKey='pKey'
sampleRecordCount=sampleSize*10
sqlDB.debug=debug
entityInfo=sqlDB.createTable(listOfRecords, entityName, primaryKey=primaryKey, withDrop=withDrop, sampleRecordCount=sampleRecordCount,failIfTooFew=failIfTooFew)
executeMany=True
fixNone=True
sqlDB.store(listOfRecords,entityInfo,executeMany=executeMany,fixNone=fixNone)
return sqlDB
[docs] def testIssue16(self):
'''
https://github.com/WolfgangFahl/pyLoDStorage/issues/16
allow to only warn if samplerecordcount is higher than number of available records
'''
self.getSampleTableDB(withDrop=False, debug=True,failIfTooFew=False)
try:
self.getSampleTableDB(withDrop=True, debug=True,failIfTooFew=True)
self.fail("There should be an exception that too few sample records where provided")
except Exception as ex:
self.assertTrue("only 1000/10000 of needed sample records to createTable available" in str(ex))
[docs] def testIssue18(self):
'''
https://github.com/WolfgangFahl/pyLoDStorage/issues/18
'''
sqlDB=self.getSampleTableDB()
tableDict=sqlDB.getTableDict()
debug=self.debug
#debug=True
if debug:
print (tableDict)
self.assertTrue("sample" in tableDict)
cols=tableDict["sample"]["columns"]
self.assertTrue("pkey" in cols)
[docs] def testIssue110(self):
"""
https://github.com/WolfgangFahl/pyLoDStorage/issues/110
"""
sqlDB=self.getSampleTableDB()
sample1=Sample.getSample(10)
sample2=Sample.getSample(10)
sample1.extend(sample2)
entityInfo=sqlDB.createTable(sample1,"sample1","pkey")
sqlDB.store(sample1,entityInfo=entityInfo,replace=True)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testSqllit3']
unittest.main()