diff --git a/goodchoice-batch/build.gradle b/goodchoice-batch/build.gradle index 986b952..b7f11b5 100644 --- a/goodchoice-batch/build.gradle +++ b/goodchoice-batch/build.gradle @@ -1,6 +1,13 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' + implementation 'org.springframework.batch:spring-batch-integration' + implementation 'org.springframework.kafka:spring-kafka' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + testImplementation 'org.springframework.batch:spring-batch-test' -} + + runtimeOnly 'com.h2database:h2' + runtimeOnly 'com.mysql:mysql-connector-j' -tasks.register("prepareKotlinBuildScriptModel") {} + implementation project(':goodchoice-point') +} diff --git a/goodchoice-batch/src/main/java/com/flab/GoodchoiceBatchApplication.java b/goodchoice-batch/src/main/java/com/flab/GoodchoiceBatchApplication.java new file mode 100644 index 0000000..abe8005 --- /dev/null +++ b/goodchoice-batch/src/main/java/com/flab/GoodchoiceBatchApplication.java @@ -0,0 +1,15 @@ +package com.flab; + +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@EnableBatchProcessing +@SpringBootApplication +public class GoodchoiceBatchApplication { + + public static void main(String[] args) { + SpringApplication.run(GoodchoiceBatchApplication.class, args); + } + +} diff --git a/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/jobs/ExpiredPointJobConfig.java b/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/jobs/ExpiredPointJobConfig.java new file mode 100644 index 0000000..5f9aac8 --- /dev/null +++ b/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/jobs/ExpiredPointJobConfig.java @@ -0,0 +1,151 @@ +package com.flab.goodchoicebatch.jobs; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.database.JdbcBatchItemWriter; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.JpaItemWriter; +import org.springframework.batch.item.database.JpaPagingItemReader; +import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.batch.item.kafka.KafkaItemWriter; +import org.springframework.batch.integration.async.AsyncItemWriter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import com.flab.goodchoicebatch.listner.ExpiredChunkListener; +import com.flab.goodchoicebatch.listner.ExpiredIJobListener; +import com.flab.goodchoicebatch.listner.ExpiredStepListener; +import com.flab.goodchoicepoint.domain.Point; + +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +import javax.batch.api.chunk.ItemReader; +import javax.persistence.EntityManagerFactory; +import javax.sql.DataSource; + +@Slf4j +@RequiredArgsConstructor +@Configuration +public class ExpiredPointJobConfig { + + private int chunkSize = 10; + + static final String JOB_NAME = "expiredPointJob"; + + static final String STEP_NAME = "expiredJobStep"; + + private final DataSource dataSource; + + private final KafkaTemplate salesInfoKafkaTemplate; + + @Bean + public Job expiredPointJob(JobBuilderFactory jobBuilderFactory, ExpiredIJobListener ExpiredIJobListener, + Step expiredJobStep) { + return jobBuilderFactory.get(JOB_NAME) + .preventRestart() + .incrementer(new RunIdIncrementer()) + .start(expiredJobStep) + .listener(ExpiredIJobListener) + .build(); + } + + @Bean + @JobScope + public Step expiredPointJobStep(StepBuilderFactory stepBuilderFactory, ExpiredChunkListener inactiveChunkListener, + ExpiredStepListener inativeStepListener, TaskExecutor taskExecutor) + throws Exception { + return stepBuilderFactory.get(STEP_NAME) + .>chunk(chunkSize) + .reader(expiredPointReader()) + .writer(asyncItemWriter()) + .listener(inativeStepListener) + .taskExecutor(taskExecutor) + .throttleLimit(2) + .allowStartIfComplete(true) + .taskExecutor(taskExecutor()) + .build(); + } + + @Bean + public TaskExecutor taskExecutor() { + var executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(15); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setThreadNamePrefix("Thread N-> :"); + return executor; + } + + @Bean + @StepScope + public JdbcCursorItemReader expiredPointReader() throws Exception { + String sql = "SELECT * FROM point WHERE expire_at >= date_add(now(), interval 1 day)"; + + return new JdbcCursorItemReaderBuilder() + .name("expiredPointReader") + .fetchSize(chunkSize) + .dataSource(dataSource) + .verifyCursorPosition(false) + .sql(sql) + .rowMapper(new BeanPropertyRowMapper<>(Point.class)) + .build(); + } + + @Bean + @StepScope + public ItemProcessor expiredPointProcessor() { + return new org.springframework.batch.item.ItemProcessor() { + @Override + public Point process(Point point) throws Exception { + log.info("Current Point={}", point.getIdx()); + point.setInactive(); + return point; + } + }; + } + + @Bean + public AsyncItemWriter asyncItemWriter() { + var asyncWriter = new AsyncItemWriter(); + asyncWriter.setDelegate(salesInfoKafkaItemWriter()); + return asyncWriter; + } + + @Bean + @SneakyThrows + public KafkaItemWriter salesInfoKafkaItemWriter() { + var kafkaItemWriter = new KafkaItemWriter(); + kafkaItemWriter.setKafkaTemplate(salesInfoKafkaTemplate); + kafkaItemWriter.setItemKeyMapper(salesInfo -> String.valueOf(salesInfo.getIdx())); + kafkaItemWriter.setDelete(Boolean.FALSE); + kafkaItemWriter.afterPropertiesSet(); + return kafkaItemWriter; + } + +} diff --git a/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/listner/ExpiredIJobListener.java b/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/listner/ExpiredIJobListener.java new file mode 100644 index 0000000..2eef01f --- /dev/null +++ b/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/listner/ExpiredIJobListener.java @@ -0,0 +1,22 @@ +package com.flab.goodchoicebatch.listner; + +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class ExpiredIJobListener implements JobExecutionListener { + @Override + public void beforeJob(JobExecution jobExecution) { + log.info("---------------> Before job execution"); + } + + @Override + public void afterJob(JobExecution jobExecution) { + log.info("---------------> After Job"); + } + +} diff --git a/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/listner/ExpiredStepListener.java b/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/listner/ExpiredStepListener.java new file mode 100644 index 0000000..ef9a9b6 --- /dev/null +++ b/goodchoice-batch/src/main/java/com/flab/goodchoicebatch/listner/ExpiredStepListener.java @@ -0,0 +1,24 @@ +package com.flab.goodchoicebatch.listner; + +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.AfterStep; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class ExpiredStepListener { + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + log.info("Before Step"); + } + + @AfterStep + public void afterStep(StepExecution stepExecution) { + log.info("After Step"); + } + +} diff --git a/goodchoice-batch/src/main/resources/application.yml b/goodchoice-batch/src/main/resources/application.yml new file mode 100644 index 0000000..89c7619 --- /dev/null +++ b/goodchoice-batch/src/main/resources/application.yml @@ -0,0 +1,50 @@ +server: + port: 8081 + +spring: + profiles: + active: live + +# spring.batch.job.names: ${job.name:NONE} + +--- +spring: + h2: + console: + path: /h2-console + enabled: true + datasource: + url: jdbc:h2:mem:testdb; + username: sa + password: + driver-class-name: org.h2.Driver + config: + activate: + on-profile: local +--- +spring: + batch: + job: + enabled: true + datasource: + url: jdbc:mysql://localhost:3306/good_choice + username: root + password: 1234 + driver-class-name: com.mysql.cj.jdbc.Driver + jpa: + show-sql: true + hibernate: + ddl-auto: none + kafka: + template: + default-topic: point + producer: + bootstrap-servers: localhost:9092 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + logging: + level: + org.hibernate.SQL: debug + config: + activate: + on-profile: live diff --git a/goodchoice-consumer/src/test/java/com/flab/goodchoicebatch/ExpiredPointTests.java b/goodchoice-consumer/src/test/java/com/flab/goodchoicebatch/ExpiredPointTests.java new file mode 100644 index 0000000..7ff2b29 --- /dev/null +++ b/goodchoice-consumer/src/test/java/com/flab/goodchoicebatch/ExpiredPointTests.java @@ -0,0 +1,48 @@ +package com.flab.goodchoicebatch; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import com.flab.goodchoicepoint.domain.Point; +import com.flab.goodchoicepoint.infrastructure.PointRepository; + +@RunWith(SpringRunner.class) +@SpringBatchTest +@SpringBootTest(classes = { ExpiredPoint.class, TestBatchConfig.class }) +public class ExpiredPointTests { + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Autowired + private PointRepository pointRepository; + + @Test + public void 포인트의_상태가_변경된다() throws Exception { + // given + for (long i = 0; i < 50; i++) { + pointRepository.save(Point.builder() + .idx(i) + .memberId(1L) + .itemId(94L) + .amount("3000") + .build()); + } + + // when + JobExecution jobExecution = jobLauncherTestUtils.launchJob(); + + // then + Assertions.assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + Assertions.assertThat(pointRepository.findExpirPoints().size()).isEqualTo(50); + + } +} diff --git a/goodchoice-point/build.gradle b/goodchoice-point/build.gradle new file mode 100644 index 0000000..04a15b8 --- /dev/null +++ b/goodchoice-point/build.gradle @@ -0,0 +1,14 @@ + +bootJar { enabled = false } +jar { enabled = true } + +group = 'com.flab' +version = '0.0.1-SNAPSHOT' +sourceCompatibility = '17' + +dependencies { + api 'org.springframework.boot:spring-boot-starter-data-jpa' +} +tasks.register("prepareKotlinBuildScriptModel") {} + + diff --git a/goodchoice-point/gradle/wrapper/gradle-wrapper.jar b/goodchoice-point/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..c1962a7 Binary files /dev/null and b/goodchoice-point/gradle/wrapper/gradle-wrapper.jar differ diff --git a/goodchoice-point/gradle/wrapper/gradle-wrapper.properties b/goodchoice-point/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..37aef8d --- /dev/null +++ b/goodchoice-point/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip +networkTimeout=10000 +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/goodchoice-point/src/main/java/com/flab/goodchoicepoint/domain/Point.java b/goodchoice-point/src/main/java/com/flab/goodchoicepoint/domain/Point.java new file mode 100644 index 0000000..ad61e7f --- /dev/null +++ b/goodchoice-point/src/main/java/com/flab/goodchoicepoint/domain/Point.java @@ -0,0 +1,71 @@ +package com.flab.goodchoicepoint.domain; + +import java.time.LocalDateTime; + +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; + +import org.hibernate.annotations.CreationTimestamp; +import org.hibernate.annotations.UpdateTimestamp; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@ToString +@Entity +@Getter +@Setter +@NoArgsConstructor +public class Point { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long idx; + + private Long memberId; + + private Long itemId; + + @Enumerated(EnumType.STRING) + private Status status; + + private String amount; + + @CreationTimestamp + private LocalDateTime createdAt; + + @UpdateTimestamp + private LocalDateTime expireAt; + + @Getter + @RequiredArgsConstructor + public enum Status { + VALID("활성화"), INVALID("비화성화"); + + private final String description; + } + + @Builder + public Point(Long memberId, Long itemId, String amount) { + this.memberId = memberId; + this.itemId = itemId; + this.amount = amount; + this.status = Status.VALID; + this.expireAt = LocalDateTime.now().plusYears(1); + } + + public Point setInactive() { + status = Status.INVALID; + return this; + } + +} diff --git a/goodchoice-point/src/main/java/com/flab/goodchoicepoint/infrastructure/PointRepository.java b/goodchoice-point/src/main/java/com/flab/goodchoicepoint/infrastructure/PointRepository.java new file mode 100644 index 0000000..abbb83e --- /dev/null +++ b/goodchoice-point/src/main/java/com/flab/goodchoicepoint/infrastructure/PointRepository.java @@ -0,0 +1,17 @@ +package com.flab.goodchoicepoint.infrastructure; + +import java.util.List; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.stereotype.Repository; + +import com.flab.goodchoicepoint.domain.Point; + +@Repository +public interface PointRepository extends JpaRepository { + + @Query(value = "select * from point where status = 'VALID'", nativeQuery = true) + List findExpirPoints(); + +} diff --git a/settings.gradle b/settings.gradle index 9075b58..314c664 100644 --- a/settings.gradle +++ b/settings.gradle @@ -12,6 +12,7 @@ include 'goodchoice-member' include 'goodchoice-coupon' include 'goodchoice-order' include 'goodchoice-item' +include 'goodchoice-point' include 'goodchoice-batch' include 'goodchoice-redis' include 'goodchoice-mysql'