복붙노트

[SQL] 다수의 소비자와 SQL 테이블로 작업 대기열 (PostgreSQL을)

SQL

다수의 소비자와 SQL 테이블로 작업 대기열 (PostgreSQL을)

나는 전형적인 생산자 - 소비자 문제가 있습니다 :

여러 프로듀서 응용 프로그램은 PostgreSQL 데이터베이스에 작업 테이블에 작업 요청을 작성합니다.

작업 요청이 시작 상태 필드 생성에 대기가 포함되어 있습니다.

프로듀서가 새 레코드를 삽입하는 규칙에 의해 통지 여러 소비자 응용 프로그램이 있습니다 :

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

그들은 RESERVED 그 상태를 설정하여 새 레코드를 예약하려고합니다. 물론, 단지 소비자에 성공합니다. 다른 모든 소비자는 같은 기록을 보유 할 수 없게한다. 그들은 대신 상태 = QUEUED 다른 기록을 보유해야한다.

예: 일부 생산자는 테이블 jobrecord에 다음과 같은 기록을 추가 :

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

지금,이 개 소비자 A, B 그들을 처리 할. 그들은 동시에 실행 시작합니다. 하나는 1 ID를 보유한다, 다른 하나는 ID (2)를 보유한다, 다음 마무리 등등 3 ID를 보유하고 있어야 첫번째 ..

순수한 멀티 스레드 세계에서 내가 작업 대기열에 대한 액세스를 제어하는 ​​뮤텍스을 사용하지만, 소비자들은 다른 시스템에 실행할 수 있습니다 다른 프로세스입니다. 모든 동기화가 데이터베이스를 통해 발생합니다 그래서 그들은 단지, 동일한 데이터베이스에 액세스 할 수 있습니다.

나는 동시 액세스와 PostgreSQL을에 잠금, 예를 들어,에 대한 많은 문서를 읽어 http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html PostgreSQL을에 잠금 해제 행을 선택 PostgreSQL의 및 잠금

이 주제에서, 나는 다음과 같은 SQL 문은 내가 무엇을해야한다는 것을 배웠다 :

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

내가 시간의 약 50 %에서, 다수의 소비자 프로세스에서이 프로그램을 실행할 때 불행하게도, 그들은 여전히 ​​모두를 처리하고 하나는 다른의 변경 사항을 덮어 같은 기록을 보유하고 있습니다.

나는 무엇을 놓치고? 어떻게 다수의 소비자가 같은 레코드를 보유하지 않도록 SQL 문을 작성해야합니까?

