我在工作的時候,在測試環境下使用的數據庫跟生產環境的數據庫不一致,當我們的測試環境下的數據庫完成測試準備更新到生產環境上的數據庫時候,需要準備更新腳本,真是一不小心沒記下來就會忘了改了哪里,哪里添加了什么,這個真是非常讓人頭疼。因此我就試著用Python來實現自動的生成更新腳本,以免我這爛記性,記不住事。
主要操作如下:
1.在原先 basedao.py 中添加如下方法,這樣舊能很方便的獲取數據庫的數據,為測試數據庫和生產數據庫做對比打下了基礎。
def select_database_struts(self): ''' 查找當前連接配置中的數據庫結構以字典集合 ''' sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database) struts = {} for k in self.__primaryKey_dict.keys(): self.__cursor.execute(sql.format(k)) results = self.__cursor.fetchall() struts[k] = {} for result in results: struts[k][result[0]] = {} struts[k][result[0]]["COLUMN_NAME"] = result[0] struts[k][result[0]]["IS_NULLABLE"] = result[1] struts[k][result[0]]["COLUMN_TYPE"] = result[2] struts[k][result[0]]["COLUMN_KEY"] = result[3] struts[k][result[0]]["COLUMN_COMMENT"] = result[4] return self.__config, struts2.編寫對比的Python腳本
'''數據庫遷移腳本, 目前支持一下幾種功能:1.生成舊數據庫中沒有的數據庫表執行 SQL 腳本(支持是否帶表數據),生成的 SQL 腳本在 temp 目錄下(表名.sql)。2.生成添加列 SQL 腳本,生成的 SQL 腳本統一放在 temp 目錄下的 depoyed.sql 中。3.生成修改列屬性 SQL 腳本,生成的 SQL 腳本統一放在 temp 目錄下的 depoyed.sql 中。4.生成刪除列 SQL 腳本,生成的 SQL 腳本統一放在 temp 目錄下的 depoyed.sql 中。'''import json, os, sysfrom basedao import BaseDaotemp_path = sys.path[0] + "/temp"if not os.path.exists(temp_path): os.mkdir(temp_path)def main(old, new, has_data=False): ''' @old 舊數據庫(目標數據庫) @new 最新的數據庫(源數據庫) @has_data 是否生成結構+數據的sql腳本 ''' clear_temp() # 先清理 temp 目錄 old_config, old_struts = old new_config, new_struts = new for new_table, new_fields in new_struts.items(): if old_struts.get(new_table) is None: gc_sql(new_config["user"], new_config["password"], new_config["database"], new_table, has_data) else: cmp_table(old_struts[new_table], new_struts[new_table], new_table)def cmp_table(old, new, table): ''' 對比表結構生成 sql ''' old_fields = old new_fields = new sql_add_column = "ALTER TABLE `{TABLE}` ADD COLUMN `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';/n" sql_change_column = "ALTER TABLE `{TABLE}` CHANGE `{COLUMN_NAME}` `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';/n" sql_del_column = "ALTER TABLE `{TABLE}` DROP {COLUMN_NAME};" if old_fields != new_fields: f = open(sys.path[0] + "/temp/deploy.sql", "a", encoding="utf8") content = "" for new_field, new_field_dict in new_fields.items(): old_filed_dict = old_fields.get(new_field) if old_filed_dict is None: # 生成添加列 sql content += sql_add_column.format(TABLE=table, **new_field_dict) else: # 生成修改列 sql if old_filed_dict != new_field_dict: content += sql_change_column.format(TABLE=table, **new_field_dict) pass # 生成刪除列 sql for old_field, old_field_dict in old_fields.items(): if new_fields.get(old_field) is None: content += sql_del_column.format(TABLE=table, COLUMN_NAME=old_field) f.write(content) f.close()def gc_sql(user, pwd, db, table, has_data): ''' 生成 sql 文件 ''' if has_data: sys_order = "mysqldump -u%s -p%s %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table) else: sys_order = "mysqldump -u%s -p%s -d %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table) os.system(sys_order)def clear_temp(): ''' 每次執行的時候調用這個,先清理下temp目錄下面的舊文件 ''' if os.path.exists(temp_path): files = os.listdir(temp_path) for file in files: f = os.path.join(temp_path, file) if os.path.isfile(f): os.remove(f) print("臨時文件目錄清理完成")if __name__ == "__main__": test1_config = { "user" : "root", "password" : "root", "database" : "test1", } test2_config = { "user" : "root", "password" : "root", "database" : "test2", } test1_dao = BaseDao(**test1_config) test1_struts = test1_dao.select_database_struts() test2_dao = BaseDao(**test2_config) test2_struts = test2_dao.select_database_struts() main(test2_struts, test1_struts)
目前只支持了4種SQL腳本的生成。
總結
以上所述是小編給大家介紹的Python 實現數據庫(SQL)更新腳本的生成方法,希望對大家有所幫助,如果大家有任何疑問歡迎給我留言,小編會及時回復大家的!
新聞熱點
疑難解答