PostgreSQL
 sql >> Cơ Sở Dữ Liệu >  >> RDS >> PostgreSQL

Sử dụng bảng SAO CHÉP nhị phân FROM với psycopg2

Đây là tương đương nhị phân của SAO CHÉP TỪ cho Python 3:

from io import BytesIO
from struct import pack
import psycopg2

# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
        (23256, 348, 43.23524, 2494827.949375)]

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Determine starting value for sequence
curs.execute("SELECT nextval('num_data_id_seq')")
id_seq = curs.fetchone()[0]

# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))

# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
                      ('i', 'i', 'h', 'f', 'd'),
                      ( 4,   4,   2,   4,   8 )))
for row in data:
    # Number of columns/fields (always 5)
    cpy.write(pack('!h', 5))
    for col, fmt, size in row_format:
        value = (id_seq if col == -1 else row[col])
        cpy.write(pack('!i' + fmt, size, value))
    id_seq += 1  # manually increment sequence outside of database

# File trailer
cpy.write(pack('!h', -1))

# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)

# Update sequence on database
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
conn.commit()

Cập nhật

Tôi đã viết lại phương pháp ở trên để ghi tệp cho COPY. Dữ liệu của tôi bằng Python nằm trong các mảng NumPy, vì vậy bạn nên sử dụng các mảng này. Đây là một số ví dụ về data với 1 triệu hàng, 7 cột:

import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# NumPy record array
shape = (7, 2000, 500)
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))

dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
         [('s' + str(x), 'f4') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data['id'] = np.arange(shape[1]*shape[2]) + 1
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
prv = 's0'
for nxt in data.dtype.names[4:]:
    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
    prv = nxt

Trên cơ sở dữ liệu của tôi, tôi có hai bảng trông giống như sau:

CREATE TABLE num_data_binary
(
  id integer PRIMARY KEY,
  node integer NOT NULL,
  ts smallint NOT NULL,
  s0 real,
  s1 real,
  s2 real,
  s3 real,
  s4 real,
  s5 real,
  s6 real
) WITH (OIDS=FALSE);

và một bảng tương tự khác có tên num_data_text .

Dưới đây là một số hàm trợ giúp đơn giản để chuẩn bị dữ liệu cho COPY (cả định dạng văn bản và nhị phân) bằng cách sử dụng thông tin trong mảng bản ghi NumPy:

def prepare_text(dat):
    cpy = BytesIO()
    for row in dat:
        cpy.write('\t'.join([repr(x) for x in row]) + '\n')
    return(cpy)

def prepare_binary(dat):
    pgcopy_dtype = [('num_fields','>i2')]
    for field, dtype in dat.dtype.descr:
        pgcopy_dtype += [(field + '_length', '>i4'),
                         (field, dtype.replace('<', '>'))]
    pgcopy = np.empty(dat.shape, pgcopy_dtype)
    pgcopy['num_fields'] = len(dat.dtype)
    for i in range(len(dat.dtype)):
        field = dat.dtype.names[i]
        pgcopy[field + '_length'] = dat.dtype[i].alignment
        pgcopy[field] = dat[field]
    cpy = BytesIO()
    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
    cpy.write(pgcopy.tostring())  # all rows
    cpy.write(pack('!h', -1))  # file trailer
    return(cpy)

Đây là cách tôi sử dụng các hàm trợ giúp để đánh giá hai phương pháp định dạng COPY:

def time_pgcopy(dat, table, binary):
    print('Processing copy object for ' + table)
    tstart = datetime.now()
    if binary:
        cpy = prepare_binary(dat)
    else:  # text
        cpy = prepare_text(dat)
    tendw = datetime.now()
    print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
          str(cpy.tell()) + ' bytes; transfering to database')
    cpy.seek(0)
    if binary:
        curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
    else:  # text
        curs.copy_from(cpy, table)
    conn.commit()
    tend = datetime.now()
    print('Database copy time: ' + str(tend - tendw))
    print('        Total time: ' + str(tend - tstart))
    return

time_pgcopy(data, 'num_data_text', binary=False)
time_pgcopy(data, 'num_data_binary', binary=True)

Đây là kết quả từ hai time_pgcopy cuối cùng lệnh:

Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
        Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
        Total time: 0:00:24.622095

Vì vậy, cả hai bước NumPy → tệp và tệp → cơ sở dữ liệu đều nhanh hơn với cách tiếp cận nhị phân. Sự khác biệt rõ ràng là cách Python chuẩn bị tệp COPY, tệp này thực sự chậm đối với văn bản. Nói chung, định dạng nhị phân tải vào cơ sở dữ liệu trong 2/3 thời gian dưới dạng định dạng văn bản cho lược đồ này.

Cuối cùng, tôi đã so sánh các giá trị trong cả hai bảng trong cơ sở dữ liệu để xem các con số có khác nhau không. Khoảng 1,46% hàng có các giá trị khác nhau cho cột s0 và phần này tăng lên 6,17% cho s6 (có thể liên quan đến phương pháp ngẫu nhiên mà tôi đã sử dụng). Sự khác biệt tuyệt đối khác 0 giữa tất cả các giá trị float 70M 32-bit nằm trong khoảng từ 9.3132257e-010 đến 7.6293945e-006. Những khác biệt nhỏ này giữa phương pháp tải văn bản và nhị phân là do mất độ chính xác từ các chuyển đổi float → text → float cần thiết cho phương pháp định dạng văn bản.




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. java.lang.ClassNotFoundException:org.postgresql.Driver

  2. Cách chuyển một mảng json thành các hàng trong postgres

  3. Tắt cảnh báo trong sqlalchemy

  4. Xóa PostgreSQL với tham gia bên trong

  5. trình tự thoát cho gạch nối (-) trong PostgreSQL là gì