Python SQLite3 database operation class sharing
- 2020-04-02 13:42:50
- OfStack
I have not been exposed to Python for a long time. Recently, I had a project that needed to analyze data, so I chose Python as the programming language. Besides the language features, I mainly focused on Python's good support for SQLite3 database, because I needed to deal with a large amount of intermediate data flexibly.
I started some modules also enjoyed writing SQL statements, then gradually tired of, think back to when used to just dig in c # using the reflection preliminary build a SQL query builder, until you find linq, and gave up the plan, of course, Microsoft and later introduced the Entity Framework, which is another story, and now I'm interested in Microsoft's things is not very big, good, pull, below to continue the text.
By the way, Drupal, a good blogging program, USES a similar query builder for database queries, avoiding the need to write SQL statements directly, and has the added benefit of masking platform dependencies to a certain degree, which helps with database migrations.
But I today introduced the database auxiliary class query constructor is a very simple thing, even limited to SQLite database, if there are children interested in can be improved, I currently as long as the operation of SQLite can be handy, for a relatively large database application directly on the ORM bar.
First look at the code:
import sqlite3
# ***************************************************
# *
# * Description: Python operation SQLite3 Database helper class ( Query constructor )
# * Author: wangye
# *
# ***************************************************
def _wrap_value(value):
return repr(value)
def _wrap_values(values):
return list(map(_wrap_value, values))
def _wrap_fields(fields):
for key,value in fields.items():
fields[key] = _wrap_value(value)
return fields
def _concat_keys(keys):
return "[" + "],[".join(keys) + "]"
def _concat_values(values):
return ",".join(values)
def _concat_fields(fields, operator = (None, ",")):
if operator:
unit_operator, group_operator = operator
# fields = _wrap_fields(fields)
compiled = []
for key,value in fields.items():
compiled.append("[" + key + "]")
if unit_operator:
compiled.append(unit_operator)
compiled.append(value)
compiled.append(group_operator)
compiled.pop() # pop last group_operator
return " ".join(compiled)
class DataCondition(object):
"""
This class is used for operations SQL The conditional statement part of the constructor helper class
For example, :
DataCondition(("=", "AND"), id = 26)
DataCondition(("=", "AND"), True, id = 26)
"""
def __init__(self, operator = ("=", "AND"), ingroup = True, **kwargs):
"""
A constructor
parameter :
operator Operator, divided into ( Expression operator , Conditional operator )
ingroup Whether to group, and if so, will be enclosed in parentheses
kwargs A key-value tuple that contains the column names and values of the database tables
Notice that the equals sign here is not equal to the actual generation SQL Statements symbol
The actual notation is given by operator[0] The control of the
For example, :
DataCondition(("=", "AND"), id = 26)
(id=26)
DataCondition((">", "OR"), id = 26, age = 35)
(id>26 OR age>35)
DataCondition(("LIKE", "OR"), False, name = "John", company = "Google")
name LIKE 'John' OR company LIKE "Google"
"""
self.ingroup = ingroup
self.fields = kwargs
self.operator = operator
def __unicode__(self):
self.fields = _wrap_fields(self.fields)
result = _concat_fields(self.fields, self.operator)
if self.ingroup:
return "(" + result + ")"
return result
def __str__(self):
return self.__unicode__()
def toString(self):
return self.__unicode__()
class DataHelper(object):
"""
SQLite3 Data query helper classes
"""
def __init__(self, filename):
"""
A constructor
parameter : filename for SQLite3 Database file name
"""
self.file_name = filename
def open(self):
"""
Open the database and set the cursor
"""
self.connection = sqlite3.connect(self.file_name)
self.cursor = self.connection.cursor()
return self
def close(self):
"""
Close the database, note that if you do not explicitly call this method,
It is also attempted when the class is reclaimed
"""
if hasattr(self, "connection") and self.connection:
self.connection.close()
def __del__(self):
"""
Destruct method, do some cleanup
"""
self.close()
def commit(self):
"""
Commit the transaction
SELECT Statement does not require this action, by default execute methods
commit_at_once Set to True It implicitly calls this method,
Otherwise you need to show that this method is called.
"""
self.connection.commit()
def execute(self, sql = None, commit_at_once = True):
"""
perform SQL statements
parameter :
sql To perform the SQL Statement, if is None , which is generated by the constructor SQL Statements.
commit_at_once Whether to commit the transaction immediately, if not immediately,
For non-query operations, the call is required commit Explicitly submit.
"""
if not sql:
sql = self.sql
self.cursor.execute(sql)
if commit_at_once:
self.commit()
def fetchone(self, sql = None):
"""
Take a record
"""
self.execute(sql, False)
return self.cursor.fetchone()
def fetchall(self, sql = None):
"""
Take all records
"""
self.execute(sql, False)
return self.cursor.fetchall()
def __concat_keys(self, keys):
return _concat_keys(keys)
def __concat_values(self, values):
return _concat_values(values)
def table(self, *args):
"""
Sets the table for the query, with multiple table names separated by commas
"""
self.tables = args
self.tables_snippet = self.__concat_keys(self.tables)
return self
def __wrap_value(self, value):
return _wrap_value(value)
def __wrap_values(self, values):
return _wrap_values(values)
def __wrap_fields(self, fields):
return _wrap_fields(fields)
def __where(self):
# self.condition_snippet
if hasattr(self, "condition_snippet"):
self.where_snippet = " WHERE " + self.condition_snippet
def __select(self):
template = "SELECT %(keys)s FROM %(tables)s"
body_snippet_fields = {
"tables" : self.tables_snippet,
"keys" : self.__concat_keys(self.body_keys),
}
self.sql = template % body_snippet_fields
def __insert(self):
template = "INSERT INTO %(tables)s (%(keys)s) VALUES (%(values)s)"
body_snippet_fields = {
"tables" : self.tables_snippet,
"keys" : self.__concat_keys(list(self.body_fields.keys())),
"values" : self.__concat_values(list(self.body_fields.values()))
}
self.sql = template % body_snippet_fields
def __update(self):
template = "UPDATE %(tables)s SET %(fields)s"
body_snippet_fields = {
"tables" : self.tables_snippet,
"fields" : _concat_fields(self.body_fields, ("=",","))
}
self.sql = template % body_snippet_fields
def __delete(self):
template = "DELETE FROM %(tables)s"
body_snippet_fields = {
"tables" : self.tables_snippet
}
self.sql = template % body_snippet_fields
def __build(self):
{
"SELECT": self.__select,
"INSERT": self.__insert,
"UPDATE": self.__update,
"DELETE": self.__delete
}[self.current_token]()
def __unicode__(self):
return self.sql
def __str__(self):
return self.__unicode__()
def select(self, *args):
self.current_token = "SELECT"
self.body_keys = args
self.__build()
return self
def insert(self, **kwargs):
self.current_token = "INSERT"
self.body_fields = self.__wrap_fields(kwargs)
self.__build()
return self
def update(self, **kwargs):
self.current_token = "UPDATE"
self.body_fields = self.__wrap_fields(kwargs)
self.__build()
return self
def delete(self, *conditions):
self.current_token = "DELETE"
self.__build()
#if *conditions:
self.where(*conditions)
return self
def where(self, *conditions):
conditions = list(map(str, conditions))
self.condition_snippet = " AND ".join(conditions)
self.__where()
if hasattr(self, "where_snippet"):
self.sql += self.where_snippet
return self
Here are some examples for your reference:
db = DataHelper("/home/wangye/sample.db3")
db.open() # Open database
db.execute("""
CREATE TABLE [staffs] (
[staff_id] INTEGER PRIMARY KEY AUTOINCREMENT,
[staff_name] TEXT NOT NULL,
[staff_cardnum] TEXT NOT NULL,
[staff_reserved] INTEGER NOT NULL
)
""") # Direct execution SQL Statement, notice here commit_at_once The default is True
db.table("staffs").insert(staff_name="John", staff_cardnum="1001", staff_reserved=0)
# Insert a record
rs = db.table("staffs").select("staff_id", "staff_name").fetchall()
# Just take it out staff_id and staff_name
rs = db.table("staffs").select("staff_name").where(DataCondition(("=", "AND"), id = 1)).fetchone()
# Take a staff_id for 1 the staff_name
rs = db.table("staffs").select("staff_name").where(DataCondition(("<", "AND"), id = 100), DataCondition(("=", "AND"), staff_reserved = 1)).fetchone()
# Take a id Less than 100 and staff_reserved for 1 the staff_name record
db.close() # Close the database
It is not currently supported for the asterisk (*) operator, and it is not very well handled for multi-table and same-name operations. This is only used for simple day-to-day scripting operations and is best not used in production environments because there may be unknown issues.