完美解决,RocketMQ如何支持多事务消息?

发布时间:2025-05-18 00:10:15 作者:益华网络 来源:undefined 浏览量(1) 点赞(1)
摘要:来源:JAVA日知录 1. 问题背景 在实际开发中,我们常常会面临多事务消息的场景,例如在DailyMart的订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。这两个业务逻辑都需要通过事务消

来源:JAVA日知录

1. 问题背景

在实际开发中,我们常常会面临多事务消息的场景,例如在DailyMart的订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。这两个业务逻辑都需要通过事务消息来保证分布式事务。

为了处理这种情况,我们可能会考虑在订单模块中创建两个事务消息监听器,分别用于处理库存扣减和积分赠送的事务处理和事务回查。

@Component@Slf

4j

//处理订单支付的事务监听器public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener 

{

  @Override  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) 

{

    ......

    //处理订单支付逻辑

   }

  @Override  public RocketMQLocalTransactionState checkLocalTransaction(Message message) 

{

      ......

      //检查订单处理逻辑

   }

}

@Component@Slf

4j

//处理订单收货的事务监听器public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener 

{

  @Override  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) 

{

    ......

   }

  @Override  public RocketMQLocalTransactionState checkLocalTransaction(Message message) 

{

      ......

   }

}

然而,当我们信心满满地完成业务逻辑编写并启动服务时,可能会遇到如下错误:rocketMQTemplate already exists RocketMQLocalTransactionListener

在rocketmq-spring-boot-starter版本低于2.1.0的项目中,可以使用多个 @RocketMQTransactionListener 监听不同的 txProducerGroup 来发送不同类型的事务消息到topic。然而,从 RocketMQ-Spring 2.1.0 版本开始,注解 @RocketMQTransactionListener 不能设置 txProducerGroup、ak、sk,这些值均需与对应的 RocketMQTemplate 保持一致。通过阅读源码 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已经存在了 RocketMQTransactionListener 则会出现上述错误。

2. 如何解决

为了在保证系统只有一个 RocketMQTransactionListener 的前提下实现多事务消息,我们可以将 RocketMQLocalTransactionListener 不处理具体业务逻辑,而是将其作为一个分发器使用。

在生产者发送事务消息时指定对应的事务处理器 ,并将事务处理器放置在消息头上发送出去,在 RocketMQTransactionListener 中根据消息头选择具体的事务处理器来实现业务逻辑。

具体实现如下:

2.1 定义事务消息处理接口

首先,定义公共的事务消息处理接口,所有事务消息都实现此接口而非 RocketMQ 默认的 RocketMQLocalTransactionListener。

public interface TransactionMessageHandler 

{

    

/**

    * 执行本地事务

    * @param

 payload 消息体

    * @param

 arg 参数

    */
    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg)

;

    

/**

     * 检查本地执行状态

     * @param

 payload 消息体

     * @return

 执行结果

     */
    RocketMQLocalTransactionState checkLocalTransaction(Object payload)

;

}

2.2 修改事务消息发送工具类,指定消息处理器

public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) 

{  

  if(transactionMessageListener == null

){

    throw new IllegalArgumentException("transactionMessageListener must not null"

);

  }

  String destination = buildDestination(topic, tag);

  Message sendMessage = MessageBuilder.withPayload(message)

    .setHeader(RocketMQHeaders.KEYS, message.getKey())

    .setHeader(SOURCE_HEADER, message.getSource())

    .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())

    .build();

  TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null

);

  log.info("[{}]事务消息[{}]发送结果[{}]"

, destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));

  return

 sendResult;

}

2.3 修改RocketMQ事务消息监听器

@Slf

4j

@RocketMQTransactionListenerpublic class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener 

{

    private final

 Map transactionMessageHandlerMap;

    public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) 

{

        this

.transactionMessageHandlerMap = transactionMessageHandlerMap;

    }

    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) 

{

        log.info("消费者收到事务消息[{}]"

, JSONObject.toJSON(message));

        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);

        if (null

 == listenerName) {

            throw new RuntimeException("not params transactionMessageListener"

);

        }

        RocketMQLocalTransactionState state;

        Object payload = message.getPayload();

        try

 {

            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);

            if (null

 == messageHandler) {

                throw new RuntimeException("not match condition TransactionMessageHandler"

);

            }

            state = messageHandler.executeLocalTransaction(payload, arg);

        } catch

 (Exception e) {

            log.error("rocket transaction message executeLocal error:{}"

, e.getMessage());

            return

 RocketMQLocalTransactionState.ROLLBACK;

        }

        return

 state;

    }

    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) 

{

        log.info("消费者收到事务回查消息[{}]"

, JsonUtils.obj2String(message.getHeaders()));

        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);

        if (null

 == listenerName) {

            throw new RuntimeException("not params transactionMessageListener"

);

        }

        RocketMQLocalTransactionState state;

        try

 {

            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);

            if (null

 == messageHandler) {

                throw new RuntimeException("not match condition TransactionMessageHandler"

);

            }

            state = messageHandler.checkLocalTransaction(message.getPayload());

        } catch

 (Exception e) {

            log.error("rocket transaction message executeLocal error:{}"

, e.getMessage());

            return

 RocketMQLocalTransactionState.ROLLBACK;

        }

        return

 state;

    }

}

在上述代码中,根据消息头中的TRANSACTION_MESSAGE_HEADER参数选择对应的事务处理器来处理事务消息。

在 DailyMart 中有一个公共组件 dailymart-rocketmq-spring-boot-starter 专门用于 RocketMQ 消息发送监听的封装,因此我们也将事务消息的处理逻辑封装到了此组件中。

2.4 修改事务消息处理逻辑

所有的事务消息处理逻辑都实现 TransactionMessageHandler 接口,以订单支付的处理逻辑为例:

@Component@Slf

4j

public class OrderPaidTransactionConsumer implements TransactionMessageHandler 

{

    @Resource    private

 TransactionTemplate transactionTemplate;

    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) 

{

        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class)

;

        ...

    }

    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Object payload) 

{

        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class)

;

        ...

    }

}

2.5 修改事务消息发送逻辑,指定事务处理器

TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER""ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class)

;

小结

本文解决了在 RocketMQ 2.1.0 版本以后,无法简单使用多个 @RocketMQTransactionListener 的问题。通过引入事务消息处理接口 TransactionMessageHandler,我们将原有的事务处理器改造成了一个分发器,使得在 DailyMart 项目中可以轻松处理多事务消息的场景。本文中涉及的代码都已经上传到github,需要的同学可以通过文末方式进行获取。

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!