Tôi tin rằng TypeError
đến từ multiprocessing
của get
.
Tôi đã loại bỏ tất cả mã DB khỏi tập lệnh của bạn. Hãy xem cái này:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Sử dụng r.wait
trả về kết quả mong đợi, nhưng sử dụng r.get
tăng TypeError
. Như được mô tả trong tài liệu về python
, sử dụng r.wait
sau map_async
.
Chỉnh sửa :Tôi phải sửa đổi câu trả lời trước đây của mình. Bây giờ tôi tin rằng TypeError
đến từ SQLAlchemy. Tôi đã sửa đổi tập lệnh của mình để tạo lại lỗi.
Chỉnh sửa 2 :Có vẻ như vấn đề là multiprocessing.pool
không hoạt động tốt nếu bất kỳ nhân viên nào tạo ra một Ngoại lệ có hàm tạo yêu cầu một tham số (xem thêm here
).
Tôi đã sửa đổi tập lệnh của mình để làm nổi bật điều này.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Trong trường hợp của bạn, do mã của bạn tạo ra một ngoại lệ SQLAlchemy, giải pháp duy nhất mà tôi có thể nghĩ đến là bắt tất cả các ngoại lệ trong do
chức năng và nâng lại một Exception
bình thường thay vì. Một cái gì đó như thế này:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Chỉnh sửa 3 :do đó, có vẻ như đây là một lỗi với Python , nhưng các ngoại lệ thích hợp trong SQLAlchemy sẽ giải quyết vấn đề đó:do đó, tôi đã nêu vấn đề với SQLAlchemy , quá.
Để giải quyết vấn đề, tôi nghĩ giải pháp ở cuối Chỉnh sửa 2 sẽ làm (bao gồm các lệnh gọi lại trong thử ngoại trừ và nâng cao lại).