복붙노트

[REDIS] 마지막 stream.on에서 비동기 함수의 콜백 ( '데이터') 이벤트 기다립니다

REDIS

마지막 stream.on에서 비동기 함수의 콜백 ( '데이터') 이벤트 기다립니다

나는 스트림을 사용하여 CSV 파일을 반복하는 빠른 CSV를 사용하고 있습니다. CSV 파일의 각 행에 대해, 나는 KUE 사용되는 레디 스에서 일자리를 만들려고합니다. 행을 파싱하는 동기 함수이다. 모든 것은 다음과 같습니다 :

var csvStream = fastCsv(_config.csvOptions)
  .on('data', function(data) {
    var stream = this;
    stream.pause();

    var payload = parseRow(data, _config);
    console.log(payload); // the payload is always printed to the console

    var job = kue.create('csv-row', {
        payload: payload
      })
      .save(function(err) {
        if (!err) console.log('Enqueued at ' + job.id);
        else console.log('Redis error ' + JSON.stringify(err));

        stream.resume();
      });
  })
  .on('end', function() {
    callback(); // this ends my parsing
  });

간단한의 CONSOLE.LOG (페이로드); 작업이 생성되지 않습니다하지만 내 CSV 파일의 모든 행에 대해 보여줍니다. 즉, 저장의 콜백의 출력 중 어느 것도 인쇄되지되고 있으며, 작업이 내 레디 스에 있지 않습니다.

이 CSV 파일의 마지막 행의 마지막 kue.create ()는 프로세스가 종료되기 전에 실행되지 않을 수 있으므로, 스트림이 이미 끝을 방출하기 때문에 나는 것 같은데요?

KUE이 완료 될 때까지, 스트림의 끝을 중단 할 수있는 방법이 있습니까?

해결법

  1. ==============================

    1.당신은 비동기 라이브러리를 사용하여이 문제를 해결할 수 있습니다. 당신은 어떤 스트림에 대한 아래의 패턴을 사용할 수 있습니다.

    당신은 비동기 라이브러리를 사용하여이 문제를 해결할 수 있습니다. 당신은 어떤 스트림에 대한 아래의 패턴을 사용할 수 있습니다.

    var AsyncLib = require('async');
    
    var worker = function (payload, cb) {
        //do something with payload and call callback
        return cb();
    };
    
    var concurrency = 5;
    var streamQueue = AsyncLib.queue(worker, concurrency);
    
    var stream = //some readable stream;
    
    stream.on('data', function(data) {
        //no need to pause and resume
        var payload = '//some payload';
        streamQueue.push(payload);
    })
    .on('end', function() {
        //register drain event on end and callback
        streamQueue.drain = function () {
            callback();
        };
    });
    
  2. ==============================

    2.난 그냥 같은 상황에서 자신을 발견했다. 그리고 이동 언어와 sync.WaitGroup과 유사한 패턴을 결심했다.

    난 그냥 같은 상황에서 자신을 발견했다. 그리고 이동 언어와 sync.WaitGroup과 유사한 패턴을 결심했다.

    가장 단순한 형태의이 (함수가 약속을 반환합니다)과 같이 표시됩니다

    function process() {
        const locker = {
            items: 0,
            resolve: null,
            lock: function() { this.items++ },
            unlock: function() {
                if (this.items > 0) this.items--;
                if (this.items === 0) {
                    this.resolve()
                    this.resolve = null
                }
            },
        };
        return new Promise(function(resolve, reject) {
            locker.resolve = resolve;
            locker.lock();
    
            fastCsv(_config.csvOptions)
            .on('end', function() { locker.unlock(); })
            .on('data', function(data) {
                locker.lock();
                const payload = parseRow(data, _config);
                kue.create('csv-row', { payload: payload })
                    .save(function(err) {
                        if (!err) console.log('Enqueued at ' + job.id);
                        else console.log('Redis error ' + JSON.stringify(err));
                        locker.unlock();
                    });
            });
    
        });
    }
    
    process().then(function () {
        console.log('Now ALL data processed!');
    });
    

    실제 응용 프로그램에서 당신은 등 별개의 클래스 / 모듈, 추가 오류 처리에 사물함을 추출 할 수 있습니다 ...하지만 원칙은 동일합니다 -뿐만 아니라 스트림이 비어 있지만 모든 것이 완료에서 만든 작업을 블로킹 할 때까지 기다립니다.

  3. from https://stackoverflow.com/questions/34464354/wait-for-callback-of-async-function-in-last-stream-ondata-event by cc-by-sa and MIT license