Điều này chưa được công bố nhưng trong nhánh chính của Alpakka, MongoSource.apply
nhận tham số kiểu:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Do đó, với bản phát hành Alpakka 0.18 sắp tới, bạn sẽ có thể thực hiện những việc sau:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Lưu ý rằng source
ở đây giả định rằng todoCollection.find()
trả về một Observable[TodoMongo]
; điều chỉnh các loại nếu cần.
Trong thời gian chờ đợi, bạn chỉ cần thêm mã trên theo cách thủ công. Ví dụ:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Lưu ý rằng MyMongoSource
được xác định là nằm trong akka.stream.alpakka.mongodb.scaladsl
gói (như MongoSource
), bởi vì ObservableToPublisher
là một lớp gói-riêng. Bạn sẽ sử dụng MyMongoSource
giống như cách bạn sẽ sử dụng MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())