Cách tiếp cận như đã nêu ở trên, có trình kích hoạt sự kiện S3 và công việc lambda lắng nghe trên vị trí thùng / đối tượng s3. Ngay sau khi tệp được tải lên vị trí s3, lệnh lambda sẽ chạy và trong lambda, bạn có thể định cấu hình để gọi lệnh AWS Glue. Điều này chính xác là chúng tôi đã làm và đã phát trực tiếp thành công. Lambda có tuổi thọ 15 phút và sẽ mất ít hơn một phút để kích hoạt / bắt đầu công việc Keo.
Vui lòng tìm nguồn mẫu ở đây để tham khảo.
from __future__ import print_function
import json
import boto3
import time
import urllib
print('Loading function')
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="your-glue-job-name here"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist '
'and your bucket is in the same region as this '
'function.'.format(source_bucket, source_bucket))
raise e
Để tạo một hàm Lambda, hãy đi tới AWS Lambdra-> Tạo một hàm mới từ Scratch-> Chọn S3 cho sự kiện, sau đó định cấu hình các vị trí nhóm S3, các tiền tố theo yêu cầu. Sau đó, sao chép, dán mẫu mã ở trên, vùng mã nội tuyến và định cấu hình tên lệnh dán nếu cần. Hãy đảm bảo rằng bạn đã thiết lập tất cả các vai trò / quyền truy cập IAM bắt buộc.
Công việc kết dính phải có điều khoản để kết nối với Aurora của bạn và sau đó bạn có thể sử dụng lệnh "TẢI TỪ S3 ....." do Aurora cung cấp. Đảm bảo rằng tất cả các cài đặt / cấu hình nhóm thông số đều được thực hiện khi cần thiết.
Hãy cho tôi biết nếu có bất kỳ vấn đề nào.
CẬP NHẬT:Đoạn mã SAMPLE cho TẢI TỪ S3:
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, [email protected];"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()