Skip to content

Commit 6821c85

Browse files
sb-keaneKeane McGough
and
Keane McGough
authored
Feature/ab2 d 5289 detect ab2d bfd hpms coverage (#1181)
* Send coverage counts to coverage count lambda when coverage update runs --------- Co-authored-by: Keane McGough <[email protected]>
1 parent e4fd227 commit 6821c85

File tree

13 files changed

+418
-94
lines changed

13 files changed

+418
-94
lines changed

docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ services:
2323
environment:
2424
- AWS_DEFAULT_REGION=us-east-1
2525
- EDGE_PORT=4566
26-
- SERVICES=sqs
26+
- SERVICES=sqs,sns
2727
ports:
2828
- '4566:4566'
2929
volumes:

e2e-bfd-test/src/test/java/gov/cms/ab2d/testjobs/EndToEndBfdTests.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import java.util.concurrent.BlockingQueue;
5757
import java.util.stream.Collectors;
5858
import java.util.stream.Stream;
59+
60+
import gov.cms.ab2d.worker.service.coveragesnapshot.CoverageSnapshotService;
5961
import lombok.extern.slf4j.Slf4j;
6062
import org.hl7.fhir.instance.model.api.IBaseBundle;
6163
import org.hl7.fhir.instance.model.api.IDomainResource;
@@ -158,6 +160,9 @@ public class EndToEndBfdTests {
158160
@Autowired
159161
private ContractToContractCoverageMapping contractToContractCoverageMapping;
160162

163+
@Autowired
164+
private CoverageSnapshotService snapshotService;
165+
161166
@TempDir
162167
File path;
163168

@@ -178,7 +183,7 @@ void setUp() {
178183
propertiesService.updateProperty(PCP_SCALE_TO_MAX_TIME, "10");
179184

180185
coverageDriver = new CoverageDriverImpl(coverageSearchRepository, pdpClientService, coverageService,
181-
propertiesService, coverageProcessor, coverageLockWrapper, contractToContractCoverageMapping);
186+
propertiesService, coverageProcessor, coverageLockWrapper, contractToContractCoverageMapping, snapshotService);
182187

183188
// Instantiate the job processors
184189
jobService = new JobServiceImpl(jobRepository, jobOutputService, logManager, path.getAbsolutePath());

src/main/resources/checkstyle.xml

+5
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@
7676
<!-- </module> -->
7777
<module name="SuppressWarningsFilter"/>
7878

79+
<module name="SuppressWithPlainTextCommentFilter">
80+
<property name="offCommentFormat" value="CHECKSTYLE.OFF"/>
81+
<property name="onCommentFormat" value="CHECKSTYLE.ON"/>
82+
</module>
83+
7984
<module name="TreeWalker">
8085

8186
<!-- Checks for Naming Conventions. -->

worker/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@
114114
<version>0.0.1-SNAPSHOT</version>
115115
</dependency>
116116

117+
<dependency>
118+
<groupId>gov.cms.ab2d</groupId>
119+
<artifactId>ab2d-sns-client</artifactId>
120+
<version>0.0.1</version>
121+
</dependency>
122+
117123
<dependency>
118124
<groupId>gov.cms.ab2d</groupId>
119125
<artifactId>coverage</artifactId>

worker/src/main/java/gov/cms/ab2d/worker/SpringBootApp.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"gov.cms.ab2d.coverage",
1919
"gov.cms.ab2d.worker",
2020
"gov.cms.ab2d.bfd.client",
21-
"gov.cms.ab2d.eventclient.clients"
21+
"gov.cms.ab2d.eventclient.clients",
22+
"gov.cms.ab2d.snsclient.clients"
2223
})
2324
@EntityScan(basePackages = {"gov.cms.ab2d.common.model", "gov.cms.ab2d.job.model", "gov.cms.ab2d.coverage.model", "gov.cms.ab2d.properties.model", "gov.cms.ab2d.contracts"})
2425
@EnableJpaRepositories(basePackages = {"gov.cms.ab2d.common.repository", "gov.cms.ab2d.job.repository",

worker/src/main/java/gov/cms/ab2d/worker/processor/coverage/CoverageDriverImpl.java

+93-68
Large diffs are not rendered by default.

worker/src/main/java/gov/cms/ab2d/worker/processor/coverage/CoverageProcessorImpl.java

+21-13
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,18 @@
55
import gov.cms.ab2d.coverage.model.CoverageMapping;
66
import gov.cms.ab2d.coverage.model.CoveragePeriod;
77
import gov.cms.ab2d.coverage.service.CoverageService;
8+
import gov.cms.ab2d.snsclient.messages.AB2DServices;
89
import gov.cms.ab2d.worker.config.ContractToContractCoverageMapping;
910
import gov.cms.ab2d.worker.service.ContractWorkerClient;
11+
import gov.cms.ab2d.worker.service.coveragesnapshot.CoverageSnapshotService;
12+
import lombok.extern.slf4j.Slf4j;
13+
import org.springframework.beans.factory.annotation.Qualifier;
14+
import org.springframework.beans.factory.annotation.Value;
15+
import org.springframework.scheduling.annotation.Scheduled;
16+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
17+
import org.springframework.stereotype.Service;
18+
19+
import javax.annotation.PreDestroy;
1020
import java.util.ArrayList;
1121
import java.util.Collection;
1222
import java.util.Iterator;
@@ -15,14 +25,6 @@
1525
import java.util.concurrent.LinkedBlockingQueue;
1626
import java.util.concurrent.TimeUnit;
1727
import java.util.concurrent.atomic.AtomicBoolean;
18-
import javax.annotation.PreDestroy;
19-
import lombok.extern.slf4j.Slf4j;
20-
import org.springframework.beans.factory.annotation.Qualifier;
21-
import org.springframework.beans.factory.annotation.Value;
22-
import org.springframework.scheduling.annotation.Scheduled;
23-
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24-
import org.springframework.stereotype.Service;
25-
2628

2729
import static gov.cms.ab2d.fhir.FhirVersion.STU3;
2830

@@ -84,24 +86,28 @@ public class CoverageProcessorImpl implements CoverageProcessor {
8486

8587
private final ContractToContractCoverageMapping contractCoverageMapping = new ContractToContractCoverageMapping();
8688

89+
private final CoverageSnapshotService coverageSnapshotService;
90+
8791
/**
8892
* Coverage processor needs an interface to the database, client for BFD, and thread pool to concurrently execute
8993
* searches.
9094
*
91-
* @param coverageService interface with the database for querying, saving, and inserting searches
92-
* @param bfdClient REST client for specific calls to BFD
93-
* @param executor thread pool to execute enrollment updates within
94-
* @param maxAttempts max number of retries to make for updating enrollment for a specific month before failing outright
95+
* @param coverageService interface with the database for querying, saving, and inserting searches
96+
* @param bfdClient REST client for specific calls to BFD
97+
* @param executor thread pool to execute enrollment updates within
98+
* @param maxAttempts max number of retries to make for updating enrollment for a specific month before failing outright
99+
* @param coverageSnapshotService
95100
*/
96101
public CoverageProcessorImpl(CoverageService coverageService, BFDClient bfdClient,
97102
@Qualifier("patientCoverageThreadPool") ThreadPoolTaskExecutor executor,
98103
@Value("${coverage.update.max.attempts}") int maxAttempts,
99-
ContractWorkerClient contractWorkerClient) {
104+
ContractWorkerClient contractWorkerClient, CoverageSnapshotService coverageSnapshotService) {
100105
this.coverageService = coverageService;
101106
this.bfdClient = bfdClient;
102107
this.executor = executor;
103108
this.maxAttempts = maxAttempts;
104109
this.contractWorkerClient = contractWorkerClient;
110+
this.coverageSnapshotService = coverageSnapshotService;
105111
}
106112

107113
@Override
@@ -352,6 +358,8 @@ private void attemptCoverageInsertion(CoverageMapping result) {
352358
coverageService.completeSearch(periodId, "successfully inserted all data for in progress search");
353359

354360
log.info("marked search as completed {}-{}-{}", contractNumber, month, year);
361+
coverageSnapshotService.sendCoverageCounts(AB2DServices.BFD, contractNumber, result.getBeneficiaryIds()
362+
.size(), year, month);
355363
} catch (Exception exception) {
356364
log.error("inserting the coverage data failed for {}-{}-{}", result.getContractNumber(),
357365
result.getPeriod().getMonth(), result.getPeriod().getYear());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package gov.cms.ab2d.worker.service.coveragesnapshot;
2+
3+
import gov.cms.ab2d.snsclient.messages.AB2DServices;
4+
5+
import java.util.Set;
6+
7+
public interface CoverageSnapshotService {
8+
9+
void sendCoverageCounts(AB2DServices services, Set<String> contracts);
10+
11+
void sendCoverageCounts(AB2DServices services, String contract, int count, int year, int month);
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package gov.cms.ab2d.worker.service.coveragesnapshot;
2+
3+
import gov.cms.ab2d.common.service.PdpClientService;
4+
import gov.cms.ab2d.contracts.model.Contract;
5+
import gov.cms.ab2d.contracts.model.ContractDTO;
6+
import gov.cms.ab2d.coverage.model.CoverageCount;
7+
import gov.cms.ab2d.coverage.service.CoverageService;
8+
import gov.cms.ab2d.snsclient.clients.SNSClient;
9+
import gov.cms.ab2d.snsclient.messages.CoverageCountDTO;
10+
import gov.cms.ab2d.snsclient.messages.AB2DServices;
11+
import gov.cms.ab2d.worker.config.ContractToContractCoverageMapping;
12+
import lombok.extern.slf4j.Slf4j;
13+
import org.springframework.stereotype.Service;
14+
15+
import java.sql.Timestamp;
16+
import java.time.Instant;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.Set;
20+
21+
import static gov.cms.ab2d.snsclient.messages.Topics.COVERAGE_COUNTS;
22+
import static java.util.stream.Collectors.groupingBy;
23+
24+
@Service
25+
@Slf4j
26+
public class CoverageSnapshotServiceImpl implements CoverageSnapshotService {
27+
28+
private final PdpClientService pdpClientService;
29+
private final CoverageService coverageService;
30+
private final ContractToContractCoverageMapping mapping;
31+
32+
private final SNSClient snsClient;
33+
34+
public CoverageSnapshotServiceImpl(PdpClientService pdpClientService, CoverageService coverageService, ContractToContractCoverageMapping mapping, SNSClient snsClient) {
35+
this.pdpClientService = pdpClientService;
36+
this.coverageService = coverageService;
37+
this.mapping = mapping;
38+
this.snsClient = snsClient;
39+
}
40+
41+
@Override
42+
public void sendCoverageCounts(AB2DServices services, Set<String> contracts) {
43+
44+
try {
45+
List<ContractDTO> enabledContracts = pdpClientService.getAllEnabledContracts()
46+
.stream()
47+
.filter(contract -> contracts.contains(contract.getContractNumber()))
48+
.map(Contract::toDTO)
49+
.toList();
50+
Map<String, List<CoverageCount>> coverageCounts = coverageService.countBeneficiariesForContracts(enabledContracts.stream()
51+
.map(mapping::map)
52+
.toList())
53+
.stream()
54+
.collect(groupingBy(CoverageCount::getContractNumber));
55+
56+
Timestamp time = Timestamp.from(Instant.now());
57+
58+
List<CoverageCountDTO> coverageCountDTOS = coverageCounts.entrySet()
59+
.stream()
60+
.map(count -> count.getValue()
61+
.stream()
62+
.map(c -> new CoverageCountDTO(count.getKey(), services.toString(),
63+
c.getBeneficiaryCount(), c.getYear(), c.getMonth(), time))
64+
.toList())
65+
.flatMap(List::stream)
66+
.toList();
67+
68+
snsClient.sendMessage(COVERAGE_COUNTS.getValue(), coverageCountDTOS);
69+
} catch (Exception e) {
70+
log.error("Sending coverage count snapshot failed, swallowing all exceptions to protect coverage update", e);
71+
}
72+
}
73+
74+
@Override
75+
public void sendCoverageCounts(AB2DServices services, String contract, int count, int year, int month) {
76+
try {
77+
snsClient.sendMessage(COVERAGE_COUNTS.getValue(), List.of(new CoverageCountDTO(contract, services.toString(),
78+
count, year, month, Timestamp.from(Instant.now()))));
79+
} catch (Exception e) {
80+
log.error("Sending coverage count snapshot failed, swallowing all exceptions to protect coverage update", e);
81+
}
82+
}
83+
84+
}

worker/src/test/java/gov/cms/ab2d/worker/processor/coverage/CoverageDriverTest.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.Optional;
4747
import java.util.Set;
4848
import javax.annotation.Nullable;
49+
50+
import gov.cms.ab2d.worker.service.coveragesnapshot.CoverageSnapshotService;
4951
import org.junit.jupiter.api.AfterEach;
5052
import org.junit.jupiter.api.BeforeEach;
5153
import org.junit.jupiter.api.DisplayName;
@@ -133,6 +135,9 @@ class CoverageDriverTest extends JobCleanup {
133135
@Autowired
134136
private ContractToContractCoverageMapping contractToContractCoverageMapping;
135137

138+
@Autowired
139+
private CoverageSnapshotService snapshotService;
140+
136141
private Contract contract;
137142
private Contract contract1;
138143
private ContractForCoverageDTO contractForCoverageDTO;
@@ -191,8 +196,8 @@ void before() {
191196
taskExecutor.setCorePoolSize(3);
192197
taskExecutor.initialize();
193198

194-
processor = new CoverageProcessorImpl(coverageService, bfdClient, taskExecutor, MAX_ATTEMPTS, contractWorkerClient);
195-
driver = new CoverageDriverImpl(coverageSearchRepo, pdpClientService, coverageService, propertiesService, processor, searchLock, contractToContractCoverageMapping);
199+
processor = new CoverageProcessorImpl(coverageService, bfdClient, taskExecutor, MAX_ATTEMPTS, contractWorkerClient, snapshotService);
200+
driver = new CoverageDriverImpl(coverageSearchRepo, pdpClientService, coverageService, propertiesService, processor, searchLock, contractToContractCoverageMapping, snapshotService);
196201
}
197202

198203
@AfterEach

worker/src/test/java/gov/cms/ab2d/worker/processor/coverage/CoverageDriverUnitTest.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.concurrent.locks.Condition;
2626
import java.util.concurrent.locks.Lock;
2727
import javax.persistence.EntityNotFoundException;
28+
29+
import gov.cms.ab2d.worker.service.coveragesnapshot.CoverageSnapshotService;
2830
import org.jetbrains.annotations.NotNull;
2931
import org.junit.jupiter.api.AfterEach;
3032
import org.junit.jupiter.api.BeforeEach;
@@ -33,6 +35,7 @@
3335
import org.junit.jupiter.api.extension.ExtendWith;
3436
import org.mockito.Mock;
3537
import org.mockito.junit.jupiter.MockitoExtension;
38+
import org.springframework.beans.factory.annotation.Autowired;
3639
import org.springframework.test.util.ReflectionTestUtils;
3740

3841

@@ -71,6 +74,9 @@ class CoverageDriverUnitTest {
7174

7275
private PropertiesService propertiesService = new PropertyServiceStub();
7376

77+
@Autowired
78+
private CoverageSnapshotService snapshotService;
79+
7480
@Mock
7581
private ContractToContractCoverageMapping mapping;
7682

@@ -144,7 +150,7 @@ public Condition newCondition() {
144150

145151
@BeforeEach
146152
void before() {
147-
driver = new CoverageDriverImpl(null, null, coverageService, null, null, null,mapping);
153+
driver = new CoverageDriverImpl(null, null, coverageService, null, null, null,mapping, snapshotService);
148154
}
149155

150156
@AfterEach
@@ -298,7 +304,7 @@ void failureToLockCausesExceptions() {
298304

299305
when(lockWrapper.getCoverageLock()).thenReturn(tryLockFalse);
300306

301-
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, propertiesService, null, lockWrapper,null);
307+
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, propertiesService, null, lockWrapper,null, snapshotService);
302308

303309
CoverageDriverException exception = assertThrows(CoverageDriverException.class, driver::discoverCoveragePeriods);
304310
assertTrue(exception.getMessage().contains("could not retrieve lock"));
@@ -318,7 +324,7 @@ void whenLockInterruptedPropagateException() {
318324
Job job = new Job();
319325
job.setContractNumber(contract.getContractNumber());
320326

321-
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, propertiesService, null, lockWrapper,null);
327+
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, propertiesService, null, lockWrapper,null, snapshotService);
322328

323329
assertThrows(InterruptedException.class, driver::discoverCoveragePeriods);
324330
assertThrows(InterruptedException.class, driver::queueStaleCoveragePeriods);
@@ -333,7 +339,7 @@ void failureToLockCoverageAvailableFailsQuietly() {
333339
when(coverageService.coveragePeriodStuckJobs(any())).thenReturn(Collections.emptyList());
334340
when(coverageService.coveragePeriodNotUpdatedSince(anyInt(), anyInt(), any())).thenReturn(Collections.emptyList());
335341

336-
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, null, null, lockWrapper,null);
342+
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, null, null, lockWrapper,null, snapshotService);
337343

338344
ContractDTO contract = new ContractDTO(null, "contractNum", null, null, null);
339345
Job job = new Job();
@@ -351,7 +357,7 @@ void failureToLockCoverageAvailableFailsQuietly() {
351357
void failureToPageCausesExceptions() {
352358
when(coverageService.pageCoverage(any())).thenThrow(RuntimeException.class);
353359

354-
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, null, null, null,null);
360+
CoverageDriver driver = new CoverageDriverImpl(null, null, coverageService, null, null, null,null, snapshotService);
355361

356362
ContractForCoverageDTO contract = new ContractForCoverageDTO();
357363
contract.setContractNumber("contractNum");
@@ -364,7 +370,7 @@ void failureToPageCausesExceptions() {
364370
@Test
365371
void loadMappingFailsQuietly() {
366372
CoverageDriverImpl driver = spy(new CoverageDriverImpl(null, null,
367-
coverageService, propertiesService, coverageProcessor, lockWrapper,null)
373+
coverageService, propertiesService, coverageProcessor, lockWrapper,null, snapshotService)
368374
);
369375

370376
propertiesService.updateProperty(MAINTENANCE_MODE, "true");

worker/src/test/java/gov/cms/ab2d/worker/processor/coverage/CoverageUpdateAndProcessorTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import gov.cms.ab2d.common.properties.PropertyServiceStub;
2323
import gov.cms.ab2d.worker.config.ContractToContractCoverageMapping;
2424
import gov.cms.ab2d.worker.service.ContractWorkerClient;
25+
import gov.cms.ab2d.worker.service.coveragesnapshot.CoverageSnapshotService;
2526
import gov.cms.ab2d.worker.util.WorkerDataSetup;
2627
import java.time.DayOfWeek;
2728
import java.time.OffsetDateTime;
@@ -118,6 +119,9 @@ class CoverageUpdateAndProcessorTest {
118119
@Autowired
119120
private ContractToContractCoverageMapping mapping;
120121

122+
@Autowired
123+
private CoverageSnapshotService snapshotService;
124+
121125
private Contract contract;
122126
private CoveragePeriod january;
123127
private CoveragePeriod february;
@@ -155,8 +159,8 @@ void before() {
155159
taskExecutor.setCorePoolSize(3);
156160
taskExecutor.initialize();
157161

158-
processor = new CoverageProcessorImpl(coverageService, bfdClient, taskExecutor, MAX_ATTEMPTS, contractWorkerClient);
159-
driver = new CoverageDriverImpl(coverageSearchRepo, pdpClientService, coverageService, propertiesService, processor, searchLock, mapping);
162+
processor = new CoverageProcessorImpl(coverageService, bfdClient, taskExecutor, MAX_ATTEMPTS, contractWorkerClient, snapshotService);
163+
driver = new CoverageDriverImpl(coverageSearchRepo, pdpClientService, coverageService, propertiesService, processor, searchLock, mapping, snapshotService);
160164
}
161165

162166
@AfterEach

0 commit comments

Comments
 (0)