Improve batch handling

This commit is contained in:
Timo Westkämper 2014-07-10 22:53:47 +03:00
parent 4e11409f28
commit f21e7b24da
10 changed files with 214 additions and 147 deletions

View File

@ -17,6 +17,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -24,7 +25,9 @@ import com.google.common.collect.ImmutableList;
import com.mysema.query.QueryMetadata;
import com.mysema.query.dml.DMLClause;
import com.mysema.query.sql.*;
import com.mysema.query.types.*;
import com.mysema.query.types.ParamExpression;
import com.mysema.query.types.ParamNotSetException;
import com.mysema.query.types.Path;
/**
* AbstractSQLClause is a superclass for SQL based DMLClause implementations
@ -49,14 +52,6 @@ public abstract class AbstractSQLClause<C extends AbstractSQLClause<C>> implemen
this.useLiterals = configuration.getUseLiterals();
}
protected abstract void assertNoTemplateExpressionsInBatch();
protected void assertNoTemplateExpressionInBatch(Expression<?> expr) {
if (expr instanceof TemplateExpression) {
throw new IllegalArgumentException("Template expressions are not allowed in batch statements");
}
}
/**
* @param listener
*/
@ -123,7 +118,7 @@ public abstract class AbstractSQLClause<C extends AbstractSQLClause<C>> implemen
}
}
protected long executeBatch(PreparedStatement stmt) throws SQLException {
private long executeBatch(PreparedStatement stmt) throws SQLException {
if (configuration.getTemplates().isBatchCountViaGetUpdateCount()) {
stmt.executeBatch();
return stmt.getUpdateCount();
@ -136,6 +131,14 @@ public abstract class AbstractSQLClause<C extends AbstractSQLClause<C>> implemen
}
}
protected long executeBatch(Collection<PreparedStatement> stmts) throws SQLException {
long rv = 0;
for (PreparedStatement stmt : stmts) {
rv += executeBatch(stmt);
}
return rv;
}
protected void close(Statement stmt) {
try {
stmt.close();
@ -144,6 +147,12 @@ public abstract class AbstractSQLClause<C extends AbstractSQLClause<C>> implemen
}
}
protected void close(Collection<? extends Statement> stmts) {
for (Statement stmt : stmts) {
close(stmt);
}
}
protected void close(ResultSet rs) {
try {
rs.close();

View File

@ -18,9 +18,12 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.mysema.query.*;
import com.mysema.query.QueryFlag.Position;
import com.mysema.query.dml.DeleteClause;
@ -99,7 +102,6 @@ public class SQLDeleteClause extends AbstractSQLClause<SQLDeleteClause> implemen
* @return
*/
public SQLDeleteClause addBatch() {
assertNoTemplateExpressionsInBatch();
batches.add(metadata);
metadata = new DefaultQueryMetadata();
metadata.addJoin(JoinType.DEFAULT, entity);
@ -107,55 +109,61 @@ public class SQLDeleteClause extends AbstractSQLClause<SQLDeleteClause> implemen
return this;
}
@Override
protected void assertNoTemplateExpressionsInBatch() {
assertNoTemplateExpressionInBatch(metadata.getWhere());
private PreparedStatement createStatement() throws SQLException{
SQLSerializer serializer = createSerializer();
serializer.serializeDelete(metadata, entity);
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
PreparedStatement stmt = connection.prepareStatement(queryString);
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
return stmt;
}
private PreparedStatement createStatement() throws SQLException{
PreparedStatement stmt;
if (batches.isEmpty()) {
SQLSerializer serializer = createSerializer();
serializer.serializeDelete(metadata, entity);
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
stmt = connection.prepareStatement(queryString);
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
} else {
SQLSerializer serializer = createSerializer();
serializer.serializeDelete(batches.get(0), entity);
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
private Collection<PreparedStatement> createStatements() throws SQLException {
SQLSerializer serializer = createSerializer();
serializer.serializeDelete(batches.get(0), entity);
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
// add first batch
stmt = connection.prepareStatement(queryString);
Map<String, PreparedStatement> stmts = Maps.newHashMap();
// add first batch
PreparedStatement stmt = connection.prepareStatement(queryString);
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
stmt.addBatch();
stmts.put(queryString, stmt);
// add other batches
for (int i = 1; i < batches.size(); i++) {
serializer = createSerializer();
serializer.serializeDelete(batches.get(i), entity);
stmt = stmts.get(serializer.toString());
if (stmt == null) {
stmt = connection.prepareStatement(serializer.toString());
stmts.put(serializer.toString(), stmt);
}
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
stmt.addBatch();
// add other batches
for (int i = 1; i < batches.size(); i++) {
serializer = createSerializer();
serializer.serializeDelete(batches.get(i), entity);
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
stmt.addBatch();
}
}
return stmt;
return stmts.values();
}
@Override
public long execute() {
PreparedStatement stmt = null;
Collection<PreparedStatement> stmts = null;
try {
stmt = createStatement();
if (batches.isEmpty()) {
stmt = createStatement();
listeners.notifyDelete(entity, metadata);
return stmt.executeUpdate();
} else {
stmts = createStatements();
listeners.notifyDeletes(entity, batches);
return executeBatch(stmt);
return executeBatch(stmts);
}
} catch (SQLException e) {
throw configuration.translate(queryString, constants, e);
@ -163,6 +171,9 @@ public class SQLDeleteClause extends AbstractSQLClause<SQLDeleteClause> implemen
if (stmt != null) {
close(stmt);
}
if (stmts != null) {
close(stmts);
}
}
}

View File

@ -15,12 +15,10 @@ package com.mysema.query.sql.dml;
import javax.annotation.Nullable;
import java.sql.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.mysema.query.DefaultQueryMetadata;
import com.mysema.query.JoinType;
import com.mysema.query.QueryFlag;
@ -121,7 +119,6 @@ public class SQLInsertClause extends AbstractSQLClause<SQLInsertClause> implemen
* @return
*/
public SQLInsertClause addBatch() {
assertNoTemplateExpressionsInBatch();
if (subQueryBuilder != null) {
subQuery = subQueryBuilder.list(values.toArray(new Expression[values.size()]));
values.clear();
@ -133,13 +130,6 @@ public class SQLInsertClause extends AbstractSQLClause<SQLInsertClause> implemen
return this;
}
@Override
protected void assertNoTemplateExpressionsInBatch() {
for (Expression<?> expr : values) {
assertNoTemplateExpressionInBatch(expr);
}
}
@Override
public SQLInsertClause columns(Path<?>... columns) {
this.columns.addAll(Arrays.asList(columns));
@ -229,30 +219,45 @@ public class SQLInsertClause extends AbstractSQLClause<SQLInsertClause> implemen
subQuery = subQueryBuilder.list(values.toArray(new Expression[values.size()]));
values.clear();
}
PreparedStatement stmt = null;
if (batches.isEmpty()) {
serializer.serializeInsert(metadata, entity, columns, values, subQuery);
stmt = prepareStatementAndSetParameters(serializer, withKeys);
} else {
serializer.serializeInsert(metadata, entity, batches.get(0).getColumns(), batches
.get(0).getValues(), batches.get(0).getSubQuery());
stmt = prepareStatementAndSetParameters(serializer, withKeys);
// add first batch
stmt.addBatch();
serializer.serializeInsert(metadata, entity, columns, values, subQuery);
return prepareStatementAndSetParameters(serializer, withKeys);
}
// add other batches
for (int i = 1; i < batches.size(); i++) {
SQLInsertBatch batch = batches.get(i);
serializer = createSerializer();
serializer.serializeInsert(metadata, entity, batch.getColumns(),
batch.getValues(), batch.getSubQuery());
private Collection<PreparedStatement> createStatements(boolean withKeys) throws SQLException {
if (subQueryBuilder != null) {
subQuery = subQueryBuilder.list(values.toArray(new Expression[values.size()]));
values.clear();
}
Map<String, PreparedStatement> stmts = Maps.newHashMap();
// add first batch
SQLSerializer serializer = createSerializer();
serializer.serializeInsert(metadata, entity, batches.get(0).getColumns(), batches
.get(0).getValues(), batches.get(0).getSubQuery());
PreparedStatement stmt = prepareStatementAndSetParameters(serializer, withKeys);
stmt.addBatch();
stmts.put(serializer.toString(), stmt);
// add other batches
for (int i = 1; i < batches.size(); i++) {
SQLInsertBatch batch = batches.get(i);
serializer = createSerializer();
serializer.serializeInsert(metadata, entity, batch.getColumns(),
batch.getValues(), batch.getSubQuery());
stmt = stmts.get(serializer.toString());
if (stmt == null) {
stmt = prepareStatementAndSetParameters(serializer, withKeys);
stmts.put(serializer.toString(), stmt);
} else {
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(),
metadata.getParams());
stmt.addBatch();
}
stmt.addBatch();
}
return stmt;
return stmts.values();
}
private PreparedStatement prepareStatementAndSetParameters(SQLSerializer serializer,
@ -288,14 +293,21 @@ public class SQLInsertClause extends AbstractSQLClause<SQLInsertClause> implemen
*/
public ResultSet executeWithKeys() {
try {
final PreparedStatement stmt = createStatement(true);
PreparedStatement stmt = null;
Collection<PreparedStatement> stmts = null;
if (batches.isEmpty()) {
stmt = createStatement(true);
listeners.notifyInsert(entity, metadata, columns, values, subQuery);
stmt.executeUpdate();
} else {
stmts = createStatements(true);
listeners.notifyInserts(entity, metadata, batches);
stmt.executeBatch();
}
if (stmts != null && stmts.size() > 1) {
throw new IllegalStateException("executeWithKeys called with batch statement and multiple SQL strings");
}
final Statement stmt2 = stmts != null ? stmts.iterator().next() : stmt;
ResultSet rs = stmt.getGeneratedKeys();
return new ResultSetAdapter(rs) {
@Override
@ -303,7 +315,7 @@ public class SQLInsertClause extends AbstractSQLClause<SQLInsertClause> implemen
try {
super.close();
} finally {
stmt.close();
stmt2.close();
}
}
};
@ -315,14 +327,16 @@ public class SQLInsertClause extends AbstractSQLClause<SQLInsertClause> implemen
@Override
public long execute() {
PreparedStatement stmt = null;
Collection<PreparedStatement> stmts = null;
try {
stmt = createStatement(false);
if (batches.isEmpty()) {
stmt = createStatement(false);
listeners.notifyInsert(entity, metadata, columns, values, subQuery);
return stmt.executeUpdate();
} else {
stmts = createStatements(false);
listeners.notifyInserts(entity, metadata, batches);
return executeBatch(stmt);
return executeBatch(stmts);
}
} catch (SQLException e) {
throw configuration.translate(queryString, constants, e);
@ -330,6 +344,9 @@ public class SQLInsertClause extends AbstractSQLClause<SQLInsertClause> implemen
if (stmt != null) {
close(stmt);
}
if (stmts != null) {
close(stmts);
}
}
}

View File

@ -14,15 +14,11 @@
package com.mysema.query.sql.dml;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.sql.*;
import java.util.*;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.mysema.query.DefaultQueryMetadata;
import com.mysema.query.JoinType;
import com.mysema.query.QueryFlag;
@ -108,7 +104,6 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
* @return
*/
public SQLMergeClause addBatch() {
assertNoTemplateExpressionsInBatch();
if (!configuration.getTemplates().isNativeMerge()) {
throw new IllegalStateException("batch only supported for databases that support native merge");
}
@ -121,19 +116,6 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
return this;
}
@Override
protected void assertNoTemplateExpressionsInBatch() {
for (Expression<?> expr : keys) {
assertNoTemplateExpressionInBatch(expr);
}
for (Expression<?> expr : columns) {
assertNoTemplateExpressionInBatch(expr);
}
for (Expression<?> expr : values) {
assertNoTemplateExpressionInBatch(expr);
}
}
public SQLMergeClause columns(Path<?>... columns) {
this.columns.addAll(Arrays.asList(columns));
return this;
@ -221,14 +203,21 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
public ResultSet executeWithKeys() {
try {
if (configuration.getTemplates().isNativeMerge()) {
final PreparedStatement stmt = createStatement(true);
PreparedStatement stmt = null;
Collection<PreparedStatement> stmts = null;
if (batches.isEmpty()) {
stmt = createStatement(true);
listeners.notifyMerge(entity, metadata, keys, columns, values, subQuery);
stmt.executeUpdate();
} else {
stmts = createStatements(true);
listeners.notifyMerges(entity, metadata, batches);
stmt.executeBatch();
}
if (stmts != null && stmts.size() > 1) {
throw new IllegalStateException("executeWithKeys called with batch statement and multiple SQL strings");
}
final Statement stmt2 = stmts != null ? stmts.iterator().next() : stmt;
ResultSet rs = stmt.getGeneratedKeys();
return new ResultSetAdapter(rs) {
@Override
@ -236,7 +225,7 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
try {
super.close();
} finally {
stmt.close();
stmt2.close();
}
}
};
@ -324,7 +313,7 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
}
}
private PreparedStatement createStatement(boolean withKeys) throws SQLException{
private PreparedStatement createStatement(boolean withKeys) throws SQLException {
SQLSerializer serializer = createSerializer();
PreparedStatement stmt = null;
if (batches.isEmpty()) {
@ -351,6 +340,37 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
return stmt;
}
private Collection<PreparedStatement> createStatements(boolean withKeys) throws SQLException {
Map<String, PreparedStatement> stmts = Maps.newHashMap();
// add first batch
SQLSerializer serializer = createSerializer();
serializer.serializeMerge(metadata, entity,
batches.get(0).getKeys(), batches.get(0).getColumns(),
batches.get(0).getValues(), batches.get(0).getSubQuery());
PreparedStatement stmt = prepareStatementAndSetParameters(serializer, withKeys);
stmts.put(serializer.toString(), stmt);
stmt.addBatch();
// add other batches
for (int i = 1; i < batches.size(); i++) {
SQLMergeBatch batch = batches.get(i);
serializer = createSerializer();
serializer.serializeMerge(metadata, entity,
batch.getKeys(), batch.getColumns(), batch.getValues(), batch.getSubQuery());
stmt = stmts.get(serializer.toString());
if (stmt == null) {
stmt = prepareStatementAndSetParameters(serializer, withKeys);
stmts.put(serializer.toString(), stmt);
} else {
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
}
stmt.addBatch();
}
return stmts.values();
}
private PreparedStatement prepareStatementAndSetParameters(SQLSerializer serializer,
boolean withKeys) throws SQLException {
queryString = serializer.toString();
@ -372,14 +392,16 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
private long executeNativeMerge() {
PreparedStatement stmt = null;
Collection<PreparedStatement> stmts = null;
try {
stmt = createStatement(false);
if (batches.isEmpty()) {
stmt = createStatement(false);
listeners.notifyMerge(entity, metadata, keys, columns, values, subQuery);
return stmt.executeUpdate();
} else {
stmts = createStatements(false);
listeners.notifyMerges(entity, metadata, batches);
return executeBatch(stmt);
return executeBatch(stmts);
}
} catch (SQLException e) {
throw configuration.translate(queryString, constants, e);
@ -387,6 +409,9 @@ public class SQLMergeClause extends AbstractSQLClause<SQLMergeClause> implements
if (stmt != null) {
close(stmt);
}
if (stmts != null) {
close(stmts);
}
}
}

View File

@ -20,6 +20,7 @@ import java.sql.SQLException;
import java.util.*;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.mysema.commons.lang.Pair;
import com.mysema.query.*;
import com.mysema.query.QueryFlag.Position;
@ -95,7 +96,6 @@ public class SQLUpdateClause extends AbstractSQLClause<SQLUpdateClause> implemen
* @return
*/
public SQLUpdateClause addBatch() {
assertNoTemplateExpressionsInBatch();
batches.add(new SQLUpdateBatch(metadata, updates));
updates = new ArrayList<Pair<Path<?>,Expression<?>>>();
metadata = new DefaultQueryMetadata();
@ -103,57 +103,61 @@ public class SQLUpdateClause extends AbstractSQLClause<SQLUpdateClause> implemen
return this;
}
protected void assertNoTemplateExpressionsInBatch() {
for (Pair<Path<?>, Expression<?>> pair : updates) {
assertNoTemplateExpressionInBatch(pair.getSecond());
}
assertNoTemplateExpressionInBatch(metadata.getWhere());
private PreparedStatement createStatement() throws SQLException{
SQLSerializer serializer = createSerializer();
serializer.serializeUpdate(metadata, entity, updates);
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
PreparedStatement stmt = connection.prepareStatement(queryString);
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
return stmt;
}
private PreparedStatement createStatement() throws SQLException{
PreparedStatement stmt;
if (batches.isEmpty()) {
SQLSerializer serializer = createSerializer();
serializer.serializeUpdate(metadata, entity, updates);
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
stmt = connection.prepareStatement(queryString);
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
} else {
SQLSerializer serializer = createSerializer();
serializer.serializeUpdate(batches.get(0).getMetadata(), entity, batches.get(0).getUpdates());
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
private Collection<PreparedStatement> createStatements() throws SQLException {
SQLSerializer serializer = createSerializer();
serializer.serializeUpdate(batches.get(0).getMetadata(), entity, batches.get(0).getUpdates());
queryString = serializer.toString();
constants = serializer.getConstants();
logger.debug(queryString);
// add first batch
stmt = connection.prepareStatement(queryString);
Map<String, PreparedStatement> stmts = Maps.newHashMap();
// add first batch
PreparedStatement stmt = connection.prepareStatement(queryString);
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
stmt.addBatch();
stmts.put(serializer.toString(), stmt);
// add other batches
for (int i = 1; i < batches.size(); i++) {
serializer = createSerializer();
serializer.serializeUpdate(batches.get(i).getMetadata(), entity, batches.get(i).getUpdates());
stmt = stmts.get(serializer.toString());
if (stmt == null) {
stmt = connection.prepareStatement(queryString);
stmts.put(queryString, stmt);
}
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
stmt.addBatch();
// add other batches
for (int i = 1; i < batches.size(); i++) {
serializer = createSerializer();
serializer.serializeUpdate(batches.get(i).getMetadata(), entity, batches.get(i).getUpdates());
setParameters(stmt, serializer.getConstants(), serializer.getConstantPaths(), metadata.getParams());
stmt.addBatch();
}
}
return stmt;
return stmts.values();
}
@Override
public long execute() {
PreparedStatement stmt = null;
Collection<PreparedStatement> stmts = null;
try {
stmt = createStatement();
if (batches.isEmpty()) {
stmt = createStatement();
listeners.notifyUpdate(entity, metadata, updates);
return stmt.executeUpdate();
} else {
stmts = createStatements();
listeners.notifyUpdates(entity, batches);
return executeBatch(stmt);
return executeBatch(stmts);
}
} catch (SQLException e) {
throw configuration.translate(queryString, constants, e);
@ -161,6 +165,9 @@ public class SQLUpdateClause extends AbstractSQLClause<SQLUpdateClause> implemen
if (stmt != null) {
close(stmt);
}
if (stmts != null) {
close(stmts);
}
}
}

View File

@ -96,11 +96,6 @@ public class SetQueryBandClause extends AbstractSQLClause<SetQueryBandClause> {
}
}
@Override
protected void assertNoTemplateExpressionsInBatch() {
// do nothing
}
@Override
public List<SQLBindings> getSQL() {
SQLBindings bindings;

View File

@ -111,7 +111,7 @@ public class DeleteBase extends AbstractBaseTest{
}
@Test(expected=IllegalArgumentException.class)
@Test
public void Delete_With_TempateExpression_In_Batch() {
delete(survey)
.where(Expressions.booleanTemplate("true"))

View File

@ -397,7 +397,7 @@ public class InsertBase extends AbstractBaseTest {
clause.execute();
}
@Test(expected=IllegalArgumentException.class)
@Test
public void Insert_With_TempateExpression_In_Batch() {
insert(survey)
.set(survey.id, 3)

View File

@ -164,13 +164,16 @@ public class MergeBase extends AbstractBaseTest{
// assertEquals(1, insert.execute());
}
@Test(expected=IllegalArgumentException.class)
@Test
@IncludeIn(H2)
public void Merge_With_TempateExpression_In_Batch() {
SQLMergeClause merge = merge(survey)
.keys(survey.id)
.set(survey.id, 5)
.set(survey.name, Expressions.stringTemplate("'5'"))
.addBatch();
merge.execute();
}

View File

@ -168,7 +168,7 @@ public class UpdateBase extends AbstractBaseTest {
update.execute();
}
@Test(expected=IllegalArgumentException.class)
@Test
public void Update_With_TempateExpression_In_Batch() {
update(survey)
.set(survey.id, 3)