Skip to content

Commit edc1ca4

Browse files
committed
Adding bulk insert feature for sales force using the batched updates through jdbc
1 parent 1e53b93 commit edc1ca4

File tree

15 files changed

+520
-97
lines changed

15 files changed

+520
-97
lines changed

build/kits/jboss-as7/modules/org/jboss/teiid/translator/salesforce/api/main/module.xml

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
<module xmlns="urn:jboss:module:1.0" name="org.jboss.teiid.translator.salesforce.api">
33
<resources>
44
<resource-root path="salesforce-api-${project.version}.jar" />
5+
<resource-root path="force-wsc-${version.com.force.api}.jar" />
56
<!-- Insert resources here -->
67
</resources>
78

connectors/connector-salesforce/src/main/java/org/teiid/resource/adapter/salesforce/SalesforceConnectionImpl.java

+83
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,29 @@
3535
import org.teiid.logging.LogConstants;
3636
import org.teiid.logging.LogManager;
3737
import org.teiid.resource.spi.BasicConnection;
38+
import org.teiid.translator.DataNotAvailableException;
3839
import org.teiid.translator.salesforce.SalesforceConnection;
3940
import org.teiid.translator.salesforce.execution.DataPayload;
4041
import org.teiid.translator.salesforce.execution.DeletedObject;
4142
import org.teiid.translator.salesforce.execution.DeletedResult;
4243
import org.teiid.translator.salesforce.execution.UpdatedResult;
4344

45+
import com.sforce.async.AsyncApiException;
46+
import com.sforce.async.BatchInfo;
47+
import com.sforce.async.BatchInfoList;
48+
import com.sforce.async.BatchRequest;
49+
import com.sforce.async.BatchResult;
50+
import com.sforce.async.BulkConnection;
51+
import com.sforce.async.ContentType;
52+
import com.sforce.async.JobInfo;
53+
import com.sforce.async.OperationEnum;
4454
import com.sforce.soap.partner.*;
4555
import com.sforce.soap.partner.sobject.SObject;
56+
import com.sforce.ws.ConnectorConfig;
4657

