MongoDB
 sql >> Cơ Sở Dữ Liệu >  >> NoSQL >> MongoDB

Nhập CSV bằng lược đồ Mongoose

Bạn có thể làm điều đó với fast-csv bằng cách lấy các tiêu đề headers từ định nghĩa lược đồ sẽ trả về các dòng được phân tích cú pháp là "đối tượng". Bạn thực sự có một số điểm không khớp, vì vậy tôi đã đánh dấu chúng bằng các bản chỉnh sửa:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Miễn là giản đồ thực sự phù hợp với CSV được cung cấp thì không sao cả. Đây là những chỉnh sửa mà tôi có thể thấy nhưng nếu bạn cần các tên trường thực tế được căn chỉnh khác nhau thì bạn cần phải điều chỉnh. Nhưng về cơ bản có một Number ở vị trí có String và về cơ bản là một trường bổ sung, mà tôi cho là trường trống trong CSV.

Những điều chung chung là lấy mảng tên trường từ lược đồ và chuyển nó vào các tùy chọn khi tạo phiên bản phân tích cú pháp csv:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Khi bạn thực sự làm điều đó thì bạn sẽ nhận được "Đối tượng" trở lại thay vì một mảng:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Đừng lo lắng về "loại" vì Mongoose sẽ truyền các giá trị theo giản đồ.

Phần còn lại xảy ra trong trình xử lý cho data biến cố. Để đạt hiệu quả tối đa, chúng tôi đang sử dụng insertMany() để chỉ ghi vào cơ sở dữ liệu một lần sau mỗi 10.000 dòng. Cách thức thực sự đi đến máy chủ và xử lý phụ thuộc vào phiên bản MongoDB, nhưng 10.000 trường sẽ khá hợp lý dựa trên số lượng trường trung bình mà bạn sẽ nhập cho một bộ sưu tập về mặt "đánh đổi" cho việc sử dụng bộ nhớ và ghi yêu cầu mạng hợp lý. Làm cho số nhỏ hơn nếu cần.

Các phần quan trọng là đánh dấu các cuộc gọi này là async các chức năng và await kết quả của insertMany() trước khi tiếp tục. Ngoài ra, chúng ta cần pause() luồng và resume() trên mỗi mục nếu không, chúng tôi có nguy cơ ghi đè buffer tài liệu cần chèn trước khi chúng thực sự được gửi đi. pause()resume() là cần thiết để đặt "áp lực ngược" lên đường ống, nếu không các mục chỉ tiếp tục "thoát ra" và kích hoạt data sự kiện.

Đương nhiên, việc kiểm soát 10.000 mục nhập yêu cầu chúng tôi kiểm tra cả trên mỗi lần lặp lại và khi hoàn thành luồng để làm trống bộ đệm và gửi bất kỳ tài liệu còn lại nào đến máy chủ.

Đó thực sự là những gì bạn muốn làm, vì bạn chắc chắn không muốn kích hoạt một yêu cầu không đồng bộ tới máy chủ trên cả hai lần lặp "mọi" thông qua data sự kiện hoặc về cơ bản mà không cần đợi mỗi yêu cầu hoàn thành. Bạn sẽ thoát khỏi việc không kiểm tra "tệp rất nhỏ", nhưng đối với bất kỳ tải nào trong thế giới thực, bạn chắc chắn sẽ vượt quá ngăn xếp cuộc gọi do các lệnh gọi không đồng bộ "đang bay" chưa hoàn tất.

FYI - một package.json đã sử dụng. mz là tùy chọn vì nó chỉ là một Promise hiện đại hóa đã kích hoạt thư viện các thư viện "tích hợp sẵn" của nút tiêu chuẩn mà tôi chỉ đơn giản là quen sử dụng. Tất nhiên, mã này hoàn toàn có thể hoán đổi cho nhau với fs mô-đun.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Trên thực tế, với Node v8.9.x trở lên, chúng tôi thậm chí có thể làm cho việc này đơn giản hơn nhiều với việc triển khai AsyncIterator thông qua stream-to-iterator mô-đun. Nó vẫn ở trong Iterator<Promise<T>> nhưng nó sẽ hoạt động cho đến khi Node v10.x trở nên ổn định LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Về cơ bản, tất cả việc xử lý "sự kiện" và tạm dừng và tiếp tục luồng được thay thế bằng một for vòng lặp:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Dễ dàng! Điều này sẽ được dọn dẹp trong quá trình triển khai nút sau này với for..await..of khi nó trở nên ổn định hơn. Nhưng những điều trên chạy tốt trên phiên bản được chỉ định trở lên.



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Cách viết truy vấn union trong mongoDB

  2. Cách kết nối mongodb từ xa với pymongo

  3. Làm cách nào để xóa phần tử mảng trong mongodb?

  4. Hình ảnh trả về từ API REST luôn hiển thị bị hỏng

  5. Làm cách nào để lưu trữ Máy chủ phân tích cú pháp của riêng tôi trên Heroku bằng MongoDB?