Trong trường hợp cụ thể này, tốt hơn là nên giảm xuống cấp DB-API, vì bạn cần một số công cụ không được tiếp xúc trực tiếp với SQLAlchemy Core, chẳng hạn như copy_expert()
. Điều đó có thể được thực hiện bằng cách sử dụng raw_connection()
. Nếu dữ liệu nguồn của bạn là tệp CSV, bạn không cần gấu trúc trong trường hợp này. Bắt đầu bằng cách tạo một bảng tạm thời, sao chép dữ liệu vào bảng tạm thời và chèn vào bảng đích có xử lý xung đột:
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
ON COMMIT DROP""")
with open("your_source.csv") as data:
cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
FROM STDIN WITH CSV""", data)
cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
SELECT itemid, title, street, pincode
FROM TEST_STAGING
ON CONFLICT ( itemid )
DO UPDATE SET title = EXCLUDED.title
, street = EXCLUDED.street
, pincode = EXCLUDED.pincode""")
except:
conn.rollback()
raise
else:
conn.commit()
finally:
conn.close()
Mặt khác, nếu dữ liệu nguồn của bạn là DataFrame
, bạn vẫn có thể sử dụng COPY
bởi truyền một hàm dưới dạng phương thức method=
thành to_sql()
. Hàm thậm chí có thể ẩn tất cả logic ở trên:
import csv
from io import StringIO
from psycopg2 import sql
def psql_upsert_copy(table, conn, keys, data_iter):
dbapi_conn = conn.connection
buf = StringIO()
writer = csv.writer(buf)
writer.writerows(data_iter)
buf.seek(0)
if table.schema:
table_name = sql.SQL("{}.{}").format(
sql.Identifier(table.schema), sql.Identifier(table.name))
else:
table_name = sql.Identifier(table.name)
tmp_table_name = sql.Identifier(table.name + "_staging")
columns = sql.SQL(", ").join(map(sql.Identifier, keys))
with dbapi_conn.cursor() as cur:
# Create the staging table
stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
cur.execute(stmt)
# Populate the staging table
stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
stmt = sql.SQL(stmt).format(tmp_table_name, columns)
cur.copy_expert(stmt, buf)
# Upsert from the staging table to the destination. First find
# out what the primary key columns are.
stmt = """
SELECT kcu.column_name
FROM information_schema.table_constraints tco
JOIN information_schema.key_column_usage kcu
ON kcu.constraint_name = tco.constraint_name
AND kcu.constraint_schema = tco.constraint_schema
WHERE tco.constraint_type = 'PRIMARY KEY'
AND tco.table_name = %s
"""
args = (table.name,)
if table.schema:
stmt += "AND tco.table_schema = %s"
args += (table.schema,)
cur.execute(stmt, args)
pk_columns = {row[0] for row in cur.fetchall()}
# Separate "data" columns from (primary) key columns
data_columns = [k for k in keys if k not in pk_columns]
# Build conflict_target
pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))
set_ = sql.SQL(", ").join([
sql.SQL("{} = EXCLUDED.{}").format(k, k)
for k in map(sql.Identifier, data_columns)])
stmt = """
INSERT INTO {} ( {} )
SELECT {}
FROM {}
ON CONFLICT ( {} )
DO UPDATE SET {}
"""
stmt = sql.SQL(stmt).format(
table_name, columns, columns, tmp_table_name, pk_columns, set_)
cur.execute(stmt)
Sau đó, bạn sẽ chèn DataFrame
mới sử dụng
df.to_sql("test_table", engine,
method=psql_upsert_copy,
index=False,
if_exists="append")
Sử dụng phương pháp này để nâng cấp ~ 1.000.000 hàng mất khoảng 16 giây trên máy này với cơ sở dữ liệu cục bộ.