CHỈNH SỬA 2018-01-27:
Nó chỉ ra rằng vấn đề này có liên quan đến DirectRunner. Nếu bạn chạy cùng một đường dẫn bằng DataflowRunner, bạn sẽ nhận được các lô thực sự lên đến 1.000 bản ghi. DirectRunner luôn tạo các gói có kích thước 1 sau một thao tác nhóm.
Câu trả lời ban đầu:
Tôi đã gặp phải vấn đề tương tự khi ghi vào cơ sở dữ liệu đám mây bằng JdbcIO của Apache Beam. Vấn đề là mặc dù JdbcIO hỗ trợ ghi lên đến 1.000 bản ghi trong một lô, nhưng tôi chưa bao giờ thực sự thấy nó ghi nhiều hơn 1 hàng cùng một lúc (tôi phải thừa nhận:Điều này luôn sử dụng DirectRunner trong môi trường phát triển).
Do đó, tôi đã thêm một tính năng vào JdbcIO nơi bạn có thể tự mình kiểm soát kích thước của các lô bằng cách nhóm dữ liệu của bạn lại với nhau và viết mỗi nhóm thành một lô. Dưới đây là ví dụ về cách sử dụng tính năng này dựa trên ví dụ WordCount ban đầu của Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Sự khác biệt với phương thức ghi thông thường của JdbcIO là phương thức mới writeIterable()
cần một PCollection<Iterable<RowT>>
làm đầu vào thay vì PCollection<RowT>
. Mỗi Lặp lại được ghi thành một lô vào cơ sở dữ liệu.
Bạn có thể tìm thấy phiên bản JdbcIO với phần bổ sung này tại đây: https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
Toàn bộ dự án ví dụ chứa ví dụ trên có thể được tìm thấy tại đây: https://github.com/ olavloite / cờ lê-dầm-ví dụ
(Cũng có một yêu cầu kéo đang chờ xử lý trên Apache Beam để đưa nó vào dự án)