Phần này của hướng dẫn nêu chi tiết cách triển khai hàng đợi tác vụ Redis để xử lý xử lý văn bản.
Cập nhật:
- 02/12/2020:Được nâng cấp lên phiên bản Python 3.8.1 cũng như các phiên bản mới nhất của Redis, Python Redis và RQ. Xem chi tiết bên dưới. Đề cập đến một lỗi trong phiên bản RQ mới nhất và đưa ra giải pháp. Đã khắc phục lỗi http trước https.
- 22/03/2016:Được nâng cấp lên phiên bản Python 3.5.1 cũng như các phiên bản mới nhất của Redis, Python Redis và RQ. Xem chi tiết bên dưới.
- 22/02/2015:Đã thêm hỗ trợ Python 3.
Phần thưởng miễn phí: Nhấp vào đây để có quyền truy cập vào video hướng dẫn Flask + Python miễn phí hướng dẫn bạn cách xây dựng ứng dụng web Flask theo từng bước.
Hãy nhớ:Đây là những gì chúng tôi đang xây dựng — Một ứng dụng Flask tính toán các cặp tần suất từ dựa trên văn bản từ một URL nhất định.
- Phần một:Thiết lập môi trường phát triển cục bộ, sau đó triển khai cả môi trường dàn dựng và sản xuất trên Heroku.
- Phần Hai:Thiết lập cơ sở dữ liệu PostgreSQL cùng với SQLAlchemy và Alembic để xử lý quá trình di chuyển.
- Phần ba:Thêm logic back-end để quét và sau đó xử lý số lượng từ từ một trang web bằng cách sử dụng các thư viện yêu cầu, BeautifulSoup và Bộ công cụ ngôn ngữ tự nhiên (NLTK).
- Phần Bốn:Triển khai hàng đợi tác vụ Redis để xử lý việc xử lý văn bản. ( hiện tại )
- Phần năm:Thiết lập Angular trên front-end để liên tục thăm dò ý kiến của back-end để xem liệu yêu cầu đã được xử lý xong chưa.
- Phần thứ sáu:Đẩy tới máy chủ dàn trên Heroku - thiết lập Redis và nêu chi tiết cách chạy hai quy trình (web và công nhân) trên một Dyno.
- Phần thứ bảy:Cập nhật giao diện người dùng để làm cho giao diện người dùng thân thiện hơn.
- Phần 8:Tạo Chỉ thị góc tùy chỉnh để hiển thị biểu đồ phân phối tần suất bằng JavaScript và D3.
Cần mã? Lấy nó từ repo.
Yêu cầu cài đặt
Công cụ được sử dụng:
- Redis (5.0.7)
- Python Redis (3.4.1)
- RQ (1.2.2) - một thư viện đơn giản để tạo hàng đợi tác vụ
Bắt đầu bằng cách tải xuống và cài đặt Redis từ trang web chính thức hoặc qua Homebrew (brew install redis
). Sau khi cài đặt, hãy khởi động máy chủ Redis:
$ redis-server
Tiếp theo, cài đặt Python Redis và RQ trong một cửa sổ đầu cuối mới:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
Thiết lập Worker
Hãy bắt đầu bằng cách tạo một quy trình công nhân để lắng nghe các tác vụ được xếp hàng đợi. Tạo tệp mới worker.py và thêm mã này:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Ở đây, chúng tôi đã lắng nghe một hàng đợi có tên default
và thiết lập kết nối với máy chủ Redis trên localhost:6379
.
Kích hoạt điều này trong một cửa sổ đầu cuối khác:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
Bây giờ chúng tôi cần cập nhật app.py của mình để gửi công việc vào hàng đợi…
Cập nhật app.py
Thêm các lần nhập sau vào app.py :
from rq import Queue
from rq.job import Job
from worker import conn
Sau đó cập nhật phần cấu hình:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
thiết lập kết nối Redis và khởi tạo hàng đợi dựa trên kết nối đó.
Di chuyển chức năng xử lý văn bản ra khỏi tuyến chỉ mục của chúng tôi và vào một hàm mới có tên là count_and_save_words()
. Hàm này chấp nhận một đối số, một URL, mà chúng tôi sẽ chuyển tới nó khi chúng tôi gọi nó từ tuyến chỉ mục của mình.
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
Hãy lưu ý đoạn mã sau:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
Lưu ý: Chúng tôi cần nhập
count_and_save_words
trong hàmindex
của chúng tôi vì gói RQ hiện có một lỗi, nơi nó sẽ không tìm thấy các chức năng trong cùng một mô-đun.
Ở đây, chúng tôi đã sử dụng hàng đợi mà chúng tôi đã khởi tạo trước đó và gọi là enqueue_call()
hàm số. Điều này đã thêm một công việc mới vào hàng đợi và công việc đó chạy count_and_save_words()
hàm với URL làm đối số. result_ttl=5000
đối số dòng cho RQ biết thời gian giữ kết quả của công việc trong - 5.000 giây, trong trường hợp này. Sau đó, chúng tôi xuất id công việc đến thiết bị đầu cuối. Id này là cần thiết để xem công việc đã được xử lý xong chưa.
Hãy thiết lập một lộ trình mới cho điều đó…
Nhận kết quả
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
Hãy kiểm tra điều này.
Khởi động máy chủ, điều hướng đến http:// localhost:5000 /, sử dụng URL https://realpython.com và lấy id công việc từ thiết bị đầu cuối. Sau đó, sử dụng id đó trong điểm cuối ‘/ results /’ - tức là http:// localhost:5000 / results / ef600206-3503-4b87-a436-ddd9438f2197.
Miễn là chưa đến 5.000 giây trôi qua trước khi bạn kiểm tra trạng thái, thì bạn sẽ thấy số id, số này được tạo khi chúng tôi thêm kết quả vào cơ sở dữ liệu:
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
Bây giờ, hãy cấu trúc lại tuyến đường một chút để trả về kết quả thực tế từ cơ sở dữ liệu trong JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
Đảm bảo thêm nhập:
from flask import jsonify
Kiểm tra điều này một lần nữa. Nếu mọi việc suôn sẻ, bạn sẽ thấy một cái gì đó tương tự như trong trình duyệt của mình:
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
Tiếp theo là gì?
Phần thưởng miễn phí: Nhấp vào đây để có quyền truy cập vào video hướng dẫn Flask + Python miễn phí hướng dẫn bạn cách xây dựng ứng dụng web Flask theo từng bước.
Trong Phần 5, chúng ta sẽ kết hợp máy khách và máy chủ bằng cách thêm Angular vào hỗn hợp để tạo một bộ thăm dò ý kiến, sẽ gửi một yêu cầu sau mỗi năm giây đến /results/<job_key>
điểm cuối yêu cầu cập nhật. Khi dữ liệu có sẵn, chúng tôi sẽ thêm dữ liệu đó vào DOM.
Chúc mừng!
Đây là tác phẩm hợp tác giữa Cam Linke, đồng sáng lập Startup Edmonton và những người tại Real Python