Indexing with the Elasticsearch Bulk API

If you’ve ever worked with the elasticsearch-rails gem, then you’ve likely used the import method to index your local Elasticsearch indexes. This is great for developing because it enables you to quickly index documents and work on the important stuff, like modifying search queries to your desired specification.

However, this technique doesn’t hold its water when you try to index a large number of documents on a production or staging environment. You should always work with Elasticsearch on a staging environment first to see how if your search queries behave as expect with several thousand documents before pushing to production. Similarly, you want to have a plan for indexing several million documents quickly.

The best strategy is to use Elasticsearch’s built in Bulk API, which enables you to index tens of thousands of documents in one request! I elected to use a Sidekiq worker to handle the creation of a bulk request with 1000 documents. This way, I can iterate over an entire table and batch my indexing into 1000 document-sized requests.

class ElasticsearchBulkIndexWorker
  include Sidekiq::Worker
  sidekiq_options retry: 5, queue: 'low'

  def client
    @client = Elasticsearch::Client.new host: CONFIG['ELASTICSEARCH_URL']
  end

  def perform(model, starting_index)
    klass = model.capitalize.constantize
    batch_for_bulk = []
    klass.where(id: starting_index..(starting_index+999)).each do |record|
      batch_for_bulk.push({ index: { _id: record.id, data: record.as_indexed_json } }) unless record.try(:archived)
    end
    klass.__elasticsearch__.client.bulk(
      index: "#{model.pluralize}_v1",
      type: model,
      body: batch_for_bulk
    )
  end
end

In order to iterate over an entire table, I simply created a rake task to handle counting and feeding in numbers to the worker. Notice that the ElasticsearchBulkIndexWorker takes a starting_index argument to handle batching. Also, it takes a string, which is the name of the model used to find the appropriate records and match them with the similarly-named index.

namespace :elasticsearch do
  task build_article_index: :environment do
    (1..Article.last.id).step(1000).each do |starting_index|
      ElasticsearchBulkIndexWorker.perform_async('article', starting_index)
    end
  end

  task build_comment_index: :environment do
    (1..Comment.last.id).step(1000).each do |starting_index|
      ElasticsearchBulkIndexWorker.perform_async('comment', starting_index)
    end
  end
end

This should help you index a high number of documents quickly. For more information, check out the Elasticsearch Bulk API documentation to figure out how to improve write consistency during your bulk indexing. Good luck!

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s