-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat : 배치 포인트 만료 (1) #61
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,13 @@ | ||
dependencies { | ||
implementation 'org.springframework.boot:spring-boot-starter-batch' | ||
implementation 'org.springframework.batch:spring-batch-integration' | ||
implementation 'org.springframework.kafka:spring-kafka' | ||
implementation 'org.springframework.boot:spring-boot-starter-data-jpa' | ||
|
||
testImplementation 'org.springframework.batch:spring-batch-test' | ||
} | ||
|
||
runtimeOnly 'com.h2database:h2' | ||
runtimeOnly 'com.mysql:mysql-connector-j' | ||
|
||
tasks.register("prepareKotlinBuildScriptModel") {} | ||
implementation project(':goodchoice-point') | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package com.flab; | ||
|
||
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; | ||
import org.springframework.boot.SpringApplication; | ||
import org.springframework.boot.autoconfigure.SpringBootApplication; | ||
|
||
@EnableBatchProcessing | ||
@SpringBootApplication | ||
public class GoodchoiceBatchApplication { | ||
|
||
public static void main(String[] args) { | ||
SpringApplication.run(GoodchoiceBatchApplication.class, args); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package com.flab.goodchoicebatch.jobs; | ||
|
||
import org.springframework.batch.core.Job; | ||
import org.springframework.batch.core.Step; | ||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; | ||
import org.springframework.batch.core.configuration.annotation.JobScope; | ||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; | ||
import org.springframework.batch.core.configuration.annotation.StepScope; | ||
import org.springframework.batch.core.launch.support.RunIdIncrementer; | ||
import org.springframework.batch.item.ItemProcessor; | ||
import org.springframework.batch.item.ItemWriter; | ||
import org.springframework.batch.item.database.JdbcBatchItemWriter; | ||
import org.springframework.batch.item.database.JdbcCursorItemReader; | ||
import org.springframework.batch.item.database.JpaItemWriter; | ||
import org.springframework.batch.item.database.JpaPagingItemReader; | ||
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; | ||
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; | ||
import org.springframework.batch.item.kafka.KafkaItemWriter; | ||
import org.springframework.batch.integration.async.AsyncItemWriter; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.core.task.SimpleAsyncTaskExecutor; | ||
import org.springframework.core.task.TaskExecutor; | ||
import org.springframework.jdbc.core.BeanPropertyRowMapper; | ||
import org.springframework.jdbc.core.JdbcTemplate; | ||
import org.springframework.kafka.core.KafkaTemplate; | ||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | ||
|
||
import com.flab.goodchoicebatch.listner.ExpiredChunkListener; | ||
import com.flab.goodchoicebatch.listner.ExpiredIJobListener; | ||
import com.flab.goodchoicebatch.listner.ExpiredStepListener; | ||
import com.flab.goodchoicepoint.domain.Point; | ||
|
||
import lombok.RequiredArgsConstructor; | ||
import lombok.SneakyThrows; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.sql.Connection; | ||
import java.sql.DriverManager; | ||
import java.sql.PreparedStatement; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.util.List; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
|
||
import javax.batch.api.chunk.ItemReader; | ||
import javax.persistence.EntityManagerFactory; | ||
import javax.sql.DataSource; | ||
|
||
@Slf4j | ||
@RequiredArgsConstructor | ||
@Configuration | ||
public class ExpiredPointJobConfig { | ||
|
||
private int chunkSize = 10; | ||
|
||
static final String JOB_NAME = "expiredPointJob"; | ||
|
||
static final String STEP_NAME = "expiredJobStep"; | ||
|
||
private final DataSource dataSource; | ||
|
||
private final KafkaTemplate<String, Point> salesInfoKafkaTemplate; | ||
|
||
@Bean | ||
public Job expiredPointJob(JobBuilderFactory jobBuilderFactory, ExpiredIJobListener ExpiredIJobListener, | ||
Step expiredJobStep) { | ||
return jobBuilderFactory.get(JOB_NAME) | ||
.preventRestart() | ||
.incrementer(new RunIdIncrementer()) | ||
.start(expiredJobStep) | ||
.listener(ExpiredIJobListener) | ||
.build(); | ||
} | ||
|
||
@Bean | ||
@JobScope | ||
public Step expiredPointJobStep(StepBuilderFactory stepBuilderFactory, ExpiredChunkListener inactiveChunkListener, | ||
ExpiredStepListener inativeStepListener, TaskExecutor taskExecutor) | ||
throws Exception { | ||
return stepBuilderFactory.get(STEP_NAME) | ||
.<Point, Future<Point>>chunk(chunkSize) | ||
.reader(expiredPointReader()) | ||
.writer(asyncItemWriter()) | ||
.listener(inativeStepListener) | ||
.taskExecutor(taskExecutor) | ||
.throttleLimit(2) | ||
.allowStartIfComplete(true) | ||
.taskExecutor(taskExecutor()) | ||
.build(); | ||
} | ||
|
||
@Bean | ||
public TaskExecutor taskExecutor() { | ||
var executor = new ThreadPoolTaskExecutor(); | ||
executor.setCorePoolSize(10); | ||
executor.setMaxPoolSize(10); | ||
executor.setQueueCapacity(15); | ||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); | ||
executor.setThreadNamePrefix("Thread N-> :"); | ||
return executor; | ||
} | ||
|
||
@Bean | ||
@StepScope | ||
public JdbcCursorItemReader<Point> expiredPointReader() throws Exception { | ||
String sql = "SELECT * FROM point WHERE expire_at >= date_add(now(), interval 1 day)"; | ||
|
||
return new JdbcCursorItemReaderBuilder<Point>() | ||
.name("expiredPointReader") | ||
.fetchSize(chunkSize) | ||
.dataSource(dataSource) | ||
.verifyCursorPosition(false) | ||
.sql(sql) | ||
.rowMapper(new BeanPropertyRowMapper<>(Point.class)) | ||
.build(); | ||
} | ||
|
||
@Bean | ||
@StepScope | ||
public ItemProcessor<Point, Point> expiredPointProcessor() { | ||
return new org.springframework.batch.item.ItemProcessor<Point, Point>() { | ||
@Override | ||
public Point process(Point point) throws Exception { | ||
log.info("Current Point={}", point.getIdx()); | ||
point.setInactive(); | ||
return point; | ||
} | ||
}; | ||
} | ||
|
||
@Bean | ||
public AsyncItemWriter<Point> asyncItemWriter() { | ||
var asyncWriter = new AsyncItemWriter<Point>(); | ||
asyncWriter.setDelegate(salesInfoKafkaItemWriter()); | ||
return asyncWriter; | ||
} | ||
|
||
@Bean | ||
@SneakyThrows | ||
public KafkaItemWriter<String, Point> salesInfoKafkaItemWriter() { | ||
var kafkaItemWriter = new KafkaItemWriter<String, Point>(); | ||
kafkaItemWriter.setKafkaTemplate(salesInfoKafkaTemplate); | ||
kafkaItemWriter.setItemKeyMapper(salesInfo -> String.valueOf(salesInfo.getIdx())); | ||
kafkaItemWriter.setDelete(Boolean.FALSE); | ||
kafkaItemWriter.afterPropertiesSet(); | ||
return kafkaItemWriter; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package com.flab.goodchoicebatch.listner; | ||
|
||
import org.springframework.batch.core.JobExecution; | ||
import org.springframework.batch.core.JobExecutionListener; | ||
import org.springframework.stereotype.Component; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
@Component | ||
public class ExpiredIJobListener implements JobExecutionListener { | ||
@Override | ||
public void beforeJob(JobExecution jobExecution) { | ||
log.info("---------------> Before job execution"); | ||
} | ||
|
||
@Override | ||
public void afterJob(JobExecution jobExecution) { | ||
log.info("---------------> After Job"); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package com.flab.goodchoicebatch.listner; | ||
|
||
import org.springframework.batch.core.StepExecution; | ||
import org.springframework.batch.core.annotation.AfterStep; | ||
import org.springframework.batch.core.annotation.BeforeStep; | ||
import org.springframework.stereotype.Component; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
@Component | ||
public class ExpiredStepListener { | ||
|
||
@BeforeStep | ||
public void beforeStep(StepExecution stepExecution) { | ||
log.info("Before Step"); | ||
} | ||
|
||
@AfterStep | ||
public void afterStep(StepExecution stepExecution) { | ||
log.info("After Step"); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
server: | ||
port: 8081 | ||
|
||
spring: | ||
profiles: | ||
active: live | ||
|
||
# spring.batch.job.names: ${job.name:NONE} | ||
|
||
--- | ||
spring: | ||
h2: | ||
console: | ||
path: /h2-console | ||
enabled: true | ||
datasource: | ||
url: jdbc:h2:mem:testdb; | ||
username: sa | ||
password: | ||
driver-class-name: org.h2.Driver | ||
config: | ||
activate: | ||
on-profile: local | ||
--- | ||
spring: | ||
batch: | ||
job: | ||
enabled: true | ||
datasource: | ||
url: jdbc:mysql://localhost:3306/good_choice | ||
username: root | ||
password: 1234 | ||
driver-class-name: com.mysql.cj.jdbc.Driver | ||
jpa: | ||
show-sql: true | ||
hibernate: | ||
ddl-auto: none | ||
kafka: | ||
template: | ||
default-topic: point | ||
producer: | ||
bootstrap-servers: localhost:9092 | ||
key-serializer: org.apache.kafka.common.serialization.StringSerializer | ||
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer | ||
logging: | ||
level: | ||
org.hibernate.SQL: debug | ||
config: | ||
activate: | ||
on-profile: live |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package com.flab.goodchoicebatch; | ||
|
||
import org.assertj.core.api.Assertions; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.springframework.batch.core.BatchStatus; | ||
import org.springframework.batch.core.JobExecution; | ||
import org.springframework.batch.test.JobLauncherTestUtils; | ||
import org.springframework.batch.test.context.SpringBatchTest; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.boot.test.context.SpringBootTest; | ||
import org.springframework.test.context.junit4.SpringRunner; | ||
|
||
import com.flab.goodchoicepoint.domain.Point; | ||
import com.flab.goodchoicepoint.infrastructure.PointRepository; | ||
|
||
@RunWith(SpringRunner.class) | ||
@SpringBatchTest | ||
@SpringBootTest(classes = { ExpiredPoint.class, TestBatchConfig.class }) | ||
public class ExpiredPointTests { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
@Autowired | ||
private JobLauncherTestUtils jobLauncherTestUtils; | ||
|
||
@Autowired | ||
private PointRepository pointRepository; | ||
|
||
@Test | ||
public void 포인트의_상태가_변경된다() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 해피 케이스만 있네요 테스트 코드는 코드에 변경사항을 감지하기도 하지만 또 다른 기능도 있는데 뭐가 있을까요? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
// given | ||
for (long i = 0; i < 50; i++) { | ||
pointRepository.save(Point.builder() | ||
.idx(i) | ||
.memberId(1L) | ||
.itemId(94L) | ||
.amount("3000") | ||
.build()); | ||
} | ||
|
||
// when | ||
JobExecution jobExecution = jobLauncherTestUtils.launchJob(); | ||
|
||
// then | ||
Assertions.assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); | ||
Assertions.assertThat(pointRepository.findExpirPoints().size()).isEqualTo(50); | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
|
||
bootJar { enabled = false } | ||
jar { enabled = true } | ||
|
||
group = 'com.flab' | ||
version = '0.0.1-SNAPSHOT' | ||
sourceCompatibility = '17' | ||
|
||
dependencies { | ||
api 'org.springframework.boot:spring-boot-starter-data-jpa' | ||
} | ||
tasks.register("prepareKotlinBuildScriptModel") {} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip | ||
networkTimeout=10000 | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
live 인데 local이 맞나요?