Theo lỗi, bạn đã có một chuỗi, (bạn đã thực hiện df.selectExpr("CAST(value AS STRING)")
), vì vậy bạn nên thử nhận sự kiện Hàng dưới dạng String
và không phải là Array[Byte]
Bắt đầu bằng cách thay đổi
val valueStr = new String(record.getAs[Array[Byte]]("value"))
đến
val valueStr = record.getAs[String]("value")
Tôi hiểu rằng bạn có thể đã có một cụm để chạy mã Spark, nhưng tôi khuyên bạn vẫn nên xem xét Trình kết nối bồn rửa Kafka Connect Mongo để bạn không phải viết và duy trì trình viết Mongo của riêng mình trong mã Spark.
Hoặc, bạn có thể ghi trực tiếp tập dữ liệu Spark vào mongo