基于Python的SQL Server数据库实现对象同步轻量级

平面设计 2025-04-06 03:54www.168986.cn平面设计培训

Python实现SQL Server数据库对象同步的轻量级方法

在日常工作中,我们经常遇到一个棘手的问题:如何将一个服务器上的某些特定表同步到另一个服务器上?尽管SSIS或其他ETL工具可以解决这个问题,但它们常常需要进行大量的重复手工操作。这无疑增加了工作量,并可能导致工作效率低下。我们有必要寻找一种更快速、更简便的解决方案。于是,本文应运而生。我们将基于Python来实现SQL Server数据库之间的快速数据同步。该方法还具有同步表结构、数据、存储过程、函数和用户自定义表类型的功能。目前支持SQL Server 2012及以上版本。

该解决方案旨在通过简单的命令行操作实现数据同步,尽量减少重复的手工操作。其主要思想是通过Python脚本连接源数据库和目标数据库,然后按照一定的逻辑进行数据的同步操作。在这个过程中,我们需要考虑一些基本的问题,比如在源服务器上需要同步的对象是否存在,输入的对象是否存在于源服务器的数据库里等。我们还需要考虑目标服务器上表的存在性、数据的覆盖方式以及存储过程的依赖关系等问题。在实现过程中,我们遇到了一些挑战,如对象之间的依赖关系可能导致递归的问题等。但通过重构代码并遵循单一职责原则,我们成功地解决了这些问题。

针对数据库同步任务的深入

在进行数据库同步操作时,我们需要注意一些关键细节以确保数据的完整性和准确性。本文将详细介绍在执行数据库同步时需要考虑的几个重要因素,并阐述其背后的原理和逻辑。

二、SQL语句的转义处理

在构建SQL语句时,转义处理是非常关键的步骤。如果不进行正确的转义处理,可能会导致SQL语句错误,进而影响数据同步的效果。特别地,我们需要关注字符串中的引号字符、二进制字段以及时间字段的转义处理,因为这些是最容易出现问题的部分。

三、Insert语句的限制与处理策略

Insert into values(),(),()语法有一定的限制,特别是针对允许的最大值。当生成的数据量达到或超过这个限制时,我们需要采取适当的策略来处理。通常,每生成1000条数据,我们会进行一次同步,以确保数据的完整性和准确性。

四、自增列的identity_insert标识管理

五、表结构与存储过程的同步

除了基本的表数据同步外,我们还需要考虑表结构和存储过程的同步。在同步表结构时,我们需要注意过滤索引和特殊的表结构导出语句的处理。对于存储过程的同步,我们需要先同步依赖的schema和对象,然后再同步存储过程本身。如果存储过程依赖于其他对象,我们需要先同步这些依赖对象,然后再进行存储过程的同步。我们还需要注意加密的存储过程和table type的同步问题。

六、测试与效果展示

在实际测试过程中,我们展示了同步源数据库的三张表和两个存储过程到目标数据库的效果。我们还详细说明了在同步过程中遇到的一些问题和挑战,例如处理索引为filter index的表、支持其他少用的类型字段等。通过展示同步某一个对象的依赖对象的过程,我们进一步阐述了依赖对象的管理和同步的重要性。

处理依赖对象的问题确实是一个不小的挑战,有时真让人头疼。在默认schema为dbo的环境中,如果不明确指定表的schema,如从dbo.test01简化为test01,那么使用sys.dm_sql_referenced_entities系统函数时,竟然无法准确找到依赖的对象。更奇怪的是,它虽然能识别schema的类型,却无法返回具体的对象。这一问题在代码中层层渗透,导致了我长时间的debug过程。这函数的“表现”让人有些措手不及,能够找到依赖对象的类型,却找不到对象本身,这确实是个难题!

遇到动态SQL的情况更是如此,sys.dm_sql_referenced_entities系统函数在寻找依赖对象时似乎遭遇了瓶颈。

其他对象的同步

除了基础的数据库对象如表,我们还支持function、table type等其他数据库对象的同步。在同步存储过程对象时,可以附带同步function和table type。这方面的操作与表和存储过程的同步类似,就不做过多阐述了。

已知问题

经过测试,我们发现暂时不支持Sequence对象的同步,这是目前的一个明显短板。

需要改进的地方

1. 代码结构优化:我们希望能够进一步优化代码结构,使其更加清晰、有条理。虽然一开始我们采用了直接且快速的方式实现功能,但后续进行了大量代码重构,目前看来仍有许多需要改进的地方。

