python mysql数据迁移
利用python来进行数据的迁移
import pymysql.cursors
connect_from_info = {
"host": "127.0.0.1",
"port": 3306,
"user": "test",
"passwd": "test",
"db": "test"
}
connect_to_info = {
"host": "127.0.0.1",
"port": 3306,
"user": "test",
"passwd": "test",
"db": "test2"
}
# 连接数据库
connect_from = pymysql.Connect(
host=connect_from_info['host'],
port=connect_from_info['port'],
user=connect_from_info['user'],
passwd=connect_from_info['passwd'],
db=connect_from_info['db'],
charset='utf8'
)
# connect_to = pymysql.Connect(
# host='127.0.0.1',
# port=3307,
# user='root',
# passwd='password',
# db='trantor',
# charset='utf8'
# )
connect_to = pymysql.Connect(
host=connect_to_info['host'],
port=connect_to_info['port'],
user=connect_to_info['user'],
passwd=connect_to_info['passwd'],
db=connect_to_info['db'],
charset='utf8'
)
cursor_to = connect_to.cursor()
cursor_from = connect_from.cursor()
objList = []
def getToFields(tableName):
sql = """
select aa.COLUMN_NAME,aa.DATA_TYPE,aa.COLUMN_COMMENT, cc.TABLE_COMMENT
from information_schema.`COLUMNS` aa LEFT JOIN
(select DISTINCT bb.TABLE_SCHEMA,bb.TABLE_NAME,bb.TABLE_COMMENT
from information_schema.`TABLES` bb ) cc
ON (aa.TABLE_SCHEMA=cc.TABLE_SCHEMA and aa.TABLE_NAME = cc.TABLE_NAME )
where aa.TABLE_SCHEMA = '%s' and aa.TABLE_NAME = '%s'
"""
data = (connect_to_info['db'], tableName,)
cursor_to.execute(sql % data)
fields = []
for row in cursor_to.fetchall():
fields.append(str(row[0]))
return fields
def getFromFields(tableName):
sql = """
select aa.COLUMN_NAME,aa.DATA_TYPE,aa.COLUMN_COMMENT, cc.TABLE_COMMENT
from information_schema.`COLUMNS` aa LEFT JOIN
(select DISTINCT bb.TABLE_SCHEMA,bb.TABLE_NAME,bb.TABLE_COMMENT
from information_schema.`TABLES` bb ) cc
ON (aa.TABLE_SCHEMA=cc.TABLE_SCHEMA and aa.TABLE_NAME = cc.TABLE_NAME )
where aa.TABLE_SCHEMA = '%s' and aa.TABLE_NAME = '%s'
"""
data = (connect_from_info['db'], tableName,)
cursor_from.execute(sql % data)
fields = []
for row in cursor_from.fetchall():
fields.append(str(row[0]))
return fields
def getInnerColumns(tableName):
from_fields = getFromFields(tableName)
to_fields = getToFields(tableName)
return list(set(from_fields).intersection(set(to_fields)))
def back(mainsql, tableName):
columns = getInnerColumns(tableName)
start = 0
size = 5000
while True:
sql = mainsql % (start, size)
start = start + size
cursor_from.execute(sql)
if cursor_from.rowcount <= 0:
break
inner_columns = []
objList.clear()
for column in cursor_from.description:
inner_columns.append(column[0])
for row in cursor_from.fetchall():
obj = {}
for i in range(0, len(inner_columns)):
obj[inner_columns[i]] = row[i]
objList.append(obj)
keyFieldStr = ""
for i in range(0, len(columns)):
if i < len(columns) - 1:
keyFieldStr = keyFieldStr + str(columns[i])
keyFieldStr = keyFieldStr + ","
else:
keyFieldStr = keyFieldStr + str(columns[i])
left = 0
while True:
right = left + 100
tempList = objList[left:right]
if len(tempList) <= 0:
break
else:
insertSql = ""
for j in range(0, len(tempList)):
obj = tempList[j]
valueStr = ""
for i in range(0, len(columns)):
if i < len(columns) - 1:
if obj[columns[i]] is None:
valueStr = valueStr + "null"
valueStr = valueStr + ","
else:
valueStr = valueStr + "'"
valueStr = valueStr + str(obj[columns[i]])
valueStr = valueStr + "'"
valueStr = valueStr + ","
else:
if obj[columns[i]] is None:
valueStr = valueStr + "null"
else:
valueStr = valueStr + "'"
valueStr = valueStr + str(obj[columns[i]])
valueStr = valueStr + "'"
if j == 0:
insertSql = "insert ignore into " + tableName + "(" + keyFieldStr + ") values(" + valueStr + ")"
else:
insertSql = insertSql + ",(" + valueStr + ")"
print(insertSql)
try:
# cursor_to.execute(insertSql)
print(insertSql)
except Exception as e:
print(e)
left = right
# connect_to.commit()
def queryToAllTables():
resultList = []
sql = "show tables"
cursor_to.execute(sql)
if cursor_to.rowcount > 0:
for row in cursor_to.fetchall():
resultList.append(row[0])
return resultList
def queryFromAllTables():
resultList = []
sql = "show tables"
cursor_from.execute(sql)
if cursor_from.rowcount > 0:
for row in cursor_from.fetchall():
resultList.append(str(row[0]))
return resultList
prefix_list = ["test__"]
if __name__ == '__main__':
from_table_list = queryFromAllTables()
to_table_list = queryToAllTables()
intersection = list(set(from_table_list).intersection(set(to_table_list)))
table_list = []
for table in intersection:
for prefix in prefix_list:
if table.startswith(prefix):
table_list.append(table)
break
for table in table_list:
sql = "select * from " + str(table) + " order by id limit %d ,%d"
back(sql, table)
1、修改两个连接信息,from是来源数据库,to是目标数据库,prefix_list是迁移哪些数据库(前缀,可以自定义修改)
2、获取两个数据库所有的表,取交集,以prefix_list过滤
3、对每个表进行数据迁移
4、查出两个表的表字段,取交集
5、查询,拼接插入sql语句,再insert
6、重复以上过程
注意点:如果是更新的话需要在执行 cursor.execute(sql)之后执行connect.commit()才能生效。
cursor.description获取表的字段名。