Vấn đề đã được giải quyết! Tôi không thể tin rằng tôi đã dành trọn hai ngày cho việc này ... Tôi đã hoàn toàn nhìn sai hướng.
Vấn đề không phải với một số cấu hình mạng Dataflow hoặc GCP và theo như tôi có thể nói ...
là đúng.
Tất nhiên vấn đề nằm trong mã của tôi:chỉ có vấn đề chỉ được tiết lộ trong môi trường phân tán. Tôi đã mắc sai lầm khi mở đường hầm từ bộ xử lý đường ống chính, thay vì công nhân. Vì vậy, đường hầm SSH đã hoạt động nhưng không phải giữa các công nhân và máy chủ mục tiêu, chỉ giữa đường ống chính và mục tiêu!
Để khắc phục điều này, tôi đã phải thay đổi DoFn yêu cầu của mình để kết thúc việc thực thi truy vấn với đường hầm:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
như bạn thấy, tôi đã phải ghi đè một số bit của thư viện pysql_beam.
Cuối cùng, mỗi công nhân mở đường hầm riêng cho từng yêu cầu. Có thể tối ưu hóa hành vi này nhưng nó đủ cho nhu cầu của tôi.