2. 数据同步效率问题:对于涉及多表的导入导出操作,当前的操作主要依赖于单线程。当多个大表需要串行导出时,可能存在效率上的瓶颈。我们需要如何根据表的数据量,更智能地分配多线程,以平衡负载并提高效率。

3. 异常提示与日志记录的优化:我们需要提供更友好、更清晰的异常提示信息,并加强日志记录功能,特别是在导出过程中生成详细的日志信息。

4. 异构数据同步的拓展:目前我们的系统主要支持SQL Server,但我们希望能够进一步拓展,实现与MySQL、Oracle以及PGSQL等数据库的异构数据同步。

端午节的钟声已经远去,代码也完成了初步的编写。这几天我进行了一系列的测试及bug修复工作,尽管取得了一些进展,但仍然可能存在未知的bug。工作量远超我的预期,但我相信通过不断的努力和改进,我们能够克服所有挑战。您的代码主要关于数据库的同步操作,从源数据库同步数据或表结构到目标数据库。代码以Python编写,使用了pymssql库来连接和操作SQL Server数据库。这部分代码提供了一个类`SyncDatabaseObject`,该类包含了一些方法,如连接数据库、检查连接、格式化对象名、检查对象是否存在、同步表变量、同步表结构、同步数据等。

```python

__author__ = 'MSSQL123'

__date__ = '2019-06-07 09:36'

import os

import sys

import time

import datetime

import pymssql

from decimal import Decimal

命令行参数解释

print("--参数解释--")

print("源数据库参数:")

print("-s_h : 源数据库主机名 -- 必须参数")

print("-s_i : 源数据库实例名 -- 默认实例名MSSQL")

print("-s_d : 源数据库名 -- 必须参数")

print("-s_u : 源数据库登录 -- 默认Windows标识符")

print("-s_p : 源数据库登录密码 -- 当s_u不为空时必须提供")

print("-s_P : 源数据库实例端口 -- 默认端口1433")

print("目标数据库参数:")

print("-t_h : 目标数据库主机名 -- 必须参数")

print("-t_i : 目标数据库实例名 -- 默认实例名MSSQL")

print("-t_d : 目标数据库名 -- 必须参数")

print("-t_u : 目标数据库登录 -- 默认Windows标识符")

print("-t_p : 目标数据库登录密码 -- 当s_u不为空时必须提供")

print("-t_P : 目标数据库实例端口 -- 默认端口1433")

print("同步对象参数:")

print("-obj : 表|sp|函数|类型名称 -- 哪个表或存储过程同步")

print("覆盖参数:")

print("-f : 强制覆盖目标数据库对象 -- Y或N")

print("--帮助文档--")

示例

print("示例用法:")

print("python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i=\"MSSQL\" -s_d=\"DB01\" -obj_type=\"tab\" -obj=\"dbo.t1,dbo.t2\" -t_h=127.0.0.1 -t_P=1433 -t_i=\"MSSQL\" -t_d=\"DB02\" -f=\"Y\"")

print("python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i=\"MSSQL\" -s_d=\"DB01\" -obj_type=\"sp\" -obj=\"dbo.sp1,dbo.sp2\" -t_h=127.0.0.1 -t_P=1433 -t_i=\"MSSQL\" -t_d=\"DB02\" -f=\"Y\"")

class SyncDatabaseObject(object):

源数据库

s_h = None

s_i = None

s_P = None

s_u = None

s_p = None

s_d = None

对象类型

s_obj_type = None

同步对象

s_obj = None

目标数据库

t_h = None

t_i = None

t_P = None

t_u = None

t_p = None

t_d = None

f = None

file_path = None

def __init__(self, args, kwargs):

for k, v in kwargs.items():

setattr(self, k, v)

连接到SQL Server

def get_connect(self, _h, _i, _P, _u, _p, _d):

cursor = False

try:

if (_u) and (_p):

conn = pymssql.connect(host=_h, server=_i, port=_P, user=_u, password=_p, database=_d)

else:

conn = pymssql.connect(host=_h, server=_i, port=_P, database=_d)

if (conn):

return conn

except:

raise

return conn

检查连接

def validated_connect(self, _h, _i, _P, _u, _p, _d):

if not (self.get_connect(_h, _i, _P, _u, _p, _d)):

print("连接到" + str(_h) + "失败,请检查您的参数")

exit(0)

格式化对象名

@staticmethod

def format_object_name(name):

format_name = ""

if ("." in name):

schema_name = name[0:name.find(".")]

object_name = name[name.find(".") + 1:]

if not ("[" in schema_name):

schema_name = "[" + schema_name + "]"

if not ("[" in object_name):

object_name = "[" + object_name + "]"

format_name = schema_name + "." + object_name

else:

if ("[" in name):

format_name = "[dbo]." + name

else:

format_name = "[dbo]." + "[" + name + "]"

return format_name

检查用户输入的对象是否为有效对象

def exits_object(self, conn, name):

cursor_source = conn.cursor()

从源数据库获取对象

sql_script = r'''select 1 1 from

(

select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.objects

union all

select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.types

)t where obj_name = '{0}'

'''.format(self.format_object_name(name))

cursor_source.execute(sql_script)

result = cursor_source.fetchall()

if not result:

return 0

else:

return 1

cursor_source.close()

表变量同步

def sync_table_variable(self, tab_name, is_reference):

conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)

conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

if (self.exits_object(conn_source, self.format_object_name(tab_name)) > 0):

pass

else:

print("-- 警告信息 --")

print("--警告:对象" + tab_name + "在源数据库中不存在")

print("-- 警告信息 --")

print()

return

exists_in_target = 0

sql_script = r'''select 1 1

from sys.table_types tp

where is_user_defined = 1

and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}' ''' \

.format((self.format_object_name(tab_name)))

如果目标服务器中存在表模式,则跳过

cursor_target.execute(sql_script)

exists_in_target = cursor_target.fetchone()

是否在目标服务器上存在

if (self.f == "Y"):

if (is_reference != "Y"):

跳过,表类型在由sp使用时不能删除

sql_script = r'''

if OBJECT_ID('{0}') is not null

drop type {0}

'''.format(self.format_object_name(tab_name))

cursor_target.execute(sql_script)

conn_target.mit()

else:

if exists_in_target:

print("-- 警告信息 --")

print("目标表类型" + tab_name + "存在,从源同步表类型跳过")

print("-- 警告信息 --")

print()

return

创建表类型的SQL脚本

sql_script = r'''

DECLARE @SQL NVARCHAR(MAX) = ''

SELECT @SQL =

'CREATE TYPE ' + '{0}' + 'AS TABLE' + CHAR(13) + '(' + CHAR(13) +

STUFF((

SELECT CHAR(13) + ' , [' + c.name + '] ' +

CASE WHEN c.is_puted = 1

THEN 'AS ' + OBJECT_DEFINITION(c.[object_id], c.column_id)

ELSE

CASE WHEN c.system_type_id != c.user_type_id

THEN '[' + SCHEMA_NAME(tp.[schema_id]) + '].[' + tp.name + ']'

ELSE '[' + UPPER(y.name) + ']'

END +

CASE

WHEN y.name IN ('varchar', 'char', 'varbinary', 'binary')

THEN '(' + CASE WHEN c.max_length = -1

THEN 'MAX'

ELSE CAST(c.max_length AS VARCHAR(5))

END + ')'

WHEN y.name IN ('nvarchar', 'nchar')

THEN '(' + CASE WHEN c.max_length = -1

THEN 'MAX'

ELSE CAST(c.max_length / 2 AS VARCHAR(5))

END + ')'

WHEN y.name IN ('datetime2', 'time2', 'datetimeoffset')

THEN '(' + CAST(c.scale AS VARCHAR(5)) + ')'

WHEN y.name = 'decimal'

THEN '(' + CAST(c.[precision] AS VARCHAR(5)) + ',' + CAST(c.scale AS VARCHAR(5)) + ')'

ELSE ''

END +

CASE WHEN c.collation_name IS NOT NULL AND c.system_type_id = c.user_type_id

THEN ' COLLATE ' + c.collation_name

ELSE ''

END +

CASE WHEN c.is_nullable = 1

THEN ' NULL'

ELSE ' NOT NULL'

END +

CASE WHEN c.default_object_id != 0

THEN ' CONSTRAINT [' + OBJECT_NAME(c.default_object_id) + ']' +

' DEFAULT ' + OBJECT_DEFINITION(c.default_object_id)

ELSE ''

END

END

From sys.table_types tp

Inner join sys.columns c on c.object_id = tp.type_table_object_id

Inner join sys.types y ON y.system_type_id = c.system_type_id

WHERE tp.is_user_defined = 1 and y.name<>'sysname'

and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}'

ORDER BY c.column_id

FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 7, ' ')

+ ');'

select @SQL as script

'''.format(self.format_object_name(tab_name))

cursor_target = conn_target.cursor()

cursor_source.execute(sql_script)

row = cursor_source.fetchone()

try:

if not exists_in_target:

在目标服务器上执行脚本

cursor_target.execute(str(row[0]))

conn_target.mit()

print("表类型" + self.format_object_name(tab_name) + "已同步")

print()

except:

print("-- 错误信息 --")

print("--表类型" + self.format_object_name(tab_name) + "同步错误")

print("-- 错误信息 --")

print()

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

模式同步

def sync_schema(self):

conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)

conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

arr_schema = []

获取所有模式,如果没有定义表名

schema_result = cursor_source.execute(r'''

select name from sys.schemas where schema_id>4 and schema_id<16384

''')

for row in cursor_source.fetchall():

cursor_target.execute(r''' if not exists(select from sys.schemas where name = '{0}')

begin

exec('create schema [{0}]')

end

'''.format(str(row[0])))

conn_target.mit()

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

def sync_table_schema_byname(self, tab_name, is_reference):

conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)

conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

if (self.exits_object(conn_source, self.format_object_name(tab_name)) == 0):

print("-- 警告信息 --")

print("警告:对象" + tab_name + "在源数据库中不存在-")

print("-- 警告信息 --")

print()

return

如果存在对存储过程的引用表,则不再次同步表

if (self.exits_object(conn_target, self.format_object_name(tab_name)) > 0):

if (self.f != "Y"):

print("-- 警告信息 --")

print("警告:对象" + tab_name + "在目标数据库中存在-")

print("-- 警告信息 --")

print()

return

如果目标服务器中存在表模式,则跳过

sql_script = r''' select 1 1 from sys.tables

where type_desc = 'USER_TABLE'

and concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) = '{0}'

'''.format((self.format_object_name(tab_name)))

cursor_target.execute(sql_script)

exists_in_target = cursor_target.fetchone()

if exists_in_target:

if (self.f == "Y"):

if (is_reference != "Y"):

cursor_target.execute("drop table {0}".format(tab_name))

else:

print("-- 警告信息 --")

print("目标表" + tab_name + "存在,从源同步表结构跳过")

print("-- 警告信息 --")

print()

return

创建表结构的SQL脚本

sql_script = r''' DECLARE

@object_name SYSNAME

, @object_id INT

原文:

在繁华的都市中,有一个独特的角落,那里充满了艺术与文化的气息。小巷深处的画廊,是这座城市的秘密宝藏。每当夕阳西下,金色的阳光洒满画廊的外墙,与墙上的涂鸦交相辉映,仿佛诉说着古老与现代的故事。

走进画廊,你会被眼前的一切所震撼。每一个角落都充满了创意与灵感,画作、雕塑、摄影作品……琳琅满目,令人目不暇接。这些艺术品不仅仅是简单的创作,更是艺术家们灵魂的倾诉。他们用心血和汗水,将灵感融入每一件作品中,使其生动而独特。

在这里,时间仿佛凝固。你可以静静地欣赏每一件作品,感受其中的情感与故事。画廊的灯光柔和而温暖,为艺术品营造出完美的氛围。你可以在这里度过一个宁静的下午,或者一个悠长的夜晚。这里,是都市中的一片净土,让人心灵得到放松与滋养。

画廊的主人是一位热爱艺术的青年,他遍访名山大川,寻找灵感。他的作品中融入了各地的文化元素,展现出独特的魅力。他致力于推广艺术,让更多的人了解艺术、热爱艺术。他的梦想是将这个画廊打造成一个国际级的艺术殿堂,让世界各地的艺术家都能在这里展示自己的作品。

这个画廊,是都市中的一颗璀璨明珠,吸引着无数艺术爱好者前来朝圣。每一个角落都充满了故事,每一件作品都散发着独特的魅力。这里,不仅是艺术的天堂,更是心灵的港湾。

在喧嚣的都市之中,隐藏着一个艺术的圣地——画廊。它坐落于小巷深处,那里充满了神秘的艺术气息和浓厚的文化底蕴。每当夕阳西下时,金色的阳光洒在画廊的外墙上,与墙上的涂鸦相互映照,仿佛在诉说着一段古老与现代交织的故事。

踏入画廊的瞬间,你会被眼前的景象所震撼。这里是一个充满创意与灵感的殿堂,每一件艺术品都是艺术家的灵魂倾诉。从画作到雕塑,再到摄影作品,琳琅满目,令人目不暇接。艺术家们用心血和汗水将灵感融入每一件作品中,使其独具魅力且充满生命力。

在这里,时间似乎失去了意义。你可以尽情沉浸在艺术的世界里,感受每一件作品所传递的情感与故事。柔和而温暖的灯光为艺术品营造了完美的氛围,让人仿佛置身于一个梦幻的艺术王国。无论是度过一个宁静的午后还是悠长的夜晚,这里都能让人心灵得到放松与滋养。

画廊的主人是一位踏遍天下、热爱艺术的青年。他的作品中融合了各地的文化元素,展现出独特而丰富的艺术风格。他致力于艺术推广,希望通过自己的努力让更多的人了解艺术、热爱艺术。他的梦想是将这个画廊打造成一个国际级的艺术殿堂,吸引世界各地的艺术家来这里展示他们的才华和作品。

这个画廊不仅是艺术的圣地,更是心灵的港湾。每一个角落都散发着独特的魅力,吸引着无数艺术爱好者前来朝圣。这里不仅展示了艺术家的才华和创造力,更提供了一个让心灵得到滋养和放松的空间。这是一个基于Python的SQL Server数据库对象同步脚本。从代码中可以看出,它主要用于同步数据库中的表、存储过程、函数和类型等对象。以下是对代码主要功能的简要概述:

1. 连接数据库: 通过提供的源数据库和目标数据库的服务器名、实例名、端口号、数据库名、用户名和密码等信息,建立连接。

2. 验证连接: 通过`validated_connect`方法验证是否能够成功连接到源数据库和目标数据库。

3. 同步操作: 根据传入的参数(如对象类型、对象名等),执行相应的同步操作。

如果对象类型是"tab",则执行表同步,包括同步表结构(`sync_table_schema`)和表数据(`sync_table_data`)。

如果对象类型是"sp",则执行存储过程同步(`sync_procudre`)。

如果对象类型是"fn",则执行函数同步。

如果对象类型是"tp",则执行表变量同步。

4. 依赖对象同步: 在同步对象之前,会先检查是否存在依赖对象,如果存在,则先同步依赖对象(`sync_dependent_object`)。

5. 错误处理: 在执行同步操作时,如果发生错误,会捕获错误并打印错误信息。

6. 确认覆盖: 在执行覆盖操作前,会提示用户是否确认要覆盖目标对象。

7. 打印同步信息: 在同步开始和结束时,会打印相应的同步信息。

从整体来看,这是一个比较完整的数据库对象同步脚本,能够处理表、存储过程、函数和类型等对象的同步,并且考虑了依赖对象的处理。这个脚本还有很多可以优化的地方,比如错误处理、日志记录等。不过对于轻量级的需求来说,这个脚本应该是足够用的。在这浩瀚的数字世界中,您我相遇于此刻,目光聚焦于一个特殊的主题——“Cambrian”,其深邃的内涵如宇宙般广阔无垠。在这里,我们一同走进一个独特的时空,体验由“Cambrian”所渲染的奇妙世界。让我们一同这个神秘的世界,品味其深邃与独特之处。

Cambrian之神秘气息早已深深吸引了我们,它是一个时代的印记,是科技进步的象征。让我们置身于这一历史与现代交融的空间,感受那浓厚的科技氛围。随着“Cambrian”的渲染,我们的视线穿梭于过去与未来之间,目睹着世界不断变革的脉络。这是一场穿越时空的冒险之旅,也是一次深入自我与世界的奇妙旅程。

在这个充满想象力的世界里,“Cambrian”为我们呈现了一幅幅绚丽的画面。它如同一位艺术家,用独特的笔触描绘出未来的蓝图;又如同一位家,带领我们未知的领域。在这个世界里,我们不仅感受到了科技的魅力,更领略到了生活的美好。那些充满想象力的画面,让我们对未来充满期待与憧憬。

“Cambrian”还是一个充满无限可能的创新之地。在这里,各种新奇的想法如雨后春笋般涌现,为这个世界注入了源源不断的活力。这是一个充满机遇与挑战的时代,每一个人都可以成为这个时代的创造者,为这个世界贡献自己的力量。在这里,我们共同见证了一个又一个奇迹的诞生,感受到了人类智慧的无穷力量。

让我们继续在这片神奇的土地上,感受由“Cambrian”所渲染的无限魅力。这个世界充满了惊喜与机遇,让我们携手共创美好的未来。在这里,我们共同成长、共同进步,共同书写属于这个时代的辉煌篇章。

Copyright © 2016-2025 www.168986.cn 狼蚁网络 版权所有 Power by