Bạn có thể làm điều đó với fast-csv bằng cách lấy các tiêu đề headers
từ định nghĩa lược đồ sẽ trả về các dòng được phân tích cú pháp là "đối tượng". Bạn thực sự có một số điểm không khớp, vì vậy tôi đã đánh dấu chúng bằng các bản chỉnh sửa:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Miễn là giản đồ thực sự phù hợp với CSV được cung cấp thì không sao cả. Đây là những chỉnh sửa mà tôi có thể thấy nhưng nếu bạn cần các tên trường thực tế được căn chỉnh khác nhau thì bạn cần phải điều chỉnh. Nhưng về cơ bản có một Number
ở vị trí có String
và về cơ bản là một trường bổ sung, mà tôi cho là trường trống trong CSV.
Những điều chung chung là lấy mảng tên trường từ lược đồ và chuyển nó vào các tùy chọn khi tạo phiên bản phân tích cú pháp csv:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
Khi bạn thực sự làm điều đó thì bạn sẽ nhận được "Đối tượng" trở lại thay vì một mảng:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Đừng lo lắng về "loại" vì Mongoose sẽ truyền các giá trị theo giản đồ.
Phần còn lại xảy ra trong trình xử lý cho data
biến cố. Để đạt hiệu quả tối đa, chúng tôi đang sử dụng insertMany()
để chỉ ghi vào cơ sở dữ liệu một lần sau mỗi 10.000 dòng. Cách thức thực sự đi đến máy chủ và xử lý phụ thuộc vào phiên bản MongoDB, nhưng 10.000 trường sẽ khá hợp lý dựa trên số lượng trường trung bình mà bạn sẽ nhập cho một bộ sưu tập về mặt "đánh đổi" cho việc sử dụng bộ nhớ và ghi yêu cầu mạng hợp lý. Làm cho số nhỏ hơn nếu cần.
Các phần quan trọng là đánh dấu các cuộc gọi này là async
các chức năng và await
kết quả của insertMany()
trước khi tiếp tục. Ngoài ra, chúng ta cần pause()
luồng và resume()
trên mỗi mục nếu không, chúng tôi có nguy cơ ghi đè buffer
tài liệu cần chèn trước khi chúng thực sự được gửi đi. pause()
và resume()
là cần thiết để đặt "áp lực ngược" lên đường ống, nếu không các mục chỉ tiếp tục "thoát ra" và kích hoạt data
sự kiện.
Đương nhiên, việc kiểm soát 10.000 mục nhập yêu cầu chúng tôi kiểm tra cả trên mỗi lần lặp lại và khi hoàn thành luồng để làm trống bộ đệm và gửi bất kỳ tài liệu còn lại nào đến máy chủ.
Đó thực sự là những gì bạn muốn làm, vì bạn chắc chắn không muốn kích hoạt một yêu cầu không đồng bộ tới máy chủ trên cả hai lần lặp "mọi" thông qua data
sự kiện hoặc về cơ bản mà không cần đợi mỗi yêu cầu hoàn thành. Bạn sẽ thoát khỏi việc không kiểm tra "tệp rất nhỏ", nhưng đối với bất kỳ tải nào trong thế giới thực, bạn chắc chắn sẽ vượt quá ngăn xếp cuộc gọi do các lệnh gọi không đồng bộ "đang bay" chưa hoàn tất.
FYI - một package.json
đã sử dụng. mz
là tùy chọn vì nó chỉ là một Promise
hiện đại hóa đã kích hoạt thư viện các thư viện "tích hợp sẵn" của nút tiêu chuẩn mà tôi chỉ đơn giản là quen sử dụng. Tất nhiên, mã này hoàn toàn có thể hoán đổi cho nhau với fs
mô-đun.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Trên thực tế, với Node v8.9.x trở lên, chúng tôi thậm chí có thể làm cho việc này đơn giản hơn nhiều với việc triển khai AsyncIterator
thông qua stream-to-iterator
mô-đun. Nó vẫn ở trong Iterator<Promise<T>>
nhưng nó sẽ hoạt động cho đến khi Node v10.x trở nên ổn định LTS:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Về cơ bản, tất cả việc xử lý "sự kiện" và tạm dừng và tiếp tục luồng được thay thế bằng một for
vòng lặp:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Dễ dàng! Điều này sẽ được dọn dẹp trong quá trình triển khai nút sau này với for..await..of
khi nó trở nên ổn định hơn. Nhưng những điều trên chạy tốt trên phiên bản được chỉ định trở lên.