Python rất phổ biến ngày nay. Vì Python là một ngôn ngữ lập trình có mục đích chung, nó cũng có thể được sử dụng để thực hiện quá trình Trích xuất, Biến đổi, Tải (ETL). Các mô-đun ETL khác nhau có sẵn, nhưng hôm nay chúng tôi sẽ gắn bó với sự kết hợp của Python và MySQL. Chúng tôi sẽ sử dụng Python để gọi các thủ tục được lưu trữ, chuẩn bị và thực thi các câu lệnh SQL.
Chúng tôi sẽ sử dụng hai cách tiếp cận tương tự nhưng khác nhau. Đầu tiên, chúng tôi sẽ gọi các thủ tục được lưu trữ sẽ thực hiện toàn bộ công việc và sau đó, chúng tôi sẽ phân tích cách chúng tôi có thể thực hiện cùng một quy trình mà không cần các thủ tục được lưu trữ bằng cách sử dụng mã MySQL trong Python.
Sẵn sàng? Trước khi chúng ta đi sâu vào tìm hiểu, hãy xem xét mô hình dữ liệu - hoặc các mô hình dữ liệu, vì có hai mô hình trong số đó trong bài viết này.
Mô hình dữ liệu
Chúng tôi sẽ cần hai mô hình dữ liệu, một mô hình để lưu trữ dữ liệu hoạt động và mô hình còn lại để lưu trữ dữ liệu báo cáo của chúng tôi.
Mô hình đầu tiên được hiển thị trong hình trên. Mô hình này được sử dụng để lưu trữ dữ liệu hoạt động (trực tiếp) cho một doanh nghiệp dựa trên đăng ký. Để biết thêm thông tin chi tiết về mô hình này, vui lòng xem bài viết trước của chúng tôi, Tạo DWH, Phần thứ nhất:Mô hình dữ liệu kinh doanh đăng ký.
Tách dữ liệu hoạt động và báo cáo thường là một quyết định rất khôn ngoan. Để đạt được sự tách biệt đó, chúng tôi sẽ cần tạo một kho dữ liệu (DWH). Chúng tôi đã làm điều đó; bạn có thể thấy mô hình trong hình trên. Mô hình này cũng được mô tả chi tiết trong bài đăng Tạo DWH, Phần thứ hai:Mô hình dữ liệu kinh doanh đăng ký.
Cuối cùng, chúng ta cần trích xuất dữ liệu từ cơ sở dữ liệu trực tiếp, chuyển đổi nó và tải nó vào DWH của chúng ta. Chúng tôi đã thực hiện việc này bằng cách sử dụng các thủ tục được lưu trữ trong SQL. Bạn có thể tìm thấy mô tả về những gì chúng tôi muốn đạt được cùng với một số ví dụ về mã trong Tạo kho dữ liệu, Phần 3:Mô hình dữ liệu kinh doanh đăng ký.
Nếu bạn cần thêm thông tin về DWH, chúng tôi khuyên bạn nên đọc các bài viết sau:
- Giản đồ Ngôi sao
- Lược đồ bông tuyết
- Lược đồ ngôi sao so với Lược đồ bông tuyết.
Nhiệm vụ của chúng ta hôm nay là thay thế các thủ tục được lưu trữ trong SQL bằng mã Python. Chúng tôi đã sẵn sàng để thực hiện một số phép thuật Python. Hãy bắt đầu với việc chỉ sử dụng các thủ tục được lưu trữ trong Python.
Phương pháp 1:ETL sử dụng các thủ tục được lưu trữ
Trước khi bắt đầu mô tả quy trình, điều quan trọng cần đề cập là chúng tôi có hai cơ sở dữ liệu trên máy chủ của mình.
subscription_live
cơ sở dữ liệu được sử dụng để lưu trữ dữ liệu giao dịch / trực tiếp, trong khi subscription_dwh
là cơ sở dữ liệu báo cáo của chúng tôi (DWH).
Chúng tôi đã mô tả các quy trình được lưu trữ được sử dụng để cập nhật bảng thứ nguyên và dữ kiện. Họ sẽ đọc dữ liệu từ subscription_live
cơ sở dữ liệu, kết hợp nó với dữ liệu trong subscription_dwh
cơ sở dữ liệu và chèn dữ liệu mới vào subscription_dwh
cơ sở dữ liệu. Hai thủ tục này là:
-
p_update_dimensions
- Cập nhật bảng thứ nguyêndim_time
vàdim_city
. -
p_update_facts
- Cập nhật hai bảng dữ kiện,fact_customer_subscribed
vàfact_subscription_status
.
Nếu bạn muốn xem mã hoàn chỉnh cho các quy trình này, hãy đọc Tạo kho dữ liệu, Phần 3:Mô hình dữ liệu kinh doanh đăng ký.
Bây giờ chúng tôi đã sẵn sàng để viết một tập lệnh Python đơn giản sẽ kết nối với máy chủ và thực hiện quy trình ETL. Đầu tiên chúng ta hãy xem toàn bộ tập lệnh ( etl_procedures.py ). Sau đó, chúng tôi sẽ giải thích những phần quan trọng nhất.
# import MySQL connector import mysql.connector # connect to server connection = mysql.connector.connect(user='', password=' ', host='127.0.0.1') print('Connected to database.') cursor = connection.cursor() # I update dimensions cursor.callproc('subscription_dwh.p_update_dimensions') print('Dimension tables updated.') # II update facts cursor.callproc('subscription_dwh.p_update_facts') print('Fact tables updated.') # commit & close connection cursor.close() connection.commit() connection.close() print('Disconnected from database.')
etl_procedures.py
Nhập mô-đun và kết nối với cơ sở dữ liệu
Python sử dụng các mô-đun để lưu trữ các định nghĩa và câu lệnh. Bạn có thể sử dụng một mô-đun hiện có hoặc viết mô-đun của riêng bạn. Việc sử dụng các mô-đun hiện có sẽ đơn giản hóa cuộc sống của bạn vì bạn đang sử dụng mã viết sẵn, nhưng việc viết mô-đun của riêng bạn cũng rất hữu ích. Khi thoát trình thông dịch Python và chạy lại, bạn sẽ mất các hàm và biến mà bạn đã xác định trước đó. Tất nhiên, bạn không muốn nhập đi nhập lại cùng một mã. Để tránh điều đó, bạn có thể lưu trữ các định nghĩa của mình trong một mô-đun và nhập nó vào Python.
Quay lại etl_procedures.py . Trong chương trình của chúng tôi, chúng tôi bắt đầu với việc nhập Trình kết nối MySQL:
# import MySQL connector import mysql.connector
MySQL Connector cho Python được sử dụng làm trình điều khiển chuẩn hóa kết nối với máy chủ / cơ sở dữ liệu MySQL. Bạn sẽ cần tải xuống và cài đặt nếu trước đây bạn chưa làm điều đó. Bên cạnh việc kết nối với cơ sở dữ liệu, nó cung cấp một số phương thức và thuộc tính để làm việc với cơ sở dữ liệu. Chúng tôi sẽ sử dụng một số trong số chúng, nhưng bạn có thể kiểm tra tài liệu đầy đủ tại đây.
Tiếp theo, chúng tôi sẽ cần kết nối với cơ sở dữ liệu của mình:
# connect to server connection = mysql.connector.connect(user='', password=' ', host='127.0.0.1') print('Connected to database.') cursor = connection.cursor()
Dòng đầu tiên sẽ kết nối với máy chủ (trong trường hợp này, tôi đang kết nối với máy cục bộ của mình) bằng thông tin đăng nhập của bạn (thay thế
và
với các giá trị thực tế). Trong khi thiết lập kết nối, bạn cũng có thể chỉ định cơ sở dữ liệu bạn muốn kết nối, như được hiển thị bên dưới:
connection = mysql.connector.connect(user='', password=' ', host='127.0.0.1', database=' ')
Tôi đã cố ý chỉ kết nối với một máy chủ chứ không phải với một cơ sở dữ liệu cụ thể vì tôi sẽ sử dụng hai cơ sở dữ liệu nằm trên cùng một máy chủ.
Lệnh tiếp theo - print
- đây chỉ là một thông báo rằng chúng tôi đã kết nối thành công. Mặc dù nó không có ý nghĩa lập trình, nhưng nó có thể được sử dụng để gỡ lỗi mã nếu có gì đó sai trong tập lệnh.
Dòng cuối cùng trong phần này là:
cursor =connect.cursor ()
Cursors are the handler structure used to work with the data. We’ll use them for retrieving data from the database (SELECT), but also to modify the data (INSERT, UPDATE, DELETE). Before using a cursor, we need to create it. And that is what this line does.
Thủ tục Gọi điện
Phần trước là chung và có thể được sử dụng cho các nhiệm vụ khác liên quan đến cơ sở dữ liệu. Phần sau của mã dành riêng cho ETL:gọi các thủ tục được lưu trữ của chúng tôi bằng cursor.callproc
yêu cầu. Nó trông như thế này:
# 1. update dimensions cursor.callproc('subscription_dwh.p_update_dimensions') print('Dimension tables updated.') # 2. update facts cursor.callproc('subscription_dwh.p_update_facts') print('Fact tables updated.')
Quy trình gọi điện khá tự giải thích. Sau mỗi cuộc gọi, một lệnh in đã được thêm vào. Một lần nữa, điều này chỉ cung cấp cho chúng tôi một thông báo rằng mọi thứ vẫn ổn.
Cam kết và Đóng
Phần cuối cùng của tập lệnh cam kết thay đổi cơ sở dữ liệu và đóng tất cả các đối tượng được sử dụng:
# commit & close connection cursor.close() connection.commit() connection.close() print('Disconnected from database.')
Quy trình gọi điện khá tự giải thích. Sau mỗi cuộc gọi, một lệnh in đã được thêm vào. Một lần nữa, điều này chỉ cung cấp cho chúng tôi một thông báo rằng mọi thứ vẫn ổn.
Cam kết là điều cần thiết ở đây; nếu không có nó, sẽ không có thay đổi nào đối với cơ sở dữ liệu, ngay cả khi bạn đã gọi một thủ tục hoặc thực thi một câu lệnh SQL.
Chạy tập lệnh
Điều cuối cùng chúng ta cần làm là chạy tập lệnh của mình. Chúng tôi sẽ sử dụng các lệnh sau trong Python Shell để đạt được điều đó:
import osfile_path ='D://python_scripts'os.chdir (file_path) thi (open ("etl_procedures.py") .read ())Tập lệnh được thực thi và tất cả các thay đổi được thực hiện trong cơ sở dữ liệu tương ứng. Kết quả có thể được nhìn thấy trong hình dưới đây.
Phương pháp 2:ETL Sử dụng Python và MySQL
Cách tiếp cận được trình bày ở trên không khác nhiều so với cách tiếp cận gọi các thủ tục được lưu trữ trực tiếp trong MySQL. Sự khác biệt duy nhất là bây giờ chúng tôi có một kịch bản sẽ thực hiện toàn bộ công việc cho chúng tôi.
Chúng ta có thể sử dụng một cách tiếp cận khác:đưa mọi thứ vào bên trong tập lệnh Python. Chúng tôi sẽ bao gồm các câu lệnh Python, nhưng chúng tôi cũng sẽ chuẩn bị các truy vấn SQL và thực thi chúng trên cơ sở dữ liệu. Cơ sở dữ liệu nguồn (trực tiếp) và cơ sở dữ liệu đích (DWH) giống như trong ví dụ với các thủ tục được lưu trữ.
Trước khi đi sâu vào vấn đề này, hãy xem qua tập lệnh hoàn chỉnh ( etl_queries.py ):
from datetime import date # import MySQL connector import mysql.connector # connect to server connection = mysql.connector.connect(user='', password=' ', host='127.0.0.1') print('Connected to database.') # 1. update dimensions # 1.1 update dim_time # date - yesterday yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = '"' + str(yesterday) + '"' # test if date is already in the table cursor = connection.cursor() query = ( "SELECT COUNT(*) " "FROM subscription_dwh.dim_time " "WHERE time_date = " + yesterday_str) cursor.execute(query) result = cursor.fetchall() yesterday_subscription_count = int(result[0][0]) if yesterday_subscription_count == 0: yesterday_year = 'YEAR("' + str(yesterday) + '")' yesterday_month = 'MONTH("' + str(yesterday) + '")' yesterday_week = 'WEEK("' + str(yesterday) + '")' yesterday_weekday = 'WEEKDAY("' + str(yesterday) + '")' query = ( "INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) " " VALUES (" + yesterday_str + ", " + yesterday_year + ", " + yesterday_month + ", " + yesterday_week + ", " + yesterday_weekday + ", Now())") cursor.execute(query) # 1.2 update dim_city query = ( "INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) " "SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now() " "FROM subscription_live.city city_live " "INNER JOIN subscription_live.country country_live ON city_live.country_id = country_live.id " "LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name " "WHERE city_dwh.id IS NULL") cursor.execute(query) print('Dimension tables updated.') # 2. update facts # 2.1 update customers subscribed # delete old data for the same date query = ( "DELETE subscription_dwh.`fact_customer_subscribed`.* " "FROM subscription_dwh.`fact_customer_subscribed` " "INNER JOIN subscription_dwh.`dim_time` ON subscription_dwh.`fact_customer_subscribed`.`dim_time_id` = subscription_dwh.`dim_time`.`id` " "WHERE subscription_dwh.`dim_time`.`time_date` = " + yesterday_str) cursor.execute(query) # insert new data query = ( "INSERT INTO subscription_dwh.`fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) " " SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN customer_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN customer_live.active = 1 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN customer_live.active = 0 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts " "FROM subscription_live.`customer` customer_live " "INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id " "INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id " "INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name " "INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = " + yesterday_str + " " "GROUP BY city_dwh.id, time_dwh.id") cursor.execute(query) # 2.2 update subscription statuses # delete old data for the same date query = ( "DELETE subscription_dwh.`fact_subscription_status`.* " "FROM subscription_dwh.`fact_subscription_status` " "INNER JOIN subscription_dwh.`dim_time` ON subscription_dwh.`fact_subscription_status`.`dim_time_id` = subscription_dwh.`dim_time`.`id` " "WHERE subscription_dwh.`dim_time`.`time_date` = " + yesterday_str) cursor.execute(query) # insert new data query = ( "INSERT INTO subscription_dwh.`fact_subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) " "SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN subscription_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN subscription_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN subscription_live.active = 1 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN subscription_live.active = 0 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts " "FROM subscription_live.`customer` customer_live " "INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id = customer_live.id " "INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id " "INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id " "INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name " "INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = " + yesterday_str + " " "GROUP BY city_dwh.id, time_dwh.id") cursor.execute(query) print('Fact tables updated.') # commit & close connection cursor.close() connection.commit() connection.close() print('Disconnected from database.')
etl_queries.py
Nhập mô-đun và kết nối với cơ sở dữ liệu
Một lần nữa, chúng tôi sẽ cần nhập MySQL bằng mã sau:
import mysql.connector
Chúng tôi cũng sẽ nhập mô-đun datetime, như được hiển thị bên dưới. Chúng tôi cần điều này cho các hoạt động liên quan đến ngày tháng trong Python:
from datetime import date
Quá trình kết nối với cơ sở dữ liệu giống như trong ví dụ trước.
Cập nhật Thứ nguyên dim_time
Để cập nhật dim_time
, chúng tôi sẽ cần kiểm tra xem giá trị (cho ngày hôm qua) đã có trong bảng hay chưa. Chúng tôi sẽ phải sử dụng các hàm ngày của Python (thay vì của SQL) để thực hiện việc này:
# date - yesterday yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = '"' + str(yesterday) + '"'
Dòng đầu tiên của mã sẽ trả về ngày hôm qua trong biến ngày, trong khi dòng thứ hai sẽ lưu trữ giá trị này dưới dạng một chuỗi. Chúng tôi sẽ cần chuỗi này dưới dạng một chuỗi vì chúng tôi sẽ nối nó với một chuỗi khác khi chúng tôi tạo truy vấn SQL.
Tiếp theo, chúng tôi sẽ cần kiểm tra xem ngày này đã có trong dim_time
bàn. Sau khi khai báo con trỏ, chúng tôi sẽ chuẩn bị truy vấn SQL. Để thực hiện truy vấn, chúng tôi sẽ sử dụng cursor.execute
lệnh:
# test if date is already in the table cursor = connection.cursor() query = ( "SELECT COUNT(*) " "FROM subscription_dwh.dim_time " "WHERE time_date = " + yesterday_str) cursor.execute(query) '"'
Chúng tôi sẽ lưu trữ kết quả truy vấn trong kết quả Biến đổi. Kết quả sẽ có 0 hoặc 1 hàng, vì vậy chúng ta có thể kiểm tra cột đầu tiên của hàng đầu tiên. Nó sẽ chứa số 0 hoặc số 1. (Hãy nhớ rằng chúng ta chỉ có thể có cùng một ngày trong một bảng thứ nguyên.)
Nếu ngày chưa có trong bảng, chúng tôi sẽ chuẩn bị các chuỗi sẽ là một phần của truy vấn SQL:
result = cursor.fetchall() yesterday_subscription_count = int(result[0][0]) if yesterday_subscription_count == 0: yesterday_year = 'YEAR("' + str(yesterday) + '")' yesterday_month = 'MONTH("' + str(yesterday) + '")' yesterday_week = 'WEEK("' + str(yesterday) + '")' yesterday_weekday = 'WEEKDAY("' + str(yesterday) + '")'
Cuối cùng, chúng tôi sẽ tạo một truy vấn và thực thi nó. Thao tác này sẽ cập nhật dim_time
bảng sau khi nó được cam kết. Xin lưu ý rằng tôi đã sử dụng đường dẫn đầy đủ đến bảng, bao gồm cả tên cơ sở dữ liệu (subscription_dwh
).
query = ( "INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) " " VALUES (" + yesterday_str + ", " + yesterday_year + ", " + yesterday_month + ", " + yesterday_week + ", " + yesterday_weekday + ", Now())") cursor.execute(query)
Cập nhật Thứ nguyên dim_city
Đang cập nhật dim_city
bảng thậm chí còn đơn giản hơn vì chúng tôi không cần phải kiểm tra bất kỳ thứ gì trước khi chèn. Chúng tôi thực sự sẽ đưa bài kiểm tra đó vào truy vấn SQL.
# 1.2 update dim_city query = ( "INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) " "SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now() " "FROM subscription_live.city city_live " "INNER JOIN subscription_live.country country_live ON city_live.country_id = country_live.id " "LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name " "WHERE city_dwh.id IS NULL") cursor.execute(query)
Ở đây chúng tôi chuẩn bị thực thi truy vấn SQL. Lưu ý rằng tôi đã sử dụng lại đường dẫn đầy đủ đến các bảng, bao gồm cả tên của cả hai cơ sở dữ liệu (subscription_live
và subscription_dwh
).
Cập nhật Bảng Dữ kiện
Điều cuối cùng chúng ta cần làm là cập nhật bảng dữ kiện của chúng ta. Quá trình này gần giống như cập nhật bảng thứ nguyên:chúng tôi chuẩn bị các truy vấn và thực thi chúng. Các truy vấn này phức tạp hơn nhiều, nhưng chúng cũng giống như các truy vấn được sử dụng trong các thủ tục được lưu trữ.
Chúng tôi đã thêm một cải tiến so với các quy trình được lưu trữ:xóa dữ liệu hiện có cho cùng một ngày trong bảng dữ kiện. Điều này sẽ cho phép chúng tôi chạy một tập lệnh nhiều lần trong cùng một ngày. Cuối cùng, chúng tôi sẽ cần thực hiện giao dịch và đóng tất cả các đối tượng và kết nối.
Chạy tập lệnh
Chúng tôi có một thay đổi nhỏ trong phần này, đó là gọi một tập lệnh khác:
- import os - file_path = 'D://python_scripts' - os.chdir(file_path) - exec(open("etl_queries.py").read())
Vì chúng tôi đã sử dụng các thông báo giống nhau và tập lệnh đã hoàn thành thành công nên kết quả là giống nhau:
Bạn sẽ sử dụng Python trong ETL như thế nào?
Hôm nay chúng ta đã thấy một ví dụ về việc thực hiện quy trình ETL với một tập lệnh Python. Có những cách khác để làm điều này, ví dụ:một số giải pháp mã nguồn mở sử dụng thư viện Python để làm việc với cơ sở dữ liệu và thực hiện quy trình ETL. Trong bài viết tiếp theo, chúng ta sẽ chơi với một trong số chúng. Trong thời gian chờ đợi, vui lòng chia sẻ kinh nghiệm của bạn với Python và ETL.