Skip to content

Commit 7e9fe6c

Browse files
Charles GiardinaGitHub Enterprise
Charles Giardina
authored and
GitHub Enterprise
committed
allow handle persistor in sync waypoint (LiveRamp#12)
1 parent 0a6fa14 commit 7e9fe6c

File tree

2 files changed

+41
-5
lines changed

2 files changed

+41
-5
lines changed

src/java/com/liveramp/captain/waypoint/SyncWaypoint.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import com.liveramp.captain.config_retriever.CaptainConfigRetriever;
66
import com.liveramp.captain.config_retriever.CaptainConfigRetrieverFactory;
77
import com.liveramp.captain.config_retriever.CaptainConfigRetrieverWrapperFactory;
8+
import com.liveramp.captain.handle_persistor.CaptainHandlePersistor;
9+
import com.liveramp.captain.handle_persistor.CaptainHandlePersistorFactory;
10+
import com.liveramp.captain.handle_persistor.CaptainHandlePersistorWrapperFactory;
811
import com.liveramp.captain.request_submitter.CaptainRequestSubmitter;
912
import com.liveramp.captain.request_submitter.CaptainRequestSubmitterFactory;
1013
import com.liveramp.captain.request_submitter.CaptainRequestSubmitterWrapperFactory;
11-
import com.liveramp.captain.status_retriever.CaptainStatusRetriever;
1214
import com.liveramp.captain.status_retriever.CaptainStatusRetrieverFactory;
1315
import com.liveramp.cl_types.clr.ClStep;
1416

@@ -19,19 +21,36 @@ public class SyncWaypoint<T extends Serializable, ServiceHandle> implements Wayp
1921
private SyncWaypoint(
2022
ClStep step,
2123
CaptainConfigRetrieverFactory<T> configRetriever,
22-
CaptainRequestSubmitterFactory<T, ServiceHandle> requestSubmitter) {
24+
CaptainRequestSubmitterFactory<T, ServiceHandle> requestSubmitter,
25+
CaptainHandlePersistorFactory<ServiceHandle> handlePersistor) {
2326
this.step = step;
24-
this.waypointSubmitter = new WaypointSubmitterImpl<>(step, configRetriever, requestSubmitter, null);
27+
this.waypointSubmitter = new WaypointSubmitterImpl<>(step, configRetriever, requestSubmitter, handlePersistor);
28+
}
29+
30+
private SyncWaypoint(
31+
ClStep step,
32+
CaptainConfigRetrieverFactory<T> configRetriever,
33+
CaptainRequestSubmitterFactory<T, ServiceHandle> requestSubmitter) {
34+
this(step, configRetriever, requestSubmitter, null);
35+
}
36+
37+
public static <T extends Serializable, ServiceHandle> SyncWaypoint<T, ServiceHandle> of(ClStep step, CaptainConfigRetrieverFactory<T> configRetriever, CaptainRequestSubmitterFactory<T, ServiceHandle> requestSubmitter, CaptainHandlePersistorFactory<ServiceHandle> handlePersistor) {
38+
return new SyncWaypoint<>(step, configRetriever, requestSubmitter, handlePersistor);
2539
}
2640

2741
public static <T extends Serializable, ServiceHandle> SyncWaypoint<T, ServiceHandle> of(ClStep step, CaptainConfigRetrieverFactory<T> configRetriever, CaptainRequestSubmitterFactory<T, ServiceHandle> requestSubmitter) {
28-
return new SyncWaypoint<T, ServiceHandle>(step, configRetriever, requestSubmitter);
42+
return new SyncWaypoint<>(step, configRetriever, requestSubmitter);
43+
}
44+
45+
public static <T extends Serializable, ServiceHandle> SyncWaypoint<T, ServiceHandle> of(ClStep step, CaptainConfigRetriever<T> configRetriever, CaptainRequestSubmitter<T, ServiceHandle> requestSubmitter, CaptainHandlePersistor<ServiceHandle> handlePersistor) {
46+
return of(step, new CaptainConfigRetrieverWrapperFactory<T>(configRetriever), new CaptainRequestSubmitterWrapperFactory<T, ServiceHandle>(requestSubmitter), new CaptainHandlePersistorWrapperFactory<ServiceHandle>(handlePersistor));
2947
}
3048

3149
public static <T extends Serializable, ServiceHandle> SyncWaypoint<T, ServiceHandle> of(ClStep step, CaptainConfigRetriever<T> configRetriever, CaptainRequestSubmitter<T, ServiceHandle> requestSubmitter) {
3250
return of(step, new CaptainConfigRetrieverWrapperFactory<T>(configRetriever), new CaptainRequestSubmitterWrapperFactory<T, ServiceHandle>(requestSubmitter));
3351
}
3452

53+
3554
@Override
3655
public ClStep getStep() {
3756
return step;

test/java/com/liveramp/captain/daemon/TestClCaptainJoblet.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public void setup() {
7676

7777
testManifest = new DefaultManifestImpl(Lists.newArrayList(
7878
SyncWaypoint.of(ClStep.COLUMN_CREATION, configRetrieverStep1, requestSubmitterStep1),
79+
SyncWaypoint.of(ClStep.CLA_VALIDATION, configRetrieverStep2, requestSubmitterStep2, handlePersistor2),
7980
AsyncWaypoint.of(ClStep.PRIVACY_INQUISITOR, configRetrieverStep2, requestSubmitterStep2, handlePersistor2, captainStatusRetrieverStep2),
8081
FlowControlWaypoint.of(ClStep.PRIVACY_INQUISITOR_SENTENCING, captainStatusRetrieverStep3),
8182
AsyncWaypoint.of(ClStep.HEARTBREAKER, configRetrieverStep4, requestSubmitterStep4, handlePersistor4, captainStatusRetrieverStep4),
@@ -95,7 +96,7 @@ public void testGoToNextStep() throws DaemonException, TException {
9596

9697
new CaptainJoblet(config, alertsHandler, requestUpdater, manifestMap, true).run();
9798

98-
verify(requestUpdater, times(1)).setStepAndStatus(config.getJobId(), ClStep.COLUMN_CREATION, CaptStatus.COMPLETED, ClStep.PRIVACY_INQUISITOR, CaptStatus.READY);
99+
verify(requestUpdater, times(1)).setStepAndStatus(config.getJobId(), ClStep.COLUMN_CREATION, CaptStatus.COMPLETED, ClStep.CLA_VALIDATION, CaptStatus.READY);
99100
}
100101

101102
@Test
@@ -162,6 +163,22 @@ public void testSubmitRequestSync() throws TException, DaemonException {
162163
verify(requestUpdater, times(1)).setStatus(config.getJobId(), ClStep.COLUMN_CREATION, CaptStatus.READY, CaptStatus.COMPLETED);
163164
}
164165

166+
@Test
167+
public void testSubmitRequestSyncWithHandlerPersistor() throws TException, DaemonException {
168+
CaptainRequestConfig config = new CaptainRequestConfig(JOB_ID, CaptStatus.READY, ClStep.CLA_VALIDATION, ServiceRequestPriority.DEFAULT, ClaType.OFFLINE_INDIVIDUAL);
169+
170+
PInquisitorRequestConfig mockPinqRequest = mock(PInquisitorRequestConfig.class);
171+
when(configRetrieverStep2.retrieveConfig(JOB_ID)).thenReturn(mockPinqRequest);
172+
when(requestSubmitterStep2.submit(mockPinqRequest, new RequestOptions())).thenReturn(SERVICE_HANDLE);
173+
174+
new CaptainJoblet(config, alertsHandler, requestUpdater, manifestMap, true).run();
175+
176+
verify(configRetrieverStep2, times(1)).retrieveConfig(config.getJobId());
177+
verify(requestSubmitterStep2, times(1)).submit(mockPinqRequest, new RequestOptions());
178+
verify(handlePersistor2, times(1)).persist(JOB_ID, SERVICE_HANDLE, ClStep.CLA_VALIDATION);
179+
verify(requestUpdater, times(1)).setStatus(config.getJobId(), ClStep.CLA_VALIDATION, CaptStatus.READY, CaptStatus.COMPLETED);
180+
}
181+
165182
@Test
166183
public void testSubmitRequestFlowControl1() throws TException, DaemonException {
167184
CaptainRequestConfig config = new CaptainRequestConfig(JOB_ID, CaptStatus.READY, ClStep.PRIVACY_INQUISITOR_SENTENCING, ServiceRequestPriority.DEFAULT, ClaType.OFFLINE_INDIVIDUAL);

0 commit comments

Comments
 (0)