PostgreSQL
 sql >> Cơ Sở Dữ Liệu >  >> RDS >> PostgreSQL

Treo trong tập lệnh Python bằng cách sử dụng SQLAlchemy và đa xử lý

Tôi tin rằng TypeError đến từ multiprocessing của get .

Tôi đã loại bỏ tất cả mã DB khỏi tập lệnh của bạn. Hãy xem cái này:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Sử dụng r.wait trả về kết quả mong đợi, nhưng sử dụng r.get tăng TypeError . Như được mô tả trong tài liệu về python , sử dụng r.wait sau map_async .

Chỉnh sửa :Tôi phải sửa đổi câu trả lời trước đây của mình. Bây giờ tôi tin rằng TypeError đến từ SQLAlchemy. Tôi đã sửa đổi tập lệnh của mình để tạo lại lỗi.

Chỉnh sửa 2 :Có vẻ như vấn đề là multiprocessing.pool không hoạt động tốt nếu bất kỳ nhân viên nào tạo ra một Ngoại lệ có hàm tạo yêu cầu một tham số (xem thêm here ).

Tôi đã sửa đổi tập lệnh của mình để làm nổi bật điều này.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Trong trường hợp của bạn, do mã của bạn tạo ra một ngoại lệ SQLAlchemy, giải pháp duy nhất mà tôi có thể nghĩ đến là bắt tất cả các ngoại lệ trong do chức năng và nâng lại một Exception bình thường thay vì. Một cái gì đó như thế này:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Chỉnh sửa 3 :do đó, có vẻ như đây là một lỗi với Python , nhưng các ngoại lệ thích hợp trong SQLAlchemy sẽ giải quyết vấn đề đó:do đó, tôi đã nêu vấn đề với SQLAlchemy , quá.

Để giải quyết vấn đề, tôi nghĩ giải pháp ở cuối Chỉnh sửa 2 sẽ làm (bao gồm các lệnh gọi lại trong thử ngoại trừ và nâng cao lại).



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Postgres:trích xuất văn bản lên đến Ký tự thứ N trong một chuỗi

  2. XÓA TỪ ... báo cáo lỗi cú pháp tại hoặc gần.

  3. Postgres:Mở rộng cột JSON thành các hàng

  4. dữ liệu bổ sung sau cột dự kiến ​​cuối cùng trong khi cố gắng nhập tệp csv vào postgresql

  5. Postgresql:xóa khoảng trắng giữa các loại chữ số nhất định