4758
public class SalesforceConnectionImpl extends BasicConnection implements SalesforceConnection {
4859
private Soap sfSoap;
60+
private BulkConnection bulkConnection;
4961

5062
private ObjectFactory partnerFactory = new ObjectFactory();
5163

@@ -99,12 +111,15 @@ private void login(String username, String password, URL url, SalesForceManagedC
99111

100112
// Set the SessionId after login, for subsequent calls
101113
sh.setSessionId(loginResult.getSessionId());
114+
this.bulkConnection = getBulkConnection(loginResult.getServerUrl(), loginResult.getSessionId());
102115
} catch (LoginFault e) {
103116
throw new ResourceException(e);
104117
} catch (InvalidIdFault e) {
105118
throw new ResourceException(e);
106119
} catch (com.sforce.soap.partner.UnexpectedErrorFault e) {
107120
throw new ResourceException(e);
121+
} catch(AsyncApiException e) {
122+
throw new ResourceException(e);
108123
} finally {
109124
BusFactory.setThreadDefaultBus(bus);
110125
}
@@ -375,4 +390,72 @@ public void close() throws ResourceException {
375390
public boolean isAlive() {
376391
return isValid();
377392
}
393+
394+
private JobInfo createBulkJob(String objectName) throws ResourceException {
395+
try {
396+
JobInfo job = new JobInfo();
397+
job.setObject(objectName);
398+
job.setOperation(OperationEnum.insert);
399+
job.setContentType(ContentType.XML);
400+
return this.bulkConnection.createJob(job);
401+
} catch (AsyncApiException e) {
402+
throw new ResourceException(e);
403+
}
404+
}
405+
406+
@Override
407+
public JobInfo executeBulkJob(String objectName, List<com.sforce.async.SObject> payload) throws ResourceException {
408+
try {
409+
JobInfo job = createBulkJob(objectName);
410+
BatchRequest request = this.bulkConnection.createBatch(job);
411+
request.addSObjects(payload.toArray(new com.sforce.async.SObject[payload.size()]));
412+
request.completeRequest();
413+
return this.bulkConnection.closeJob(job.getId());
414+
} catch (AsyncApiException e) {
415+
throw new ResourceException(e);
416+
}
417+
}
418+
419+
@Override
420+
public BatchResult getBulkResults(JobInfo job) throws ResourceException {
421+
try {
422+
BatchInfoList batchInfo = this.bulkConnection.getBatchInfoList(job.getId());
423+
BatchInfo[] batches = batchInfo.getBatchInfo();
424+
if (batches.length > 0) {
425+
BatchResult batchResult = this.bulkConnection.getBatchResult(job.getId(), batches[0].getId());
426+
if (batchResult.isPartialResult()) {
427+
throw new DataNotAvailableException(500);
428+
}
429+
return batchResult;
430+
}
431+
throw new DataNotAvailableException(500);
432+
} catch (AsyncApiException e) {
433+
throw new ResourceException(e);
434+
}
435+
}
436+
437+
private BulkConnection getBulkConnection(String endpoint, String sessionid) throws AsyncApiException {
438+
ConnectorConfig config = new ConnectorConfig();
439+
config.setSessionId(sessionid);
440+
// The endpoint for the Bulk API service is the same as for the normal
441+
// SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
442+
int index = endpoint.indexOf("Soap/u/"); //$NON-NLS-1$
443+
int endIndex = endpoint.indexOf('/', index+7);
444+
String apiVersion = endpoint.substring(index+7,endIndex);
445+
String restEndpoint = endpoint.substring(0, endpoint.indexOf("Soap/"))+ "async/" + apiVersion;//$NON-NLS-1$ //$NON-NLS-2$
446+
config.setRestEndpoint(restEndpoint);
447+
config.setCompression(true);
448+
config.setTraceMessage(false);
449+
BulkConnection connection = new BulkConnection(config);
450+
return connection;
451+
}
452+
453+
@Override
454+
public void cancelBulkJob(JobInfo job) throws ResourceException {
455+
try {
456+
this.bulkConnection.abortJob(job.getId());
457+
} catch (AsyncApiException e) {
458+
throw new ResourceException(e);
459+
}
460+
}
378461
}

connectors/salesforce-api/pom.xml

+16-12
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
2-
<parent>
3-
<artifactId>connectors</artifactId>
4-
<groupId>org.jboss.teiid</groupId>
5-
<version>8.3.0.Beta3-SNAPSHOT</version>
6-
</parent>
7-
<modelVersion>4.0.0</modelVersion>
8-
<artifactId>salesforce-api</artifactId>
9-
<groupId>org.jboss.teiid.connectors</groupId>
10-
<name>Salesforce API</name>
11-
<description>The java API for the Salesforce.com partner web service API</description>
12-
<dependencies>
13-
</dependencies>
2+
<parent>
3+
<artifactId>connectors</artifactId>
4+
<groupId>org.jboss.teiid</groupId>
5+
<version>8.3.0.Beta3-SNAPSHOT</version>
6+
</parent>
7+
<modelVersion>4.0.0</modelVersion>
8+
<artifactId>salesforce-api</artifactId>
9+
<groupId>org.jboss.teiid.connectors</groupId>
10+
<name>Salesforce API</name>
11+
<description>The java API for the Salesforce.com partner web service API</description>
12+
<dependencies>
13+
<dependency>
14+
<groupId>com.force.api</groupId>
15+
<artifactId>force-wsc</artifactId>
16+
</dependency>
17+
</dependencies>
1418
</project>

connectors/translator-salesforce/src/main/java/org/teiid/translator/salesforce/MetadataProcessor.java

+9
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ public void processMetadata() throws TranslatorException {
7474
addTable(object);
7575
}
7676
addRelationships();
77+
78+
// Mark id fields are auto increment values, as they are not allowed to be updated
79+
for (Table table:this.metadataFactory.getSchema().getTables().values()) {
80+
for (Column column:table.getPrimaryKey().getColumns()) {
81+
if (!column.isUpdatable()) {
82+
column.setAutoIncremented(true);
83+
}
84+
}
85+
}
7786
} catch (ResourceException e) {
7887
throw new TranslatorException(e);
7988
}

