Redis
 sql >> Cơ Sở Dữ Liệu >  >> NoSQL >> Redis

Làm thế nào để triển khai một luồng tương lai cho cuộc gọi chặn bằng futures.rs và Redis PubSub?

Cảnh báo nặng nề Tôi chưa bao giờ sử dụng thư viện này trước đây, và kiến ​​thức cấp thấp của tôi về một số khái niệm hơi ... thiếu. Chủ yếu là tôi đang đọc qua hướng dẫn. Tôi khá chắc chắn rằng bất cứ ai đã làm công việc không đồng bộ sẽ đọc điều này và cười, nhưng nó có thể là một điểm khởi đầu hữu ích cho những người khác. Cảnh báo trước!

Hãy bắt đầu với một cái gì đó đơn giản hơn một chút, minh họa cách một Stream làm. Chúng tôi có thể chuyển đổi một trình lặp của Result s vào một luồng:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Điều này cho chúng ta thấy một cách để sử dụng luồng. Chúng tôi sử dụng and_then để thực hiện điều gì đó với mỗi trọng tải (ở đây chỉ cần in nó ra) và sau đó for_each để chuyển đổi Stream trở lại Future . Sau đó, chúng ta có thể điều hành tương lai bằng cách gọi tên lạ forget phương pháp.

Tiếp theo là gắn thư viện Redis vào hỗn hợp, chỉ xử lý một tin nhắn. Kể từ get_message() đang chặn, chúng tôi cần đưa một số chủ đề vào hỗn hợp. Không nên thực hiện một lượng lớn công việc trong loại hệ thống không đồng bộ này vì mọi thứ khác sẽ bị chặn. Ví dụ:

Trừ khi nó được sắp xếp theo cách khác, cần đảm bảo rằng việc triển khai chức năng này kết thúc rất nhanh .

Trong một thế giới lý tưởng, thùng redis sẽ được xây dựng trên đỉnh một thư viện giống như tương lai và phơi bày tất cả những điều này một cách nguyên bản.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Sự hiểu biết của tôi trở nên mờ nhạt hơn ở đây. Trong một chuỗi riêng biệt, chúng tôi chặn tin nhắn và đẩy nó vào kênh khi chúng tôi nhận được. Điều tôi không hiểu là tại sao chúng ta cần phải nắm vào tay cầm của luồng. Tôi mong đợi rằng foo.forget sẽ tự chặn, đợi cho đến khi luồng trống.

Trong kết nối telnet với máy chủ Redis, hãy gửi:

publish rust awesome

Và bạn sẽ thấy nó hoạt động. Việc thêm các câu lệnh in cho thấy rằng (đối với tôi) foo.forget câu lệnh được chạy trước khi luồng được tạo.

Nhiều tin nhắn phức tạp hơn. Sender tiêu thụ chính nó để ngăn không cho phía tạo ra đi quá xa so với phía tiêu thụ. Điều này được thực hiện bằng cách trả về một tương lai khác từ send ! Chúng tôi cần đưa nó ra khỏi đó để sử dụng lại cho lần lặp tiếp theo của vòng lặp:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Tôi chắc chắn rằng sẽ có nhiều hệ sinh thái hơn cho loại hình tương tác này khi thời gian trôi qua. Ví dụ:thùng tương lai-cpupool có thể có thể là được mở rộng để hỗ trợ một tiện ích tương tự cho điều này.




  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Cách nhận lại cuộc gọi khi khóa hết hạn trong REDIS

  2. Redis Cluster - đã sẵn sàng sản xuất?

  3. Chèn một Danh sách vào một Danh sách khác trong Redis

  4. Xem xét việc truy cập lại các mục ở trên hoặc xác định bean thuộc loại 'org.springframework.data.redis.core.RedisTemplate' trong cấu hình của bạn

  5. Làm cách nào để xóa mọi thứ trong Redis?