当前位置:网站首页>数据库SqlServer迁移PostgreSql实践
数据库SqlServer迁移PostgreSql实践
2022-08-04 18:00:00 【51CTO】
背景
公司某内部系统属于商业产品,数据库性能已出现明显问题,服务经常卡死,员工经常反馈数据无法查询或不能及时查询,该系统所使用的数据库为SqlServer,SqlServer数据库属于商业数据库,依赖厂商的维护,且维护成本高,效率低,且存在版权等问题,考虑将该系统的数据库,迁移至PostGresql数据库,属于BSD的开源数据库,不存在版本问题,公司也有部分系统采用pg,维护成本也将大大减低。
迁移原理
SqlServer属于商业数据库,不可能像Mysql等数据库一样,去解析相关的数据库binlog,从而实现增量数据的回放,结合应用属性,最后确定采用离线迁移方式,从SqlServer中将表数据全部读出,然后将数据写入到pg中,采用此种方案的弊病就是程序端需停止写入(应用可将部分数据缓存到本地),等待数据库迁移完成后,程序端再迁移至PostGresql,迁移方法如下:

表结构迁移原理
表结构主要包含字段,索引,主键,外键等信息组成,主要采用开源工具sqlserver2pg进行表结构的转换
表结构转换
从SqlServer中读写表结构的字段信息,并对字段类型进行转换,转换核心代码如下
sub convert_type
{
my ($sqlstype, $sqlqual, $colname, $tablename, $typname, $schemaname) =
@_;
my $rettype;
if (defined $types{$sqlstype})
{
if ((defined $sqlqual and defined($unqual{$types{$sqlstype}}))
or not defined $sqlqual)
{
# This is one of the few types that have to be unqualified (binary type)
$rettype = $types{$sqlstype};
# but we might add a check constraint for binary data
if ($sqlstype =~ 'binary' and defined $sqlqual) {
print STDERR "convert_type: $sqlstype, $sqlqual, $colname\n";
my $constraint;
$constraint->{TYPE} = 'CHECK_BINARY_LENGTH';
$constraint->{TABLE} = $tablename;
$constraint->{TEXT} = "octet_length(" . format_identifier($colname) . ") <= $sqlqual";
push @{$objects->{SCHEMAS}->{$schemaname}->{TABLES}->{$tablename}
->{CONSTRAINTS}}, ($constraint);
}
}
elsif (defined $sqlqual)
{
$rettype = ($types{$sqlstype} . "($sqlqual)");
}
}
# A few special cases
elsif ($sqlstype eq 'bit' and not defined $sqlqual)
{
$rettype = "boolean";
}
elsif ($sqlstype eq 'ntext' and not defined $sqlqual)
{
$rettype = "text";
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
外键,索引,唯一键转换
主要是从sqlserver导出的表结构数据中,对相关的索引,外键等语句进行转换,转换核心代码如下
while (my ($schema, $refschema) = each %{$objects->{SCHEMAS}})
{
# Indexes
# They don't have a schema qualifier. But their table has, and they are in the same schema as their table
foreach my $table (sort keys %{$refschema->{TABLES}})
{
foreach
my $index (
sort keys %{$refschema->{TABLES}->{$table}->{INDEXES}})
{
my $index_created = 0;
my $idxref =
$refschema->{TABLES}->{$table}->{INDEXES}->{$index};
my $idxdef .= "";
if ($idxref->{DISABLE})
{
$idxdef .= "-- ";
}
$idxdef .= "CREATE";
if ($idxref->{UNIQUE})
{
$idxdef .= " UNIQUE";
}
if (defined $idxref->{COLS})
{
$idxdef .= " INDEX " . format_identifier($index) . " ON " . format_identifier($schema) . '.' . format_identifier($table) . " ("
. join(",", map{format_identifier_cols_index($_)} @{$idxref->{COLS}}) . ")";
if (defined $idxref->{INCLUDE}) {
$idxdef .= " INCLUDE (" .
join(",", map{format_identifier_cols_index($_)} @{$idxref->{INCLUDE}})
. ")";
}
if (not defined $idxref->{WHERE} and not defined $idxref->{DISABLE}) {
$idxdef .= ";\n";
print AFTER $idxdef;
# the possible comment would go to after file
$index_created = 1;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
数据类型转换原理
数据类型转换

函数类型转换

存储过程
视图部分需手动改造
迁移方法
表结构转换
表结构导入pg
数据迁移
数据比对
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author:jacker_zhou
@create_time: 2017-04-07
@overview: mssql pg
"""
__author__ = 'jacker_zhou'
__version__ = '0.1'
import psycopg2,pymssql
import types
import time
TableSpace='public.'
class CompareDataBase():
def __init__(self):
self.pgcnotallow=psycopg2.connect(database="test",host="127.0.0.1",port=15432,user="postgres",password="test")
self.mscnotallow=pymssql.connect(host="192.168.1.1",user="test",password="test",database="test")
def commit(self):
self.pgconn.commit()
def close(self):
self.pgconn.close()
self.msconn.close()
def rollback(self):
self.pgconn.rollback()
def exesyncdb(self):
mscursor=self.msconn.cursor()
sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ")
mscursor.execute(sql)
table=mscursor.fetchall()
print ("total table %d"%len(table))
if(table is None or len(table)<=0):
return
else:
for row in table:
self.executeTable(row[1],row[0])
print ("%s is execute success"%row[1])
def comparedb(self):
mscursor=self.msconn.cursor()
sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ")
mscursor.execute(sql)
table=mscursor.fetchall()
print ("total table %d"%len(table))
if(table is None or len(table)<=0):
return
else:
for row in table:
self.compareTable(row[1])
def executeTable(self,tablename,count):
#print tablename
sql1="SELECT * FROM %s"%tablename
print (sql1)
mscursor=self.msconn.cursor()
mscursor.execute(sql1)
table=mscursor.fetchall()
if(table is None or len(table)<=0):
mscursor.close()
return
lst_result=self.initColumn(table)
#print "column"
mscursor.close()
print ("execute sync %s data to postgresql"%tablename)
sql2=self.initPgSql(tablename,count)
pgcursor=self.pgconn.cursor()
pgcursor.executemany(sql2,lst_result)
pgcursor.close()
def compareTable(self,tablename):
#print tablename
sql1="SELECT count(*) FROM %s"%tablename
mscursor=self.msconn.cursor()
mscursor.execute(sql1)
ms_res=mscursor.fetchall()
mscursor.close()
pgcursor=self.pgconn.cursor()
pgcursor.execute(sql1)
pg_res=pgcursor.fetchall()
pgcursor.close()
res =""
if ms_res[0][0] == pg_res[0][0]:
res ="ok"
else:
res = "fail"
print ("execute compare table %s data postgresql: %s mssql:%s result: %s"%(tablename,pg_res[0][0],ms_res[0][0],res))
if __name__=="__main__":
sdb= CompareDataBase()
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print ("task start time %s"%start_time)
try:
sdb.comparedb()
except Exception as e:
print (e)
sdb.rollback()
else:
sdb.commit()
sdb.close()
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print ("task end time %s"%end_time)
print ("ok........" )
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
参考
边栏推荐
猜你喜欢
随机推荐
How to recruit programmers
力扣学习---0804
不论你是大众,科班和非科班,我这边整理很久,总结出的学习路线,还不快卷起来
R语言ggpubr包的ggtexttable函数可视化表格数据(直接绘制表格图或者在图像中添加表格数据)、使用ggarrange函数将表格数据和可视化图像组合起来(表格数据在可视化图像下方)
【技术积累】JS事件循环,Promise,async/await的运行顺序
【日记】UPNP功能会允许自动给光猫追加端口映射
Literature Review on Involution of College Students
小程序笔记1
DHCP&OSPF组合实验演示(Huawei路由交换设备配置)
FE01_OneHot-Scala应用
LeetCode 899. 有序队列
字节二面被问到mysql事务与锁问题,我蚌埠住了
2022年五一数学建模C题讲解
公司自用的国产API管理神器
Google Earth Engine APP——一键在线查看全球1984-至今年的影像同时加载一个影像分析
2018读书记
网页端IM即时通讯开发:短轮询、长轮询、SSE、WebSocket
开发那些事儿:如何通过EasyCVR平台获取监控现场的人流量统计数据?
mysql cdc 为什么需要RELOAD 这个权限?这个权限在采集数据的过程中的作用是什么?有哪
JWT主动校验Token是否过期









