Session factory now supports much better thread thgrouhoutput, especially for many reads. There is optional ThreadingTest which can be used to test Lucene session under simultaneus users.

This commit is contained in:
Lassi Immonen 2011-01-03 12:51:25 +00:00
parent 51673ca67e
commit 56e9b38c3a
11 changed files with 492 additions and 84 deletions

View File

@ -49,14 +49,21 @@
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>3.0.3.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>3.0.3.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,21 @@
package com.mysema.query.lucene.session;
import com.mysema.query.QueryException;
public class WriteLockObtainFailedException extends QueryException {
private static final long serialVersionUID = 4569418223905066659L;
public WriteLockObtainFailedException(String msg) {
super(msg);
}
public WriteLockObtainFailedException(String msg, Throwable t) {
super(msg, t);
}
public WriteLockObtainFailedException(Throwable t) {
super(t);
}
}

View File

@ -11,23 +11,29 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mysema.query.QueryException;
import com.mysema.query.lucene.session.LuceneWriter;
import com.mysema.query.lucene.session.WriteLockObtainFailedException;
public class LuceneWriterImpl implements LuceneWriter {
public class FileLockingWriter implements LuceneWriter {
private static final Logger logger = LoggerFactory.getLogger(LuceneWriterImpl.class);
protected static final Logger logger = LoggerFactory.getLogger(LuceneWriter.class);
private IndexWriter writer;
protected IndexWriter writer;
@Nullable
private final ReleaseListener releaseListener;
protected final ReleaseListener releaseListener;
private volatile boolean leased = false;
public LuceneWriterImpl(Directory directory, boolean createNew, ReleaseListener releaseListener) {
public FileLockingWriter(Directory directory, boolean createNew, long defaultLockTimeout,
ReleaseListener releaseListener) {
IndexWriter.setDefaultWriteLockTimeout(defaultLockTimeout);
try {
if (createNew == false) {
try {
@ -47,10 +53,16 @@ public class LuceneWriterImpl implements LuceneWriter {
MaxFieldLength.LIMITED);
}
} catch (LockObtainFailedException e) {
logger.error("Got timeout " + System.currentTimeMillis());
throw new WriteLockObtainFailedException(e);
} catch (IOException e) {
throw new QueryException(e);
}
this.releaseListener = releaseListener;
if(logger.isDebugEnabled()) {
logger.debug("Created writer " + writer);
}
}
@Override
@ -90,6 +102,9 @@ public class LuceneWriterImpl implements LuceneWriter {
// TODO What would be best way to control this?
writer.optimize();
writer.close();
if(logger.isDebugEnabled()) {
logger.debug("Closed writer " + writer);
}
} catch (IOException e) {
logger.error("Writer close failed", e);
try {
@ -107,5 +122,19 @@ public class LuceneWriterImpl implements LuceneWriter {
public IndexWriter getIndexWriter() {
return writer;
}
public void lease() {
leased = true;
}
public void release() {
leased = false;
}
public boolean isLeased() {
return leased;
}
}

View File

@ -2,7 +2,9 @@ package com.mysema.query.lucene.session.impl;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@ -22,11 +24,18 @@ public class LuceneSessionFactoryImpl implements LuceneSessionFactory {
private final Directory directory;
private final AtomicReference<LuceneSearcher> searcher = new AtomicReference<LuceneSearcher>();
@Nullable
private volatile LuceneSearcher searcher;
ExecutorService searchUpdater = Executors.newSingleThreadExecutor();
private final AtomicBoolean creatingNew = new AtomicBoolean(false);
@Nullable
private final ReleaseListener releaseListener;
private long defaultLockTimeout = 2000;
public LuceneSessionFactoryImpl(String indexPath) throws IOException {
File folder = new File(indexPath);
if (!folder.exists() && !folder.mkdirs()) {
@ -60,8 +69,8 @@ public class LuceneSessionFactoryImpl implements LuceneSessionFactory {
if (!LuceneSessionHolder.hasCurrentSession(this)) {
if (logger.isDebugEnabled()) {
logger.debug("Binding new session to thread");
if (logger.isTraceEnabled()) {
logger.trace("Binding new session to thread");
}
LuceneSession session = openSession(LuceneSessionHolder.getReadOnly());
@ -76,56 +85,58 @@ public class LuceneSessionFactoryImpl implements LuceneSessionFactory {
return new LuceneSessionImpl(this, readOnly);
}
public LuceneWriterImpl getWriter(boolean createNew) {
return new LuceneWriterImpl(directory, createNew, releaseListener);
public FileLockingWriter getWriter(boolean createNew) {
return new FileLockingWriter(directory, createNew, defaultLockTimeout, releaseListener);
}
public LuceneSearcher leaseSearcher() {
try {
if (searcher.get() == null) {
createNewSearcher(null);
if (searcher == null) {
synchronized (this) {
if (searcher == null) {
if (logger.isDebugEnabled()) {
logger.debug("Creating first searcher");
}
searcher = createNewSearcher();
}
}
}
// Checking do we need to refresh the reader
LuceneSearcher s = searcher.get();
if (!s.isCurrent()) {
// Underlying index has changed
// Decreasing the reference counter so that
// count can go to zero either here or
// when final searcher has done it's job
// This pairs with createNewSearcher incRef()
try {
s.release();
} catch (QueryException e) {
logger.error("Could not release index reader", e);
if (creatingNew.compareAndSet(false, true)) {
if (!searcher.isCurrent()) {
try {
// This release pairs with createNewSearcher lease
searcher.release();
} catch (QueryException e) {
logger.error("Could not release searcher", e);
}
searcher = createNewSearcher();
}
createNewSearcher(s);
creatingNew.set(false);
}
// Incrementing reference as we lease this out
// This pairs with closeReaders decRef()
searcher.get().lease();
return searcher.get();
// This pairs with LuceneSessions close
searcher.lease();
return searcher;
} catch (IOException e) {
throw new QueryException(e);
}
}
private LuceneSearcher createNewSearcher(@Nullable LuceneSearcher expected) throws IOException {
private LuceneSearcher createNewSearcher() throws IOException {
LuceneSearcher s = new LuceneSearcher(directory, releaseListener);
if (!searcher.compareAndSet(expected, s)) {
// Some thread already created a new one so just close this
s.release();
} else {
// Incrementing the reference count first time
// We want to keep using the same reader until the index is changed
s.lease();
if (logger.isDebugEnabled()) {
logger.debug("Created searcher " + s);
}
return searcher.get();
// Increment the first time
s.lease();
return s;
}
public void setDefaultLockTimeout(long defaultLockTimeout) {
this.defaultLockTimeout = defaultLockTimeout;
}
}

View File

@ -64,6 +64,7 @@ public final class LuceneSessionHolder {
private static Map<LuceneSessionFactory, LuceneSession> getSessions() {
if (sessions.get() == null) {
sessions.set(new HashMap<LuceneSessionFactory, LuceneSession>());
}
return sessions.get();
}
@ -74,8 +75,10 @@ public final class LuceneSessionHolder {
try {
for (LuceneSession session : getSessions().values()) {
try {
//System.out.println("session holder close");
session.close();
} catch (QueryException e) {
//System.out.println("failed to close session " + e.getCause().getMessage());
logger.error("Failed to close session", e);
}
}

View File

@ -22,7 +22,7 @@ public class LuceneSessionImpl implements LuceneSession {
private LuceneSearcher searcher;
@Nullable
private LuceneWriterImpl writer;
private FileLockingWriter writer;
private final LuceneSerializer serializer = new LuceneSerializer(true, true);
@ -83,7 +83,15 @@ public class LuceneSessionImpl implements LuceneSession {
}
if (writer != null) {
writer.close();
if (writer.isLeased()) {
try {
writer.commit();
} finally {
writer.release();
}
} else {
writer.close();
}
}
if (searcherException != null) {

View File

@ -17,14 +17,17 @@ public class LuceneTransactionHandler {
public Object transactionalMethod(ProceedingJoinPoint joinPoint, LuceneTransactional annotation)
throws Throwable {
if(logger.isDebugEnabled()) {
logger.debug("Starting LuceneTransactional method");
if (logger.isTraceEnabled()) {
logger.trace("LuceneSessionHolder.lease");
}
LuceneSessionHolder.lease(annotation.readOnly());
try {
return joinPoint.proceed();
} finally {
if (logger.isTraceEnabled()) {
logger.trace("LuceneSessionHolder.release");
}
LuceneSessionHolder.release();
}

View File

@ -1,5 +1,7 @@
package com.mysema.query.lucene.session.impl;
import com.mysema.query.lucene.session.LuceneWriter;
/**
* Helps to make sure the resources are released as they should be.
*
@ -11,6 +13,6 @@ public interface ReleaseListener {
void release(LuceneSearcher searcher);
void close(LuceneWriterImpl writer);
void close(LuceneWriter writer);
}

View File

@ -21,7 +21,6 @@ import org.junit.Test;
import com.mysema.query.lucene.LuceneQuery;
import com.mysema.query.lucene.session.impl.LuceneSearcher;
import com.mysema.query.lucene.session.impl.LuceneSessionFactoryImpl;
import com.mysema.query.lucene.session.impl.LuceneWriterImpl;
import com.mysema.query.lucene.session.impl.ReleaseListener;
import com.mysema.query.types.path.NumberPath;
import com.mysema.query.types.path.StringPath;
@ -98,7 +97,7 @@ public class LuceneSessionFactoryTest {
session.close();
}
@Test(expected=SessionNotBoundException.class)
@Test(expected = SessionNotBoundException.class)
public void CurrentSession() {
sessionFactory.getCurrentSession();
}
@ -146,10 +145,14 @@ public class LuceneSessionFactoryTest {
private class ReleaseCounter implements ReleaseListener {
List<LuceneSearcher> searchers = new ArrayList<LuceneSearcher>();
List<LuceneWriterImpl> writers = new ArrayList<LuceneWriterImpl>();
List<LuceneWriter> writers = new ArrayList<LuceneWriter>();
Map<LuceneSearcher, Integer> leases = new HashMap<LuceneSearcher, Integer>();
Map<LuceneSearcher, Integer> releases = new HashMap<LuceneSearcher, Integer>();
Map<LuceneWriterImpl, Integer> closes = new HashMap<LuceneWriterImpl, Integer>();
Map<LuceneWriter, Integer> closes = new HashMap<LuceneWriter, Integer>();
public void lease(LuceneSearcher searcher) {
if (!searchers.contains(searcher)) {
@ -168,7 +171,7 @@ public class LuceneSessionFactoryTest {
releases.put(searcher, releases.get(searcher) + 1);
}
public void close(LuceneWriterImpl writer) {
public void close(LuceneWriter writer) {
if (!writers.contains(writer)) {
writers.add(writer);
}
@ -178,7 +181,7 @@ public class LuceneSessionFactoryTest {
closes.put(writer, closes.get(writer) + 1);
}
}
@Test
public void Reset() {
addData(sessionFactory);
@ -186,15 +189,15 @@ public class LuceneSessionFactoryTest {
LuceneSession session = sessionFactory.openSession(true);
assertEquals(4, session.createQuery().count());
session.close();
session = sessionFactory.openSession(false);
assertEquals(4, session.createQuery().count());
session.beginReset().addDocument(getDocument());
session.flush();
assertEquals(1, session.createQuery().count());
session.close();
session = sessionFactory.openSession(true);
assertEquals(1, session.createQuery().count());
session.close();
@ -204,50 +207,50 @@ public class LuceneSessionFactoryTest {
public void ResourcesAreReleased() throws IOException {
ReleaseCounter counter = new ReleaseCounter();
sessionFactory = new LuceneSessionFactoryImpl(directory, counter);
LuceneSession session = sessionFactory.openSession(false);
session.beginAppend().addDocument(getDocument());
session.flush();
LuceneQuery query = session.createQuery();
assertEquals(1, query.where(year.gt(1800)).count());
session.beginAppend().addDocument(getDocument());
session.flush();
query = session.createQuery();
assertEquals(2, query.where(year.gt(1800)).count());
session.close();
//Second session
// Second session
session = sessionFactory.openSession(true);
query = session.createQuery();
assertEquals(2, query.where(year.gt(1800)).count());
session.close();
assertEquals(3, counter.leases.size());
assertEquals(3, counter.releases.size());
assertEquals(3, counter.searchers.size());
assertEquals(1, counter.closes.size());
assertEquals(1, counter.writers.size());
//First and second searchers should be released totally
assertEquals(2, (int)counter.leases.get(counter.searchers.get(0)));
assertEquals(2, (int)counter.releases.get(counter.searchers.get(0)));
assertEquals(2, (int)counter.leases.get(counter.searchers.get(1)));
assertEquals(2, (int)counter.releases.get(counter.searchers.get(1)));
// First and second searchers should be released totally
assertEquals(2, (int) counter.leases.get(counter.searchers.get(0)));
assertEquals(2, (int) counter.releases.get(counter.searchers.get(0)));
//Third searcher leaves it as current
assertEquals(2, (int)counter.leases.get(counter.searchers.get(2)));
assertEquals(1, (int)counter.releases.get(counter.searchers.get(2)));
//The writer should be closed
assertEquals(1, (int)counter.closes.get(counter.writers.get(0)));
assertEquals(2, (int) counter.leases.get(counter.searchers.get(1)));
assertEquals(2, (int) counter.releases.get(counter.searchers.get(1)));
// Third searcher leaves it as current
assertEquals(2, (int) counter.leases.get(counter.searchers.get(2)));
assertEquals(1, (int) counter.releases.get(counter.searchers.get(2)));
// The writer should be closed
assertEquals(1, (int) counter.closes.get(counter.writers.get(0)));
}
}

View File

@ -0,0 +1,312 @@
package com.mysema.query.lucene.session;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.aop.aspectj.annotation.AspectJProxyFactory;
import com.mysema.query.lucene.session.impl.LuceneSessionFactoryImpl;
import com.mysema.query.lucene.session.impl.LuceneTransactionHandler;
public class ThreadingTest {
private LuceneSessionFactoryImpl sessionFactory;
private TestDao dao;
private static class Task implements Callable<Throwable> {
TestDao dao;
String action;
Task(TestDao dao, String action) {
this.dao = dao;
this.action = action;
}
@Override
public Throwable call() throws Exception {
try {
if (action.equals("read")) {
dao.read();
}
if (action.equals("write")) {
dao.write();
}
if (action.equals("reset")) {
dao.reset();
}
return null;
} catch (Throwable t) {
return t;
}
}
}
private static interface TestDao {
void write();
void reset();
void read();
}
private static class TestDaoImpl implements TestDao {
private LuceneSessionFactory sessionFactory;
private QDocument doc = new QDocument("d");
int counter = 1;
int readCount = 0;
private final Object writeLock = new Object();
private final Object readLock = new Object();
private final Object resetLock = new Object();
TestDaoImpl(LuceneSessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
@Override
@LuceneTransactional
public void write() {
LuceneSession session = sessionFactory.getCurrentSession();
LuceneWriter writer = session.beginAppend();
int numOfDocs = 1000;
for (int i = 0; i < numOfDocs; i++) {
writer.addDocument(QueryTestHelper.createDocument(
"title " + counter,
"",
"",
counter,
0));
}
synchronized (writeLock) {
counter += numOfDocs;
}
System.out.println("Added " + numOfDocs + " documents ");
}
@Override
@LuceneTransactional
public void reset() {
LuceneSession session = sessionFactory.getCurrentSession();
synchronized (resetLock) {
counter = 0;
}
session.beginReset().addDocument(
QueryTestHelper.createDocument("title " + counter, "", "", counter, 0));
System.out.println("Resetted index ");
}
@Override
@LuceneTransactional(readOnly = true)
public void read() {
LuceneSession session = sessionFactory.getCurrentSession();
int count = (int) session.createQuery().where(doc.title.startsWith("title")).count();
synchronized (readLock) {
readCount++;
if (readCount % 1000 == 0) {
System.out.println("Document count is " + count + " after " + readCount
+ " reads");
}
}
}
}
@Before
public void before() throws IOException {
String path = "target/index";
FileUtils.deleteDirectory(new File(path));
sessionFactory = new LuceneSessionFactoryImpl("target/index");
// Initialize index
QueryTestHelper.addData(sessionFactory);
AspectJProxyFactory factory = new AspectJProxyFactory(new TestDaoImpl(sessionFactory));
factory.addAspect(new LuceneTransactionHandler());
dao = factory.getProxy();
}
private static class TaskRunner implements Callable<List<Throwable>> {
private String action;
private int requestsPerMinute;
private int starDelayInSecs;
public boolean stop = false;
private int clients;
ScheduledExecutorService threads;
TestDao dao;
TaskRunner(String action, int clients, int requestsPerMinute, int startDelayInSecs,
TestDao dao) {
this.action = action;
this.clients = clients;
this.requestsPerMinute = requestsPerMinute;
this.starDelayInSecs = startDelayInSecs;
this.dao = dao;
}
@Override
public List<Throwable> call() throws Exception {
threads = Executors.newScheduledThreadPool(clients);
List<Throwable> errors = new ArrayList<Throwable>();
List<Future<Throwable>> tasks = new ArrayList<Future<Throwable>>();
// Fire tasks
for (int i = 0; i < clients; i++) {
tasks.add(threads.schedule(
new Task(dao, action),
i * starDelayInSecs,
TimeUnit.SECONDS));
}
long startTime = System.currentTimeMillis();
int numOfRequests = 0;
// Take results and fire again
while (tasks.size() > 0) {
int taskReadyIndex = -1;
for (int i = 0; i < tasks.size(); i++) {
try {
tasks.get(i).get(1, TimeUnit.MILLISECONDS);
taskReadyIndex = i;
break;
} catch (TimeoutException e) {
// Ignore
}
}
if (taskReadyIndex == -1) {
continue;
}
Throwable error = tasks.get(taskReadyIndex).get();
tasks.remove(taskReadyIndex);
numOfRequests++;
if (error != null) {
errors.add(error);
}
if (!stop) {
tasks.add(threads.schedule(
new Task(dao, action),
60 / requestsPerMinute,
TimeUnit.SECONDS));
}
}
float elapsedMins = ((float) (System.currentTimeMillis() - startTime) / 1000) / 60;
System.out.println(numOfRequests + " " + action + " requests, " + numOfRequests
/ elapsedMins + " requests/min");
threads.shutdown();
return errors;
}
}
@Test
@Ignore
public void Simultaneous() throws InterruptedException, ExecutionException {
sessionFactory.setDefaultLockTimeout(5000);
ExecutorService threads = Executors.newFixedThreadPool(3);
int simulationtimeInSecs = 60;
// Readers
TaskRunner readRunner = new TaskRunner("read", 100, 120, simulationtimeInSecs / 1000, dao);
Future<List<Throwable>> reads = threads.submit(readRunner);
// Writers
TaskRunner writeRunner = new TaskRunner("write", 1, 6, 0, dao);
Future<List<Throwable>> writes = threads.submit(writeRunner);
// Resetters
TaskRunner resetRunner = new TaskRunner("reset", 1, 2, 15, dao);
Future<List<Throwable>> resets = threads.submit(resetRunner);
Thread.sleep(simulationtimeInSecs * 1000);
readRunner.stop = true;
writeRunner.stop = true;
resetRunner.stop = true;
List<Throwable> readErrors = reads.get();
List<Throwable> writeErrors = writes.get();
List<Throwable> resetErrors = resets.get();
threads.shutdown();
System.out.println("Read errors: " + readErrors.size());
System.out.println("Write errors: " + writeErrors.size());
System.out.println("Reset errors: " + resetErrors.size());
printErrors(readErrors);
printErrors(writeErrors);
printErrors(resetErrors);
assertEquals("Read errors", 0, readErrors.size());
assertEquals("Write errors", 0, writeErrors.size());
assertEquals("Reset errors", 0, resetErrors.size());
//
// Map<Class<?>, Integer> errorTypes = new HashMap<Class<?>, Integer>();
// List<Throwable> errors = new ArrayList<Throwable>();
// for (int i = 0; i < runs; i++) {
// Throwable t = pool.take().get();
// if (t != null) {
// if (!errorTypes.containsKey(t.getClass())) {
// errorTypes.put(t.getClass(), 0);
// }
// errorTypes.put(t.getClass(), errorTypes.get(t.getClass()) + 1);
// errors.add(t);
// }
// }
}
private void printErrors(List<Throwable> errors) {
int count = 0;
for (Throwable t : errors) {
System.out.println("------------------");
System.out.println("");
t.printStackTrace(System.out);
if (count++ > 3) {
break;
}
}
}
}

View File

@ -0,0 +1,9 @@
# Configure an appender that logs to console
log4j.rootLogger=info, A1
log4j.threshold=debug
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c{1} - %m%n
# Configuration of logging levels for different packages
log4j.logger.com.mysema.query.lucene=DEBUG