connectors/translator-salesforce/src/main/java/org/teiid/translator/salesforce/SalesForceExecutionFactory.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class SalesForceExecutionFactory extends ExecutionFactory<ConnectionFacto
6363
private static final String EXCLUDES = "excludes";//$NON-NLS-1$
6464
private static final String INCLUDES = "includes";//$NON-NLS-1$
6565
private boolean auditModelFields = false;
66+
private int maxInsertBatchSize = 2048;
6667

6768
public SalesForceExecutionFactory() {
6869
// http://jira.jboss.org/jira/browse/JBEDSP-306
@@ -100,11 +101,11 @@ public ResultSetExecution createResultSetExecution(QueryExpression command, Exec
100101
public UpdateExecution createUpdateExecution(Command command, ExecutionContext executionContext, RuntimeMetadata metadata, SalesforceConnection connection) throws TranslatorException {
101102
UpdateExecution result = null;
102103
if(command instanceof org.teiid.language.Delete) {
103-
result = new DeleteExecutionImpl(command, connection, metadata, executionContext);
104+
result = new DeleteExecutionImpl(this, command, connection, metadata, executionContext);
104105
} else if (command instanceof org.teiid.language.Insert) {
105-
result = new InsertExecutionImpl(command, connection, metadata, executionContext);
106+
result = new InsertExecutionImpl(this, command, connection, metadata, executionContext);
106107
} else if (command instanceof org.teiid.language.Update) {
107-
result = new UpdateExecutionImpl(command, connection, metadata, executionContext);
108+
result = new UpdateExecutionImpl(this, command, connection, metadata, executionContext);
108109
}
109110
return result;
110111

@@ -266,4 +267,20 @@ public boolean useAnsiJoin() {
266267
return true;
267268
}
268269

270+
@Override
271+
public boolean supportsBulkUpdate() {
272+
return true;
273+
}
274+
275+
@TranslatorProperty(display="Max Bulk Insert Batch Size", description="The max size of a bulk insert batch. Default 2048.", advanced=true)
276+
public int getMaxBulkInsertBatchSize() {
277+
return maxInsertBatchSize;
278+
}
279+
280+
public void setMaxBulkInsertBatchSize(int maxInsertBatchSize) {
281+
if (maxInsertBatchSize < 1) {
282+
throw new AssertionError("Max bulk insert batch size must be greater than 0"); //$NON-NLS-1$
283+
}
284+
this.maxInsertBatchSize = maxInsertBatchSize;
285+
}
269286
}

connectors/translator-salesforce/src/main/java/org/teiid/translator/salesforce/SalesForcePlugin.java

+3
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,8 @@ public static enum Event implements BundleUtil.Event{
4747
TEIID13002,
4848
TEIID13004,
4949
TEIID13005,
50+
TEIID13006,
51+
TEIID13007,
52+
TEIID13008,
5053
}
5154
}

connectors/translator-salesforce/src/main/java/org/teiid/translator/salesforce/SalesforceConnection.java

+7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import org.teiid.translator.salesforce.execution.DeletedResult;
3232
import org.teiid.translator.salesforce.execution.UpdatedResult;
3333

34+
import com.sforce.async.BatchResult;
35+
import com.sforce.async.JobInfo;
36+
import com.sforce.async.SObject;
3437
import com.sforce.soap.partner.DescribeGlobalResult;
3538
import com.sforce.soap.partner.DescribeSObjectResult;
3639
import com.sforce.soap.partner.QueryResult;
@@ -59,5 +62,9 @@ public interface SalesforceConnection extends Connection {
5962

6063
public DescribeSObjectResult getObjectMetaData(String objectName) throws ResourceException;
6164

65+
public JobInfo executeBulkJob(String objectName, List<SObject> payload) throws ResourceException;
6266

67+
public BatchResult getBulkResults(JobInfo job) throws ResourceException;
68+
69+
public void cancelBulkJob(JobInfo job) throws ResourceException;
6370
}

connectors/translator-salesforce/src/main/java/org/teiid/translator/salesforce/execution/AbstractUpdateExecution.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.teiid.translator.ExecutionContext;
3434
import org.teiid.translator.TranslatorException;
3535
import org.teiid.translator.UpdateExecution;
36+
import org.teiid.translator.salesforce.SalesForceExecutionFactory;
37+
import org.teiid.translator.salesforce.SalesForcePlugin;
3638
import org.teiid.translator.salesforce.SalesforceConnection;
3739
import org.teiid.translator.salesforce.Util;
3840
import org.teiid.translator.salesforce.execution.visitors.IQueryProvidingVisitor;
@@ -48,16 +50,17 @@
4850
*
4951
*/
5052
public abstract class AbstractUpdateExecution implements UpdateExecution {
51-
53+
protected SalesForceExecutionFactory executionFactory;
5254
protected SalesforceConnection connection;
5355
protected RuntimeMetadata metadata;
5456
protected ExecutionContext context;
5557
protected Command command;
5658
protected int result;
5759

58-
public AbstractUpdateExecution(Command command,
60+
public AbstractUpdateExecution(SalesForceExecutionFactory ef, Command command,
5961
SalesforceConnection salesforceConnection,
6062
RuntimeMetadata metadata, ExecutionContext context) {
63+
this.executionFactory = ef;
6164
this.connection = salesforceConnection;
6265
this.metadata = metadata;
6366
this.context = context;
@@ -94,17 +97,15 @@ String[] getIDs(Condition criteria, IQueryProvidingVisitor visitor) throws Trans
9497
Id = Util.stripQutes(Id);
9598
Ids = new String[] { Id };
9699
} catch (ClassCastException cce) {
97-
throw new RuntimeException(
98-
"Error: The delete criteria is not a CompareCriteria");
100+
throw new RuntimeException(SalesForcePlugin.Util.gs(SalesForcePlugin.Event.TEIID13008));
99101
}
100102

101103
} else if (visitor.hasCriteria()) {
102104
try {
103105
String query = visitor.getQuery();
104106
QueryResult results = getConnection().query(query, context.getBatchSize(), Boolean.FALSE);
105-
if (null != results && results.getSize() > 0) {
106-
ArrayList<String> idList = new ArrayList<String>(results
107-
.getRecords().size());
107+
if (results != null && results.getSize() > 0) {
108+
ArrayList<String> idList = new ArrayList<String>(results.getRecords().size());
108109
for (int i = 0; i < results.getRecords().size(); i++) {
109110
SObject sObject = results.getRecords().get(i);
110111
idList.add(sObject.getId());

connectors/translator-salesforce/src/main/java/org/teiid/translator/salesforce/execution/DeleteExecutionImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,18 @@
2828
import org.teiid.metadata.RuntimeMetadata;
2929
import org.teiid.translator.TranslatorException;
3030
import org.teiid.translator.ExecutionContext;
31+
import org.teiid.translator.salesforce.SalesForceExecutionFactory;
3132
import org.teiid.translator.salesforce.SalesforceConnection;
3233
import org.teiid.translator.salesforce.execution.visitors.DeleteVisitor;
3334

3435

3536
public class DeleteExecutionImpl extends AbstractUpdateExecution {
3637

3738

38-
public DeleteExecutionImpl(Command command,
39+
public DeleteExecutionImpl(SalesForceExecutionFactory ef, Command command,
3940
SalesforceConnection salesforceConnection,
4041
RuntimeMetadata metadata, ExecutionContext context) {
41-
super(command, salesforceConnection, metadata, context);
42+
super(ef, command, salesforceConnection, metadata, context);
4243
}
4344

4445
@Override

0 commit comments

Comments
 (0)