Skip to content

Commit 9bfb2b0

Browse files
committed
TEIID-2322 switching reusable executions to a pool paradigm rather than
relying on partid
1 parent 22d025a commit 9bfb2b0

File tree

4 files changed

+50
-21
lines changed

4 files changed

+50
-21
lines changed

engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorManager.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
import org.teiid.dqp.message.AtomicRequestID;
3434
import org.teiid.dqp.message.AtomicRequestMessage;
3535
import org.teiid.logging.CommandLogMessage;
36+
import org.teiid.logging.CommandLogMessage.Event;
3637
import org.teiid.logging.LogConstants;
3738
import org.teiid.logging.LogManager;
3839
import org.teiid.logging.MessageLevel;
39-
import org.teiid.logging.CommandLogMessage.Event;
4040
import org.teiid.metadata.FunctionMethod;
4141
import org.teiid.query.QueryPlugin;
4242
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
@@ -57,6 +57,7 @@ public class ConnectorManager {
5757

5858
private String translatorName;
5959
private String connectionName;
60+
private List<String> id;
6061

6162
// known requests
6263
private ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem> requestStates = new ConcurrentHashMap<AtomicRequestID, ConnectorWorkItem>();
@@ -69,6 +70,7 @@ public class ConnectorManager {
6970
public ConnectorManager(String translatorName, String connectionName) {
7071
this.translatorName = translatorName;
7172
this.connectionName = connectionName;
73+
this.id = Arrays.asList(translatorName, connectionName);
7274
}
7375

7476
public String getStausMessage() {
@@ -107,7 +109,7 @@ public SourceCapabilities getCapabilities() throws TeiidComponentException {
107109

108110
checkStatus();
109111
ExecutionFactory<Object, Object> translator = getExecutionFactory();
110-
BasicSourceCapabilities resultCaps = CapabilitiesConverter.convertCapabilities(translator, Arrays.asList(translatorName, connectionName));
112+
BasicSourceCapabilities resultCaps = CapabilitiesConverter.convertCapabilities(translator, id);
111113
cachedCapabilities = resultCaps;
112114
return resultCaps;
113115
}
@@ -245,4 +247,8 @@ public String getConnectionName() {
245247
return this.connectionName;
246248
}
247249

250+
public List<String> getId() {
251+
return id;
252+
}
253+
248254
}

engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ public void close() {
156156
try {
157157
if (execution != null) {
158158
execution.close();
159-
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed execution"}); //$NON-NLS-1$
159+
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed execution"}); //$NON-NLS-1$
160+
if (execution instanceof ReusableExecution<?>) {
161+
this.requestMsg.getCommandContext().putReusableExecution(this.manager.getId(), (ReusableExecution<?>) execution);
162+
}
160163
}
161164
} catch (Throwable e) {
162165
LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
@@ -238,14 +241,11 @@ public void execute() throws TranslatorException {
238241
this.expectedColumns = ((StoredProcedure)command).getResultSetColumns().size();
239242
}
240243

241-
Execution exec = this.requestMsg.getCommandContext().getReusableExecution(this.securityContext.getPartIdentifier());
244+
Execution exec = this.requestMsg.getCommandContext().getReusableExecution(this.manager.getId());
242245
if (exec != null) {
243246
((ReusableExecution)exec).reset(translatedCommand, this.securityContext, connection);
244247
} else {
245248
exec = connector.createExecution(translatedCommand, this.securityContext, queryMetadata, (unwrapped == null) ? this.connection:unwrapped);
246-
if (exec instanceof ReusableExecution<?>) {
247-
this.requestMsg.getCommandContext().putReusableExecution(this.securityContext.getPartIdentifier(), (ReusableExecution<?>) exec);
248-
}
249249
}
250250
setExecution(command, translatedCommand, exec);
251251

engine/src/main/java/org/teiid/query/util/CommandContext.java

+22-12
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.text.DecimalFormat;
2727
import java.text.SimpleDateFormat;
2828
import java.util.*;
29-
import java.util.concurrent.ConcurrentHashMap;
3029
import java.util.concurrent.Executor;
3130
import java.util.concurrent.atomic.AtomicLong;
3231

@@ -136,7 +135,7 @@ private static class GlobalState {
136135
private TransactionService transactionService;
137136
private SourceHint sourceHint;
138137
private Executor executor = ExecutorUtils.getDirectExecutor();
139-
Map<String, ReusableExecution<?>> reusableExecutions;
138+
Map<Object, List<ReusableExecution<?>>> reusableExecutions;
140139
Set<CommandListener> commandListeners = null;
141140
private LRUCache<String, DecimalFormat> decimalFormatCache;
142141
private LRUCache<String, SimpleDateFormat> dateFormatCache;
@@ -647,32 +646,43 @@ public void setExecutor(Executor e) {
647646
this.globalState.executor = e;
648647
}
649648

650-
public ReusableExecution<?> getReusableExecution(String nodeId) {
649+
public ReusableExecution<?> getReusableExecution(Object key) {
651650
synchronized (this.globalState) {
652651
if (this.globalState.reusableExecutions == null) {
653652
return null;
654653
}
655-
return this.globalState.reusableExecutions.get(nodeId);
654+
List<ReusableExecution<?>> reusableExecutions = this.globalState.reusableExecutions.get(key);
655+
if (reusableExecutions != null && !reusableExecutions.isEmpty()) {
656+
return reusableExecutions.remove(0);
657+
}
658+
return null;
656659
}
657660
}
658661

659-
public void putReusableExecution(String nodeId, ReusableExecution<?> execution) {
662+
public void putReusableExecution(Object key, ReusableExecution<?> execution) {
660663
synchronized (this.globalState) {
661664
if (this.globalState.reusableExecutions == null) {
662-
this.globalState.reusableExecutions = new ConcurrentHashMap<String, ReusableExecution<?>>();
665+
this.globalState.reusableExecutions = new HashMap<Object, List<ReusableExecution<?>>>();
666+
}
667+
List<ReusableExecution<?>> reusableExecutions = this.globalState.reusableExecutions.get(key);
668+
if (reusableExecutions == null) {
669+
reusableExecutions = new LinkedList<ReusableExecution<?>>();
670+
this.globalState.reusableExecutions.put(key, reusableExecutions);
663671
}
664-
this.globalState.reusableExecutions.put(nodeId, execution);
672+
reusableExecutions.add(execution);
665673
}
666674
}
667675

668676
public void close() {
669677
synchronized (this.globalState) {
670678
if (this.globalState.reusableExecutions != null) {
671-
for (ReusableExecution<?> reusableExecution : this.globalState.reusableExecutions.values()) {
672-
try {
673-
reusableExecution.dispose();
674-
} catch (Exception e) {
675-
LogManager.logWarning(LogConstants.CTX_DQP, e, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30030));
679+
for (List<ReusableExecution<?>> reusableExecutions : this.globalState.reusableExecutions.values()) {
680+
for (ReusableExecution<?> reusableExecution : reusableExecutions) {
681+
try {
682+
reusableExecution.dispose();
683+
} catch (Exception e) {
684+
LogManager.logWarning(LogConstants.CTX_DQP, e, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30030));
685+
}
676686
}
677687
}
678688
this.globalState.reusableExecutions.clear();

test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestExecutionReuse.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
*/
2222
package org.teiid.dqp.internal.process;
2323

24-
import static org.junit.Assert.assertEquals;
25-
import static org.junit.Assert.assertTrue;
24+
import static org.junit.Assert.*;
2625

2726
import java.sql.Connection;
2827
import java.sql.ResultSet;
@@ -50,6 +49,7 @@
5049
import org.teiid.language.Command;
5150
import org.teiid.language.QueryExpression;
5251
import org.teiid.metadata.RuntimeMetadata;
52+
import org.teiid.query.util.CommandContext;
5353
import org.teiid.runtime.EmbeddedConfiguration;
5454
import org.teiid.translator.DataNotAvailableException;
5555
import org.teiid.translator.ExecutionContext;
@@ -203,5 +203,18 @@ public void onComplete(Statement stmt) {
203203
Mockito.verify(execution, Mockito.times(EXEC_COUNT)).close();
204204
Mockito.verify(execution, Mockito.times(EXEC_COUNT - 1)).reset((Command)Mockito.anyObject(), (ExecutionContext)Mockito.anyObject(), Mockito.anyObject());
205205
}
206+
207+
@Test public void testCommandContext() {
208+
CommandContext cc = new CommandContext();
209+
FakeReusableExecution fe = new FakeReusableExecution();
210+
cc.putReusableExecution("a", fe);
211+
cc.putReusableExecution("a", new FakeReusableExecution());
212+
213+
ReusableExecution<?> re = cc.getReusableExecution("a");
214+
ReusableExecution<?> re1 = cc.getReusableExecution("a");
215+
assertSame(fe, re);
216+
assertNotSame(fe, re1);
217+
assertNull(cc.getReusableExecution("a"));
218+
}
206219

207220
}

0 commit comments

Comments
 (0)