Python implements the database update script generation method
- 2020-06-12 09:33:28
- OfStack
When I was at work, in the test environment used database with production database is not 1, when we test environment database to complete the test for updates to the database on the production environment, the need to update the script, is 1 down accidentally didn't will forget where is changed, where what is added, this is really a headache. So I tried Python to automate the generation of update scripts so I wouldn't have a bad memory.
The main operations are as follows:
1. Add the following method to the original ES6en.py, so as to obtain the data of the database as easily as possible and lay a foundation for the comparison between the test database and the production database.
def select_database_struts(self):
'''
Find the database structure in the current connection configuration for a dictionary collection
'''
sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT
FROM information_schema.`COLUMNS`
WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database)
struts = {}
for k in self.__primaryKey_dict.keys():
self.__cursor.execute(sql.format(k))
results = self.__cursor.fetchall()
struts[k] = {}
for result in results:
struts[k][result[0]] = {}
struts[k][result[0]]["COLUMN_NAME"] = result[0]
struts[k][result[0]]["IS_NULLABLE"] = result[1]
struts[k][result[0]]["COLUMN_TYPE"] = result[2]
struts[k][result[0]]["COLUMN_KEY"] = result[3]
struts[k][result[0]]["COLUMN_COMMENT"] = result[4]
return self.__config, struts
2. Write Python scripts for comparison
'''
Database migration script , Currently supported 1 The following functions:
1. Generate database table executions that did not exist in the old database SQL Script (with or without table data supported) , The generated SQL The script in temp Under directory (table name .sql ).
2. Generate add column SQL Script, generated by SQL The script system 1 On the temp In the directory depoyed.sql In the.
3. Generate modified column attributes SQL Script, generated by SQL The script system 1 On the temp In the directory depoyed.sql In the.
4. Generate deleted columns SQL Script, generated by SQL The script system 1 On the temp In the directory depoyed.sql In the.
'''
import json, os, sys
from basedao import BaseDao
temp_path = sys.path[0] + "/temp"
if not os.path.exists(temp_path):
os.mkdir(temp_path)
def main(old, new, has_data=False):
'''
@old Old database (target database)
@new Latest database (source database)
@has_data Whether to generate structure or not + The data of sql The script
'''
clear_temp() # To clean up first temp directory
old_config, old_struts = old
new_config, new_struts = new
for new_table, new_fields in new_struts.items():
if old_struts.get(new_table) is None:
gc_sql(new_config["user"], new_config["password"], new_config["database"], new_table, has_data)
else:
cmp_table(old_struts[new_table], new_struts[new_table], new_table)
def cmp_table(old, new, table):
'''
Contrast table structure generation sql
'''
old_fields = old
new_fields = new
sql_add_column = "ALTER TABLE `{TABLE}` ADD COLUMN `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
sql_change_column = "ALTER TABLE `{TABLE}` CHANGE `{COLUMN_NAME}` `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
sql_del_column = "ALTER TABLE `{TABLE}` DROP {COLUMN_NAME};"
if old_fields != new_fields:
f = open(sys.path[0] + "/temp/deploy.sql", "a", encoding="utf8")
content = ""
for new_field, new_field_dict in new_fields.items():
old_filed_dict = old_fields.get(new_field)
if old_filed_dict is None:
# Generate add column sql
content += sql_add_column.format(TABLE=table, **new_field_dict)
else:
# Generate modified column sql
if old_filed_dict != new_field_dict:
content += sql_change_column.format(TABLE=table, **new_field_dict)
pass
# Generate deleted columns sql
for old_field, old_field_dict in old_fields.items():
if new_fields.get(old_field) is None:
content += sql_del_column.format(TABLE=table, COLUMN_NAME=old_field)
f.write(content)
f.close()
def gc_sql(user, pwd, db, table, has_data):
'''
generate sql file
'''
if has_data:
sys_order = "mysqldump -u%s -p%s %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
else:
sys_order = "mysqldump -u%s -p%s -d %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
os.system(sys_order)
def clear_temp():
'''
Every time it executes, let's clean this up temp Old files under the directory
'''
if os.path.exists(temp_path):
files = os.listdir(temp_path)
for file in files:
f = os.path.join(temp_path, file)
if os.path.isfile(f):
os.remove(f)
print(" Temporary file directory cleanup completed ")
if __name__ == "__main__":
test1_config = {
"user" : "root",
"password" : "root",
"database" : "test1",
}
test2_config = {
"user" : "root",
"password" : "root",
"database" : "test2",
}
test1_dao = BaseDao(**test1_config)
test1_struts = test1_dao.select_database_struts()
test2_dao = BaseDao(**test2_config)
test2_struts = test2_dao.select_database_struts()
main(test2_struts, test1_struts)
Only four SQL scripts are currently supported for generation.