如何使用Python将MySQL表数据迁移到MongoDB集合

发布时间:2025-05-15 16:24:31 作者:益华网络 来源:undefined 浏览量(3) 点赞(2)
摘要:【.com快译】介绍 MySQL 是一个 RDBMS 平台,它以规范化的方式以表格格式存储数据,而 MongoDB 是一个 NoSQL 数据库,它以无模式的方式将信息存储为按集合分组的文档。数据的表示方式完全不同,因此将 MySQL 表数据迁移到 MongoDB 集合听起来可能是一项艰巨的任务。但是,Python 凭借其强大

【.com快译】介绍

MySQL 是一个 RDBMS 平台,它以规范化的方式以表格格式存储数据,而 MongoDB 是一个 NoSQL 数据库,它以无模式的方式将信息存储为按集合分组的文档。数据的表示方式完全不同,因此将 MySQL 表数据迁移到 MongoDB 集合听起来可能是一项艰巨的任务。但是,Python 凭借其强大的连接性和数据处理能力让这一切变得轻而易举。

在本文中,将详细的讲解如何使用简单的 Python 脚本将 MySQL 表数据迁移到 MongoDB 集合所需的步骤。这些脚本是在 Windows 上使用 Python 3.9.5 开发的。但是,它应该适用于任何平台上的任何 Python 3+ 版本一起使用。

步骤 1:安装所需的模块

第一步是安装连接MySQL 和 MongoDB 数据库实例所需的模块。我们将使用mysql.connector连接到 MySQL 数据库。对于 MongoDB,使用pymongo,这是从 Python 连接到 MongoDB 的推荐模块。

如果所需模块尚未安装,请运行以下PIP命令来安装它们。

pip 安装 mysql 连接器 pip 安装 pymongo 

PIP 是 Python 包或模块的包管理器。

步骤2:从MySQL表中读取数据

第一步是从源 MySQL 表中读取数据,并以可用于将数据加载到目标 MongoDB 数据库中的格式进行准备。MongoDB 是一个 NoSQL 数据库,它将数据存储为 JSON 文档,因此最好以 JSON 格式生成源数据。值得一提的是,Python 具有强大的数据处理能力,可以轻松地将数据转换为 JSON 格式。

import mysql.connector  mysqldb = mysql.connector.connect(    host="localhost",    database="employees",    user="root",    password="" ) mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall()  print(myresult)

当脚本在没有任何错误的情况下完成时,输出的结果如下:

[    {       "id":4,       "name":"Medicine",       "description":"<p>Medicine<br></p>",       "created_at":"",       "updated_at":""    },    {       "id":6,       "name":"Food",       "description":"<p>Food</p>",       "created_at":"",       "updated_at":""    },    {       "id":8,       "name":"Groceries",       "description":"<p>Groceries<br></p>",       "created_at":"",       "updated_at":""    },    {       "id":9,       "name":"Cakes & Bakes",       "description":"<p>Cakes & Bakes<br></p>",       "created_at":d"",       "updated_at":""    } ]

请注意,输出的是一个 JSON 数组,因为我们将dictionary=True参数传递给了游标。否则,结果将采用列表格式。现在有了 JSON 格式的源数据,就可以迁移到 MongoDB 集合。

步骤3:写入 MongoDB 集合

获得 JSON 格式的源数据后,下一步是将数据插入到 MongoDB 集合中。集合是一组文档,相当于 RDBMS 中表(或关系)。我可以通过调用insert_many()集合类的方法来实现,该方法返回插入文档的对象 ID 列表。请注意,当作为参数传递空列表时,此方法将引发异常,因此在方法调用之前进行长度检查。

import pymongo mongodb_host = "mongodb://localhost:27017/" mongodb_dbname = "mymongodb" myclient = pymongo.MongoClient(mongodb_host) mydb = myclient[mongodb_dbname] mycol = mydb["categories"] if len(myresult) > 0:         x = mycol.insert_many(myresult) #myresult comes from mysql cursor         print(len(x.inserted_ids))

完成此步骤后,检查一下 MongoDB 实例,以验证数据库和集合是否已创建,文档是否已插入。注意MongoDB 是无模式的,这就意味着不必定义模式来插入文档,模式是动态推断并自动创建的。MongoDB 还可以创建代码中引用的数据库和集合(如果它们还不存在的话)。

步骤4:把数据放在一起

下面是从 MySQL 中读取表并将其插入到 MongoDB 中集合的完整脚本。

import mysql.connector import pymongo delete_existing_documents = True mysql_host="localhost" mysql_database="mydatabase" mysql_schema = "myschema" mysql_user="myuser" mysql_password="********" mongodb_host = "mongodb://localhost:27017/" mongodb_dbname = "mymongodb" mysqldb = mysql.connector.connect(     host=mysql_host,     database=mysql_database,     user=mysql_user,     password=mysql_password ) mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall() myclient = pymongo.MongoClient(mongodb_host) mydb = myclient[mongodb_dbname] mycol = mydb["categories"] if len(myresult) > 0:         x = mycol.insert_many(myresult) #myresult comes from mysql cursor         print(len(x.inserted_ids))

步骤 5:增强脚本以加载 MySQL 架构中的所有表

该脚本从 MySQL 中读取一个表,并将结果加载到 MongoDB 集合中。然后,下一步是遍历源数据库中所有表的列表,并将结果加载到新的 MySQL 集合中。我们可以通过查询information_schema.tables元数据表来实现这一点,该表提供给定模式中的表列表。然后可以遍历结果并调用上面的脚本来迁移每个表的数据。

