Python SQLite3 database operation class sharing

  • 2020-04-02 13:42:50
  • OfStack

I have not been exposed to Python for a long time. Recently, I had a project that needed to analyze data, so I chose Python as the programming language. Besides the language features, I mainly focused on Python's good support for SQLite3 database, because I needed to deal with a large amount of intermediate data flexibly.

I started some modules also enjoyed writing SQL statements, then gradually tired of, think back to when used to just dig in c # using the reflection preliminary build a SQL query builder, until you find linq, and gave up the plan, of course, Microsoft and later introduced the Entity Framework, which is another story, and now I'm interested in Microsoft's things is not very big, good, pull, below to continue the text.

By the way, Drupal, a good blogging program, USES a similar query builder for database queries, avoiding the need to write SQL statements directly, and has the added benefit of masking platform dependencies to a certain degree, which helps with database migrations.

But I today introduced the database auxiliary class query constructor is a very simple thing, even limited to SQLite database, if there are children interested in can be improved, I currently as long as the operation of SQLite can be handy, for a relatively large database application directly on the ORM bar.


First look at the code:


import sqlite3

# ***************************************************
# *
# * Description: Python operation SQLite3 Database helper class ( Query constructor )
# * Author: wangye
# *
# ***************************************************

def _wrap_value(value):
    return repr(value)

def _wrap_values(values):
    return list(map(_wrap_value, values))

def _wrap_fields(fields):
    for key,value in fields.items():
        fields[key] = _wrap_value(value)
    return fields

def _concat_keys(keys):
    return "[" + "],[".join(keys) + "]"

def _concat_values(values):
    return ",".join(values)

def _concat_fields(fields, operator = (None, ",")):
    if operator:
        unit_operator, group_operator = operator
    # fields = _wrap_fields(fields)
    compiled = []
    for key,value in fields.items():
        compiled.append("[" + key + "]")
        if unit_operator:
            compiled.append(unit_operator)
            compiled.append(value)
        compiled.append(group_operator)
    compiled.pop() # pop last group_operator
    return " ".join(compiled)

class DataCondition(object):
    """
         This class is used for operations SQL The conditional statement part of the constructor helper class 

         For example, :
        DataCondition(("=", "AND"), id = 26)
        DataCondition(("=", "AND"), True, id = 26)
    """

    def __init__(self, operator = ("=", "AND"), ingroup = True, **kwargs):
        """
             A constructor 
             parameter :
                operator  Operator, divided into ( Expression operator ,  Conditional operator )
                ingroup   Whether to group, and if so, will be enclosed in parentheses 
                kwargs    A key-value tuple that contains the column names and values of the database tables 
                          Notice that the equals sign here is not equal to the actual generation SQL Statements symbol 
                          The actual notation is given by operator[0] The control of the 
             For example, :
            DataCondition(("=", "AND"), id = 26)
            (id=26)
            DataCondition((">", "OR"), id = 26, age = 35)
            (id>26 OR age>35)
            DataCondition(("LIKE", "OR"), False, name = "John", company = "Google")
            name LIKE 'John' OR company LIKE "Google"
        """
        self.ingroup = ingroup
        self.fields = kwargs
        self.operator = operator

    def __unicode__(self):
        self.fields = _wrap_fields(self.fields)
        result = _concat_fields(self.fields, self.operator)
        if self.ingroup:
            return "(" + result + ")"
        return result

    def __str__(self):
        return self.__unicode__()

    def toString(self):
        return self.__unicode__()

