Để nhận ra một hàng đợi đơn giản trong redis có thể được sử dụng để gửi lại các công việc bị lỗi, tôi sẽ thử một cái gì đó như sau:
- 1 danh sách "up_for_grabs"
- 1 danh sách "being_worked_on"
- khóa tự động hết hạn
một công nhân đang cố gắng giành lấy một công việc sẽ làm như thế này:
timeout = 3600
#wrap this in a transaction so our cleanup wont kill the task
#Move the job away from the queue so nobody else tries to claim it
job = RPOPLPUSH(up_for_grabs, being_worked_on)
#Set a lock and expire it, the value tells us when that job will time out. This can be arbitrary though
SETEX('lock:' + job, Time.now + timeout, timeout)
#our application logic
do_work(job)
#Remove the finished item from the queue.
LREM being_worked_on -1 job
#Delete the item's lock. If it crashes here, the expire will take care of it
DEL('lock:' + job)
Và thỉnh thoảng, chúng tôi có thể lấy danh sách của mình và kiểm tra xem tất cả các công việc trong đó có thực sự có khóa hay không. trường hợp này chúng tôi sẽ gửi lại.
Đây sẽ là mã giả cho điều đó:
loop do
items = LRANGE(being_worked_on, 0, -1)
items.each do |job|
if !(EXISTS("lock:" + job))
puts "We found a job that didn't have a lock, resubmitting"
LREM being_worked_on -1 job
LPUSH(up_for_grabs, job)
end
end
sleep 60
end