Refactored callback to SessionFactory, Session type of behaviour

This commit is contained in:
Lassi Immonen 2010-12-28 13:39:41 +00:00
parent 40e67458c8
commit 20ce0cd486
10 changed files with 515 additions and 240 deletions

View File

@ -34,7 +34,13 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.6.9</version>
</dependency>
<!-- test -->
<dependency>

View File

@ -1,35 +1,19 @@
package com.mysema.query.lucene.session;
import org.apache.lucene.index.IndexWriter;
import com.mysema.query.lucene.LuceneQuery;
/**
* General interface on using Lucene.
*
* @author laimw
*/
public interface LuceneSession {
/**
* Creates a new LuceneQuery
*
* @return
*/
LuceneQuery createQuery();
IndexWriter createAppendWriter();
IndexWriter createOverwriteWriter();
void flush();
/**
* Creates a new index, adds updates to it and publishes the new index to
* all readers after the callback finishes.
*
* @param callback
*/
void updateNew(WriteCallback callback);
/**
* Updates the current index and publishes it to all readers after the
* callback finishes.
*
* @param callback
*/
void update(WriteCallback callback);
void close();
}

View File

@ -0,0 +1,8 @@
package com.mysema.query.lucene.session;
public interface LuceneSessionFactory {
LuceneSession getCurrentSession();
LuceneSession openSession(boolean readOnly);
}

View File

@ -0,0 +1,184 @@
package com.mysema.query.lucene.session;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mysema.query.QueryException;
public class LuceneSessionFactoryImpl implements LuceneSessionFactory {
private final Logger logger = LoggerFactory.getLogger(LuceneSessionFactoryImpl.class);
private Directory directory;
private final AtomicReference<IndexSearcher> searcher = new AtomicReference<IndexSearcher>();
public LuceneSessionFactoryImpl(String indexPath) throws IOException {
File folder = new File(indexPath);
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException("Could not create directory: " + folder.getAbsolutePath());
}
try {
directory = new SimpleFSDirectory(folder);
} catch (IOException e) {
logger.error("Could not create lucene directory to " + folder.getAbsolutePath());
throw e;
}
}
public LuceneSessionFactoryImpl(Directory directory) {
this.directory = directory;
}
@Override
public LuceneSession getCurrentSession() {
return LuceneSessionHolder.getCurrentSession();
}
@Override
public LuceneSession openSession(boolean readOnly) {
return new LuceneSessionImpl(this, readOnly);
}
public IndexWriter getWriter(boolean createNew) {
try {
return new IndexWriter(directory, new StandardAnalyzer(Version.LUCENE_30), createNew,
MaxFieldLength.LIMITED);
} catch (IOException e) {
throw new QueryException(e);
}
}
public void flush(LuceneSessionImpl session) {
IndexWriter writer = session.getIndexWriter();
if(writer == null) {
return;
}
try {
writer.commit();
} catch (IOException e) {
throw new QueryException(e);
}
//Close the reader, so it will need to lease it again
closeReader(session);
}
public IndexSearcher leaseSearcher() {
try {
if (searcher.get() == null) {
createNewSearcher(null);
}
// Checking do we need to refresh the reader
IndexSearcher is = searcher.get();
if (!is.getIndexReader().isCurrent()) {
// Underlying index has changed
// Decreasing the reference counter so that
// count can go to zero either here or
// when final searcher has done it's job
// This pairs with createNewSearcher incRef()
try {
is.getIndexReader().decRef();
} catch (IOException e) {
logger.error("Could not release index reader", e);
}
createNewSearcher(is);
}
// Incrementing reference as we lease this out
// This pairs with closeReaders decRef()
searcher.get().getIndexReader().incRef();
return searcher.get();
} catch (IOException e) {
throw new QueryException(e);
}
}
private IndexSearcher createNewSearcher(IndexSearcher expected) throws IOException {
IndexSearcher is = new IndexSearcher(directory);
if (!searcher.compareAndSet(expected, is)) {
// Some thread already created a new one so just close this
is.close();
} else {
// Incrementing the reference count first time
// We want to keep using the same reader until the index is changed
is.getIndexReader().incRef();
}
return searcher.get();
}
public void closeSession(LuceneSessionImpl session) {
QueryException readerException = null;
try {
closeReader(session);
} catch (QueryException e) {
readerException = e;
}
closeWriter(session);
if (readerException != null) {
throw readerException;
}
}
private void closeReader(LuceneSessionImpl session) {
if (session.getIndexSearcher() == null ) {
return;
}
try {
// Decrementing the reader, if this is last reference,
// reader will be closed
session.getIndexSearcher().getIndexReader().decRef();
session.removeIndexSearcher();
} catch (IOException e) {
logger.error("Reader close failed", e);
throw new QueryException("Reader close failed", e);
}
}
private void closeWriter(LuceneSessionImpl session) {
// Always close writer
try {
IndexWriter writer = session.getIndexWriter();
if (writer != null) {
writer.close();
}
} catch (IOException e) {
logger.error("Writer close failed", e);
try {
if (IndexWriter.isLocked(directory)) {
IndexWriter.unlock(directory);
}
} catch (IOException e1) {
logger.error("Lock release failed", e1);
throw new QueryException(e1);
}
throw new QueryException(e);
}
}
}

