Below is the Java source of this implementation class
package swiki.lucene.asynchronous;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;
/**
* @author swiki swiki
*
*/
public class AsynchronousIndexWriter implements Runnable {
/*
* A blocking queue of document to facilitate asynchronous writing.
*/
private BlockingQueue documents;
/*
* Instance of core index writer which does the actual writing task.
*/
private IndexWriter writer;
/*
* Thread which makes writing asynchronous
*/
private Thread writerThread;
/*
* We need to set this to false if the document addition is completed. This
* will not immediately stop the writing as there could be some documents in
* the queue. It completes once all documents are written and the queue is
* empty.
*/
private boolean keepRunning = true;
/*
* This flag is set to false once writer is done with the queue data
* writing.
*/
private boolean isRunning = true;
/*
* Duration in miliseconds for which the writer should sleep when it finds
* the queue empty and job is still not completed
*/
private long sleepMilisecondOnEmpty = 100;
/**
* This method should be used to add documents to index queue. If the queue
* is full it will wait for the queue to be available.
*
* @param doc
* @throws InterruptedException
*/
public void addDocument(Document doc) throws InterruptedException {
documents.put(doc);
}
public void startWriting() {
writerThread = new Thread(this, "AsynchronousIndexWriter");
writerThread.start();
}
/**
* Constructor with indexwriter as input. It Uses ArrayBlockingQueue with
* size 100 and sleepMilisecondOnEmpty is 100ms
*
* @param w
*/
public AsynchronousIndexWriter(IndexWriter w) {
this(w, 100, 100);
}
/**
* Constructor with indexwriter and queue size as input. It Uses
* ArrayBlockingQueue with size queueSize and sleepMilisecondOnEmpty is
* 100ms
*
* @param w
* @param queueSize
*/
public AsynchronousIndexWriter(IndexWriter w, int queueSize) {
this(w, queueSize, 100);
}
/**
* Constructor with indexwriter, queueSize as input. It Uses
* ArrayBlockingQueue with size queueSize
*
* @param w
* @param queueSize
* @param sleepMilisecondOnEmpty
*/
public AsynchronousIndexWriter(IndexWriter w, int queueSize,
long sleepMilisecondOnEmpty) {
this(w, new ArrayBlockingQueue(queueSize), sleepMilisecondOnEmpty);
}
/**
* A implementation of BlockingQueue can be used
*
* @param w
* @param queueSize
* @param sleepMilisecondOnEmpty
*/
public AsynchronousIndexWriter(IndexWriter w, BlockingQueue queue,
long sleepMilisecondOnEmpty) {
writer = w;
documents = queue;
this.sleepMilisecondOnEmpty = sleepMilisecondOnEmpty;
startWriting();
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
while (keepRunning || !documents.isEmpty()) {
Document d = (Document) documents.poll();
try {
if (d != null) {
writer.addDocument(d);
} else {
/*
* Nothing in queue so lets wait
*/
Thread.sleep(sleepMilisecondOnEmpty);
}
} catch (ClassCastException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (CorruptIndexException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
isRunning = false;
}
/**
* Stop the thread gracefully, wait until its done writing.
*/
private void stopWriting() {
this.keepRunning = false;
try {
while (isRunning) {
//using the same sleep duration as writer uses
Thread.sleep(sleepMilisecondOnEmpty);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void optimize() throws CorruptIndexException, IOException {
writer.optimize();
}
public void close() throws CorruptIndexException, IOException {
stopWriting();
writer.close();
}
}
Below is a sample class which demonstrates how we can use this class. Here are few things to note, asynchronous thread is started as soon as you instantiate using new AsynchronousIndexWriter(...)
package swiki.lucene.asynchronous;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
/**
* @author swiki swiki
*
*/
public class TestAsyncWriter {
public static void main(String[] args) {
try {
Directory fsdir = FSDirectory.getDirectory("index");
IndexWriter w = new IndexWriter(fsdir, new StandardAnalyzer(), true);
AsynchronousIndexWriter writer = new AsynchronousIndexWriter(w);
/*
* This call can be replaced by the logic of reading
* data using multiple threads
*/
addDocumentsInMultipleThreads(writer);
writer.optimize();
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void addDocumentsInMultipleThreads(
AsynchronousIndexWriter writer) throws InterruptedException {
//add here the code for adding document from multiple threads.
Document doc = new Document();
doc.add(new Field("content","My Content", Field.Store.YES, Field.Index.UN_TOKENIZED));
writer.addDocument(new Document());
}
}
5 Comment:
Good Article and I just have one sugestion.
It seems that the use of the Decorator pattern fits well in this case. Can't your Asynchronous writer extend the IndexWriter? This way you could switch back to the default writer.
And an observation: the optimse operation shouldn't be asynchronous too?
Cheers
Nice idea...though I preferred containment approach in this case for the following reasons:
I did not want to restrict my implementation to only current version of IndexWriter. If there is a new and better version of IndexWriter available then same implementation can be leveraged. I think same can be achieved in decorator approach as well.
Another reason I did not want to do inheritance is, providing support for all parametrized constructors of IndexWriter which is a un-necessary code calling super in each case, as I am not worried about how the Writer is initialized but just need to make it asynchronous.
Third reason, I was not sure about Multi-threaded environment behavior of all methods of IndexWriter so didn't want risk the implementation.
This is useful stuff, I am using this in my implementation. Thanks
Hi! You treated here the problem whenever the process of indexation is faster than "data's transmission". In my case, the processof indexation make data treatement too slow. So is It possible to launch a // indexation process using the same index?
@omar
1. Writing the index in parallel can also be done, but you can not write to the same index. I have tried that option by creating multiple indexes and merging them when indexing is completed. You can use the merge method for doing that. In parallel index writing nmy observation is that the maximum time is spent on merging as thats the intelligent part of indexing. So I am not sure how much time you would be able to save. But Surely it will enable you to release the Data reading resources if index writing is taking more time.
2. In a typical scenario of getting data from Database, its always faster at Index writing side as the DB call is done over network and indexing can be done on local disc. If you have a really fast network or DB is local then It may be possible that index writing in slower then that but I would first try to find out the Database fetch time before coming to any conclusion.
3. Try to log some time stats at the place where you are fetching data and see if its really performing fast. Make sure to log the time for complete data fetch time not the first record. As its always misleading.
Post a Comment
Got to say something? Don't hold it, we would love to hear it from you. Leave a comment.