해결법

  1. ==============================

    1.여기 내 게시물 읽기 :

    여기 내 게시물 읽기 :

    잠금 및 업데이트에 대한 선택과 PostgreSQL의 일관성

    당신이 트랜잭션과 잠금 테이블을 사용하는 경우 아무런 문제가 없습니다.

  2. ==============================

    2.나는뿐만 아니라 FIFO 큐에 대한 포스트 그레스를 사용합니다. I 원래 높은 동시성 정확한 결과를 얻을 ACCESS EXCLUSIVE를 사용하지만, 그 실행시에 ACCESS SHARE 잠금 pg_dump의 취득과 상호 배타적 인 불행한 효과를 갖는다. 이것은 아주 오랫동안합니다 (pg_dump의 기간)에 대한 잠금 내 옆에 () 함수를 발생합니다. 우리는 24 시간 연중 무휴 가게하고 고객이 한밤중에 큐에 데드 타임과 같이하지 않았다 때문에 허용하지 않았다.

    나는뿐만 아니라 FIFO 큐에 대한 포스트 그레스를 사용합니다. I 원래 높은 동시성 정확한 결과를 얻을 ACCESS EXCLUSIVE를 사용하지만, 그 실행시에 ACCESS SHARE 잠금 pg_dump의 취득과 상호 배타적 인 불행한 효과를 갖는다. 이것은 아주 오랫동안합니다 (pg_dump의 기간)에 대한 잠금 내 옆에 () 함수를 발생합니다. 우리는 24 시간 연중 무휴 가게하고 고객이 한밤중에 큐에 데드 타임과 같이하지 않았다 때문에 허용하지 않았다.

    나는 아직도 동시 안전 및 pg_dump를 실행하는 동안 잠글 것이다 덜 제한적인 잠금 장치가 있어야한다 생각. 내 검색이 SO 포스트에 나를 이끌었다.

    그럼 몇 가지 조사를했다.

    다음 모드가 실패 동시성없이 실행하고, 또한 pg_dump의에 차단하지에 대기에서 작업의 상태를 업데이트하는 FIFO 큐 NEXT () 함수에 충분합니다 :

    SHARE UPDATE EXCLUSIVE
    SHARE ROW EXCLUSIVE
    EXCLUSIVE
    

    질문:

    begin;
    lock table tx_test_queue in exclusive mode;
    update 
        tx_test_queue
    set 
        status='running'
    where
        job_id in (
            select
                job_id
            from
                tx_test_queue
            where
                status='queued'
            order by 
                job_id asc
            limit 1
        )
    returning job_id;
    commit;
    

    같은 외모 결과 :

    UPDATE 1
     job_id
    --------
         98
    (1 row)
    

    여기서 높은 동시성 (30)에 다른 로크 모드를 모두 테스트 쉘 스크립트이다.

    #!/bin/bash
    # RESULTS, feel free to repro yourself
    #
    # noLock                    FAIL
    # accessShare               FAIL
    # rowShare                  FAIL
    # rowExclusive              FAIL
    # shareUpdateExclusive      SUCCESS
    # share                     FAIL+DEADLOCKS
    # shareRowExclusive         SUCCESS
    # exclusive                 SUCCESS
    # accessExclusive           SUCCESS, but LOCKS against pg_dump
    
    #config
    strategy="exclusive"
    
    db=postgres
    dbuser=postgres
    queuecount=100
    concurrency=30
    
    # code
    psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
    # empty queue
    psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
    echo "Simulating 10 second pg_dump with ACCESS SHARE"
    psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &
    
    echo "Starting workers..."
    # queue $queuecount items
    seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
    #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
    # process $queuecount w/concurrency of $concurrency
    case $strategy in
        "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
        *) echo "Unknown strategy $strategy";;
    esac
    echo $strategySql
    seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
    #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
    psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
    psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
    

    편집 할 경우 코드도 여기에 있습니다 : https://gist.github.com/1083936

    a)는 정확하고 B) pg_dump의과 충돌하지 않는 것이 가장 제한적인 모드가 이후 나는 EXCLUSIVE 모드를 사용하여 내 응용 프로그램을 업데이트하고 있습니다. 이 잠금 포스트 그레스의 동네 짱 - 전문가없이 ACCESS EXCLUSIVE에서 응용 프로그램을 변화의 측면에서 가장 위험한를 보이기 때문에 나는 가장 제한을 선택했다.

    나는 내 테스트 장비와 함께하고 응답 뒤에 일반 아이디어를 매우 편안한 느낌. 나는 이것을 공유하는 다른 사람이 문제를 해결하는 데 도움이되기를 바랍니다.

  3. ==============================

    3.필요가 이것에 대한 전체 테이블 잠금을 할 수 없습니다 : \.

    필요가 이것에 대한 전체 테이블 잠금을 할 수 없습니다 : \.

    업데이트로 만든 행 잠금은 잘 작동합니다.

    apinstein의 대답에 만들어 여전히 작동하는지 확인 변경 I에 대한 https://gist.github.com/mackross/a49b72ad8d24f7cefc32를 참조하십시오.

    최종 코드는

    update 
        tx_test_queue
    set 
        status='running'
    where
        job_id in (
            select
                job_id
            from
                tx_test_queue
            where
                status='queued'
            order by 
                job_id asc
            limit 1 for update
        )
    returning job_id;
    
  4. ==============================

    4.에 대한 단지 무엇을 선택?

    에 대한 단지 무엇을 선택?

    SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;
    

    https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

  5. ==============================

    5.당신은 queue_classic이 어떻게하는지보고 할 수 있습니다. https://github.com/ryandotsmith/queue_classic

    당신은 queue_classic이 어떻게하는지보고 할 수 있습니다. https://github.com/ryandotsmith/queue_classic

    코드는 매우 짧고 이해하기 쉽다.

  6. ==============================

    6.좋아, jordani에서 링크를 기반으로, 나를 위해 노력 솔루션입니다. 내 문제의 일부는 Qt를-SQL의 작동 방식에 있었다, 나는 Qt의 코드를 포함 시켰습니다 :

    좋아, jordani에서 링크를 기반으로, 나를 위해 노력 솔루션입니다. 내 문제의 일부는 Qt를-SQL의 작동 방식에 있었다, 나는 Qt의 코드를 포함 시켰습니다 :

    QSqlDatabase db = GetDatabase();
    db.transaction();
    QSqlQuery lockQuery(db);
    bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
    QSqlQuery query(db);
    query.prepare(    
    "UPDATE jobrecord "
    "  SET \"owner\"= :owner, state = :reserved "
    "  WHERE id = ( "
    "    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
    "  ) RETURNING id;"
    );
    query.bindValue(":owner", pid);
    query.bindValue(":reserved", JobRESERVED);
    query.bindValue(":queued", JobQUEUED); 
    bool result = query.exec();
    

    다수의 소비자가 같은 작업을 처리 있는지 확인하려면, 나는 규칙 및 로그 테이블을 추가 :

    CREATE TABLE serverjobrecord_log
    (
      serverjobrecord_id integer,
      oldowner text,
      newowner text
    ) WITH ( OIDS=FALSE );
    
    
    CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
    WHERE old.owner IS NOT NULL AND new.state = 1 
    DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
        VALUES (new.id, old.owner, new.owner);
    

    ACCESS EXCLUSIVE 모드에서 LOCK의 표 serverjobrecord없이; 문, 로그 테이블이 항목으로 occasionaly 채우고, 한 소비자가 있었다 다른 사람의 값을 덮어 가지고 있지만 LOCK 문을 사용하여, 로그 테이블이 비어 :-)

  7. ==============================

    7.대신 바퀴를 개혁의 PgQ를 확인하십시오.

    대신 바퀴를 개혁의 PgQ를 확인하십시오.

  8. from https://stackoverflow.com/questions/6507475/job-queue-as-sql-table-with-multiple-consumers-postgresql by cc-by-sa and MIT license