View File

@ -0,0 +1,64 @@
package com.mysema.query.lucene.session;
import com.mysema.query.QueryException;
/**
* Holds the thread local session
*
* @author laimw
*/
public final class LuceneSessionHolder {
private static final ThreadLocal<LuceneSessionRef> currentSessionRef =
new ThreadLocal<LuceneSessionRef>();
private static class LuceneSessionRef {
private LuceneSession session;
private int referenceCount = 0;
LuceneSessionRef(LuceneSession session) {
this.session = session;
}
}
private LuceneSessionHolder() {
}
public static boolean hasCurrentSession() {
return currentSessionRef.get() != null;
}
public static LuceneSession getCurrentSession() {
return getSessionRef().session;
}
public static void setCurrentSession(LuceneSession session) {
LuceneSessionRef ref = new LuceneSessionRef(session);
currentSessionRef.set(ref);
}
public static void release() {
LuceneSessionRef ref = getSessionRef();
ref.referenceCount--;
if (ref.referenceCount == 0) {
try {
ref.session.close();
} finally {
currentSessionRef.remove();
}
}
}
public static void lease() {
getSessionRef().referenceCount++;
}
private static LuceneSessionRef getSessionRef() {
if (!hasCurrentSession()) {
throw new QueryException("There is no session bound to local thread");
}
return currentSessionRef.get();
}
}

View File

