Skip to content

Commit

Permalink
feat(ice): Support listen TTR retry count with @IceTtrListener.
Browse files Browse the repository at this point in the history
  • Loading branch information
yizzuide committed Apr 26, 2020
1 parent ab725c1 commit c83868b
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;

Expand All @@ -19,7 +18,7 @@
*
* @author yizzuide
* @since 1.15.0
* @version 3.1.3
* @version 3.2.0
* Create at 2019/11/16 17:30
*/
@Slf4j
Expand Down Expand Up @@ -125,21 +124,42 @@ public void run() {
/**
* 处理ttr的任务
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
private void processTtrJob(DelayJob delayJob, Job<?> job) {
log.warn("Ice处理TTR的Job {},当前重试次数为{}", delayJob.getJodId(), delayJob.getRetryCount() + 1);
if (delayJob.getRetryCount() > Integer.MAX_VALUE - 1) {
log.error("Ice处理TTR的Job {}, 重试次数超过Integer.MAX_VALUE,放弃重试", delayJob.getJodId());
return;
}
int currentRetryCount = delayJob.getRetryCount() + 1;
log.warn("Ice处理TTR的Job {},当前重试次数为{}", delayJob.getJodId(), currentRetryCount);
// 检测重试次数过载
boolean overload = delayJob.getRetryCount() >= job.getRetryCount();
// 过载处理标识
boolean handleFlag = false;
// 调用TTR监听器
List<HandlerMetaData> ttrMetaDataList = IceContext.getTopicTtrMap().get(job.getTopic());
if (!CollectionUtils.isEmpty(ttrMetaDataList)) {
for (HandlerMetaData handlerMetaData : ttrMetaDataList) {
try {
TtrJob<?> ttrJob = new TtrJob<>();
ttrJob.setOverload(overload);
ttrJob.setRetryCount(currentRetryCount);
ttrJob.setJob((Job) job);
ReflectUtil.invokeWithWrapperInject(handlerMetaData.getTarget(), handlerMetaData.getMethod(), Collections.singletonList(ttrJob), TtrJob.class, tj -> tj.getJob().getBody(), (tj, body) -> tj.getJob().setBody(body));
handleFlag = true;
} catch (Exception e) {
log.error("Ice invoke TTR listener error: {}", e.getMessage(), e);
}
}
}
if (overload) {
log.error("Ice检测到 Job {} 的TTR超过预设的{}次!", job.getId(), job.getRetryCount());
boolean handleFlag = false;
// 调用TTR Overload监听器
List<HandlerMetaData> handlerMetaDataList = IceContext.getTopicTtrOverloadMap().get(job.getTopic());
if (!CollectionUtils.isEmpty(handlerMetaDataList)) {
for (HandlerMetaData handlerMetaData : handlerMetaDataList) {
Method method = handlerMetaData.getMethod();
List<HandlerMetaData> ttrOverloadMetaDataList = IceContext.getTopicTtrOverloadMap().get(job.getTopic());
if (!CollectionUtils.isEmpty(ttrOverloadMetaDataList)) {
for (HandlerMetaData handlerMetaData : ttrOverloadMetaDataList) {
try {
ReflectUtil.invokeWithWrapperInject(handlerMetaData.getTarget(), method, Collections.singletonList(job), Job.class, Job::getBody, Job::setBody);
ReflectUtil.invokeWithWrapperInject(handlerMetaData.getTarget(), handlerMetaData.getMethod(), Collections.singletonList(job), Job.class, Job::getBody, Job::setBody);
handleFlag = true;
} catch (Exception e) {
log.error("Ice invoke TTR overload listener error: {}", e.getMessage(), e);
Expand Down Expand Up @@ -183,13 +203,11 @@ private void processTtrJob(DelayJob delayJob, Job<?> job) {
// 移除delayBucket中的任务
delayBucket.remove(index, delayJob);
// 设置当前重试次数
if (delayJob.getRetryCount() < Integer.MAX_VALUE - 1) {
delayJob.setRetryCount(delayJob.getRetryCount() + 1);
}
delayJob.setRetryCount(currentRetryCount);
// 重置到当前延迟
long delayDate = System.currentTimeMillis() +
(props.isEnableDelayMultiRetryCount() ?
job.getDelay() * (delayJob.getRetryCount() + 1) * props.getRetryDelayMultiFactor() :
job.getDelay() * (currentRetryCount) * props.getRetryDelayMultiFactor() :
job.getDelay());
delayJob.setDelayTime(delayDate);
// 再次添加到任务中
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ public class IceContext implements ApplicationListener<ContextRefreshedEvent> {

private static Map<String, List<HandlerMetaData>> topicMap = new HashMap<>();

private static Map<String, List<HandlerMetaData>> topicTtrMap = new HashMap<>();

private static Map<String, List<HandlerMetaData>> topicTtrOverloadMap = new HashMap<>();


@Autowired
private IceProperties props;

Expand All @@ -35,6 +38,10 @@ public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
IceListener iceListener = (IceListener) annotation;
return iceListener.value();
}, !props.isMultiTopicListenerPerHandler());
topicTtrMap = AopContextHolder.getHandlerMetaData(IceHandler.class, IceTtrListener.class, (annotation, metaData) -> {
IceTtrListener iceTtrListener = (IceTtrListener) annotation;
return iceTtrListener.value();
}, !props.isMultiTopicListenerPerHandler());
topicTtrOverloadMap = AopContextHolder.getHandlerMetaData(IceHandler.class, IceTtrOverloadListener.class, (annotation, metaData) -> {
IceTtrOverloadListener iceTtrOverloadListener = (IceTtrOverloadListener) annotation;
return iceTtrOverloadListener.value();
Expand All @@ -45,6 +52,10 @@ static Map<String, List<HandlerMetaData>> getTopicMap() {
return topicMap;
}

static Map<String, List<HandlerMetaData>> getTopicTtrMap() {
return topicTtrMap;
}

static Map<String, List<HandlerMetaData>> getTopicTtrOverloadMap() {
return topicTtrOverloadMap;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.yizzuide.milkomeda.ice;

import org.springframework.core.annotation.AliasFor;

import java.lang.annotation.*;

/**
* IceTtrListener
* TTR重试监听器,方法参数类型固定为 {@link TtrJob}
*
* @author yizzuide
* @since 3.2.0
* Create at 2020/04/26 10:37
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface IceTtrListener {
/**
* 监听的Topic
*
* @return topic name
*/
@AliasFor("topic")
String value() default "";

/**
* 监听的Topic
*
* @return topic name
*/
@AliasFor("value")
String topic() default "";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.github.yizzuide.milkomeda.ice;

import lombok.Data;

/**
* TtrJob
* 重试任务信息
*
* @author yizzuide
* @since 3.2.0
* Create at 2020/04/26 10:52
*/
@Data
public class TtrJob<T> {
/**
* 是否重试超载
*/
private boolean isOverload;

/**
* 当前重试次数
*/
private int retryCount;

/**
* 任务
*/
private Job<T> job;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@ public String topicName() {
return "topic_product_check";
}

// TTR超载监听器
// TTR监听器(接收参数固定为TtrJob)
@IceTtrListener(topic = "#target.topicName()")
public void ttrHandle(TtrJob<Product> ttrJob) {
log.error("TTR retry with topic: {},count:{}, overload:{}, job:{}", ttrJob.getJob().getTopic(), ttrJob.getRetryCount(), ttrJob.isOverload(), ttrJob.getJob());
}

// TTR超载监听器(接收参数可以是Job<Product>、Product、Map)
@IceTtrOverloadListener(topic = "#target.topicName()")
public void ttrHandle(Product job) {
log.error("trr overload with job: {}", job);
public void ttrOverloadHandle(Product job) {
log.error("TTR overload with job: {}", job);
}
}

0 comments on commit c83868b

Please sign in to comment.