[REDIS] 마지막 stream.on에서 비동기 함수의 콜백 ( '데이터') 이벤트 기다립니다
REDIS마지막 stream.on에서 비동기 함수의 콜백 ( '데이터') 이벤트 기다립니다
나는 스트림을 사용하여 CSV 파일을 반복하는 빠른 CSV를 사용하고 있습니다. CSV 파일의 각 행에 대해, 나는 KUE 사용되는 레디 스에서 일자리를 만들려고합니다. 행을 파싱하는 동기 함수이다. 모든 것은 다음과 같습니다 :
var csvStream = fastCsv(_config.csvOptions)
.on('data', function(data) {
var stream = this;
stream.pause();
var payload = parseRow(data, _config);
console.log(payload); // the payload is always printed to the console
var job = kue.create('csv-row', {
payload: payload
})
.save(function(err) {
if (!err) console.log('Enqueued at ' + job.id);
else console.log('Redis error ' + JSON.stringify(err));
stream.resume();
});
})
.on('end', function() {
callback(); // this ends my parsing
});
간단한의 CONSOLE.LOG (페이로드); 작업이 생성되지 않습니다하지만 내 CSV 파일의 모든 행에 대해 보여줍니다. 즉, 저장의 콜백의 출력 중 어느 것도 인쇄되지되고 있으며, 작업이 내 레디 스에 있지 않습니다.
이 CSV 파일의 마지막 행의 마지막 kue.create ()는 프로세스가 종료되기 전에 실행되지 않을 수 있으므로, 스트림이 이미 끝을 방출하기 때문에 나는 것 같은데요?
KUE이 완료 될 때까지, 스트림의 끝을 중단 할 수있는 방법이 있습니까?
해결법
-
==============================
1.당신은 비동기 라이브러리를 사용하여이 문제를 해결할 수 있습니다. 당신은 어떤 스트림에 대한 아래의 패턴을 사용할 수 있습니다.
당신은 비동기 라이브러리를 사용하여이 문제를 해결할 수 있습니다. 당신은 어떤 스트림에 대한 아래의 패턴을 사용할 수 있습니다.
var AsyncLib = require('async'); var worker = function (payload, cb) { //do something with payload and call callback return cb(); }; var concurrency = 5; var streamQueue = AsyncLib.queue(worker, concurrency); var stream = //some readable stream; stream.on('data', function(data) { //no need to pause and resume var payload = '//some payload'; streamQueue.push(payload); }) .on('end', function() { //register drain event on end and callback streamQueue.drain = function () { callback(); }; });
-
==============================
2.난 그냥 같은 상황에서 자신을 발견했다. 그리고 이동 언어와 sync.WaitGroup과 유사한 패턴을 결심했다.
난 그냥 같은 상황에서 자신을 발견했다. 그리고 이동 언어와 sync.WaitGroup과 유사한 패턴을 결심했다.
가장 단순한 형태의이 (함수가 약속을 반환합니다)과 같이 표시됩니다
function process() { const locker = { items: 0, resolve: null, lock: function() { this.items++ }, unlock: function() { if (this.items > 0) this.items--; if (this.items === 0) { this.resolve() this.resolve = null } }, }; return new Promise(function(resolve, reject) { locker.resolve = resolve; locker.lock(); fastCsv(_config.csvOptions) .on('end', function() { locker.unlock(); }) .on('data', function(data) { locker.lock(); const payload = parseRow(data, _config); kue.create('csv-row', { payload: payload }) .save(function(err) { if (!err) console.log('Enqueued at ' + job.id); else console.log('Redis error ' + JSON.stringify(err)); locker.unlock(); }); }); }); } process().then(function () { console.log('Now ALL data processed!'); });
실제 응용 프로그램에서 당신은 등 별개의 클래스 / 모듈, 추가 오류 처리에 사물함을 추출 할 수 있습니다 ...하지만 원칙은 동일합니다 -뿐만 아니라 스트림이 비어 있지만 모든 것이 완료에서 만든 작업을 블로킹 할 때까지 기다립니다.
from https://stackoverflow.com/questions/34464354/wait-for-callback-of-async-function-in-last-stream-ondata-event by cc-by-sa and MIT license
'REDIS' 카테고리의 다른 글
[REDIS] 레디 스에서 네임 스페이스? (0) | 2020.01.15 |
---|---|
[REDIS] ServiceStack.Redis.Sentinel 사용 (0) | 2020.01.15 |
[REDIS] 2 레디 스 온라인 사용자를 추적하기위한 접근한다. 어느 것이 더 빠르다? (0) | 2020.01.15 |
[REDIS] 어떻게 레디 스 서버의 CPU 사용량을 개선하기 위해? (0) | 2020.01.15 |
[REDIS] PooledRedisClientManager는 연결을 해제하지 (0) | 2020.01.15 |