Điều này chưa được công bố nhưng trong nhánh chính của Alpakka, MongoSource.apply
nhận tham số kiểu:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Do đó, với bản phát hành Alpakka 0.18 sắp tới, bạn sẽ có thể thực hiện những việc sau:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Lưu ý rằng source ở đây giả định rằng todoCollection.find() trả về một Observable[TodoMongo]; điều chỉnh các loại nếu cần.
Trong thời gian chờ đợi, bạn chỉ cần thêm mã trên theo cách thủ công. Ví dụ:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Lưu ý rằng MyMongoSource được xác định là nằm trong akka.stream.alpakka.mongodb.scaladsl gói (như MongoSource ), bởi vì ObservableToPublisher
là một lớp gói-riêng. Bạn sẽ sử dụng MyMongoSource giống như cách bạn sẽ sử dụng MongoSource :
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())