Skip to content
This repository has been archived by the owner on Jul 4, 2024. It is now read-only.

Commit

Permalink
invalid prepared statement handling fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
grro committed Aug 5, 2015
1 parent bcbc145 commit 89e9ad2
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ protected ExecutionSpec getExecutionSpec() {
return ctx.getExecutionSpec();
}

protected UDTValueMapper getUDTValueMapper() {
return ctx.getUDTValueMapper();
}


InterceptorRegistry getInterceptorRegistry() {
return ctx.getInterceptorRegistry();
Expand Down
50 changes: 37 additions & 13 deletions troilus-core-java7/src/main/java/net/oneandone/troilus/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class Context {

private final ExecutionSpec executionSpec;
private final InterceptorRegistry interceptorRegistry;
private final UDTValueMapper udtValueMapper;
private final BeanMapper beanMapper;
private final Executor executor;
private final MetadataCatalog catalog;
Expand All @@ -60,46 +61,58 @@ private Context(Session session, BeanMapper beanMapper) {
}

private Context(Session session, BeanMapper beanMapper, Executor executor) {
this(new DBSession(session, new MetadataCatalog(session), beanMapper),
new MetadataCatalog(session),
this(session, beanMapper, executor, new MetadataCatalog(session));
}

private Context(Session session, BeanMapper beanMapper, Executor executor, MetadataCatalog catalog) {
this(session, beanMapper, executor, catalog, new DBSession(session, catalog, beanMapper));
}

private Context(Session session, BeanMapper beanMapper, Executor executor, MetadataCatalog catalog, DBSession dbSession) {
this(dbSession,
catalog,
new ExecutionSpecImpl(),
new InterceptorRegistry(),
beanMapper,
new UDTValueMapper(dbSession.getProtocolVersion(), catalog, beanMapper),
executor);
}



private static Executor newTaskExecutor() {
try {
Method commonPoolMeth = ForkJoinPool.class.getMethod("commonPool"); // Java8 method
return (Executor) commonPoolMeth.invoke(ForkJoinPool.class);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
return Executors.newCachedThreadPool();
}
}

private Context(DBSession dbSession,
MetadataCatalog catalog,
ExecutionSpec executionSpec,
InterceptorRegistry interceptorRegistry,
BeanMapper beanMapper,
UDTValueMapper udtValueMapper,
Executor executors) {
this.dbSession = dbSession;
this.catalog = catalog;
this.executionSpec = executionSpec;
this.interceptorRegistry = interceptorRegistry;
this.executor = executors;
this.beanMapper = beanMapper;
this.udtValueMapper = udtValueMapper;
}



private static Executor newTaskExecutor() {
try {
Method commonPoolMeth = ForkJoinPool.class.getMethod("commonPool"); // Java8 method
return (Executor) commonPoolMeth.invoke(ForkJoinPool.class);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
return Executors.newCachedThreadPool();
}
}


Context withInterceptor(QueryInterceptor interceptor) {
return new Context(dbSession,
catalog,
executionSpec,
interceptorRegistry.withInterceptor(interceptor),
beanMapper,
udtValueMapper,
executor);

}
Expand All @@ -110,6 +123,7 @@ Context withSerialConsistency(ConsistencyLevel consistencyLevel) {
executionSpec.withSerialConsistency(consistencyLevel),
interceptorRegistry,
beanMapper,
udtValueMapper,
executor);
}

Expand All @@ -119,6 +133,7 @@ Context withTtl(int ttlSec) {
executionSpec.withTtl(ttlSec),
interceptorRegistry,
beanMapper,
udtValueMapper,
executor);
}

Expand All @@ -128,6 +143,7 @@ Context withWritetime(long microsSinceEpoch) {
executionSpec.withWritetime(microsSinceEpoch),
interceptorRegistry,
beanMapper,
udtValueMapper,
executor);
}

Expand All @@ -137,6 +153,7 @@ Context withTracking() {
executionSpec.withTracking(),
interceptorRegistry,
beanMapper,
udtValueMapper,
executor);
}

