幂等注解优化

1、代码对比

image-20221104114830511

优化前

    @Override
    public ConsumerStatus onMessage(Message msg) {
        String key = msg.getKeys();
        String jsonStr = MessageHelper.toString(msg.getBody());
        log.info("getMsg:{}", jsonStr);
        //保证消费幂等
        if (redisService.get(key) != null) {
            log.warn("The message whose key is {} has been consumed, will ignore!", key);
            return ConsumerStatus.SUCCESS;
        }
        try {
            AccountRebindEmailVo accountRebindEmailVo = JSON.parseObject(jsonStr, AccountRebindEmailVo.class);
            coursewareShareMapper.updateReceiverPhoneByReceiverId(accountRebindEmailVo.getUid(), accountRebindEmailVo.getEmail());
            try {
                redisService.put(key, expired, "");
            } catch (Exception ex) {
                //ignore
            }
            return ConsumerStatus.SUCCESS;
        } catch (Exception ex) {
            log.error("AccountPlatformSubscriber consume fail:{}", ex.toString());
            return ConsumerStatus.FAIL;
        }
    }

优化后

    @Idempotent(value = "#msg.getUniqKey()", expireTime = 5, timeUnit = TimeUnit.HOURS)
    @Override
    public ConsumerStatus onMessage(Message msg) {
        String jsonStr = MessageHelper.toString(msg.getBody());
        log.info("getMsg:{}", jsonStr);
        AccountRebindEmailVo accountRebindEmailVo = JSON.parseObject(jsonStr, AccountRebindEmailVo.class);
        coursewareShareMapper.updateReceiverPhoneByReceiverId(accountRebindEmailVo.getUid(), accountRebindEmailVo.getEmail());
        return ConsumerStatus.SUCCESS;
    }

2、实现

幂等注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
    String value();
    int expireTime() default 60;
    TimeUnit timeUnit() default TimeUnit.SECONDS;
    String info() default "重复执行,将中止执行";
}

切面处理:IdempotentAspect

/**
 * @description: 幂等注解切面
 * @author: wuqinhong
 * @date: 2022/11/4
 **/
@Slf4j
@Aspect
@Component
public class IdempotentAspect {
    @Autowired
    private BaseRedisService redisService;

    /**
     * spel
     */
    private ExpressionParser parser = new SpelExpressionParser();

    /**
     * spring下获取参数名的组件
     */
    private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();


    @Around("@annotation(com.encloud.annotation.Idempotent)")
    public void handle(ProceedingJoinPoint joinPoint) throws Throwable {
        //获取注解的属性值
        Idempotent idempotent = getDeclaredAnnotation(joinPoint);
        String exp = idempotent.value();
        int expireTime = idempotent.expireTime();
        TimeUnit timeUnit = idempotent.timeUnit();
        int seconds = (int) timeUnit.toSeconds(expireTime);
        String info = idempotent.info();

        //根据sqel表达式获取值
        Object[] args = joinPoint.getArgs();
        Method method = getMethod(joinPoint);
        EvaluationContext context = this.bindParam(method, args);
        Expression expression = parser.parseExpression(exp);
        String key = (String) expression.getValue(context);

        //redis检查是否存在key
        if (redisService.get(key) != null) {
            log.warn(info);
            return;
        }
        //执行切点方法
        joinPoint.proceed();

        //redis设置key
        redisService.put(key, seconds, null);
    }

    /**
     * 获取方法中声明的注解
     *
     * @param joinPoint
     * @return
     * @throws NoSuchMethodException
     */
    public Idempotent getDeclaredAnnotation(JoinPoint joinPoint) throws NoSuchMethodException {
        // 获取方法名
        String methodName = joinPoint.getSignature().getName();
        // 反射获取目标类
        Class<?> targetClass = joinPoint.getTarget().getClass();
        // 拿到方法对应的参数类型
        Class<?>[] parameterTypes = ((MethodSignature) joinPoint.getSignature()).getParameterTypes();
        // 根据类、方法、参数类型(重载)获取到方法的具体信息
        Method objMethod = targetClass.getMethod(methodName, parameterTypes);
        // 拿到方法定义的注解信息
        Idempotent annotation = objMethod.getDeclaredAnnotation(Idempotent.class);
        // 返回
        return annotation;
    }

    /**
     * 获取当前执行的方法
     *
     * @param pjp
     * @return
     * @throws NoSuchMethodException
     */
    private Method getMethod(ProceedingJoinPoint pjp) throws NoSuchMethodException {
        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
        Method method = methodSignature.getMethod();
        Method targetMethod = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes());
        return targetMethod;
    }

    /**
     * 将方法的参数名和参数值绑定
     *
     * @param method 方法,根据方法获取参数名
     * @param args   方法的参数值
     * @return
     */
    private EvaluationContext bindParam(Method method, Object[] args) {
        //获取方法的参数名
        String[] params = discoverer.getParameterNames(method);
        //将参数名与参数值对应起来
        EvaluationContext context = new StandardEvaluationContext();
        for (int len = 0; len < params.length; len++) {
            context.setVariable(params[len], args[len]);
        }
        return context;
    }
}

3、技术点

Spel表达式

分布式锁

反射、AOP

MQ重试