创建电子商务网站的步骤,网站开发的付款方式,网站手机版开发,灯饰 东莞网站建设使用Python 3读取MongoDB数据#xff0c;然后写入到PostgreSQL中#xff0c;分别借助pymongo和psycopg2库。这两个库的安装方式如下#xff1a;
pip3 install psycopg2 # 需要系统安装PG库
pip3 install pymongo网上查询的资料提供的都是碎片化的结果#xff0c;经整理和优…使用Python 3读取MongoDB数据然后写入到PostgreSQL中分别借助pymongo和psycopg2库。这两个库的安装方式如下
pip3 install psycopg2 # 需要系统安装PG库
pip3 install pymongo网上查询的资料提供的都是碎片化的结果经整理和优化后觉得以分批次读取和写入的方式较适合生产场景并能有效应对数据量比较大的情况具体可根据自己的资源配置调整批次量级。下面是经过排错后的读取和写入步骤
import pandas as pd
import pymongo as pm
import json
from bson import ObjectId
from datetime import date, datetime
import psycopg2 as pg
from io import StringIOmongo_addr mongodb://user_name:user_pswd172.20.2.142:27017
batch_size 1 12 # 单批次条数2^N次方# 加载和处理数据
def extract_and_load(dbn, coll_name, process, *args):client pm.MongoClient(mongo_addr) # 连接MongoDB数据库客户端对象try:db client.get_database(dbn)coll db[coll_name]start_ts int(datetime(2025, 4, 20, 10, 30).timestamp())start_id ObjectId(f{start_ts:x} 0000000000000000)cond cond {_id: {$gte: start_id},$and: [{address: {$exists: True}},{address: {$ne: None}},{address: {$ne: }},{address: {$not: {$regex: china|中国, $options: i}}}]}cursor coll.find(cond).batch_size(batch_size)batch_list []counter 0for item in cursor:# item字典的value为字典或列表则转为字符串for key, value in item.items():if isinstance(value, (dict, list)):item[key] json.dumps(value, ensure_asciiFalse)batch_list.append(item)counter 1if counter % batch_size 0:process(batch_list, *args)batch_list []if batch_list:process(batch_list, *args)cursor.close()print(f共加载数据 {counter} 条)finally:client.close()# 写入到PG数据库
def load_to_postgres(data_list, *args):data_src pd.DataFrame(data_list)# 此处可以加数据检查conn pg.connect(databasepg_database, userpg_user, passwordpg_pwd, hostpg_host, portpg_port)cur conn.cursor() # 得到游标# 下面采用COPY方式写入首先格式化输入流然后写入到PG库buffer StringIO()# noinspection PyTypeCheckerdata_src.to_csv(buffer, sep\1, na_rep, headerFalse, indexFalse)buffer.seek(0) # 游标定位到开始默认是末尾table_name ods.t_tmptable_columns ,.join(data_src.columns)sql fCOPY {table_name}({table_columns}) FROM STDIN (FORMAT CSV, DELIMITER \1, NULL )try:cur.copy_expert(sql, buffer)except pg.Error as error:# 可编写错误数据输出或打印更多错误日志raise errorconn.commit()cur.close()conn.close()if __name__ __main__:extract_and_load(mongo_db_nm, mongo_collect_nm, load_to_postgres)实际使用时会设置更多的参数比如模式和表名可传递给args参数另外若有需要也可采用异步写的方式去优化读写流程。