class DataHelper(object):

    """
        SQLite3  Data query helper classes 
    """

    def __init__(self, filename):
        """
             A constructor 
             parameter : filename  for SQLite3  Database file name 
        """
        self.file_name = filename

    def open(self):
        """
             Open the database and set the cursor 
        """
        self.connection = sqlite3.connect(self.file_name)
        self.cursor = self.connection.cursor()
        return self

    def close(self):
        """
             Close the database, note that if you do not explicitly call this method, 
             It is also attempted when the class is reclaimed 
        """
        if hasattr(self, "connection") and self.connection:
            self.connection.close()

    def __del__(self):
        """
             Destruct method, do some cleanup 
        """
        self.close()

    def commit(self):
        """
             Commit the transaction 
            SELECT Statement does not require this action, by default execute methods 
            commit_at_once Set to True It implicitly calls this method, 
             Otherwise you need to show that this method is called. 
        """
        self.connection.commit()

    def execute(self, sql = None, commit_at_once = True):
        """
             perform SQL statements 
             parameter :
                sql   To perform the SQL Statement, if is None , which is generated by the constructor SQL Statements. 
                commit_at_once  Whether to commit the transaction immediately, if not immediately, 
                 For non-query operations, the call is required commit Explicitly submit. 
        """
        if not sql:
            sql = self.sql
        self.cursor.execute(sql)
        if commit_at_once:
            self.commit()

    def fetchone(self, sql = None):
        """
             Take a record 
        """
        self.execute(sql, False)
        return self.cursor.fetchone()

    def fetchall(self, sql = None):
        """
             Take all records 
        """
        self.execute(sql, False)
        return self.cursor.fetchall()

    def __concat_keys(self, keys):
        return _concat_keys(keys)

    def __concat_values(self, values):
        return _concat_values(values)

    def table(self, *args):
        """
             Sets the table for the query, with multiple table names separated by commas 
        """
        self.tables = args
        self.tables_snippet = self.__concat_keys(self.tables)
        return self

    def __wrap_value(self, value):
        return _wrap_value(value)

    def __wrap_values(self, values):
        return _wrap_values(values)

    def __wrap_fields(self, fields):
        return _wrap_fields(fields)

    def __where(self):
        # self.condition_snippet
        if hasattr(self, "condition_snippet"):
            self.where_snippet = " WHERE " + self.condition_snippet

    def __select(self):
        template = "SELECT %(keys)s FROM %(tables)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "keys" : self.__concat_keys(self.body_keys), 
        }
        self.sql = template % body_snippet_fields

    def __insert(self):
        template = "INSERT INTO %(tables)s (%(keys)s) VALUES (%(values)s)"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "keys" : self.__concat_keys(list(self.body_fields.keys())),
            "values" : self.__concat_values(list(self.body_fields.values()))
        }
        self.sql = template % body_snippet_fields

    def __update(self):
        template = "UPDATE %(tables)s SET %(fields)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "fields" : _concat_fields(self.body_fields, ("=",","))
        }
        self.sql = template % body_snippet_fields

    def __delete(self):
        template = "DELETE FROM %(tables)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet
        }
        self.sql = template % body_snippet_fields

    def __build(self):
        {
            "SELECT": self.__select,
            "INSERT": self.__insert,
            "UPDATE": self.__update,
            "DELETE": self.__delete
        }[self.current_token]()

    def __unicode__(self):
        return self.sql

    def __str__(self):
        return self.__unicode__()

    def select(self, *args):
        self.current_token = "SELECT"
        self.body_keys = args
        self.__build()
        return self

    def insert(self, **kwargs):
        self.current_token = "INSERT"
        self.body_fields = self.__wrap_fields(kwargs)
        self.__build()
        return self

    def update(self, **kwargs):
        self.current_token = "UPDATE"
        self.body_fields = self.__wrap_fields(kwargs)
        self.__build()
        return self

    def delete(self, *conditions):
        self.current_token = "DELETE"
        self.__build()
        #if *conditions:
        self.where(*conditions)
        return self

    def where(self, *conditions):
        conditions = list(map(str, conditions))
        self.condition_snippet = " AND ".join(conditions)
        self.__where()
        if hasattr(self, "where_snippet"):
            self.sql += self.where_snippet
        return self

Here are some examples for your reference:


db = DataHelper("/home/wangye/sample.db3")
db.open() #  Open database 
db.execute("""
    CREATE TABLE [staffs] (
      [staff_id] INTEGER PRIMARY KEY AUTOINCREMENT,
      [staff_name] TEXT NOT NULL,
      [staff_cardnum] TEXT NOT NULL,
      [staff_reserved] INTEGER NOT NULL
)
""") #  Direct execution SQL Statement, notice here commit_at_once The default is True

db.table("staffs").insert(staff_name="John", staff_cardnum="1001", staff_reserved=0)
#  Insert a record 

rs = db.table("staffs").select("staff_id", "staff_name").fetchall()
#  Just take it out staff_id and staff_name

rs = db.table("staffs").select("staff_name").where(DataCondition(("=", "AND"), id = 1)).fetchone()
#  Take a staff_id for 1 the staff_name

rs = db.table("staffs").select("staff_name").where(DataCondition(("<", "AND"), id = 100), DataCondition(("=", "AND"), staff_reserved = 1)).fetchone()
#  Take a id Less than 100 and staff_reserved for 1 the staff_name record 

db.close() #  Close the database 

It is not currently supported for the asterisk (*) operator, and it is not very well handled for multi-table and same-name operations. This is only used for simple day-to-day scripting operations and is best not used in production environments because there may be unknown issues.


Related articles: