国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 數據庫 > SQL Server > 正文

基于Python的SQL Server數據庫實現對象同步輕量級

2020-07-25 12:36:24
字體:
來源:轉載
供稿:網友

緣由

日常工作中經常遇到類似的問題:把某個服務器上的某些指定的表同步到另外一臺服務器。

類似需求用SSIS或者其他ETL工作很容易實現,比如用SSIS的話,就會會存在相當一部分反復的手工操作。

建源的數據庫信息,目標的數據庫信息,如果是多個表,需要一個一個地拉source和target,然后一個一個地mapping,然后運行實現數據同步。

然后很可能,這個workflow使用也就這么一次,就壽終正寢了,卻一樣要浪費時間去做這個ETL。

快速數據同步實現

于是在想,可不可能快速實現類似需求,盡最大程度減少重復的手工操作?類似基于命令行的方式,簡單快捷,不需要太多的手動操作。

于是就有了本文,基于Python(目的是順便熟悉一下Python的語法),快速實現SQL Server的數據庫之間的數據同步操作,后面又稍微擴展了一下,可以實現不同服務器的數據庫之間的表結構,表對應的數據,存儲過程,函數,用戶自定義類型表(user define table type)的同步

目前支持在兩個SQL Server數據源之間:每次同步一張或者多張表/存儲過程,也可以同步整個數據庫的所有表/存儲過程(以及表/存儲過程依賴的其他數據庫對象)。

支持sqlserver2012以上版本

需要考慮到一些基本的校驗問題:在源服務器上,需要同步的對象是否存在,或者輸入的對象是否存在于源服務器的數據庫里。

在目標服務器上,對于表的同步:

1,表的存在依賴于schema,需要考慮到表的schema是否存在,如果不存在先在target庫上創建表對應的schema

2,target表中是否有數據?如果有數據,是否以覆蓋的方式執行

對于存儲過程的同步:

1,類似于表,需要考慮存儲過程的schema是否存在,如果不存在先在target庫上創建表對應的schema
2,類似于表,arget數據庫中是否已經存在對應的存儲過程,是否以覆蓋的方式執行
3,存儲過程可能依賴于b表,某些函數,用戶自定義表變量等等,同步存儲過程的時候需要先同步依賴的對象,這一點比較復雜,實現過程中遇到在很多很多的坑

  可能存在對象A依賴于對象B,對象B依賴于對象C……,這里有點遞歸的意思

  這一點導致了重構大量的代碼,一開始都是直來直去的同步,無法實現這個邏輯,切實體會到代碼的“單一職責”原則

參數說明

參數說明如下,大的包括四類:

1,源服務器信息 (服務器地址,實例名,數據庫名稱,用戶名,密碼),沒有用戶名密碼的情況下,使用windows身份認證模式

2,目標服務器信息(服務器地址,實例名,數據庫名稱,用戶名,密碼),沒有用戶名密碼的情況下,使用windows身份認證模式

3,同步的對象類型以及對象

4,同步的對象在目標服務器上存在的情況下,是否強制覆蓋

其實在同步數據的時候,也可以把需要同步的行數提取出來做參數,比較簡單,這里暫時沒有做。

比如需要快速搭建一個測試環境,需要同步所有的表結構和每個表的一部分數據即可。

表以及數據同步

表同步的原理是,創建目標表,遍歷源數據的表,生成insert into values(***),(***),(***)格式的sql,然后插入目標數據庫,這里大概步驟如下:

1,表依賴于schema,所以同步表之前先同步schema

2,強制覆蓋的情況下,會drop掉目標表(如果存在的話),防止目標表與源表結構不一致,非強制覆蓋的情況下,如果字段不一致,則拋出異常

3,同步表結構,包括字段,索引,約束等等,但是無法支持外鍵,刻意去掉了外鍵,想想為什么?因吹斯汀。

4,需要篩選出來非計算列字段,insert語句只能是非計算列字段(又導致重構了部分代碼)

5,轉義處理,在拼湊SQL的時候,需要進行轉義處理,否則會導致SQL語句錯誤,目前處理了字符串中的'字符,二進制字段,時間字段的轉義處理(最容易發生問題的地方)

6,鑒于insert into values(***),(***),(***)語法上允許的最大值是1000,因此每生成1000條數據,就同步一次

7,自增列的identity_insert 標識打開與關閉處理

使用如下參數,同步源數據庫的三張表到目標數據庫,因為這里是在本機命名實例下測試,因此實例名和端口號輸入

執行同步的效果

說明:

1,如果輸入obj_type="tab" 且-obj=為None的情況下,會同步源數據庫中的所有表。
2,這個效率取決于機器性能和網絡傳輸,本機測試的話,每秒中可以提交3到4次,也就是每秒鐘可以提交3000~4000行左右的數據。

已知的問題:

1,當表的索引為filter index的時候,無法生成包含where條件的索引創建語句,那個看起來蛋疼的表結構導出語句,暫時沒時間改它。
2,暫時不支持其他少用的類型字段,比如地理空間字段什么的。

存儲過程對象的同步

存儲過程同步的原理是,在源數據庫上生成創建存儲過程的語句,然后寫入目標庫,這里大概步驟如下:
1,存儲過程依賴于schema,所以同步存儲過程之前先同步schema(同表)
2,同步的過程會檢查依賴對象,如果依賴其他對象,暫停當前對象同步,先同步依賴對象
3,重復第二步驟,直至完成
4,對于存儲過程的同步,如果是強制覆蓋的話,強制覆蓋僅僅對存儲過程自己生效(刪除&重建),對依賴對象并不生效,如果依賴對象不存在,就創建,否則不做任何事情

使用如下參數,同步源數據庫的兩個存儲過程到目標數據庫,因為這里是在本機命名實例下測試,因此實例名和端口號輸入

說明:測試要同步的存儲過程之一為[dbo].[sp_test01],它依賴于其他兩個對象:dbo.table01和dbo.fn_test01()

create proc [dbo].[sp_test01]asbegin set no count on; delete from dbo.table01 where id = 1000 select dbo.fn_test01()end

而dbo.fn_test01()的如下,依賴于另外一個對象:dbo.table02

create function [dbo].[fn_test01]()RETURNS intASBEGIN  declare @count int = 0 select @count = count(1) from dbo.table02 return @countEND

因此,這個測試的[dbo].[sp_test01]就依賴于其他對象,如果其依賴的對象不存在,同步的時候,僅僅同步這個存儲過程本身,是沒有意義的

同步某一個對象的依賴對象,使用如下SQL查出來對象依賴信息,因此這里就層層深入,同步依賴對象。
這里就類似于同步A的時候,A依賴于B和C,然后停止同步A,先同步B和C,同步B或者C的時候,可能又依賴于其他對象,然后繼續先同步其依賴對象。

效果如下

如果輸入obj_type="sp" 且-obj=為None的情況下,會同步源數據庫中的所有存儲過程以及其依賴對象

已知的問題:

1,加密的存儲過程或者函數是無法實現同步的,因為無法生成創建對象的腳本
1,table type的同步也是一個蛋疼的過程,目前支持,但是支持的并不好,原因是創建table type之前,先刪除依賴于table type的對象,否則無法刪除與創建。

特別說明

依賴對象的解決,還是比較蛋疼的

如果在默認schema為dbo的對象,在存儲過程或者函數中沒有寫schema(參考如下修改后的sp,不寫相關表的schema dbo,dbo.test01==>test01),
使用 sys.dm_sql_referenced_entities這個系統函數是無法找到其依賴的對象的,奇葩的是可以找到schema的類型,卻沒有返回對象本身。

這一點導致在代碼中層層深入,進行了長時間的debug,完全沒有想到這個函數是這個鳥樣子,因為這里找到依賴對象的類型,卻找不到對象本身,次奧!!!
另外一種情況就是動態SQL了,無法使用 sys.dm_sql_referenced_entities這個系統函數找到其依賴的對象。

其他對象的同步

  支持其他數據庫對象的同步,比如function,table type等,因為可以在同步其他存儲過程對象的時候附帶的同步function,table type,這個與表或者存儲過程類似,不做過多說明。 

已知問題:

1,201906122030:經測試,目前暫時不支持Sequence對象的同步。

需要改進的地方

1,代碼結構優化,更加清晰和條例的結構(一開始用最直接簡單粗暴的方式快速實現,后面重構了很多代碼,現在自己看起來還有很多不舒服的痕跡)
2,數據同步的效率問題,對于多表的導入導出操作,依賴于單線程,多個大表導出串行的話,可能存在效率上的瓶頸,如何根據表的數據量,盡可能平均地分配多多個線程中,提升效率
3,更加友好清晰的異常提示以及日志記錄,生成導出日志信息。
4,異構數據同步,MySQL《==》SQL Server《==》Oracle《==》PGSQL

代碼端午節寫好了,這幾天抽空進行了一些測試以及bug fix,應該還潛在不少未知的bug,工作量比想象中的大的多了去了。

