Python sqlite utility class with dynamic parameter capabilities

  • 2020-09-16 07:36:00
  • OfStack

This article gives an example of Python's sqlite utility class with dynamic parameter capabilities. To share for your reference, the details are as follows:

Working on sqlite and python recently

After referring to each tutorial on the Internet, combining with the previous java jdbc database tools class, write the following python tools class for connecting to sqlite

The main reason for the tedious writing is to preserve one kind of Object similar to java... args dynamic parameter writing is compatible with array /list passing an indefinite number of parameters and the return value is List dict dictionary in order to convert to JSON

Some of the differences in python are encapsulated by this utility class and can be used as follows:


db.executeQuery("s * f t w id=? and name=?", "id01", "name01");// Dynamic parameter form 
db.executeQuery("s * f t w id=? and name=?", ("id01", "name01"));//tuple The tuple type   Equivalent above   Parenthesis can be omitted 
db.executeQuery("s * f t w id=? and name=?", ["id01", "name01"]);//list An array 

The complete Python code is as follows:


#!/usr/bin/python
#-*- coding:utf-8 -*-  
import sqlite3
import os 
#
#  Connect to the database helper class 
# eg:
#  db = database()
#  count,listRes = db.executeQueryPage("select * from student where id=? and name like ? ", 2, 10, "id01", "%name%")
#  listRes = db.executeQuery("select * from student where id=? and name like ? ", "id01", "%name%")
#  db.execute("delete from student where id=? ", "id01")
#  count = db.getCount("select * from student ")
#  db.close()
#
class database :
  dbfile = "sqlite.db"
  memory = ":memory:"
  conn = None
  showsql = True
  def __init__(self):
    self.conn = self.getConn()
  # Output tool 
  def out(self, outStr, *args):
    if(self.showsql):
      for var in args:
        if(var):
          outStr = outStr + ", " + str(var)
      print("db. " + outStr)
    return 
  # Get connected 
  def getConn(self):
    if(self.conn is None):
      conn = sqlite3.connect(self.dbfile)
      if(conn is None):
        conn = sqlite3.connect(self.memory)
      if(conn is None):
        print("dbfile : " + self.dbfile + " is not found && the memory connect error ! ")
      else:
        conn.row_factory = self.dict_factory # Dictionary solution 
        self.conn = conn
      self.out("db init conn ok ! ")
    else:
      conn = self.conn
    return conn
  # Dictionary solution 
  def dict_factory(self, cursor, row): 
    d = {} 
    for idx, col in enumerate(cursor.description): 
      d[col[0]] = row[idx] 
    return d
  # Close the connection 
  def close(self, conn=None):
    res = 2
    if(not conn is None):
      conn.close()
      res = res - 1
    if(not self.conn is None):
      self.conn.close()
      res = res - 1
    self.out("db close res : " + str(res))
    return res
  # The processing parameters tuple or list  Get reasonable parameters list
  # Set the dynamic parameters tuple to list  And pass the dynamic parameters separately list from tuple As a parameter 
  def turnArray(self, args):
    #args (1, 2, 3)  Direct call type  exe("select x x", 1, 2, 3)
    #return [1, 2, 3] <- list(args)
    #args ([1, 2, 3], ) list The incoming type  exe("select x x",[ 1, 2, 3]) len(args)=1 && type(args[0])=list
    #return [1, 2, 3]
    if(args and len(args) == 1 and (type(args[0]) is list) ):
      res = args[0]
    else:
      res = list(args)
    return res
  # Paging query   The query page page   Each page num article   return   Total number of entries before pagination   and   A list of data for the current page  count,listR = db.executeQueryPage("select x x",1,10,(args))
  def executeQueryPage(self, sql, page, num, *args):
    args = self.turnArray(args)
    count = self.getCount(sql, args)
    pageSql = "select * from ( " + sql + " ) limit 5 offset 0 "
    #args.append(num)
    #args.append(int(num) * (int(page) - 1) )
    self.out(pageSql, args) 
    conn = self.getConn()
    cursor = conn.cursor()
    listRes = cursor.execute(sql, args).fetchall()
    return (count, listRes)  
  # Query list array[map] eg: [{'id': u'id02', 'birth': u'birth01', 'name': u'name02'}, {'id': u'id03', 'birth': u'birth01', 'name': u'name03'}]
  def executeQuery(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args) 
    conn = self.getConn()
    cursor = conn.cursor()
    res = cursor.execute(sql, args).fetchall()
    return res  
  # perform sql Or query list   And submit 
  def execute(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args) 
    conn = self.getConn()
    cursor = conn.cursor()
    #sql A placeholder   fill args  Can be tuple(1, 2)( Dynamic parameter array )  It can also be list[1, 2] list(tuple) tuple(list)
    res = cursor.execute(sql, args).fetchall()
    conn.commit()
    #self.close(conn)
    return res  
  # Query the list of column names array[str] eg: ['id', 'name', 'birth']
  def getColumnNames(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args) 
    conn = self.getConn()
    if(not conn is None):
      cursor = conn.cursor()
      cursor.execute(sql, args)
      res = [tuple[0] for tuple in cursor.description]
    return res  
  # The query results are single str eg: 'xxxx'
  def getString(self, sql, *args):
    args = self.turnArray(args)
    self.out(sql, args) 
    conn = self.getConn()
    cursor = conn.cursor()
    listRes = cursor.execute(sql, args).fetchall()
    columnNames = [tuple[0] for tuple in cursor.description]
    #print(columnNames)
    res = ""
    if(listRes and len(listRes) >= 1):
      res = listRes[0][columnNames[0]]
    return res   
  # Query record number   Automatically add count(*) eg: 3
  def getCount(self, sql, *args):
    args = self.turnArray(args)
    sql = "select count(*) cc from ( " + sql + " ) "
    resString = self.getString(sql, args)  
    res = 0   
    if(resString):
      res = int(resString)
    return res
#################################### test 
def main():
  db = database()
  db.execute(
    ''' 
    create table if not exists student(
      id   text primary key,
      name  text not null,
      birth  text 
    )
    ''' 
  )
  for i in range(10):
    db.execute("insert into student values('id1" + str(i) + "', 'name1" + str(i) + "', 'birth1" + str(i) + "')")
  db.execute("insert into student values('id01', 'name01', 'birth01')")
  db.execute("insert into student values('id02', 'name02', 'birth01')")
  db.execute("insert into student values('id03', 'name03', 'birth01')")
  print(db.getColumnNames("select * from student"))
  print(db.getCount("select * from student " ))
  print(db.getString("select name from student where id = ? ", "id02" ))
  print(db.executeQuery("select * from student where 1=? and 2=? ", 1, 2 ))
  print(db.executeQueryPage("select * from student where id like ? ", 1, 5, "id0%"))
  db.execute("update student set name='nameupdate' where id = ? ", "id02")
  db.execute("delete from student where id = ? or 1=1 ", "id01")
  db.close()
if __name__ == '__main__':
  main()

More about Python related topics: interested readers to view this site "Python SQLite database operation skills summary", "common database operations Python skills summary", "Python data structure and algorithm tutorial", "Python function using techniques", "Python string skills summary", "Python introduction and advanced tutorial" and "Python file and directory skills summary"

I hope this article has been helpful in Python programming.


Related articles: