Trong Spark, các hàm trên RDD
s (như map
tại đây) được đánh số thứ tự và gửi đến những người thực thi để xử lý. Điều này ngụ ý rằng tất cả các phần tử có trong các hoạt động đó phải có thể tuần tự hóa.
Kết nối Redis ở đây không thể tuần tự hóa được vì nó mở các kết nối TCP tới DB đích được liên kết với máy nơi nó được tạo.
Giải pháp là tạo các kết nối đó trên các trình thực thi, trong bối cảnh thực thi cục bộ. Có một số cách để làm điều đó. Hai điều khiến bạn nhớ đến là:
-
rdd.mapPartitions
:cho phép bạn xử lý toàn bộ phân vùng cùng một lúc và do đó khấu hao chi phí tạo kết nối) - Trình quản lý kết nối singleton:Tạo kết nối một lần cho mỗi người thực thi
mapPartitions
dễ dàng hơn vì tất cả những gì nó yêu cầu là một thay đổi nhỏ đối với cấu trúc chương trình:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Một trình quản lý kết nối singleton có thể được mô hình hóa với một đối tượng chứa tham chiếu lười biếng đến một kết nối (lưu ý:một tham chiếu có thể thay đổi cũng sẽ hoạt động).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Đối tượng này sau đó có thể được sử dụng để khởi tạo 1 kết nối cho mỗi JVM worker và được sử dụng như một Serializable
đối tượng trong một đóng hoạt động.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Ưu điểm của việc sử dụng đối tượng singleton là ít chi phí hơn vì các kết nối chỉ được tạo một lần bởi JVM (trái ngược với 1 trên mỗi phân vùng RDD)
Cũng có một số nhược điểm:
- việc dọn dẹp các kết nối rất phức tạp (ngắt kết nối / bộ hẹn giờ)
- một người phải đảm bảo an toàn chuỗi của các tài nguyên được chia sẻ
(*) mã cung cấp cho mục đích minh họa. Không được biên dịch hoặc thử nghiệm.