@ -1,150 +1,101 @@
package com.mysema.query.lucene.session;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mysema.query.QueryException;
import com.mysema.query.lucene.LuceneQuery;
import com.mysema.query.lucene.LuceneSerializer;
/**
* Lucene session implementation
*
* @author laimw
*
*/
public class LuceneSessionImpl implements LuceneSession {
private final Logger logger = LoggerFactory.getLogger(LuceneSessionImpl.class);
private final Directory directory;
private final AtomicReference<IndexSearcher> searcher = new AtomicReference<IndexSearcher>();
private final LuceneSerializer serializer = new LuceneSerializer(true, true);
public LuceneSessionImpl(Directory directory) {
this.directory = directory;
}
public LuceneSessionImpl(String indexPath) throws IOException {
File folder = new File(indexPath);
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException("Could not create directory: "
+ folder.getAbsolutePath());
}
try {
directory = new SimpleFSDirectory(folder);
} catch (IOException e) {
logger.error("Could not create lucene directory to "
+ folder.getAbsolutePath());
throw e;
}
}
private IndexSearcher createNewSearcher(IndexSearcher expected) throws IOException {
IndexSearcher is = new IndexSearcher(directory);
if (!searcher.compareAndSet(expected, is)) {
// Some thread already created a new one so just close this
is.close();
} else {
// Incrementing the reference count first time
// We want to keep using the same reader until the index is changed
is.getIndexReader().incRef();
}
return searcher.get();
}
private IndexSearcher getSearcher() throws IOException {
if (searcher.get() == null) {
createNewSearcher(null);
}
// Checking do we need to refresh the reader
IndexSearcher is = searcher.get();
if (!is.getIndexReader().isCurrent()) {
// Underlying index has changed
// Decreasing the reference counter so that
// count can go to zero either here or
// when final searcher has done it's job
is.getIndexReader().decRef();
createNewSearcher(is);
}
return searcher.get();
}
private boolean readOnly;
private boolean closed = false;
private LuceneSessionFactoryImpl sessionFactory;
private IndexSearcher searcher;
private IndexWriter writer;
private LuceneSerializer serializer = new LuceneSerializer(true, true);
public LuceneSessionImpl(LuceneSessionFactoryImpl sessionFactory,
boolean readOnly) {
this.sessionFactory = sessionFactory;
this.readOnly = readOnly;
}
@Override
public LuceneQuery createQuery() {
try {
final IndexSearcher is = getSearcher();
is.getIndexReader().incRef();
return new LuceneQuery(serializer, is){
@Override
public void close(){
try {
is.getIndexReader().decRef();
} catch (IOException e) {
throw new QueryException(e);
}
}
};
} catch (IOException e) {
throw new QueryException(e);
}
checkClosed();
return new LuceneQuery(serializer, getSearcher());
}
@Override
public void update(WriteCallback callback) {
try {
update(callback, false);
} catch (IOException e) {
throw new QueryException(e);
}
}
@Override
public void updateNew(WriteCallback callback) {
try {
update(callback, true);
} catch (IOException e) {
throw new QueryException(e);
private IndexSearcher getSearcher() {
if(searcher == null) {
searcher = sessionFactory.leaseSearcher();
}
return searcher;
}
private void update(WriteCallback callback, boolean create) throws IOException {
IndexWriter writer = new IndexWriter(directory, new StandardAnalyzer(
Version.LUCENE_CURRENT), create, MaxFieldLength.LIMITED);
try {
callback.write(writer);
} finally {
try {
writer.close();
} catch (IOException e) {
logger.error("Writer close failed", e);
try {
if (IndexWriter.isLocked(directory)) {
IndexWriter.unlock(directory);
}
} catch (IOException e1) {
logger.error("Lock release failed", e1);
}
}
@Override
public IndexWriter createAppendWriter() {
checkClosed();
return createWriter(false);
}
@Override
public IndexWriter createOverwriteWriter() {
checkClosed();
return createWriter(true);
}
private IndexWriter createWriter(boolean createNew) {
if (readOnly) {
throw new QueryException("Read only session, cannot create writer");
}
if (writer == null) {
writer = sessionFactory.getWriter(createNew);
}
return writer;
}
@Override
public void close() {
checkClosed();
sessionFactory.closeSession(this);
closed = true;
}
@Override
public void flush() {
checkClosed();
sessionFactory.flush(this);
}
public IndexSearcher getIndexSearcher() {
return searcher;
}
public void removeIndexSearcher() {
searcher = null;
}
public IndexWriter getIndexWriter() {
return writer;
}
private void checkClosed() {
if (closed) {
throw new QueryException("Session is closed");
}
}
}

View File

@ -0,0 +1,14 @@
package com.mysema.query.lucene.session;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LuceneTransaction {
boolean readOnly() default false;
}

View File

@ -0,0 +1,43 @@
package com.mysema.query.lucene.session;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Aspect
public class LuceneTransactionHandler {
private final Logger logger = LoggerFactory.getLogger(LuceneTransactionHandler.class);
private LuceneSessionFactoryImpl sessionFactory;
public LuceneTransactionHandler(LuceneSessionFactoryImpl sessionFactory) {
this.sessionFactory = sessionFactory;
}
@Around("@annotation(luceneTransactionAnnotation)")
public Object transactionalMethod(ProceedingJoinPoint joinPoint, LuceneTransaction annotation)
throws Throwable {
if (!LuceneSessionHolder.hasCurrentSession()) {
if (logger.isDebugEnabled()) {
logger.debug("Binding new session to thread");
}
LuceneSession session = sessionFactory.openSession(annotation.readOnly());
LuceneSessionHolder.setCurrentSession(session);
}
LuceneSessionHolder.lease();
try {
return joinPoint.proceed();
} finally {
LuceneSessionHolder.release();
}
}
}

View File

@ -0,0 +1,108 @@
package com.mysema.query.lucene.session;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.NumericField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.junit.Before;
import org.junit.Test;
import com.mysema.query.lucene.LuceneQuery;
import com.mysema.query.types.path.StringPath;
public class LuceneSessionFactoryTest {
private LuceneSessionFactory sessionFactory;
private Directory directory;
private StringPath title;
@Before
public void before() throws IOException {
directory = new RAMDirectory();
sessionFactory = new LuceneSessionFactoryImpl(directory);
final QDocument entityPath = new QDocument("doc");
title = entityPath.title;
}
@Test
public void testCreate() throws IOException {
LuceneSession session = sessionFactory.openSession(false);
IndexWriter writer = session.createOverwriteWriter();
writer.addDocument(createDocument(
"Jurassic Park",
"Michael Crichton",
"It's a UNIX system! I know this!",
1990,
90.00));
writer.addDocument(createDocument(
"Nummisuutarit",
"Aleksis Kivi",
"ESKO. Ja iloitset ja riemuitset?",
1864,
10.00));
session.flush();
//Testing the write
IndexSearcher searcher = new IndexSearcher(directory);
Document doc1 = searcher.doc(0);
Document doc2 = searcher.doc(1);
assertNotNull(doc1);
assertNotNull(doc2);
assertEquals("Jurassic Park", doc1.getField("title").stringValue());
assertEquals("Nummisuutarit", doc2.getField("title").stringValue());
LuceneQuery query = session.createQuery();
List<Document> results = query.where(title.eq("Jurassic Park")).list();
assertEquals(1, results.size());
assertEquals("Jurassic Park", results.get(0).getField("title").stringValue());
//TODO Kysely ei toimi, jos ei ota uutta LuceneQuery objektia
query = session.createQuery();
long count = query.where(title.startsWith("Nummi")).count();
assertEquals(1, count);
// TODO Tästä tulee 0 eikä 2!!
// query.where(title.ne("AA")).count();
session.close();
}
private Document createDocument(
final String docTitle,
final String docAuthor,
final String docText,
final int docYear,
final double docGross) {
final Document doc = new Document();
doc.add(new Field("title", docTitle, Store.YES, Index.ANALYZED));
doc.add(new Field("author", docAuthor, Store.YES, Index.ANALYZED));
doc.add(new Field("text", docText, Store.YES, Index.ANALYZED));
doc.add(new NumericField("year", Store.YES, true).setIntValue(docYear));
doc.add(new NumericField("gross", Store.YES, true).setDoubleValue(docGross));
return doc;
}
}

View File

@ -1,87 +0,0 @@
package com.mysema.query.lucene.session;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericField;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.junit.Before;
import org.junit.Test;
public class LuceneSessionImplTest {
private LuceneSession session;
private Directory directory;
private final QDocument document = new QDocument("doc");
@Before
public void before() throws IOException {
directory = new RAMDirectory();
session = new LuceneSessionImpl(directory);
}
@Test
public void Create() throws IOException {
session.updateNew(new WriteCallback() {
public void write(IndexWriter writer){
try {
writer.addDocument(createDocument(
"Jurassic Park",
"Michael Crichton",
"It's a UNIX system! I know this!",
1990,
90.00));
writer.addDocument(createDocument(
"Nummisuutarit",
"Aleksis Kivi",
"ESKO. Ja iloitset ja riemuitset?",
1864,
10.00));
} catch (CorruptIndexException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
List<Document> results = session.createQuery().where(document.title.eq("Jurassic Park")).list();
assertEquals(1, results.size());
assertEquals("Jurassic Park", results.get(0).getField("title").stringValue());
Long count = session.createQuery().where(document.title.startsWith("Nummi")).count();
assertEquals(1, (long) count);
}
private Document createDocument(
String docTitle,
String docAuthor,
String docText,
int docYear,
double docGross) {
Document doc = new Document();
doc.add(new Field("title", docTitle, Store.YES, Index.ANALYZED));
doc.add(new Field("author", docAuthor, Store.YES, Index.ANALYZED));
doc.add(new Field("text", docText, Store.YES, Index.ANALYZED));
doc.add(new NumericField("year", Store.YES, true).setIntValue(docYear));
doc.add(new NumericField("gross", Store.YES, true).setDoubleValue(docGross));
return doc;
}
}