首页 文章

作业队列为具有多个使用者的SQL表(PostgreSQL)

提问于
浏览
32

我有一个典型的 生产环境 者 - 消费者问题:

多个 生产环境 者应用程序将作业请求写入PostgreSQL数据库上的作业表 .

作业请求的状态字段在创建时包含QUEUED .

当 生产环境 者插入新记录时,规则会通知 multiple 个消费者应用程序:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

他们将尝试通过将其状态设置为RESERVED来保留新记录 . 当然,只有消费者才能成功 . 所有其他消费者不应该保留相同的记录 . 他们应该保留state = QUEUED的其他记录 .

示例:某个 生产环境 者将以下记录添加到表jobrecord:

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

现在,两个消费者A,B想要处理它们 . 他们同时开始跑步 . 一个应该保留id 1,另一个应该保留id 2,然后完成的第一个应该保留id 3等等 .

在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可以在不同机器上运行的不同进程 . 它们只访问同一个数据库,因此所有同步都必须通过数据库进行 .

我在PostgreSQL中阅读了很多关于并发访问和锁定的文档,例如: http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Select unlocked row in Postgresql PostgreSQL and locking

从这些主题中我了解到,以下SQL语句应该能够满足我的需求:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

不幸的是,当我在多个消费者流程中运行它时,在大约50%的时间内,它们仍然保留相同的记录,处理它和一个覆盖另一个的更改 .

我错过了什么?如何编写SQL语句以便多个使用者不会保留相同的记录?

7 回答

  • 4

    在这里阅读我的帖子:

    Consistency in postgresql with locking and select for update

    如果您使用事务和LOCK TABLE,您将没有任何问题 .

  • 2

    我也使用postgres作为FIFO队列 . 我最初使用的是ACCESS EXCLUSIVE,它在高并发性方面产生了正确的结果,但是与pg_dump互斥是一个不幸的结果,pg_dump在执行期间获取了一个ACCESS SHARE锁 . 这会导致我的next()函数锁定很长时间(pg_dump的持续时间) . 这是不可接受的,因为我们是一个24x7的商店,客户不喜欢在半夜排队的死时间 .

    我认为必须有一个限制较少的锁,它仍然是并发安全的,而不是在pg_dump运行时锁定 . 我的搜索引导我到这个SO帖子 .

    然后我做了一些研究 .

    以下模式足以用于FIFO队列NEXT()函数,该函数将更新作业状态从排队到运行而不会出现任何并发失败,也不会阻止pg_dump:

    SHARE UPDATE EXCLUSIVE
    SHARE ROW EXCLUSIVE
    EXCLUSIVE
    

    查询:

    begin;
    lock table tx_test_queue in exclusive mode;
    update 
        tx_test_queue
    set 
        status='running'
    where
        job_id in (
            select
                job_id
            from
                tx_test_queue
            where
                status='queued'
            order by 
                job_id asc
            limit 1
        )
    returning job_id;
    commit;
    

    结果如下:

    UPDATE 1
     job_id
    --------
         98
    (1 row)
    

    这是一个shell脚本,它以高并发性(30)测试所有不同的锁模式 .

    #!/bin/bash
    # RESULTS, feel free to repro yourself
    #
    # noLock                    FAIL
    # accessShare               FAIL
    # rowShare                  FAIL
    # rowExclusive              FAIL
    # shareUpdateExclusive      SUCCESS
    # share                     FAIL+DEADLOCKS
    # shareRowExclusive         SUCCESS
    # exclusive                 SUCCESS
    # accessExclusive           SUCCESS, but LOCKS against pg_dump
    
    #config
    strategy="exclusive"
    
    db=postgres
    dbuser=postgres
    queuecount=100
    concurrency=30
    
    # code
    psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
    # empty queue
    psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
    echo "Simulating 10 second pg_dump with ACCESS SHARE"
    psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &
    
    echo "Starting workers..."
    # queue $queuecount items
    seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
    #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
    # process $queuecount w/concurrency of $concurrency
    case $strategy in
        "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        *) echo "Unknown strategy $strategy";;
    esac
    echo $strategySql
    seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
    #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
    psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
    psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
    

    如果您想编辑,代码也在这里:https://gist.github.com/1083936

    我正在更新我的应用程序以使用EXCLUSIVE模式,因为它是最严格的模式,a)是正确的,b)与pg_dump不冲突 . 我选择了最严格的限制因为它似乎是最不具风险的,因为在没有成为postgres锁定的超级专家的情况下,从ACCESS EXCLUSIVE更改应用程序 .

    我对我的试验台和答案背后的一般想法感到非常满意 . 我希望分享这有助于解决其他问题 .

  • 5

    无需为此执行整个表锁:\ .

    使用 for update 创建的行锁可以正常工作 .

    请参阅https://gist.github.com/mackross/a49b72ad8d24f7cefc32,了解我对apinstein的回答所做的更改,并确认它仍然有效 .

    最终的代码是

    update 
        tx_test_queue
    set 
        status='running'
    where
        job_id in (
            select
                job_id
            from
                tx_test_queue
            where
                status='queued'
            order by 
                job_id asc
            limit 1 for update
        )
    returning job_id;
    
  • 14

    那么选择呢?

    SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;
    

    https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

  • 32

    您可能想看看queue_classic是如何做到的 . https://github.com/ryandotsmith/queue_classic

    代码非常简短易懂 .

  • -1

    检查PgQ而不是重新发明轮子 .

  • -1

    好的,基于jordani的链接,这是适合我的解决方案 . 由于我的一些问题是Qt-SQL的工作方式,我已经包含了Qt代码:

    QSqlDatabase db = GetDatabase();
    db.transaction();
    QSqlQuery lockQuery(db);
    bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
    QSqlQuery query(db);
    query.prepare(    
    "UPDATE jobrecord "
    "  SET \"owner\"= :owner, state = :reserved "
    "  WHERE id = ( "
    "    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
    "  ) RETURNING id;"
    );
    query.bindValue(":owner", pid);
    query.bindValue(":reserved", JobRESERVED);
    query.bindValue(":queued", JobQUEUED); 
    bool result = query.exec();
    

    要检查,如果多个使用者处理相同的作业,我添加了规则和日志表:

    CREATE TABLE serverjobrecord_log
    (
      serverjobrecord_id integer,
      oldowner text,
      newowner text
    ) WITH ( OIDS=FALSE );
    
    
    CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
    WHERE old.owner IS NOT NULL AND new.state = 1 
    DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
        VALUES (new.id, old.owner, new.owner);
    

    如果没有 LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; 语句,日志表会偶尔填充条目,如果一个消费者已经覆盖了另一个消息的值,但是使用LOCK语句,日志表仍然是空的:-)

相关问题