# -*- coding: utf-8 -*-# !/usr/bin/env python3__author__ = 'MSSQL123'__date__ = '2019-06-07 09:36'import osimport sysimport timeimport datetimeimport pymssqlfrom decimal import Decimalusage = '''     -----parameter explain-----     source database parameter     -s_h      : soure database host                     ----- must require parameter     -s_i      : soure database instace name                 ----- default instance name MSSQL     -s_d      : soure database name                     ----- must require parameter     -s_u      : soure database login                     ----- default windows identifier     -s_p      : soure database login password                ----- must require when s_u is not null     -s_P      : soure database instance port                 ----- default port 1433     target database parameter     -t_h      : target database host                     ----- must require parameter     -t_i      : target database instace name                 ----- default instance name MSSQL     -t_d      : target database name                     ----- must require parameter     -t_u      : target database login                    ----- default windows identifier     -t_p      : target database login password                ----- must require when s_u is not null     -t_P      : target database instance port                ----- default port 1433     sync object parameter     -obj_type   : table or sp or function or other databse object       ----- tab or sp or fn or tp     -obj      : table|sp|function|type name                 ----- whick table or sp sync     overwirte parameter     -f       : force overwirte target database object              ----- F or N      --help: help document     Example:     python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i="MSSQL" -s_d="DB01" -obj_type="tab" -obj="dbo.t1,dbo.t2" -t_h=127.0.0.1 -t_P=1433 -t_i="MSSQL" -t_d="DB02" -f="Y"               python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i="MSSQL" -s_d="DB01" -obj_type="sp" -obj="dbo.sp1,dbo.sp2" -t_h=127.0.0.1 -t_P=1433 -t_i="MSSQL" -t_d="DB02" -f="Y"     '''class SyncDatabaseObject(object):  # source databse  s_h = None  s_i = None  s_P = None  s_u = None  s_p = None  s_d = None  # obj type  s_obj_type = None  # sync objects  s_obj = None  # target database  t_h = None  t_i = None  t_P = None  t_u = None  t_p = None  t_d = None  f = None  file_path = None  def __init__(self, *args, **kwargs):    for k, v in kwargs.items():      setattr(self, k, v)  # connect to sqlserver  def get_connect(self, _h, _i, _P, _u, _p, _d):    cursor = False    try:      if (_u) and (_p):        conn = pymssql.connect(host=_h,                    server=_i,                    port=_P,                    user=_u,                    password=_p,                    database=_d)      else:        conn = pymssql.connect(host=_h,                    server=_i,                    port=_P,                    database=_d)      if (conn):        return conn    except:      raise    return conn  # check connection  def validated_connect(self, _h, _i, _P, _u, _p, _d):    if not (self.get_connect(_h, _i, _P, _u, _p, _d)):      print("connect to " + str(_h) + " failed,please check you parameter")      exit(0)  '''  this is supposed to be a valid object name just like xxx_name,or dbo.xxx_name,or [schema].xxx_name or schema.[xxx_name]  then transfer this kind of valid object name to format object name like [dbo].[xxx_name](give a default dbo schema name when no schema name)  other format object name consider as unvalid,will be rasie error in process  format object name    1,xxx_name       ======> [dbo].[xxx_name]    2,dbo.xxx_name     ======> [dbo].[xxx_name]    3,[schema].xxx_name   ======> [dbo].[xxx_name]    3,schema.xxx_name    ======> [schema].[xxx_name]    4,[schema].[xxx_name]  ======> [schema].[xxx_name]    5,[schema].[xxx_name  ======> rasie error format message  '''  @staticmethod  def format_object_name(name):    format_name = ""    if ("." in name):      schema_name = name[0:name.find(".")]      object_name = name[name.find(".") + 1:]      if not ("[" in schema_name):        schema_name = "[" + schema_name + "]"      if not ("[" in object_name):        object_name = "[" + object_name + "]"      format_name = schema_name + "." + object_name    else:      if ("[" in name):        format_name = "[dbo]." + name      else:        format_name = "[dbo]." + "[" + name + "]"    return format_name  '''  check user input object is a valid object  '''  def exits_object(self, conn, name):    conn = conn    cursor_source = conn.cursor()    # get object by name from source db    sql_script = r'''select top 1 1 from                (                  select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.objects                   union all                  select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.types                 )t where obj_name = '{0}'                 '''.format(self.format_object_name(name))    cursor_source.execute(sql_script)    result = cursor_source.fetchall()    if not result:      return 0    else:      return 1    conn.cursor.close()    conn.close()  # table variable sync  def sync_table_variable(self, tab_name, is_reference):    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    if (self.exits_object(conn_source, self.format_object_name(tab_name))) > 0:      pass    else:      print("----------------------- warning message -----------------------")      print("--------warning: object " + tab_name + " not existing in source database ------------")      print("----------------------- warning message -----------------------")      print()      return    exists_in_target = 0    sql_script = r'''select top 1 1               from sys.table_types tp               where is_user_defined = 1                and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}' ''' /      .format((self.format_object_name(tab_name)))    # if the table schema exists in target server,skip    cursor_target.execute(sql_script)    exists_in_target = cursor_target.fetchone()    # weather exists in target server database    if (self.f == "Y"):      if (is_reference != "Y"):        # skiped,table type can not drop when used by sp        sql_script = r'''                     if OBJECT_ID('{0}') is not null                       drop type {0}                   '''.format(self.format_object_name(tab_name))        cursor_target.execute(sql_script)        conn_target.commit()    else:      if exists_in_target:        print("----------------------- warning message -----------------------")        print("the target table type " + tab_name + " exists ,skiped sync table type from source")        print("----------------------- warning message -----------------------")        print()        return    sql_script = r'''                     DECLARE @SQL NVARCHAR(MAX) = ''                  SELECT @SQL =                  'CREATE TYPE ' + '{0}' + 'AS TABLE' + CHAR(13) + '(' + CHAR(13) +                  STUFF((                          SELECT CHAR(13) + '  , [' + c.name + '] ' +                            CASE WHEN c.is_computed = 1                              THEN 'AS ' + OBJECT_DEFINITION(c.[object_id], c.column_id)                              ELSE                                CASE WHEN c.system_type_id != c.user_type_id                                  THEN '[' + SCHEMA_NAME(tp.[schema_id]) + '].[' + tp.name + ']'                                  ELSE '[' + UPPER(y.name) + ']'                                END +                                CASE                                  WHEN y.name IN ('varchar', 'char', 'varbinary', 'binary')                                    THEN '(' + CASE WHEN c.max_length = -1                                            THEN 'MAX'                                            ELSE CAST(c.max_length AS VARCHAR(5))                                          END + ')'                                  WHEN y.name IN ('nvarchar', 'nchar')                                    THEN '(' + CASE WHEN c.max_length = -1                                            THEN 'MAX'                                            ELSE CAST(c.max_length / 2 AS VARCHAR(5))                                          END + ')'                                  WHEN y.name IN ('datetime2', 'time2', 'datetimeoffset')                                    THEN '(' + CAST(c.scale AS VARCHAR(5)) + ')'                                  WHEN y.name = 'decimal'                                    THEN '(' + CAST(c.[precision] AS VARCHAR(5)) + ',' + CAST(c.scale AS VARCHAR(5)) + ')'                                  ELSE ''                                END +                                CASE WHEN c.collation_name IS NOT NULL AND c.system_type_id = c.user_type_id                                  THEN ' COLLATE ' + c.collation_name                                  ELSE ''                                END +                                CASE WHEN c.is_nullable = 1                                  THEN ' NULL'                                  ELSE ' NOT NULL'                                END +                                CASE WHEN c.default_object_id != 0                                  THEN ' CONSTRAINT [' + OBJECT_NAME(c.default_object_id) + ']' +                                      ' DEFAULT ' + OBJECT_DEFINITION(c.default_object_id)                                  ELSE ''                                END                            END                          From sys.table_types tp                          Inner join sys.columns c on c.object_id = tp.type_table_object_id                          Inner join sys.types y ON y.system_type_id = c.system_type_id                           WHERE tp.is_user_defined = 1 and y.name<>'sysname'                          and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}'                          ORDER BY c.column_id                  FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 7, '   ')                    + ');'                  select @SQL as script                '''.format(self.format_object_name(self.format_object_name((tab_name))))    cursor_target = conn_target.cursor()    cursor_source.execute(sql_script)    row = cursor_source.fetchone()    try:      if not exists_in_target:        # execute the script on target server        cursor_target.execute(str(row[0])) # drop current stored_procudre if exists        conn_target.commit()        print("*************table type " + self.format_object_name(tab_name) + " synced *********************")        print() # give a blank row when finish    except:      print("----------------------- error message -----------------------")      print("-----------table type " + self.format_object_name(tab_name) + " synced error ---------------")      print("----------------------- error message -----------------------")      print()      # raise    cursor_source.close()    conn_source.close()    cursor_target.close()    conn_target.close()  # schema sync  def sync_schema(self):    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    arr_schema = []    # get all table in database when not define table name    schema_result = cursor_source.execute(r'''                         select name from sys.schemas where schema_id>4 and schema_id<16384                        ''')    for row in cursor_source.fetchall():      cursor_target.execute(r''' if not exists(select * from sys.schemas where name = '{0}')                         begin                           exec('create schema [{0}]')                         end                       '''.format(str(row[0])))      conn_target.commit()    cursor_source.close()    conn_source.close()    cursor_target.close()    conn_target.close()  def sync_table_schema_byname(self, tab_name, is_reference):    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    if (self.exits_object(conn_source, self.format_object_name(tab_name)) == 0):      print("----------------------- warning message -----------------------")      print("---------------warning: object " + tab_name + " not existing in source database ----------------")      print("----------------------- warning message -----------------------")      print()      return    # if exists a reference table for sp,not sync the table agagin    if (self.exits_object(conn_target, self.format_object_name(tab_name)) > 0):      if (self.f != "Y"):        print("----------------------- warning message -----------------------")        print("---------------warning: object " + tab_name + " existing in target database ----------------")        print("----------------------- warning message -----------------------")        print()        return    sql_script = r''' select top 1 1 from sys.tables                    where type_desc = 'USER_TABLE'                    and concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) = '{0}'                  '''.format((self.format_object_name(tab_name)))    # if the table schema exists in target server,skip    cursor_target.execute(sql_script)    exists_in_target = cursor_target.fetchone()    if exists_in_target:      if (self.f == "Y"):        if (is_reference != "Y"):          cursor_target.execute("drop table {0}".format(tab_name))      else:        print("----------------------- warning message -----------------------")        print("the target table " + tab_name + " exists ,skiped sync table schema from source")        print("----------------------- warning message -----------------------")        print()        return    sql_script = r'''   DECLARE                    @object_name SYSNAME                  , @object_id INT                SELECT                    @object_name = '[' + s.name + '].[' + o.name + ']'                  , @object_id = o.[object_id]                FROM sys.objects o WITH (NOWAIT)                JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id]                WHERE QUOTENAME(s.name) + '.' + QUOTENAME(o.name) = '{0}'                  AND o.[type] = 'U'                  AND o.is_ms_shipped = 0                DECLARE @SQL NVARCHAR(MAX) = ''                ;WITH index_column AS                 (                  SELECT                      ic.[object_id]                    , ic.index_id                    , ic.is_descending_key                    , ic.is_included_column                    , c.name                  FROM sys.index_columns ic WITH (NOWAIT)                  JOIN sys.columns c WITH (NOWAIT) ON ic.[object_id] = c.[object_id] AND ic.column_id = c.column_id                  WHERE ic.[object_id] = @object_id                ),                fk_columns AS                 (                   SELECT                      k.constraint_object_id                    , cname = c.name                    , rcname = rc.name                  FROM sys.foreign_key_columns k WITH (NOWAIT)                  JOIN sys.columns rc WITH (NOWAIT) ON rc.[object_id] = k.referenced_object_id AND rc.column_id = k.referenced_column_id                   JOIN sys.columns c WITH (NOWAIT) ON c.[object_id] = k.parent_object_id AND c.column_id = k.parent_column_id                  WHERE k.parent_object_id = @object_id                )                SELECT @SQL = 'CREATE TABLE ' + @object_name + '' + '(' + '' + STUFF((                  SELECT '' + ', [' + c.name + '] ' +                     CASE WHEN c.is_computed = 1                      THEN 'AS ' + cc.[definition]                       ELSE UPPER(tp.name) +                         CASE WHEN tp.name IN ('varchar', 'char', 'varbinary', 'binary', 'text')                            THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS VARCHAR(5)) END + ')'                           WHEN tp.name IN ('nvarchar', 'nchar')                            THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS VARCHAR(5)) END + ')'                           WHEN tp.name IN ('datetime2', 'time2', 'datetimeoffset')                             THEN '(' + CAST(c.scale AS VARCHAR(5)) + ')'                           WHEN tp.name = 'decimal'                             THEN '(' + CAST(c.[precision] AS VARCHAR(5)) + ',' + CAST(c.scale AS VARCHAR(5)) + ')'                          ELSE ''                        END +                        CASE WHEN c.collation_name IS NOT NULL THEN ' COLLATE ' + c.collation_name ELSE '' END +                        CASE WHEN c.is_nullable = 1 THEN ' NULL' ELSE ' NOT NULL' END +                        CASE WHEN dc.[definition] IS NOT NULL THEN ' DEFAULT' + dc.[definition] ELSE '' END +                         CASE WHEN ic.is_identity = 1 THEN ' IDENTITY(' + CAST(ISNULL( /*ic.seed_value*/ 1, '0') AS CHAR(1)) + ',' + CAST(ISNULL(ic.increment_value, '1') AS CHAR(1)) + ')' ELSE '' END                     END + ''                  FROM sys.columns c WITH (NOWAIT)                  JOIN sys.types tp WITH (NOWAIT) ON c.user_type_id = tp.user_type_id                  LEFT JOIN sys.computed_columns cc WITH (NOWAIT) ON c.[object_id] = cc.[object_id] AND c.column_id = cc.column_id                  LEFT JOIN sys.default_constraints dc WITH (NOWAIT) ON c.default_object_id != 0 AND c.[object_id] = dc.parent_object_id AND c.column_id = dc.parent_column_id                  LEFT JOIN sys.identity_columns ic WITH (NOWAIT) ON c.is_identity = 1 AND c.[object_id] = ic.[object_id] AND c.column_id = ic.column_id                  WHERE c.[object_id] = @object_id                  ORDER BY c.column_id                  FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '' + ' ')                  + ISNULL((SELECT '' + ', CONSTRAINT [' + k.name + '] PRIMARY KEY (' +                           (SELECT STUFF((                             SELECT ', [' + c.name + '] ' + CASE WHEN ic.is_descending_key = 1 THEN 'DESC' ELSE 'ASC' END                             FROM sys.index_columns ic WITH (NOWAIT)                             JOIN sys.columns c WITH (NOWAIT) ON c.[object_id] = ic.[object_id] AND c.column_id = ic.column_id                             WHERE ic.is_included_column = 0                               AND ic.[object_id] = k.parent_object_id                                AND ic.index_id = k.unique_index_id                                FOR XML PATH(N''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, ''))                      + ')' + ''                      FROM sys.key_constraints k WITH (NOWAIT)                      WHERE k.parent_object_id = @object_id                         AND k.[type] = 'PK'), '') + ')' + ''                  + ISNULL((SELECT (                    SELECT '' +                       'ALTER TABLE ' + @object_name + ' WITH'                       + CASE WHEN fk.is_not_trusted = 1                         THEN ' NOCHECK'                         ELSE ' CHECK'                        END +                        ' ADD CONSTRAINT [' + fk.name + '] FOREIGN KEY('                        + STUFF((                        SELECT ', [' + k.cname + ']'                        FROM fk_columns k                        WHERE k.constraint_object_id = fk.[object_id] and 1=2                        FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '')                        + ')' +                       ' REFERENCES [' + SCHEMA_NAME(ro.[schema_id]) + '].[' + ro.name + '] ('                       + STUFF((                        SELECT ', [' + k.rcname + ']'                        FROM fk_columns k                        WHERE k.constraint_object_id = fk.[object_id]                        FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '')                        + ')'                      + CASE                         WHEN fk.delete_referential_action = 1 THEN ' ON DELETE CASCADE'                         WHEN fk.delete_referential_action = 2 THEN ' ON DELETE SET NULL'                        WHEN fk.delete_referential_action = 3 THEN ' ON DELETE SET DEFAULT'                         ELSE ''                        END                      + CASE                         WHEN fk.update_referential_action = 1 THEN ' ON UPDATE CASCADE'                        WHEN fk.update_referential_action = 2 THEN ' ON UPDATE SET NULL'                        WHEN fk.update_referential_action = 3 THEN ' ON UPDATE SET DEFAULT'                         ELSE ''                        END                       + '' + 'ALTER TABLE ' + @object_name + ' CHECK CONSTRAINT [' + fk.name + ']' + ''                    FROM sys.foreign_keys fk WITH (NOWAIT)                    JOIN sys.objects ro WITH (NOWAIT) ON ro.[object_id] = fk.referenced_object_id                    WHERE fk.parent_object_id = @object_id                    FOR XML PATH(N''), TYPE).value('.', 'NVARCHAR(MAX)')), '')                  + ISNULL(((SELECT                     '' + 'CREATE' + CASE WHEN i.is_unique = 1 THEN ' UNIQUE' ELSE '' END                         + ' NONCLUSTERED INDEX [' + i.name + '] ON ' + @object_name + ' (' +                        STUFF((                        SELECT ', [' + c.name + ']' + CASE WHEN c.is_descending_key = 1 THEN ' DESC' ELSE ' ASC' END                        FROM index_column c                        WHERE c.is_included_column = 0                          AND c.index_id = i.index_id                        FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')'                         + ISNULL('' + 'INCLUDE (' +                           STUFF((                          SELECT ', [' + c.name + ']'                          FROM index_column c                          WHERE c.is_included_column = 1                            AND c.index_id = i.index_id                          FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')', '') + ''                    FROM sys.indexes i WITH (NOWAIT)                    WHERE i.[object_id] = @object_id                      AND i.is_primary_key = 0                      AND i.[type] = 2                    FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)')                  ), '')                select @SQL as script '''.format(self.format_object_name(tab_name))    cursor_target = conn_target.cursor()    cursor_source.execute(sql_script)    row = cursor_source.fetchone()    if not row[0]:      return    try:      cursor_target.execute(row[0]) # drop current table schema if exists      conn_target.commit()      print("*************schema " + self.format_object_name(tab_name) + " synced *************")      print() # give a blank row when finish    except:      print("----------------------- warning message -----------------------")      print("-----------schema " + self.format_object_name(tab_name) + " synced failed---------------")      print("----------------------- warning message -----------------------")      print()    cursor_source.close()    conn_source.close()    cursor_target.close()    conn_target.close()  def get_table_column(self, conn, tab_name):    column_names = ""    conn = conn    cursor_source = conn.cursor()    # get object by name from source db    sql_script = r'''select name from sys.columns              where object_id = object_id('{0}') and is_computed=0 order by object_id                 '''.format(self.format_object_name(tab_name))    cursor_source.execute(sql_script)    result = cursor_source.fetchall()    for row in result:      column_names = column_names + row[0] + ","    return column_names[0:len(column_names) - 1]    conn.cursor.close()    conn.close()  def sync_table_schema(self):    #default not sync by referenced other object    is_reference = "N"    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    arr_table = []    if (self.s_obj):      for tab_name in self.s_obj.split(","):        if (tab_name) and (self.exits_object(conn_source, tab_name)>0):          self.sync_table_schema_byname(tab_name, is_reference)        else:          print("----------------------- warning message -----------------------")          print("-----------schema " + self.format_object_name(tab_name) + " not existing in source database---------------")          print("----------------------- warning message -----------------------")          print()    else:      # sync all tables      # get all table in database when not define table name      sql_script = ''' SELECT QUOTENAME(s.name)+'.'+ QUOTENAME(o.name)               FROM sys.objects o WITH (NOWAIT)               JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id]               WHERE o.[type] = 'U' AND o.is_ms_shipped = 0             '''      cursor_source.execute(sql_script)      for row in cursor_source.fetchall():        self.sync_table_schema_byname(str(row[0]), is_reference)  # sync data from soure table to target table  def sync_table_data(self):    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    arr_table = []    if (self.s_obj):      arr_table = self.s_obj.split(',')      for tab_name in arr_table:        if (self.exits_object(conn_target, self.format_object_name(tab_name)) == 0):          arr_table.remove(tab_name)          print("----------------- warning message -----------------------")          print("----------------- warning: table " + tab_name + " not existing in target database ---------------------")          print("----------------- warning message -----------------------")    else:      # get all table in database when not define table name      tab_result = cursor_source.execute(r''' SELECT QUOTENAME(s.name)+'.'+ QUOTENAME(o.name)                          FROM sys.objects o WITH (NOWAIT)                          JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id]                          WHERE o.[type] = 'U'                            AND o.is_ms_shipped = 0                      ''')      for row in cursor_source.fetchall():        arr_table.append(str(row[0]))    insert_columns = ""    insert_columns = self.get_table_column(conn_source, tab_name)    for tab_name in arr_table:      if (self.f != "Y"):        sql_script = "select top 1 {0} from {1} ".format(insert_columns, tab_name)        # if exists data in target table,break        cursor_target.execute(sql_script)        exists = cursor_target.fetchone()        if exists:          print("----------------------- warning message -----------------------")          print("the target table " + tab_name + " exists data,skiped sync table type from source")          print("----------------------- warning message -----------------------")          print()          continue      else:        sql_script = "truncate table {0} ".format(tab_name)        # if exists data in target table,break        cursor_target.execute(sql_script)        conn_target.commit()      insert_columns = ""      insert_columns = self.get_table_column(conn_source, tab_name)      insert_prefix = ""      # weather has identity column      cursor_source.execute(r'''select 1 from sys.columns                   where object_id = OBJECT_ID('{0}') and is_identity =1                  '''.format(tab_name))      exists_identity = None      exists_identity = cursor_source.fetchone()      if (exists_identity):        insert_prefix = "set identity_insert {0} on; ".format(tab_name)      # data source      insert_sql = ""      values_sql = ""      current_row = ""      counter = 0      sql_script = r''' select {0} from {1}  '''.format(insert_columns, tab_name)      cursor_source.execute(sql_script)      # create insert columns      '''      for field in cursor_source.description:        insert_columns = insert_columns + str(field[0]) + ","      insert_columns = insert_columns[0:len(insert_columns) - 1]      '''      insert_prefix = insert_prefix + "insert into {0} ({1}) values ".format(tab_name, insert_columns)      for row in cursor_source.fetchall():        counter = counter + 1        for key in row:          if (str(key) == "None"):            current_row = current_row + r''' null, '''          else:            if (type(key) is datetime.datetime):              current_row = current_row + r''' '{0}', '''.format(str(key)[0:23])            elif (type(key) is str):              # 我槽!!!,這里又有一個坑:https://blog.csdn.net/dadaowuque/article/details/81016127              current_row = current_row + r''' '{0}', '''.format(                key.replace("'", "''").replace('/u0000', '').replace('/x00', ''))            elif (type(key) is Decimal):              d = Decimal(key)              s = '{0:f}'.format(d)              current_row = current_row + r''' '{0}', '''.format(s)            elif (type(key) is bytes):              # print(hex(int.from_bytes(key, 'big', signed=True) ))              current_row = current_row + r''' {0}, '''.format(                hex(int.from_bytes(key, 'big', signed=False)))            else:              current_row = current_row + r''' '{0}', '''.format(key)        current_row = current_row[0:len(current_row) - 2] # remove the the last one char ","        values_sql = values_sql + "(" + current_row + "),"        current_row = ""        # execute the one batch when        if (counter == 1000):          insert_sql = insert_prefix + values_sql          insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char ","          if (exists_identity):            insert_sql = insert_sql + " ;set identity_insert {0} off;".format(tab_name)          try:            cursor_target.execute(insert_sql)          except:            print(              "----------------------error " + tab_name + " data synced failed-------------------------")            raise          conn_target.commit()          insert_sql = ""          values_sql = ""          current_row = ""          counter = 0          print(time.strftime("%Y-%m-%d %H:%M:%S",                    time.localtime()) + "*************** " + self.format_object_name(            tab_name) + " " + str(1000) + " rows synced *************")      if (values_sql):        insert_sql = insert_prefix + values_sql        insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char ","        if (exists_identity):          insert_sql = insert_sql + " ; set identity_insert {0} off;".format(tab_name)        # execute the last batch        try:          cursor_target.execute(insert_sql)        except:          print("------------------error " + tab_name + " data synced failed------------------------")          raise        conn_target.commit()        insert_sql = ""        values_sql = ""        current_row = ""        print(time.strftime("%Y-%m-%d %H:%M:%S",                  time.localtime()) + "*************** " + self.format_object_name(          tab_name) + " " + str(          counter) + " rows synced *************")        print(time.strftime("%Y-%m-%d %H:%M:%S",                  time.localtime()) + "----------------synced " + self.format_object_name(          tab_name) + " data finished---------------")        print()    cursor_source.close()    conn_source.close()    cursor_target.close()    conn_target.close()  def sync_dependent_object(self, obj_name):    # 強制覆蓋,不需要對依賴對象生效,如果是因為屬于依賴對象而被同步的,先檢查target中是否存在,如果存在就不繼續同步,這里打一個標記來實現    is_refernece = "Y"    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    '''    find dependent objects    if exists dependent objects,sync Dependent objects objects in advance    '''    sql_check_dependent = r'''                      SELECT * FROM              (                SELECT                distinct rtrim(lower(s.type)) COLLATE Chinese_PRC_CI_AS as obj_type,                 QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name) COLLATE Chinese_PRC_CI_AS as obj                FROM sys.dm_sql_referenced_entities('{0}','OBJECT') as d                  inner join sys.sysobjects s on s.id = d.referenced_id                union all                SELECT                distinct rtrim(lower(d.referenced_class_desc)) COLLATE Chinese_PRC_CI_AS as obj_type,                QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name) COLLATE Chinese_PRC_CI_AS as obj                FROM sys.dm_sql_referenced_entities('{0}','OBJECT') as d                  inner join sys.types s on s.user_type_id = d.referenced_id              )t               '''.format(self.format_object_name(obj_name))    cursor_source.execute(sql_check_dependent)    result = cursor_source.fetchall()    for row in result:      if row[1]:        if (row[0] == "u"):          if (row[1]):            self.sync_table_schema_byname(row[1], is_refernece)        elif (row[0] == "fn" or row[0] == "if"):          if (row[1]):            self.sync_procudre_by_name("f", row[1], is_refernece)        elif (row[0] == "type"):          if (row[1]):            self.sync_table_variable(row[1], is_refernece)  def sync_procudre_by_name(self, type, obj_name, is_reference):    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    if (self.exits_object(conn_source, self.format_object_name(obj_name)) == 0):      print("---------------warning message----------------")      print("---------------warning: object " + obj_name + " not existing in source database ----------------")      print("---------------warning message----------------")      print()      return    if (self.exits_object(conn_target, self.format_object_name(obj_name)) > 0):      if (self.f != "Y"):        print("---------------warning message----------------")        print("---------------warning: object " + obj_name + " existing in target database ----------------")        print("---------------warning message----------------")        print()        return    '''     本來想直接生成刪除語句的:      這里有一個該死的轉義,怎么都弄不好,中午先去吃飯吧,    下午回來想了一下,換一種方式,不要死磕轉義問題了    sql_script =           select          'if object_id('+''''+QUOTENAME(schema_name(uid))+ '' + QUOTENAME(name)+''''+') is not null '          +' drop proc '+QUOTENAME(schema_name(uid))+ '.' + QUOTENAME(name) ,          OBJECT_DEFINITION(id)          from sys.sysobjects where xtype = 'P' and uid not in (16,19)    '''    sql_script = r'''          select          QUOTENAME(schema_name(uid))+'.'+QUOTENAME(name),          OBJECT_DEFINITION(id)          from sys.sysobjects where xtype in ('P','IF','FN') and uid not in (16,19)         '''    if (obj_name):      sql_script = sql_script + " and QUOTENAME(schema_name(uid))+ '.' + QUOTENAME(name) ='{0}' ".format(        self.format_object_name(obj_name))    cursor_source.execute(sql_script)    row = cursor_source.fetchone()    try:      if type == "f":        sql_script = r'''                if object_id('{0}') is not null                   drop function {0}              '''.format(self.format_object_name(row[0]))      elif type == "p":        sql_script = r'''                if object_id('{0}') is not null                   drop proc {0}              '''.format(self.format_object_name(row[0]))      cursor_target.execute(sql_script) # drop current stored_procudre if exists      conn_target.commit()      # sync dependent object      if (is_reference != "N"):        self.sync_dependent_object(self.format_object_name(row[0]))      # sync object it self      cursor_target.execute(str(row[1])) # execute create stored_procudre script      conn_target.commit()      print("*************sync sp: " + self.format_object_name(row[0]) + " finished *****************")      print()    except:      print("---------------error message----------------")      print("------------------ sync " + row[0] + "sp error --------------------------")      print("---------------error message----------------")      print()    cursor_source.close()    conn_source.close()    cursor_target.close()    conn_target.close()  def sync_procudre(self, type):    is_reference = "N"    conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)    conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)    cursor_source = conn_source.cursor()    cursor_target = conn_target.cursor()    if (self.s_obj):      for proc_name in self.s_obj.split(","):        self.sync_dependent_object(proc_name)        self.sync_procudre_by_name(type, proc_name, is_reference)    # sync all sp and function    else:      sql_script = r'''            select            QUOTENAME(schema_name(uid))+'.'+QUOTENAME(name),            OBJECT_DEFINITION(id)            from sys.sysobjects where xtype = upper('{0}') and uid not in (16,19)           '''.format(type)      cursor_source.execute(sql_script)      for row in cursor_source.fetchall():        self.sync_dependent_object(row[0])        self.sync_procudre_by_name(type, row[0], is_reference)if __name__ == "__main__":  '''  sync = SyncDatabaseObject(s_h="127.0.0.1",               s_i = "sql2017",               s_P = 49744,               s_d="DB01",               t_h="127.0.0.1",               t_i="sql2017",               t_P=49744,               t_d="DB02",               s_obj_type = "sp",               s_obj = "dbo.sp_test01",               f="Y")  sync.sync_procudre("p")  '''  p_s_h = ""  p_s_i = "MSSQL"  p_s_P = 1433  p_s_d = ""  p_s_u = None  p_s_p = None  p_s_obj = ""  p_type = ""  p_t_s = ""  p_t_i = "MSSQL"  p_t_P = "1433"  p_t_d = ""  p_t_u = None  p_t_p = None  # force conver target database object,default not force cover target database object  p_f = "N"  # sync obj type table|sp  p_obj_type = None  # sync whick database object  p_obj = None  if len(sys.argv) == 1:    print(usage)    sys.exit(1)  elif sys.argv[1] == '--help':    print(usage)    sys.exit()  elif len(sys.argv) >= 2:    for i in sys.argv[1:]:      _argv = i.split('=')      # source server name      if _argv[0] == '-s_h':        p_s_h = _argv[1]      # source server instance name      if _argv[0] == '-s_i':        if (_argv[1]):          p_s_i = _argv[1]      # source server instance PORT      if _argv[0] == '-s_P':        if (_argv[1]):          p_s_P = _argv[1]      # source database name      if _argv[0] == '-s_d':        p_s_d = _argv[1]      if _argv[0] == '-s_u':        p_s_u = _argv[1]      if _argv[0] == '-s_p':        p_s_p = _argv[1]      if _argv[0] == '-t_h':        p_t_h = _argv[1]      if _argv[0] == '-t_i':        if (_argv[1]):          p_t_i = _argv[1]      if _argv[0] == '-t_P':        if (_argv[1]):          p_t_P = _argv[1]      if _argv[0] == '-t_d':        p_t_d = _argv[1]      if _argv[0] == '-t_u':        p_t_u = _argv[1]      if _argv[0] == '-t_p':        p_t_p = _argv[1]      if _argv[0] == '-f':        if (_argv[1]):          p_f = _argv[1]      # object type      if _argv[0] == '-obj_type':        if not (_argv[1]):          print("-obj_type can not be null (-obj=tab|-obj=sp|-obj=fn|-obj=type)")          exit(0)        else:          p_obj_type = _argv[1]      # object name      if _argv[0] == '-obj':        if (_argv[1]):          p_obj = _argv[1]    # require para    if p_s_h.strip() == "":      print("source server host cannot be null")      exit(0)    if p_s_d.strip() == "":      print("source server host database name cannot be null")      exit(0)    if p_t_h.strip() == "":      print("target server host cannot be null")      exit(0)    if p_t_d.strip() == "":      print("target server host database name cannot be null")      exit(0)    sync = SyncDatabaseObject(s_h=p_s_h,                 s_i=p_s_i,                 s_P=p_s_P,                 s_d=p_s_d,                 s_u=p_s_u,                 s_p=p_s_p,                 s_obj=p_obj,                 t_h=p_t_h,                 t_i=p_t_i,                 t_P=p_t_P,                 t_d=p_t_d,                 t_u=p_t_u,                 t_p=p_t_p,                 f=p_f)    sync.validated_connect(p_s_h, p_s_i, p_s_P, p_s_d, p_s_u, p_s_p)    sync.validated_connect(p_t_h, p_t_i, p_t_P, p_t_d, p_t_u, p_t_p)    if (p_f.upper() == "Y"):      confirm = input("confirm you want to overwrite the target object? ")      if confirm.upper() != "Y":        exit(0)    print("-------------------------- sync begin ----------------------------------")    print()    if (p_obj_type == "tab"):      # sync schema      sync.sync_schema()      # sync table schema      sync.sync_table_schema()      # sync data      sync.sync_table_data()    elif (p_obj_type == "sp"):      # sync schema      sync.sync_schema()      # sync sp      sync.sync_procudre("p")    elif (p_obj_type == "fn"):      # sync schema      sync.sync_schema()      # sync sp      sync.sync_procudre("fn")    elif (p_obj_type == "tp"):      # sync schema      sync.sync_schema()      # sync sp      sync.sync_table_variable()    else:      print("-obj_type is not validated")    print()    print("-------------------------- sync finish ----------------------------------")

總結

以上所述是小編給大家介紹的基于Python的SQL Server數據庫實現對象同步輕量級,希望對大家有所幫助,如果大家有任何疑問歡迎給我留言,小編會及時回復大家的!

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 郧西县| 宁晋县| 宁城县| 珲春市| 雷波县| 佛学| 无锡市| 昭苏县| 寿宁县| 安庆市| 大埔区| 手游| 封开县| 突泉县| 汉沽区| 栖霞市| 阿城市| 渭源县| 乡城县| 明溪县| 广灵县| 个旧市| 肇东市| 法库县| 聂荣县| 正定县| 马边| 饶平县| 黄大仙区| 景德镇市| 禄劝| 科技| 韶关市| 延川县| 宝兴县| 洛阳市| 通州区| 镇坪县| 长乐市| 昆明市| 和政县|