Tôi đã viết một công cụ sao lưu gia tăng cho MongoDB một thời gian trước, bằng Python. Công cụ giám sát các thay đổi dữ liệu bằng cách gắn thẻ oplog
. Đây là phần có liên quan của mã.
Câu trả lời được cập nhật, MongDB 3.6+
Như datdinhquoc đã khéo léo chỉ ra trong phần nhận xét bên dưới, đối với MongoDB 3.6 trở lên có Luồng thay đổi .
Câu trả lời được cập nhật, pymongo 3
from time import sleep
from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect
# Time to wait for data or connection.
_SLEEP = 1.0
if __name__ == '__main__':
oplog = MongoClient().local.oplog.rs
stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']
while True:
kw = {}
kw['filter'] = {'ts': {'$gt': stamp}}
kw['cursor_type'] = CursorType.TAILABLE_AWAIT
kw['oplog_replay'] = True
cursor = oplog.find(**kw)
try:
while cursor.alive:
for doc in cursor:
stamp = doc['ts']
print(doc) # Do something with doc.
sleep(_SLEEP)
except AutoReconnect:
sleep(_SLEEP)
Cũng xem http://api.mongodb.com/python/current/examples /tailable.html .
Câu trả lời ban đầu, pymongo 2
from time import sleep
from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp
# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}
# Time to wait for data or connection.
_SLEEP = 10
if __name__ == '__main__':
db = MongoClient().local
while True:
query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}} # Replace with your query.
cursor = db.oplog.rs.find(query, **_TAIL_OPTS)
cursor.add_option(_QUERY_OPTIONS['oplog_replay'])
try:
while cursor.alive:
try:
doc = next(cursor)
# Do something with doc.
except (AutoReconnect, StopIteration):
sleep(_SLEEP)
finally:
cursor.close()