Expand All @@ -146,6 +163,7 @@ Context withoutTracking() {
executionSpec.withoutTracking(),
interceptorRegistry,
beanMapper,
udtValueMapper,
executor);
}

Expand All @@ -155,6 +173,7 @@ Context withRetryPolicy(RetryPolicy policy) {
executionSpec.withRetryPolicy(policy),
interceptorRegistry,
beanMapper,
udtValueMapper,
executor);
}

Expand All @@ -164,6 +183,7 @@ Context withConsistency(ConsistencyLevel consistencyLevel) {
executionSpec.withConsistency(consistencyLevel),
interceptorRegistry,
beanMapper,
udtValueMapper,
executor);
}

Expand All @@ -178,6 +198,10 @@ MetadataCatalog getCatalog() {
return catalog;
}

UDTValueMapper getUDTValueMapper() {
return udtValueMapper;
}

ExecutionSpec getExecutionSpec() {
return executionSpec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,11 @@
package net.oneandone.troilus;



import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.DataType;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.ResultSet;
Expand All @@ -40,8 +34,6 @@
import com.google.common.base.MoreObjects;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

Expand All @@ -54,10 +46,8 @@ public class DBSession {
private static final Logger LOG = LoggerFactory.getLogger(DBSession.class);

private final Session session;
private final MetadataCatalog catalog;
private final boolean isKeyspacenameAssigned;
private final String keyspacename;
private final UDTValueMapper udtValueMapper;
private final PreparedStatementCache preparedStatementCache;

private final AtomicLong lastCacheCleanTime = new AtomicLong(0);
Expand All @@ -73,13 +63,10 @@ public class DBSession {
*/
DBSession(Session session, MetadataCatalog catalog, BeanMapper beanMapper) {
this.session = session;
this.catalog = catalog;

this.keyspacename = session.getLoggedKeyspace();
this.isKeyspacenameAssigned = (keyspacename != null);

//this.udtValueMapper = new UDTValueMapper(session.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion(), beanMapper);
this.udtValueMapper = new UDTValueMapper(session.getCluster().getConfiguration().getProtocolOptions().getProtocolVersionEnum(), beanMapper);
this.preparedStatementCache = new PreparedStatementCache(session);
}

Expand Down Expand Up @@ -112,13 +99,6 @@ ProtocolVersion getProtocolVersion() {
return getSession().getCluster().getConfiguration().getProtocolOptions().getProtocolVersionEnum();
}

/**
* @return the udtvalue mapper
*/
UDTValueMapper getUDTValueMapper() {
return udtValueMapper;
}


/**
* @param statement the statement to prepare
Expand Down Expand Up @@ -159,65 +139,6 @@ public ListenableFuture<ResultSet> executeAsync(Statement statement) {
}


/**
* @param tablename the table name
* @param name the columnname
* @param value the value
* @return the mapped value
*/
Object toStatementValue(Tablename tablename, String name, Object value) {
if (isNullOrEmpty(value)) {
return null;
}

DataType dataType = catalog.getColumnMetadata(tablename, name).getType();

// build in
if (UDTValueMapper.isBuildInType(dataType)) {

// enum
if (DataTypes.isTextDataType(dataType) && Enum.class.isAssignableFrom(value.getClass())) {
return value.toString();
}

// byte buffer (byte[])
if (dataType.equals(DataType.blob()) && byte[].class.isAssignableFrom(value.getClass())) {
return ByteBuffer.wrap((byte[]) value);
}


return value;

// udt
} else {
return getUDTValueMapper().toUdtValue(tablename, catalog, catalog.getColumnMetadata(tablename, name).getType(), value);
}
}

/**
* @param tablename the tablename
* @param name the columnname
* @param values the vlaues
* @return the mapped values
*/
ImmutableList<Object> toStatementValues(Tablename tablename, String name, ImmutableList<Object> values) {
List<Object> result = Lists.newArrayList();

for (Object value : values) {
result.add(toStatementValue(tablename, name, value));
}

return ImmutableList.copyOf(result);
}


private boolean isNullOrEmpty(Object value) {
return (value == null) ||
(Collection.class.isAssignableFrom(value.getClass()) && ((Collection<?>) value).isEmpty()) ||
(Map.class.isAssignableFrom(value.getClass()) && ((Map<?, ?>) value).isEmpty());
}



@Override
public String toString() {
Expand Down Expand Up @@ -279,7 +200,4 @@ public String toString() {
return Joiner.on(", ").withKeyValueSeparator("=").join(preparedStatementCache.asMap());
}
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public ListenableFuture<Statement> apply(DeleteQueryData queryData) {
if (queryData == null) {
throw new NullPointerException();
}
return DeleteQueryDataImpl.toStatementAsync(queryData, getExecutionSpec(), dbSession);
return DeleteQueryDataImpl.toStatementAsync(queryData, getExecutionSpec(), getUDTValueMapper(), dbSession);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public Boolean getIfExists() {
* @param ctx the context
* @return the query data statement
*/
static ListenableFuture<Statement> toStatementAsync(DeleteQueryData data, ExecutionSpec executionSpec, DBSession dbSession) {
static ListenableFuture<Statement> toStatementAsync(DeleteQueryData data, ExecutionSpec executionSpec, UDTValueMapper udtValueMapper, DBSession dbSession) {

Delete delete = (data.getTablename().getKeyspacename() == null) ? delete().from(data.getTablename().getTablename())
: delete().from(data.getTablename().getKeyspacename(), data.getTablename().getTablename());
Expand All @@ -162,7 +162,7 @@ static ListenableFuture<Statement> toStatementAsync(DeleteQueryData data, Execut
Clause keybasedWhereClause = eq(entry.getKey(), bindMarker());
delete.where(keybasedWhereClause);

values.add(dbSession.toStatementValue(data.getTablename(), entry.getKey(), entry.getValue()));
values.add(udtValueMapper.toStatementValue(data.getTablename(), entry.getKey(), entry.getValue()));
}

ListenableFuture<PreparedStatement> preparedStatementFuture = dbSession.prepareAsync(delete);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public ListenableFuture<ResultList<Record>> apply(ReadQueryData querData) {

private ListenableFuture<ResultList<Record>> executeAsync(final ReadQueryData queryData, DBSession dbSession) {
// perform query
ListenableFuture<ResultSet> resultSetFuture = performAsync(dbSession, ReadQueryDataImpl.toStatementAsync(queryData, dbSession));
ListenableFuture<ResultSet> resultSetFuture = performAsync(dbSession, ReadQueryDataImpl.toStatementAsync(queryData, getUDTValueMapper(), dbSession));

// result set to record list mapper
Function<ResultSet, ResultList<Record>> resultSetToRecordList = new Function<ResultSet, ResultList<Record>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public Boolean getDistinct() {
* @param ctx the context
* @return the query as statement
*/
static ListenableFuture<Statement> toStatementAsync(ReadQueryData data, DBSession dbSession) {
static ListenableFuture<Statement> toStatementAsync(ReadQueryData data, UDTValueMapper udtValueMapper, DBSession dbSession) {
Select.Selection selection = select();

if ((data.getDistinct() != null) && data.getDistinct()) {
Expand Down Expand Up @@ -286,10 +286,10 @@ static ListenableFuture<Statement> toStatementAsync(ReadQueryData data, DBSessio
for (Entry<String, ImmutableList<Object>> entry : data.getKeys().entrySet()) {
if (entry.getValue().size() == 1) {
select.where(eq(entry.getKey(), bindMarker()));
values.add(dbSession.toStatementValue(data.getTablename(), entry.getKey(), entry.getValue().get(0)));
values.add(udtValueMapper.toStatementValue(data.getTablename(), entry.getKey(), entry.getValue().get(0)));
} else {
select.where(in(entry.getKey(), bindMarker()));
values.add(dbSession.toStatementValues(data.getTablename(), entry.getKey(), entry.getValue()));
values.add(udtValueMapper.toStatementValues(data.getTablename(), entry.getKey(), entry.getValue()));
}

}
Expand Down
Loading

0 comments on commit 89e9ad2

Please sign in to comment.