幂等注解优化
1、代码对比

优化前
@Override
public ConsumerStatus onMessage(Message msg) {
String key = msg.getKeys();
String jsonStr = MessageHelper.toString(msg.getBody());
log.info("getMsg:{}", jsonStr);
//保证消费幂等
if (redisService.get(key) != null) {
log.warn("The message whose key is {} has been consumed, will ignore!", key);
return ConsumerStatus.SUCCESS;
}
try {
AccountRebindEmailVo accountRebindEmailVo = JSON.parseObject(jsonStr, AccountRebindEmailVo.class);
coursewareShareMapper.updateReceiverPhoneByReceiverId(accountRebindEmailVo.getUid(), accountRebindEmailVo.getEmail());
try {
redisService.put(key, expired, "");
} catch (Exception ex) {
//ignore
}
return ConsumerStatus.SUCCESS;
} catch (Exception ex) {
log.error("AccountPlatformSubscriber consume fail:{}", ex.toString());
return ConsumerStatus.FAIL;
}
}
优化后
@Idempotent(value = "#msg.getUniqKey()", expireTime = 5, timeUnit = TimeUnit.HOURS)
@Override
public ConsumerStatus onMessage(Message msg) {
String jsonStr = MessageHelper.toString(msg.getBody());
log.info("getMsg:{}", jsonStr);
AccountRebindEmailVo accountRebindEmailVo = JSON.parseObject(jsonStr, AccountRebindEmailVo.class);
coursewareShareMapper.updateReceiverPhoneByReceiverId(accountRebindEmailVo.getUid(), accountRebindEmailVo.getEmail());
return ConsumerStatus.SUCCESS;
}
2、实现
幂等注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
String value();
int expireTime() default 60;
TimeUnit timeUnit() default TimeUnit.SECONDS;
String info() default "重复执行,将中止执行";
}
切面处理:IdempotentAspect
/**
* @description: 幂等注解切面
* @author: wuqinhong
* @date: 2022/11/4
**/
@Slf4j
@Aspect
@Component
public class IdempotentAspect {
@Autowired
private BaseRedisService redisService;
/**
* spel
*/
private ExpressionParser parser = new SpelExpressionParser();
/**
* spring下获取参数名的组件
*/
private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
@Around("@annotation(com.encloud.annotation.Idempotent)")
public void handle(ProceedingJoinPoint joinPoint) throws Throwable {
//获取注解的属性值
Idempotent idempotent = getDeclaredAnnotation(joinPoint);
String exp = idempotent.value();
int expireTime = idempotent.expireTime();
TimeUnit timeUnit = idempotent.timeUnit();
int seconds = (int) timeUnit.toSeconds(expireTime);
String info = idempotent.info();
//根据sqel表达式获取值
Object[] args = joinPoint.getArgs();
Method method = getMethod(joinPoint);
EvaluationContext context = this.bindParam(method, args);
Expression expression = parser.parseExpression(exp);
String key = (String) expression.getValue(context);
//redis检查是否存在key
if (redisService.get(key) != null) {
log.warn(info);
return;
}
//执行切点方法
joinPoint.proceed();
//redis设置key
redisService.put(key, seconds, null);
}
/**
* 获取方法中声明的注解
*
* @param joinPoint
* @return
* @throws NoSuchMethodException
*/
public Idempotent getDeclaredAnnotation(JoinPoint joinPoint) throws NoSuchMethodException {
// 获取方法名
String methodName = joinPoint.getSignature().getName();
// 反射获取目标类
Class<?> targetClass = joinPoint.getTarget().getClass();
// 拿到方法对应的参数类型
Class<?>[] parameterTypes = ((MethodSignature) joinPoint.getSignature()).getParameterTypes();
// 根据类、方法、参数类型(重载)获取到方法的具体信息
Method objMethod = targetClass.getMethod(methodName, parameterTypes);
// 拿到方法定义的注解信息
Idempotent annotation = objMethod.getDeclaredAnnotation(Idempotent.class);
// 返回
return annotation;
}
/**
* 获取当前执行的方法
*
* @param pjp
* @return
* @throws NoSuchMethodException
*/
private Method getMethod(ProceedingJoinPoint pjp) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method method = methodSignature.getMethod();
Method targetMethod = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes());
return targetMethod;
}
/**
* 将方法的参数名和参数值绑定
*
* @param method 方法,根据方法获取参数名
* @param args 方法的参数值
* @return
*/
private EvaluationContext bindParam(Method method, Object[] args) {
//获取方法的参数名
String[] params = discoverer.getParameterNames(method);
//将参数名与参数值对应起来
EvaluationContext context = new StandardEvaluationContext();
for (int len = 0; len < params.length; len++) {
context.setVariable(params[len], args[len]);
}
return context;
}
}
3、技术点
Spel表达式
分布式锁
反射、AOP
MQ重试