Tôi đã có thể sử dụng Change Streams và TTL để mô phỏng một cronjob. Tôi đã xuất bản một bài đăng giải thích chi tiết những gì tôi đã làm và ghi công tại: https:// www. patreon.com/posts/17697287
Tuy nhiên, về cơ bản, bất cứ lúc nào tôi cần lên lịch "sự kiện" cho một tài liệu, khi tạo tài liệu, tôi cũng tạo một tài liệu sự kiện song song. Tài liệu sự kiện này sẽ có _id giống như id của tài liệu đầu tiên.
Ngoài ra, đối với tài liệu sự kiện này, tôi sẽ thiết lập một TTL.
Khi TTL hết hạn, tôi sẽ ghi lại sự thay đổi "xóa" của nó với Change Streams. Và sau đó, tôi sẽ sử dụng documentKey của sự thay đổi (vì nó giống với id của tài liệu tôi muốn kích hoạt) để tìm tài liệu đích trong bộ sưu tập đầu tiên và làm bất cứ điều gì tôi muốn với tài liệu.
Tôi đang sử dụng Node.js với Express và Mongoose để truy cập MongoDB. Đây là phần liên quan sẽ được thêm vào App.js:
const { ReplSet } = require('mongodb-topology-manager');
run().catch(error => console.error(error));
async function run() {
console.log(new Date(), 'start');
const bind_ip = 'localhost';
// Starts a 3-node replica set on ports 31000, 31001, 31002, replica set
// name is "rs0".
const replSet = new ReplSet('mongod', [
{ options: { port: 31000, dbpath: `${__dirname}/data/db/31000`, bind_ip } },
{ options: { port: 31001, dbpath: `${__dirname}/data/db/31001`, bind_ip } },
{ options: { port: 31002, dbpath: `${__dirname}/data/db/31002`, bind_ip } }
], { replSet: 'rs0' });
// Initialize the replica set
await replSet.purge();
await replSet.start();
console.log(new Date(), 'Replica set started...');
// Connect to the replica set
const uri = 'mongodb://localhost:31000,localhost:31001,localhost:31002/' + 'test?replicaSet=rs0';
await mongoose.connect(uri);
var db = mongoose.connection;
db.on('error', console.error.bind(console, 'connection error:'));
db.once('open', function () {
console.log("Connected correctly to server");
});
// To work around "MongoError: cannot open $changeStream for non-existent database: test" for this example
await mongoose.connection.createCollection('test');
// *** we will add our scheduler here *** //
var Item = require('./models/item');
var ItemExpiredEvent = require('./models/scheduledWithin');
let deleteOps = {
$match: {
operationType: "delete"
}
};
ItemExpiredEvent.watch([deleteOps]).
on('change', data => {
// *** treat the event here *** //
console.log(new Date(), data.documentKey);
Item.findById(data.documentKey, function(err, item) {
console.log(item);
});
});
// The TTL set in ItemExpiredEvent will trigger the change stream handler above
console.log(new Date(), 'Inserting item');
Item.create({foo:"foo", bar: "bar"}, function(err, cupom) {
ItemExpiredEvent.create({_id : item._id}, function(err, event) {
if (err) console.log("error: " + err);
console.log('event inserted');
});
});
}
Và đây là mã cho model / SchedisedWithin:
var mongoose = require('mongoose');
var Schema = mongoose.Schema;
var ScheduledWithin = new Schema({
_id: mongoose.Schema.Types.ObjectId,
}, {timestamps: true});
// timestamps: true will automatically create a "createdAt" Date field
ScheduledWithin.index({createdAt: 1}, {expireAfterSeconds: 90});
module.exports = mongoose.model('ScheduledWithin', ScheduledWithin);