Bạn có thể sử dụng df.map (row => ...) để chuyển đổi khung dữ liệu thành RDD nếu bạn muốn ánh xạ một hàng thành một phần tử RDD khác.
Ví dụ:
val df = Seq(("table1",432),
("table2",567),
("table3",987),
("table1",789)).
toDF("tablename", "Code").toDF()
df.show()
+---------+----+
|tablename|Code|
+---------+----+
| table1| 432|
| table2| 567|
| table3| 987|
| table1| 789|
+---------+----+
val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]
OR
val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
Vui lòng tham khảo https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html về AnalysisException:Các truy vấn có nguồn phát trực tuyến phải được thực thi bằng writeStream.start ()
Bạn cần đợi kết thúc truy vấn bằng cách sử dụng truy vấn. awaitTermination () Để ngăn quá trình thoát ra khi truy vấn đang hoạt động.