[REDIS] 어떻게 futures.rs와 레디 스 PubSub를 사용하여 차단 호출을위한 선물의 스트림을 구현하는 방법?
REDIS어떻게 futures.rs와 레디 스 PubSub를 사용하여 차단 호출을위한 선물의 스트림을 구현하는 방법?
내 응용 프로그램은 레디 스 PubSub 채널에서 스트리밍 데이터를 수신하고 처리 할 수있는 시스템을 만들려고 해요. 나는 내가 본 것을 녹 다른 모든 레디 스 드라이버와 함께 사용하고 있다는 레디 스 드라이버는,이 데이터를 수신하는 경우에만이 값을 반환하는 채널에서 데이터를 얻을 수있는 차단 작업을 사용 :
let msg = match pubsub.get_message() {
Ok(m) => m,
Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
Ok(s) => s,
Err(_) => panic!("Could not convert redis message to string!")
};
I 입력을 기다리는 동안 나는 내 응용 프로그램 내에서 다른 작업을 수행 할 수 있도록 미래에이 차단 함수 호출을 래핑하는 선물-RS 라이브러리를 사용하고 싶었다.
나는 선물과이 데이터가 PubSub에 의해 수신 될 때 신호 할텐데 스트림을 만들려고에 대한 자습서를 읽고,하지만 난 그렇게하는 방법을 알아낼 수 없습니다.
어떻게 차단 pubsub.get_message () 함수에 대한 일정 및 설문 조사 기능을 만들 수 있습니까?
해결법
-
==============================
1.헤비는 내가 전에이 라이브러리를 사용한 적이, 그리고 몇 가지 개념의 내 낮은 수준의 지식이 부족 ... 조금입니다주의. 대부분 내가 자습서를 읽고 있어요. 나는이와 웃음을 읽을 작업을 비동기 수행했다 확신 사람 해요,하지만 그것은 다른 사람들을위한 유용한 출발점이 될 수 있습니다. 주의의 위험 부담!
헤비는 내가 전에이 라이브러리를 사용한 적이, 그리고 몇 가지 개념의 내 낮은 수준의 지식이 부족 ... 조금입니다주의. 대부분 내가 자습서를 읽고 있어요. 나는이와 웃음을 읽을 작업을 비동기 수행했다 확신 사람 해요,하지만 그것은 다른 사람들을위한 유용한 출발점이 될 수 있습니다. 주의의 위험 부담!
의는 스트림이 작동하는 방법을 보여, 조금 간단한 무언가 시작하자. 우리는 스트림으로 결과의 반복자를 변환 할 수 있습니다 :
extern crate futures; use futures::Future; use futures::stream::{self, Stream}; fn main() { let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())]; let payloads = stream::iter(payloads.into_iter()); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); }
하나 개의 방법이 쇼 우리는 스트림을 소비합니다. 우리는 (여기에 단지를 인쇄) 한 후 미래에 스트림 다시 변환의 for_each 각 페이로드에 뭔가를 and_then 사용합니다. 우리는 그 다음 이상하게 잊지라는 이름의 메소드를 호출하여 미래를 실행할 수 있습니다.
다음은 하나의 메시지를 처리, 믹스로 레디 스 라이브러리를 묶어 것입니다. GET_MESSAGE () 메소드가 차단되어 있기 때문에, 우리는 혼합으로 일부 스레드를 도입 할 필요가있다. 그것은 다른 모든이 차단됩니다 비동기 이러한 유형의 시스템에서 작업의 많은 양을 수행하는 것은 좋은 생각이 아니다. 예를 들면 :
이상적인 세계에서는, 레디 스의 상자는 선물 같은 라이브러리 꼭대기에 지어 기본적으로이 모든 것을 노출 될 것이다.
extern crate redis; extern crate futures; use std::thread; use futures::Future; use futures::stream::{self, Stream}; fn main() { let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); let (tx, payloads) = stream::channel(); let redis_thread = thread::spawn(move || { let msg = pubsub.get_message().expect("Unable to get message"); let payload: Result<String, _> = msg.get_payload(); tx.send(payload).forget(); }); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); redis_thread.join().expect("unable to join to thread"); }
나의 이해는 여기 흐려집니다. 별도의 스레드에서, 우리는 메시지를 차단하고 우리가 그것을 얻을 때 채널에 밀어 넣습니다. 우리는 스레드의 핸들에 유지에 필요한 이유를 내가 이해하지 못하는 것은. 나는 foo.forget 스트림이 빌 때까지 기다리고, 그 자체를 차단하고있을 것이라고 기대.
레디 스 서버에 telnet 연결,이를 보내 :
publish rust awesome
그리고 당신은 작동 표시됩니다. 스레드가 양산되기 전에 (나를 위해)가 foo.forget 문이 실행될 때 인쇄 문 쇼를 추가.
여러 메시지가 까다 롭습니다. 보낸 사람은 너무 멀리 앞서 소비 측면의 점점에서 생성 측을 방지하기 위해 자체를 소비한다. 이것은 전송에서 또 다른 미래를 반환하여 수행됩니다! 우리는 루프의 다음 반복을 재사용 거기에서 다시 셔틀 필요 :
extern crate redis; extern crate futures; use std::thread; use std::sync::mpsc; use futures::Future; use futures::stream::{self, Stream}; fn main() { let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); let (tx, payloads) = stream::channel(); let redis_thread = thread::spawn(move || { let mut tx = tx; while let Ok(msg) = pubsub.get_message() { let payload: Result<String, _> = msg.get_payload(); let (next_tx_tx, next_tx_rx) = mpsc::channel(); tx.send(payload).and_then(move |new_tx| { next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); futures::finished(()) }).forget(); tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); } }); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); redis_thread.join().expect("unable to join to thread"); }
나는 시간이 지남에 따라 상호 운용이 유형의 더 많은 에코 시스템이 될 것이라고 확신합니다. 예를 들어, 선물-cpupool의 상자는 아마이 유사한 쓰임새를 지원하도록 확장 할 수있다.
from https://stackoverflow.com/questions/38906680/how-to-implement-a-stream-of-futures-for-a-blocking-call-using-futures-rs-and-re by cc-by-sa and MIT license
'REDIS' 카테고리의 다른 글
[REDIS] 때문에 설정 파일 오류로 레디 스 서버를 시작할 수 없습니다 (0) | 2020.01.16 |
---|---|
[REDIS] Heroku가와 RedisToGo에 Resque, Resque 서버, (0) | 2020.01.16 |
[REDIS] 애드온없이 가능한에게 Heroku에 레디 스인가? (0) | 2020.01.16 |
[REDIS] 레디 스 - 반환 값으로 루아 테이블 - 왜이 작동하지 않습니다 (0) | 2020.01.16 |
[REDIS] PHP - 레디 스 -를 직렬화없이 레디 스에서 PHP의 객체를 저장할 수있는 방법이 있습니까? (0) | 2020.01.16 |