[SQL] 다수의 소비자와 SQL 테이블로 작업 대기열 (PostgreSQL을)
SQL다수의 소비자와 SQL 테이블로 작업 대기열 (PostgreSQL을)
나는 전형적인 생산자 - 소비자 문제가 있습니다 :
여러 프로듀서 응용 프로그램은 PostgreSQL 데이터베이스에 작업 테이블에 작업 요청을 작성합니다.
작업 요청이 시작 상태 필드 생성에 대기가 포함되어 있습니다.
프로듀서가 새 레코드를 삽입하는 규칙에 의해 통지 여러 소비자 응용 프로그램이 있습니다 :
CREATE OR REPLACE RULE "jobrecord.added" AS
ON INSERT TO jobrecord DO
NOTIFY "jobrecordAdded";
그들은 RESERVED 그 상태를 설정하여 새 레코드를 예약하려고합니다. 물론, 단지 소비자에 성공합니다. 다른 모든 소비자는 같은 기록을 보유 할 수 없게한다. 그들은 대신 상태 = QUEUED 다른 기록을 보유해야한다.
예: 일부 생산자는 테이블 jobrecord에 다음과 같은 기록을 추가 :
id state owner payload
------------------------
1 QUEUED null <data>
2 QUEUED null <data>
3 QUEUED null <data>
4 QUEUED null <data>
지금,이 개 소비자 A, B 그들을 처리 할. 그들은 동시에 실행 시작합니다. 하나는 1 ID를 보유한다, 다른 하나는 ID (2)를 보유한다, 다음 마무리 등등 3 ID를 보유하고 있어야 첫번째 ..
순수한 멀티 스레드 세계에서 내가 작업 대기열에 대한 액세스를 제어하는 뮤텍스을 사용하지만, 소비자들은 다른 시스템에 실행할 수 있습니다 다른 프로세스입니다. 모든 동기화가 데이터베이스를 통해 발생합니다 그래서 그들은 단지, 동일한 데이터베이스에 액세스 할 수 있습니다.
나는 동시 액세스와 PostgreSQL을에 잠금, 예를 들어,에 대한 많은 문서를 읽어 http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html PostgreSQL을에 잠금 해제 행을 선택 PostgreSQL의 및 잠금
이 주제에서, 나는 다음과 같은 SQL 문은 내가 무엇을해야한다는 것을 배웠다 :
UPDATE jobrecord
SET owner= :owner, state = :reserved
WHERE id = (
SELECT id from jobrecord WHERE state = :queued
ORDER BY id LIMIT 1
)
RETURNING id; // will only return an id when they reserved it successfully
내가 시간의 약 50 %에서, 다수의 소비자 프로세스에서이 프로그램을 실행할 때 불행하게도, 그들은 여전히 모두를 처리하고 하나는 다른의 변경 사항을 덮어 같은 기록을 보유하고 있습니다.
나는 무엇을 놓치고? 어떻게 다수의 소비자가 같은 레코드를 보유하지 않도록 SQL 문을 작성해야합니까?
해결법
-
==============================
1.여기 내 게시물 읽기 :
여기 내 게시물 읽기 :
잠금 및 업데이트에 대한 선택과 PostgreSQL의 일관성
당신이 트랜잭션과 잠금 테이블을 사용하는 경우 아무런 문제가 없습니다.
-
==============================
2.나는뿐만 아니라 FIFO 큐에 대한 포스트 그레스를 사용합니다. I 원래 높은 동시성 정확한 결과를 얻을 ACCESS EXCLUSIVE를 사용하지만, 그 실행시에 ACCESS SHARE 잠금 pg_dump의 취득과 상호 배타적 인 불행한 효과를 갖는다. 이것은 아주 오랫동안합니다 (pg_dump의 기간)에 대한 잠금 내 옆에 () 함수를 발생합니다. 우리는 24 시간 연중 무휴 가게하고 고객이 한밤중에 큐에 데드 타임과 같이하지 않았다 때문에 허용하지 않았다.
나는뿐만 아니라 FIFO 큐에 대한 포스트 그레스를 사용합니다. I 원래 높은 동시성 정확한 결과를 얻을 ACCESS EXCLUSIVE를 사용하지만, 그 실행시에 ACCESS SHARE 잠금 pg_dump의 취득과 상호 배타적 인 불행한 효과를 갖는다. 이것은 아주 오랫동안합니다 (pg_dump의 기간)에 대한 잠금 내 옆에 () 함수를 발생합니다. 우리는 24 시간 연중 무휴 가게하고 고객이 한밤중에 큐에 데드 타임과 같이하지 않았다 때문에 허용하지 않았다.
나는 아직도 동시 안전 및 pg_dump를 실행하는 동안 잠글 것이다 덜 제한적인 잠금 장치가 있어야한다 생각. 내 검색이 SO 포스트에 나를 이끌었다.
그럼 몇 가지 조사를했다.
다음 모드가 실패 동시성없이 실행하고, 또한 pg_dump의에 차단하지에 대기에서 작업의 상태를 업데이트하는 FIFO 큐 NEXT () 함수에 충분합니다 :
SHARE UPDATE EXCLUSIVE SHARE ROW EXCLUSIVE EXCLUSIVE
질문:
begin; lock table tx_test_queue in exclusive mode; update tx_test_queue set status='running' where job_id in ( select job_id from tx_test_queue where status='queued' order by job_id asc limit 1 ) returning job_id; commit;
같은 외모 결과 :
UPDATE 1 job_id -------- 98 (1 row)
여기서 높은 동시성 (30)에 다른 로크 모드를 모두 테스트 쉘 스크립트이다.
#!/bin/bash # RESULTS, feel free to repro yourself # # noLock FAIL # accessShare FAIL # rowShare FAIL # rowExclusive FAIL # shareUpdateExclusive SUCCESS # share FAIL+DEADLOCKS # shareRowExclusive SUCCESS # exclusive SUCCESS # accessExclusive SUCCESS, but LOCKS against pg_dump #config strategy="exclusive" db=postgres dbuser=postgres queuecount=100 concurrency=30 # code psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);" # empty queue psql84 -t -U $dbuser $db -c "truncate tx_test_queue;"; echo "Simulating 10 second pg_dump with ACCESS SHARE" psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" & echo "Starting workers..." # queue $queuecount items seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');" #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;" # process $queuecount w/concurrency of $concurrency case $strategy in "noLock") strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "accessShare") strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "rowShare") strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "rowExclusive") strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "share") strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "shareRowExclusive") strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "exclusive") strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; "accessExclusive") strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";; *) echo "Unknown strategy $strategy";; esac echo $strategySql seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql" #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;" psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;" psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
편집 할 경우 코드도 여기에 있습니다 : https://gist.github.com/1083936
a)는 정확하고 B) pg_dump의과 충돌하지 않는 것이 가장 제한적인 모드가 이후 나는 EXCLUSIVE 모드를 사용하여 내 응용 프로그램을 업데이트하고 있습니다. 이 잠금 포스트 그레스의 동네 짱 - 전문가없이 ACCESS EXCLUSIVE에서 응용 프로그램을 변화의 측면에서 가장 위험한를 보이기 때문에 나는 가장 제한을 선택했다.
나는 내 테스트 장비와 함께하고 응답 뒤에 일반 아이디어를 매우 편안한 느낌. 나는 이것을 공유하는 다른 사람이 문제를 해결하는 데 도움이되기를 바랍니다.
-
==============================
3.필요가 이것에 대한 전체 테이블 잠금을 할 수 없습니다 : \.
필요가 이것에 대한 전체 테이블 잠금을 할 수 없습니다 : \.
업데이트로 만든 행 잠금은 잘 작동합니다.
apinstein의 대답에 만들어 여전히 작동하는지 확인 변경 I에 대한 https://gist.github.com/mackross/a49b72ad8d24f7cefc32를 참조하십시오.
최종 코드는
update tx_test_queue set status='running' where job_id in ( select job_id from tx_test_queue where status='queued' order by job_id asc limit 1 for update ) returning job_id;
-
==============================
4.에 대한 단지 무엇을 선택?
에 대한 단지 무엇을 선택?
SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;
https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE
-
==============================
5.당신은 queue_classic이 어떻게하는지보고 할 수 있습니다. https://github.com/ryandotsmith/queue_classic
당신은 queue_classic이 어떻게하는지보고 할 수 있습니다. https://github.com/ryandotsmith/queue_classic
코드는 매우 짧고 이해하기 쉽다.
-
==============================
6.좋아, jordani에서 링크를 기반으로, 나를 위해 노력 솔루션입니다. 내 문제의 일부는 Qt를-SQL의 작동 방식에 있었다, 나는 Qt의 코드를 포함 시켰습니다 :
좋아, jordani에서 링크를 기반으로, 나를 위해 노력 솔루션입니다. 내 문제의 일부는 Qt를-SQL의 작동 방식에 있었다, 나는 Qt의 코드를 포함 시켰습니다 :
QSqlDatabase db = GetDatabase(); db.transaction(); QSqlQuery lockQuery(db); bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; "); QSqlQuery query(db); query.prepare( "UPDATE jobrecord " " SET \"owner\"= :owner, state = :reserved " " WHERE id = ( " " SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 " " ) RETURNING id;" ); query.bindValue(":owner", pid); query.bindValue(":reserved", JobRESERVED); query.bindValue(":queued", JobQUEUED); bool result = query.exec();
다수의 소비자가 같은 작업을 처리 있는지 확인하려면, 나는 규칙 및 로그 테이블을 추가 :
CREATE TABLE serverjobrecord_log ( serverjobrecord_id integer, oldowner text, newowner text ) WITH ( OIDS=FALSE ); CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord WHERE old.owner IS NOT NULL AND new.state = 1 DO INSERT INTO jobrecord_log (id, oldowner, newowner) VALUES (new.id, old.owner, new.owner);
ACCESS EXCLUSIVE 모드에서 LOCK의 표 serverjobrecord없이; 문, 로그 테이블이 항목으로 occasionaly 채우고, 한 소비자가 있었다 다른 사람의 값을 덮어 가지고 있지만 LOCK 문을 사용하여, 로그 테이블이 비어 :-)
-
==============================
7.대신 바퀴를 개혁의 PgQ를 확인하십시오.
대신 바퀴를 개혁의 PgQ를 확인하십시오.
from https://stackoverflow.com/questions/6507475/job-queue-as-sql-table-with-multiple-consumers-postgresql by cc-by-sa and MIT license
'SQL' 카테고리의 다른 글
[SQL] Sequelize 단수 테이블 이름을 사용하도록하는 방법 (0) | 2020.06.21 |
---|---|
[SQL] 그것은 MySQL을 사용 여러 열 BY GROUP 할 수 있습니까? (0) | 2020.06.21 |
[SQL] 테이블 스캔 및 클러스터 된 인덱스 스캔의 차이점은 무엇입니까? (0) | 2020.06.21 |
[SQL] SQL 서버 - 여기서 "sys.functions"인가? (0) | 2020.06.21 |
[SQL] MySQL은 '스키마를 생성'과 '데이터베이스를 만드는'- 어떤 차이가 있나요 (0) | 2020.06.21 |