Python implements the database update script generation method

  • 2020-06-12 09:33:28
  • OfStack

When I was at work, in the test environment used database with production database is not 1, when we test environment database to complete the test for updates to the database on the production environment, the need to update the script, is 1 down accidentally didn't will forget where is changed, where what is added, this is really a headache. So I tried Python to automate the generation of update scripts so I wouldn't have a bad memory.

The main operations are as follows:

1. Add the following method to the original ES6en.py, so as to obtain the data of the database as easily as possible and lay a foundation for the comparison between the test database and the production database.


def select_database_struts(self):
    '''
     Find the database structure in the current connection configuration for a dictionary collection 
    '''
    sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT
        FROM information_schema.`COLUMNS` 
        WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database)
    struts = {}
    for k in self.__primaryKey_dict.keys():
      self.__cursor.execute(sql.format(k))
      results = self.__cursor.fetchall()
      struts[k] = {}
      for result in results:
        struts[k][result[0]] = {}
        struts[k][result[0]]["COLUMN_NAME"] = result[0]
        struts[k][result[0]]["IS_NULLABLE"] = result[1]
        struts[k][result[0]]["COLUMN_TYPE"] = result[2]
        struts[k][result[0]]["COLUMN_KEY"] = result[3]
        struts[k][result[0]]["COLUMN_COMMENT"] = result[4]
    return self.__config, struts

2. Write Python scripts for comparison


'''
 Database migration script ,  Currently supported 1 The following functions: 
1. Generate database table executions that did not exist in the old database  SQL  Script (with or without table data supported) , The generated  SQL  The script in  temp  Under directory (table name .sql ). 
2. Generate add column  SQL  Script, generated by  SQL  The script system 1 On the  temp  In the directory  depoyed.sql  In the. 
3. Generate modified column attributes  SQL  Script, generated by  SQL  The script system 1 On the  temp  In the directory  depoyed.sql  In the. 
4. Generate deleted columns  SQL  Script, generated by  SQL  The script system 1 On the  temp  In the directory  depoyed.sql  In the. 
'''
import json, os, sys
from basedao import BaseDao

temp_path = sys.path[0] + "/temp"
if not os.path.exists(temp_path):
  os.mkdir(temp_path)

def main(old, new, has_data=False):
  '''
  @old  Old database (target database) 
  @new  Latest database (source database) 
  @has_data  Whether to generate structure or not + The data of sql The script  
  '''
  clear_temp()  #  To clean up first  temp  directory 
  old_config, old_struts = old
  new_config, new_struts = new
  for new_table, new_fields in new_struts.items():
    if old_struts.get(new_table) is None:
      gc_sql(new_config["user"], new_config["password"], new_config["database"], new_table, has_data)
    else:
      cmp_table(old_struts[new_table], new_struts[new_table], new_table)

def cmp_table(old, new, table):
  '''
   Contrast table structure generation  sql
  '''
  old_fields = old
  new_fields = new

  sql_add_column = "ALTER TABLE `{TABLE}` ADD COLUMN `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
  sql_change_column = "ALTER TABLE `{TABLE}` CHANGE `{COLUMN_NAME}` `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
  sql_del_column = "ALTER TABLE `{TABLE}` DROP {COLUMN_NAME};"

  if old_fields != new_fields:
    f = open(sys.path[0] + "/temp/deploy.sql", "a", encoding="utf8")
    content = ""
    for new_field, new_field_dict in new_fields.items():
      old_filed_dict = old_fields.get(new_field)
      if old_filed_dict is None:
        #  Generate add column  sql
        content += sql_add_column.format(TABLE=table, **new_field_dict)
      else:
        #  Generate modified column  sql
        if old_filed_dict != new_field_dict:
          content += sql_change_column.format(TABLE=table, **new_field_dict)
        pass
    #  Generate deleted columns  sql
    for old_field, old_field_dict in old_fields.items():
      if new_fields.get(old_field) is None:
        content += sql_del_column.format(TABLE=table, COLUMN_NAME=old_field)
        
    f.write(content)
    f.close()

def gc_sql(user, pwd, db, table, has_data):
  '''
   generate  sql  file 
  '''
  if has_data:
    sys_order = "mysqldump -u%s -p%s %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
  else:
    sys_order = "mysqldump -u%s -p%s -d %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
  os.system(sys_order)

def clear_temp():
  '''
   Every time it executes, let's clean this up temp Old files under the directory 
  '''
  if os.path.exists(temp_path):
    files = os.listdir(temp_path)
    for file in files:
      f = os.path.join(temp_path, file)
      if os.path.isfile(f):
        os.remove(f)
  print(" Temporary file directory cleanup completed ")

if __name__ == "__main__":
  test1_config = {
    "user" : "root", 
    "password" : "root",
    "database" : "test1", 
  }
  test2_config = {
    "user" : "root", 
    "password" : "root",
    "database" : "test2", 
  }
  
  test1_dao = BaseDao(**test1_config)
  test1_struts = test1_dao.select_database_struts()
  
  test2_dao = BaseDao(**test2_config)
  test2_struts = test2_dao.select_database_struts()

  main(test2_struts, test1_struts)

Only four SQL scripts are currently supported for generation.


Related articles: