Tôi đã gặp vấn đề tương tự, không chắc liệu bạn có tìm ra giải pháp hay không nhưng tôi có thể thực hiện được điều gì đó tương tự bằng cách làm như sau. Đầu tiên, tôi đã thêm trình kích hoạt vào bảng của mình
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Điều này sẽ đặt một trình kích hoạt trên bảng bất cứ khi nào một hàng, được cập nhật, xóa hoặc chèn. Sau đó, nó sẽ gọi hàm kích hoạt mà tôi đã thiết lập trông giống như sau:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Điều này sẽ cho phép tôi 'lắng nghe' bất kỳ bản cập nhật nào trong số này từ dự án khởi động mùa xuân của tôi và nó sẽ gửi toàn bộ hàng dưới dạng tải trọng. Tiếp theo, trong dự án khởi động mùa xuân, tôi đã định cấu hình kết nối với db của mình.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
Với tôi Autowire đó (chèn phụ thuộc) nó vào phương thức khởi tạo trong lớp dịch vụ của tôi và truyền nó đến một lớp r2dbc PostgressqlConnection như sau:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Bây giờ chúng tôi muốn 'lắng nghe' bảng của chúng tôi và nhận được thông báo khi thực hiện một số cập nhật cho bảng của chúng tôi. Để làm điều đó, chúng tôi thiết lập một phương thức khởi tạo được thực hiện sau khi chèn phụ thuộc bằng cách sử dụng chú thích @PostContruct
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Lưu ý rằng chúng ta lắng nghe bất kỳ tên nào chúng ta đặt bên trong phương thức pg_notify. Ngoài ra, chúng tôi muốn thiết lập một phương thức để đóng kết nối khi bean sắp bị loại bỏ, như sau:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Bây giờ tôi chỉ cần tạo một phương thức trả về một Flux của bất kỳ thứ gì hiện có trong bảng của tôi và tôi cũng hợp nhất nó với thông báo của mình, như tôi đã nói trước khi thông báo đến dưới dạng json, vì vậy tôi phải giải mã hóa nó và tôi quyết định sử dụng ObjectMapper. Vì vậy, nó sẽ trông giống như thế này:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
Hy vọng điều này sẽ hữu ích.