#Iterate through the list of tables in the schema table_list_cursor = mysqldb.cursor() table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;", (mysql_schema,)) tables = table_list_cursor.fetchall() for table in tables:     #Execute the migration script for table

也可以通过将迁移逻辑抽象为一个函数来实现这一点。

#Function migrate_table  def migrate_table(db, col_name):     mycursor = db.cursor(dictionary=True)     mycursor.execute("SELECT * FROM " + col_name + ";")     myresult = mycursor.fetchall()     mycol = mydb[col_name]     if delete_existing_documents:         #delete all documents in the collection         mycol.delete_many({})     #insert the documents     if len(myresult) > 0:         x = mycol.insert_many(myresult)         return len(x.inserted_ids)     else:         return 0

步骤6:输出脚本进度并使其可读

脚本的进度是通过使用print 语句来传达的。通过颜色编码使输出易于阅读。例如,以将成功语句打印为绿色,将失败语句打印为红色。

class bcolors:     HEADER = \033[95m     OKBLUE = \033[94m     OKCYAN = \033[96m     OKGREEN = \033[92m     WARNING = \033[93m     FAIL = \033[91m     ENDC = \033[0m     BOLD = \033[1m     UNDERLINE = \033[4m print(f"{bcolors.HEADER}This is a header{bcolors.ENDC}") print(f"{bcolors.OKBLUE}This prints in blue{bcolors.ENDC}") print(f"{bcolors.OKGREEN}This message is green{bcolors.ENDC}")

最终结果

源 MySQL 数据库 迁移后的目标 MongoDB 数据库 在VSCode 中的 Python 脚本和输出

完整的脚本

import mysql.connector import pymongo import datetime class bcolors:     HEADER = \033[95m     OKBLUE = \033[94m     OKCYAN = \033[96m     OKGREEN = \033[92m     WARNING = \033[93m     FAIL = \033[91m     ENDC = \033[0m     BOLD = \033[1m     UNDERLINE = \033[4m begin_time = datetime.datetime.now() print(f"{bcolors.HEADER}Script started at: {begin_time} {bcolors.ENDC}") delete_existing_documents = True; mysql_host="localhost" mysql_database="mydatabase" mysql_schema = "myschhema" mysql_user="root" mysql_password="" mongodb_host = "mongodb://localhost:27017/" mongodb_dbname = "mymongodb" print(f"{bcolors.HEADER}Initializing database connections...{bcolors.ENDC}") print("") #MySQL connection print(f"{bcolors.HEADER}Connecting to MySQL server...{bcolors.ENDC}") mysqldb = mysql.connector.connect(     host=mysql_host,     database=mysql_database,     user=mysql_user,     password=mysql_password ) print(f"{bcolors.HEADER}Connection to MySQL Server succeeded.{bcolors.ENDC}") #MongoDB connection print(f"{bcolors.HEADER}Connecting to MongoDB server...{bcolors.ENDC}") myclient = pymongo.MongoClient(mongodb_host) mydb = myclient[mongodb_dbname] print(f"{bcolors.HEADER}Connection to MongoDB Server succeeded.{bcolors.ENDC}") print(f"{bcolors.HEADER}Database connections initialized successfully.{bcolors.ENDC}") #Start migration print(f"{bcolors.HEADER}Migration started...{bcolors.ENDC}") dblist = myclient.list_database_names() if mongodb_dbname in dblist:     print(f"{bcolors.OKBLUE}The database exists.{bcolors.ENDC}") else:     print(f"{bcolors.WARNING}The database does not exist, it is being created.{bcolors.ENDC}") #Function migrate_table  def migrate_table(db, col_name):     mycursor = db.cursor(dictionary=True)     mycursor.execute("SELECT * FROM " + col_name + ";")     myresult = mycursor.fetchall()     mycol = mydb[col_name]     if delete_existing_documents:         #delete all documents in the collection         mycol.delete_many({})     #insert the documents     if len(myresult) > 0:         x = mycol.insert_many(myresult)         return len(x.inserted_ids)     else:         return 0 #Iterate through the list of tables in the schema table_list_cursor = mysqldb.cursor() table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;", (mysql_schema,)) tables = table_list_cursor.fetchall() total_count = len(tables) success_count = 0 fail_count = 0 for table in tables:     try:         print(f"{bcolors.OKCYAN}Processing table: {table[0]}...{bcolors.ENDC}")         inserted_count = migrate_table(mysqldb, table[0])         print(f"{bcolors.OKGREEN}Processing table: {table[0]} completed. {inserted_count} documents inserted.{bcolors.ENDC}")         success_count += 1     except Exception as e:         print(f"{bcolors.FAIL} {e} {bcolors.ENDC}")         fail_count += 1 print("") print("Migration completed.") print(f"{bcolors.OKGREEN}{success_count} of {total_count} tables migrated successfully.{bcolors.ENDC}") if fail_count > 0:     print(f"{bcolors.FAIL}Migration of {fail_count} tables failed. See errors above.{bcolors.ENDC}") end_time = datetime.datetime.now() print(f"{bcolors.HEADER}Script completed at: {end_time} {bcolors.ENDC}") print(f"{bcolors.HEADER}Total execution time: {end_time-begin_time} {bcolors.ENDC}")

警告

该脚本适用于中小型 MySQL 数据库,它有几百个表,每个表有几千行。对于具有数百万行的大型数据库,性能可能会受到影响。在开始实际迁移之前,请在表列表查询和实际表select查询上使用 LIMIT 关键字对有限行进行检测。

下载 

带点击此处从 GitHub 下载整个脚本:

https://github.com/zshameel/MySQL2MongoDB

【译稿,合作站点转载请注明原文译者和出处为.com】

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!