복붙노트

[REDIS] 어떻게 futures.rs와 레디 스 PubSub를 사용하여 차단 호출을위한 선물의 스트림을 구현하는 방법?

REDIS

어떻게 futures.rs와 레디 스 PubSub를 사용하여 차단 호출을위한 선물의 스트림을 구현하는 방법?

내 응용 프로그램은 레디 스 PubSub 채널에서 스트리밍 데이터를 수신하고 처리 할 수있는 시스템을 만들려고 해요. 나는 내가 본 것을 녹 다른 모든 레디 스 드라이버와 함께 사용하고 있다는 레디 스 드라이버는,이 데이터를 수신하는 경우에만이 값을 반환하는 채널에서 데이터를 얻을 수있는 차단 작업을 사용 :

let msg = match pubsub.get_message() {
        Ok(m) => m,
        Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
    Ok(s) => s,
    Err(_) => panic!("Could not convert redis message to string!")
};

I 입력을 기다리는 동안 나는 내 응용 프로그램 내에서 다른 작업을 수행 할 수 있도록 미래에이 차단 함수 호출을 래핑하는 선물-RS 라이브러리를 사용하고 싶었다.

나는 선물과이 데이터가 PubSub에 의해 수신 될 때 신호 할텐데 스트림을 만들려고에 대한 자습서를 읽고,하지만 난 그렇게하는 방법을 알아낼 수 없습니다.

어떻게 차단 pubsub.get_message () 함수에 대한 일정 및 설문 조사 기능을 만들 수 있습니까?

해결법

  1. ==============================

    1.헤비는 내가 전에이 라이브러리를 사용한 적이, 그리고 몇 가지 개념의 내 낮은 수준의 지식이 부족 ... 조금입니다주의. 대부분 내가 자습서를 읽고 있어요. 나는이와 웃음을 읽을 작업을 비동기 수행했다 확신 사람 해요,하지만 그것은 다른 사람들을위한 유용한 출발점이 될 수 있습니다. 주의의 위험 부담!

    헤비는 내가 전에이 라이브러리를 사용한 적이, 그리고 몇 가지 개념의 내 낮은 수준의 지식이 부족 ... 조금입니다주의. 대부분 내가 자습서를 읽고 있어요. 나는이와 웃음을 읽을 작업을 비동기 수행했다 확신 사람 해요,하지만 그것은 다른 사람들을위한 유용한 출발점이 될 수 있습니다. 주의의 위험 부담!

    의는 스트림이 작동하는 방법을 보여, 조금 간단한 무언가 시작하자. 우리는 스트림으로 결과의 반복자를 변환 할 수 있습니다 :

    extern crate futures;
    
    use futures::Future;
    use futures::stream::{self, Stream};
    
    fn main() {
        let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
        let payloads = stream::iter(payloads.into_iter());
    
        let foo = payloads
            .and_then(|payload| futures::finished(println!("{}", payload)))
            .for_each(|_| Ok(()));
    
        foo.forget();
    }
    

    하나 개의 방법이 쇼 우리는 스트림을 소비합니다. 우리는 (여기에 단지를 인쇄) 한 후 미래에 스트림 다시 변환의 for_each 각 페이로드에 뭔가를 and_then 사용합니다. 우리는 그 다음 이상하게 잊지라는 이름의 메소드를 호출하여 미래를 실행할 수 있습니다.

    다음은 하나의 메시지를 처리, 믹스로 레디 스 라이브러리를 묶어 것입니다. GET_MESSAGE () 메소드가 차단되어 있기 때문에, 우리는 혼합으로 일부 스레드를 도입 할 필요가있다. 그것은 다른 모든이 차단됩니다 비동기 이러한 유형의 시스템에서 작업의 많은 양을 수행하는 것은 좋은 생각이 아니다. 예를 들면 :

    이상적인 세계에서는, 레디 스의 상자는 선물 같은 라이브러리 꼭대기에 지어 기본적으로이 모든 것을 노출 될 것이다.

    extern crate redis;
    extern crate futures;
    
    use std::thread;
    use futures::Future;
    use futures::stream::{self, Stream};
    
    fn main() {
        let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
    
        let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
        pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
    
        let (tx, payloads) = stream::channel();
    
        let redis_thread = thread::spawn(move || {
            let msg = pubsub.get_message().expect("Unable to get message");
            let payload: Result<String, _> = msg.get_payload();
            tx.send(payload).forget();
        });
    
        let foo = payloads
            .and_then(|payload| futures::finished(println!("{}", payload)))
            .for_each(|_| Ok(()));
    
        foo.forget();
        redis_thread.join().expect("unable to join to thread");
    }
    

    나의 이해는 여기 흐려집니다. 별도의 스레드에서, 우리는 메시지를 차단하고 우리가 그것을 얻을 때 채널에 밀어 넣습니다. 우리는 스레드의 핸들에 유지에 필요한 이유를 내가 이해하지 못하는 것은. 나는 foo.forget 스트림이 빌 때까지 기다리고, 그 자체를 차단하고있을 것이라고 기대.

    레디 스 서버에 telnet 연결,이를 보내 :

    publish rust awesome
    

    그리고 당신은 작동 표시됩니다. 스레드가 양산되기 전에 (나를 위해)가 foo.forget 문이 실행될 때 인쇄 문 쇼를 추가.

    여러 메시지가 까다 롭습니다. 보낸 사람은 너무 멀리 앞서 소비 측면의 점점에서 생성 측을 방지하기 위해 자체를 소비한다. 이것은 전송에서 또 다른 미래를 반환하여 수행됩니다! 우리는 루프의 다음 반복을 재사용 거기에서 다시 셔틀 필요 :

    extern crate redis;
    extern crate futures;
    
    use std::thread;
    use std::sync::mpsc;
    
    use futures::Future;
    use futures::stream::{self, Stream};
    
    fn main() {
        let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
    
        let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
        pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
    
        let (tx, payloads) = stream::channel();
    
        let redis_thread = thread::spawn(move || {
            let mut tx = tx;
    
            while let Ok(msg) = pubsub.get_message() {
                let payload: Result<String, _> = msg.get_payload();
    
                let (next_tx_tx, next_tx_rx) = mpsc::channel();
    
                tx.send(payload).and_then(move |new_tx| {
                    next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                    futures::finished(())
                }).forget();
    
                tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
            }
        });
    
        let foo = payloads
            .and_then(|payload| futures::finished(println!("{}", payload)))
            .for_each(|_| Ok(()));
    
        foo.forget();
        redis_thread.join().expect("unable to join to thread");
    }
    

    나는 시간이 지남에 따라 상호 운용이 유형의 더 많은 에코 시스템이 될 것이라고 확신합니다. 예를 들어, 선물-cpupool의 상자는 아마이 유사한 쓰임새를 지원하도록 확장 할 수있다.

  2. from https://stackoverflow.com/questions/38906680/how-to-implement-a-stream-of-futures-for-a-blocking-call-using-futures-rs-and-re by cc-by-sa and MIT license