PostgreSQL
 sql >> Cơ Sở Dữ Liệu >  >> RDS >> PostgreSQL

PostgreSQL - triển khai một hàng đợi đáng tin cậy

Xem xét nhiều người đọc, cần phải kiểm soát bản ghi mà mỗi người đọc đã nhận được.

Ngoài ra, người ta nói rằng đơn đặt hàng cũng là một điều kiện để gửi hồ sơ đến người đọc. Vì vậy, nếu một số giao dịch tiếp theo đã được thực hiện trước một giao dịch trước đó, chúng tôi phải "dừng" và chỉ gửi lại hồ sơ khi nó đã cam kết, để duy trì thứ tự của hồ sơ được gửi đến người đọc.

Điều đó nói rằng, hãy kiểm tra việc triển khai:

-- lets create our queue table 
drop table if exists queue_records cascade;
create table if not exists queue_records 
(
  cod serial primary key,
  date_posted timestamp default timeofday()::timestamp,
  message text
);


-- lets create a table to save "checkpoints" per reader_id
drop table if exists queue_reader_checkpoint cascade;
create table if not exists queue_reader_checkpoint 
(
  reader_id text primary key,
  last_checkpoint numeric
);



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
RETURNS SETOF queue_records AS
$BODY$
DECLARE
    vLAST_CHECKPOINT    numeric;
    vCHECKPOINT_EXISTS  integer;
    vRECORD         queue_records%rowtype;
BEGIN

    -- let's get the last record sent to the reader 
    SELECT  last_checkpoint
    INTO    vLAST_CHECKPOINT
    FROM    queue_reader_checkpoint
    WHERE   reader_id = pREADER_ID;

    -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
    -- sets it to the last cod from queue. It means that reader will get records from now on.
    if (vLAST_CHECKPOINT is null) then
        -- sets the flag indicating the reader does not have any checkpoint recorded
        vCHECKPOINT_EXISTS = 0;
        -- gets the very last commited record
        SELECT  MAX(cod)
        INTO    vLAST_CHECKPOINT
        FROM    queue_records;
    else
        -- sets the flag indicating the reader already have a checkpoint recorded
        vCHECKPOINT_EXISTS = 1; 
    end if;

    -- now let's get the records from the queue one-by-one 
    FOR vRECORD IN 
            SELECT  *
            FROM    queue_records
            WHERE   COD > vLAST_CHECKPOINT 
            ORDER   BY COD
    LOOP

        -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
        if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then

            -- let's save the last record read
            vLAST_CHECKPOINT = vRECORD.COD;

            -- and return it
            RETURN NEXT vRECORD;

        -- but, if it is not, then is out of order
        else
            -- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
            -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
            exit;
        end if;
    END LOOP;


    -- now we have to persist the last record read to be retrieved on next call
    if (vCHECKPOINT_EXISTS = 0) then
        INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
    else        
        UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
    end if; 
end;
$BODY$ LANGUAGE plpgsql VOLATILE;



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Làm thế nào để cấu hình Postgresql với dự án rails?

  2. gitlab Lỗi 500 sau khi cập nhật khi xem các dự án

  3. Postgres Query JSON Array chứa một cái gì đó

  4. Có thể tạo bảng với tên biến trong PostgreSQL không?

  5. Tôi có thể sử dụng lệnh \ copy vào một hàm của postgresql không?