Tôi không phải là chuyên gia về mongodb, nhưng dựa trên các ví dụ tôi đã thấy, đây là một mẫu tôi sẽ thử.
Tôi đã bỏ qua các sự kiện khác ngoài dữ liệu, vì điều chỉnh rằng một trong những sự kiện có vẻ là mối quan tâm chính.
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
Tôi đang cố gắng tổng hợp một bài kiểm tra luồng Rx này mà không cần mongodb, trong thời gian chờ đợi, điều này có thể cung cấp cho bạn một số ý tưởng.