Created
June 10, 2016 16:01
-
-
Save baumandm/c10fd31701366cf6b1b8ad996312acf9 to your computer and use it in GitHub Desktop.
Streaming from MongoDB to Elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Uses Mongoose to stream a large collection out of mongoDB and into Elasticsearch | |
// Using the Elasticsearch.js library and bulk mode.. | |
// Bulk request size must be large enough to avoid request timeout errors (depends on the ES cluster specs) | |
var mongoose = require('mongoose'); | |
// Mongo connection, schema all configured in mongo.js | |
var mongo = require('./mongo'); | |
var elasticsearch = require('elasticsearch'); | |
var client = new elasticsearch.Client({ | |
host: 'localhost:9200', | |
log: 'info' | |
}); | |
var indexName = 'myIndex'; | |
var typeName = 'doc'; | |
var Model = mongoose.model('myModel'); | |
var count = 0; | |
var startTime = new Date().getTime(); | |
var stream = Model.find().lean().stream(); | |
var bulk = []; | |
var sendAndEmptyQueue = function () { | |
client.bulk({ | |
body: bulk | |
}, function (err, resp) { | |
if (err) { | |
console.log(err); | |
} else { | |
console.log('Sent ' + bulk.length + ' documents to Elasticsearch!'); | |
} | |
}) | |
bulk = []; | |
} | |
stream.on('data', function(doc) { | |
count += 1; | |
bulk.push({ index: { _index: indexName, _type: typeName, _id: doc._id }}); | |
bulk.push(doc); | |
if (count % 500 == 1) { | |
sendAndEmptyQueue(); | |
} | |
}).on('err', function(err) { | |
console.log('MongoDB Stream Error: ' + err); | |
}).on('close', function() { | |
sendAndEmptyQueue(); | |
console.log('Document Count is: ' + count); | |
console.log('Duration is: ' + (new Date().getTime() - startTime)); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment