python mysql数据迁移

利用python来进行数据的迁移

import pymysql.cursors

connect_from_info = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "test",
    "passwd": "test",
    "db": "test"
}

connect_to_info = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "test",
    "passwd": "test",
    "db": "test2"
}

# 连接数据库
connect_from = pymysql.Connect(
    host=connect_from_info['host'],
    port=connect_from_info['port'],
    user=connect_from_info['user'],
    passwd=connect_from_info['passwd'],
    db=connect_from_info['db'],
    charset='utf8'
)

# connect_to = pymysql.Connect(
#     host='127.0.0.1',
#     port=3307,
#     user='root',
#     passwd='password',
#     db='trantor',
#     charset='utf8'
# )


connect_to = pymysql.Connect(
    host=connect_to_info['host'],
    port=connect_to_info['port'],
    user=connect_to_info['user'],
    passwd=connect_to_info['passwd'],
    db=connect_to_info['db'],
    charset='utf8'
)

cursor_to = connect_to.cursor()
cursor_from = connect_from.cursor()

objList = []


def getToFields(tableName):
    sql = """
        select aa.COLUMN_NAME,aa.DATA_TYPE,aa.COLUMN_COMMENT, cc.TABLE_COMMENT 
                            from information_schema.`COLUMNS` aa LEFT JOIN 
                            (select DISTINCT bb.TABLE_SCHEMA,bb.TABLE_NAME,bb.TABLE_COMMENT 
                            from information_schema.`TABLES` bb ) cc  
                            ON (aa.TABLE_SCHEMA=cc.TABLE_SCHEMA and aa.TABLE_NAME = cc.TABLE_NAME )
                            where aa.TABLE_SCHEMA = '%s' and aa.TABLE_NAME = '%s'
                            """

    data = (connect_to_info['db'], tableName,)

    cursor_to.execute(sql % data)

    fields = []
    for row in cursor_to.fetchall():
        fields.append(str(row[0]))
    return fields


def getFromFields(tableName):
    sql = """
        select aa.COLUMN_NAME,aa.DATA_TYPE,aa.COLUMN_COMMENT, cc.TABLE_COMMENT 
                            from information_schema.`COLUMNS` aa LEFT JOIN 
                            (select DISTINCT bb.TABLE_SCHEMA,bb.TABLE_NAME,bb.TABLE_COMMENT 
                            from information_schema.`TABLES` bb ) cc  
                            ON (aa.TABLE_SCHEMA=cc.TABLE_SCHEMA and aa.TABLE_NAME = cc.TABLE_NAME )
                            where aa.TABLE_SCHEMA = '%s' and aa.TABLE_NAME = '%s'
                            """

    data = (connect_from_info['db'], tableName,)

    cursor_from.execute(sql % data)

    fields = []
    for row in cursor_from.fetchall():
        fields.append(str(row[0]))
    return fields


def getInnerColumns(tableName):
    from_fields = getFromFields(tableName)
    to_fields = getToFields(tableName)
    return list(set(from_fields).intersection(set(to_fields)))


def back(mainsql, tableName):
    columns = getInnerColumns(tableName)
    start = 0
    size = 5000
    while True:
        sql = mainsql % (start, size)
        start = start + size
        cursor_from.execute(sql)
        if cursor_from.rowcount <= 0:
            break
        inner_columns = []
        objList.clear()
        for column in cursor_from.description:
            inner_columns.append(column[0])
        for row in cursor_from.fetchall():
            obj = {}
            for i in range(0, len(inner_columns)):
                obj[inner_columns[i]] = row[i]
            objList.append(obj)
        keyFieldStr = ""
        for i in range(0, len(columns)):
            if i < len(columns) - 1:
                keyFieldStr = keyFieldStr + str(columns[i])
                keyFieldStr = keyFieldStr + ","
            else:
                keyFieldStr = keyFieldStr + str(columns[i])
        left = 0
        while True:
            right = left + 100
            tempList = objList[left:right]
            if len(tempList) <= 0:
                break
            else:
                insertSql = ""
                for j in range(0, len(tempList)):
                    obj = tempList[j]
                    valueStr = ""
                    for i in range(0, len(columns)):
                        if i < len(columns) - 1:
                            if obj[columns[i]] is None:
                                valueStr = valueStr + "null"
                                valueStr = valueStr + ","
                            else:
                                valueStr = valueStr + "'"
                                valueStr = valueStr + str(obj[columns[i]])
                                valueStr = valueStr + "'"
                                valueStr = valueStr + ","
                        else:
                            if obj[columns[i]] is None:
                                valueStr = valueStr + "null"
                            else:
                                valueStr = valueStr + "'"
                                valueStr = valueStr + str(obj[columns[i]])
                                valueStr = valueStr + "'"
                    if j == 0:
                        insertSql = "insert ignore into " + tableName + "(" + keyFieldStr + ")  values(" + valueStr + ")"
                    else:
                        insertSql = insertSql + ",(" + valueStr + ")"
                print(insertSql)
                try:
                    # cursor_to.execute(insertSql)
                    print(insertSql)
                except Exception as e:
                    print(e)
            left = right
        # connect_to.commit()


def queryToAllTables():
    resultList = []
    sql = "show tables"
    cursor_to.execute(sql)
    if cursor_to.rowcount > 0:
        for row in cursor_to.fetchall():
            resultList.append(row[0])
    return resultList


def queryFromAllTables():
    resultList = []
    sql = "show tables"
    cursor_from.execute(sql)
    if cursor_from.rowcount > 0:
        for row in cursor_from.fetchall():
            resultList.append(str(row[0]))
    return resultList


prefix_list = ["test__"]

if __name__ == '__main__':
    from_table_list = queryFromAllTables()
    to_table_list = queryToAllTables()
    intersection = list(set(from_table_list).intersection(set(to_table_list)))
    table_list = []
    for table in intersection:
        for prefix in prefix_list:
            if table.startswith(prefix):
                table_list.append(table)
                break
    for table in table_list:
        sql = "select * from " + str(table) + " order by id  limit %d ,%d"
        back(sql, table)

1、修改两个连接信息,from是来源数据库,to是目标数据库,prefix_list是迁移哪些数据库(前缀,可以自定义修改)

2、获取两个数据库所有的表,取交集,以prefix_list过滤

3、对每个表进行数据迁移

4、查出两个表的表字段,取交集

5、查询,拼接插入sql语句,再insert

6、重复以上过程

注意点:如果是更新的话需要在执行 cursor.execute(sql)之后执行connect.commit()才能生效。

cursor.description获取表的字段名。

results matching ""

    No results matching ""