如何使用Python将MySQL表数据迁移到MongoDB集合
【.com快译】介绍
MySQL 是一个 RDBMS 平台,它以规范化的方式以表格格式存储数据,而 MongoDB 是一个 NoSQL 数据库,它以无模式的方式将信息存储为按集合分组的文档。数据的表示方式完全不同,因此将 MySQL 表数据迁移到 MongoDB 集合听起来可能是一项艰巨的任务。但是,Python 凭借其强大的连接性和数据处理能力让这一切变得轻而易举。
在本文中,将详细的讲解如何使用简单的 Python 脚本将 MySQL 表数据迁移到 MongoDB 集合所需的步骤。这些脚本是在 Windows 上使用 Python 3.9.5 开发的。但是,它应该适用于任何平台上的任何 Python 3+ 版本一起使用。
步骤 1:安装所需的模块
第一步是安装连接MySQL 和 MongoDB 数据库实例所需的模块。我们将使用mysql.connector连接到 MySQL 数据库。对于 MongoDB,使用pymongo,这是从 Python 连接到 MongoDB 的推荐模块。
如果所需模块尚未安装,请运行以下PIP命令来安装它们。
pip 安装 mysql 连接器 pip 安装 pymongoPIP 是 Python 包或模块的包管理器。
步骤2:从MySQL表中读取数据
第一步是从源 MySQL 表中读取数据,并以可用于将数据加载到目标 MongoDB 数据库中的格式进行准备。MongoDB 是一个 NoSQL 数据库,它将数据存储为 JSON 文档,因此最好以 JSON 格式生成源数据。值得一提的是,Python 具有强大的数据处理能力,可以轻松地将数据转换为 JSON 格式。
import mysql.connector mysqldb = mysql.connector.connect( host="localhost", database="employees", user="root", password="" ) mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall() print(myresult)当脚本在没有任何错误的情况下完成时,输出的结果如下:
[ { "id":4, "name":"Medicine", "description":"<p>Medicine<br></p>", "created_at":"", "updated_at":"" }, { "id":6, "name":"Food", "description":"<p>Food</p>", "created_at":"", "updated_at":"" }, { "id":8, "name":"Groceries", "description":"<p>Groceries<br></p>", "created_at":"", "updated_at":"" }, { "id":9, "name":"Cakes & Bakes", "description":"<p>Cakes & Bakes<br></p>", "created_at":d"", "updated_at":"" } ]请注意,输出的是一个 JSON 数组,因为我们将dictionary=True参数传递给了游标。否则,结果将采用列表格式。现在有了 JSON 格式的源数据,就可以迁移到 MongoDB 集合。
步骤3:写入 MongoDB 集合
获得 JSON 格式的源数据后,下一步是将数据插入到 MongoDB 集合中。集合是一组文档,相当于 RDBMS 中表(或关系)。我可以通过调用insert_many()集合类的方法来实现,该方法返回插入文档的对象 ID 列表。请注意,当作为参数传递空列表时,此方法将引发异常,因此在方法调用之前进行长度检查。
import pymongo mongodb_host = "mongodb://localhost:27017/" mongodb_dbname = "mymongodb" myclient = pymongo.MongoClient(mongodb_host) mydb = myclient[mongodb_dbname] mycol = mydb["categories"] if len(myresult) > 0: x = mycol.insert_many(myresult) #myresult comes from mysql cursor print(len(x.inserted_ids))完成此步骤后,检查一下 MongoDB 实例,以验证数据库和集合是否已创建,文档是否已插入。注意MongoDB 是无模式的,这就意味着不必定义模式来插入文档,模式是动态推断并自动创建的。MongoDB 还可以创建代码中引用的数据库和集合(如果它们还不存在的话)。
步骤4:把数据放在一起
下面是从 MySQL 中读取表并将其插入到 MongoDB 中集合的完整脚本。
import mysql.connector import pymongo delete_existing_documents = True mysql_host="localhost" mysql_database="mydatabase" mysql_schema = "myschema" mysql_user="myuser" mysql_password="********" mongodb_host = "mongodb://localhost:27017/" mongodb_dbname = "mymongodb" mysqldb = mysql.connector.connect( host=mysql_host, database=mysql_database, user=mysql_user, password=mysql_password ) mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall() myclient = pymongo.MongoClient(mongodb_host) mydb = myclient[mongodb_dbname] mycol = mydb["categories"] if len(myresult) > 0: x = mycol.insert_many(myresult) #myresult comes from mysql cursor print(len(x.inserted_ids))步骤 5:增强脚本以加载 MySQL 架构中的所有表
该脚本从 MySQL 中读取一个表,并将结果加载到 MongoDB 集合中。然后,下一步是遍历源数据库中所有表的列表,并将结果加载到新的 MySQL 集合中。我们可以通过查询information_schema.tables元数据表来实现这一点,该表提供给定模式中的表列表。然后可以遍历结果并调用上面的脚本来迁移每个表的数据。
#Iterate through the list of tables in the schema table_list_cursor = mysqldb.cursor() table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;", (mysql_schema,)) tables = table_list_cursor.fetchall() for table in tables: #Execute the migration script for table也可以通过将迁移逻辑抽象为一个函数来实现这一点。
#Function migrate_table def migrate_table(db, col_name): mycursor = db.cursor(dictionary=True) mycursor.execute("SELECT * FROM " + col_name + ";") myresult = mycursor.fetchall() mycol = mydb[col_name] if delete_existing_documents: #delete all documents in the collection mycol.delete_many({}) #insert the documents if len(myresult) > 0: x = mycol.insert_many(myresult) return len(x.inserted_ids) else: return 0步骤6:输出脚本进度并使其可读
脚本的进度是通过使用print 语句来传达的。通过颜色编码使输出易于阅读。例如,以将成功语句打印为绿色,将失败语句打印为红色。
class bcolors: HEADER = \033[95m OKBLUE = \033[94m OKCYAN = \033[96m OKGREEN = \033[92m WARNING = \033[93m FAIL = \033[91m ENDC = \033[0m BOLD = \033[1m UNDERLINE = \033[4m print(f"{bcolors.HEADER}This is a header{bcolors.ENDC}") print(f"{bcolors.OKBLUE}This prints in blue{bcolors.ENDC}") print(f"{bcolors.OKGREEN}This message is green{bcolors.ENDC}")最终结果
源 MySQL 数据库 迁移后的目标 MongoDB 数据库 在VSCode 中的 Python 脚本和输出完整的脚本
import mysql.connector import pymongo import datetime class bcolors: HEADER = \033[95m OKBLUE = \033[94m OKCYAN = \033[96m OKGREEN = \033[92m WARNING = \033[93m FAIL = \033[91m ENDC = \033[0m BOLD = \033[1m UNDERLINE = \033[4m begin_time = datetime.datetime.now() print(f"{bcolors.HEADER}Script started at: {begin_time} {bcolors.ENDC}") delete_existing_documents = True; mysql_host="localhost" mysql_database="mydatabase" mysql_schema = "myschhema" mysql_user="root" mysql_password="" mongodb_host = "mongodb://localhost:27017/" mongodb_dbname = "mymongodb" print(f"{bcolors.HEADER}Initializing database connections...{bcolors.ENDC}") print("") #MySQL connection print(f"{bcolors.HEADER}Connecting to MySQL server...{bcolors.ENDC}") mysqldb = mysql.connector.connect( host=mysql_host, database=mysql_database, user=mysql_user, password=mysql_password ) print(f"{bcolors.HEADER}Connection to MySQL Server succeeded.{bcolors.ENDC}") #MongoDB connection print(f"{bcolors.HEADER}Connecting to MongoDB server...{bcolors.ENDC}") myclient = pymongo.MongoClient(mongodb_host) mydb = myclient[mongodb_dbname] print(f"{bcolors.HEADER}Connection to MongoDB Server succeeded.{bcolors.ENDC}") print(f"{bcolors.HEADER}Database connections initialized successfully.{bcolors.ENDC}") #Start migration print(f"{bcolors.HEADER}Migration started...{bcolors.ENDC}") dblist = myclient.list_database_names() if mongodb_dbname in dblist: print(f"{bcolors.OKBLUE}The database exists.{bcolors.ENDC}") else: print(f"{bcolors.WARNING}The database does not exist, it is being created.{bcolors.ENDC}") #Function migrate_table def migrate_table(db, col_name): mycursor = db.cursor(dictionary=True) mycursor.execute("SELECT * FROM " + col_name + ";") myresult = mycursor.fetchall() mycol = mydb[col_name] if delete_existing_documents: #delete all documents in the collection mycol.delete_many({}) #insert the documents if len(myresult) > 0: x = mycol.insert_many(myresult) return len(x.inserted_ids) else: return 0 #Iterate through the list of tables in the schema table_list_cursor = mysqldb.cursor() table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;", (mysql_schema,)) tables = table_list_cursor.fetchall() total_count = len(tables) success_count = 0 fail_count = 0 for table in tables: try: print(f"{bcolors.OKCYAN}Processing table: {table[0]}...{bcolors.ENDC}") inserted_count = migrate_table(mysqldb, table[0]) print(f"{bcolors.OKGREEN}Processing table: {table[0]} completed. {inserted_count} documents inserted.{bcolors.ENDC}") success_count += 1 except Exception as e: print(f"{bcolors.FAIL} {e} {bcolors.ENDC}") fail_count += 1 print("") print("Migration completed.") print(f"{bcolors.OKGREEN}{success_count} of {total_count} tables migrated successfully.{bcolors.ENDC}") if fail_count > 0: print(f"{bcolors.FAIL}Migration of {fail_count} tables failed. See errors above.{bcolors.ENDC}") end_time = datetime.datetime.now() print(f"{bcolors.HEADER}Script completed at: {end_time} {bcolors.ENDC}") print(f"{bcolors.HEADER}Total execution time: {end_time-begin_time} {bcolors.ENDC}")警告
该脚本适用于中小型 MySQL 数据库,它有几百个表,每个表有几千行。对于具有数百万行的大型数据库,性能可能会受到影响。在开始实际迁移之前,请在表列表查询和实际表select查询上使用 LIMIT 关键字对有限行进行检测。
下载
带点击此处从 GitHub 下载整个脚本:
https://github.com/zshameel/MySQL2MongoDB【译稿,合作站点转载请注明原文译者和出处为.com】
扫一扫,关注我们