PostgreSQL
 sql >> Cơ Sở Dữ Liệu >  >> RDS >> PostgreSQL

CHÈN hoặc CẬP NHẬT dữ liệu hàng loạt từ dataframe / CSV vào cơ sở dữ liệu PostgreSQL

Trong trường hợp cụ thể này, tốt hơn là nên giảm xuống cấp DB-API, vì bạn cần một số công cụ không được tiếp xúc trực tiếp với SQLAlchemy Core, chẳng hạn như copy_expert() . Điều đó có thể được thực hiện bằng cách sử dụng raw_connection() . Nếu dữ liệu nguồn của bạn là tệp CSV, bạn không cần gấu trúc trong trường hợp này. Bắt đầu bằng cách tạo một bảng tạm thời, sao chép dữ liệu vào bảng tạm thời và chèn vào bảng đích có xử lý xung đột:

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

Mặt khác, nếu dữ liệu nguồn của bạn là DataFrame , bạn vẫn có thể sử dụng COPY bởi truyền một hàm dưới dạng phương thức method= thành to_sql() . Hàm thậm chí có thể ẩn tất cả logic ở trên:

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

Sau đó, bạn sẽ chèn DataFrame mới sử dụng

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

Sử dụng phương pháp này để nâng cấp ~ 1.000.000 hàng mất khoảng 16 giây trên máy này với cơ sở dữ liệu cục bộ.




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Điều kiện ở đâu cho bảng đã tham gia trong ORM theo thứ tự

  2. Xuất dữ liệu có chứa nguồn cấp dữ liệu dòng dưới dạng CSV từ PostgreSQL

  3. Buộc SSL cho kết nối Django Postgres

  4. Thêm tháng vào một ngày trong PostgreSQL

  5. Postgresql tạo tiện ích mở rộng không thành công