RocketMq事务消息发送代码流程详解

所属分类: 软件编程 / java 阅读数: 55
收藏 0 赞 0 分享

一、RocketMq事务消息流程:

1、首先会向broker发送一个预请求消息,消费者不可见

2、回调执行本地事务(比如操作数据库)

3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功。

二、RocketMq事务消息实例:

1、引入rocketMq相关的依赖:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>

2、创建一个TransactionProducer类:

public class TransactionProducer {

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
    //创建生产者并制定组名
    TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
    //2.指定Nameserver地址
    producer.setNamesrvAddr("192.168.***.***:9876");
    //3、指定消息监听对象用于执行本地事务和消息回查
    TransactionListener listener = new TransactionListenerImol();
    producer.setTransactionListener(listener);
    //4、线程池
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
        Thread thread = newThread(r);
        thread.setName("client-tanscation-msg-check-thread");
        return thread;
      }
    });
    producer.setExecutorService(executorService);
    //5、启动producer
    producer.start();

    //6.创建消息对象,指定主题Topic、Tag和消息体 String topic, String tags, String keys, byte[] body
    Message message = new Message("Topic_transaction_demo", //主题
        "Tags", //主要用于消息过滤
        "Key_1", //消息唯一值
        ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

    //7、发送事务消息
    TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

    producer.shutdown();
  }
}

3、发送事务消息还需要一个事务监听对象,它实现TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:

public class TransactionListenerImol implements TransactionListener {
  //存储事务状态信息 key:事务id value:当前事务执行的状态
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  //执行本地事务
  @Override
  public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //事务id
    String transactionId = message.getTransactionId();
    //0:执行中,状态未知 1:执行成功 2:执行失败
    localTrans.put(transactionId, 0);
    //业务执行,本地事务,service
    System.out.println("hello-demo-transaction");
    try {
      System.out.println("正在执行本地事务---");
      Thread.sleep(60000*2);
      System.out.println("本地事务执行成功---");
      localTrans.put(transactionId, 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
      localTrans.put(transactionId, 2);
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }

  //消息回查
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    //获取对应事务的状态信息
    String transactionId = messageExt.getTransactionId();
    //获取对应事务id执行状态
    Integer status = localTrans.get(transactionId);
    //消息回查
    System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);
    switch (status) {
      case 0:
        return LocalTransactionState.UNKNOW;
      case 1:
        return LocalTransactionState.COMMIT_MESSAGE;
      case 2:
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

更多精彩内容其他人还在看

SpringBoot中使用Ehcache的详细教程

EhCache 是一个纯 Java 的进程内缓存框架,具有快速、精干等特点,是 Hibernate 中默认的 CacheProvider。这篇文章主要介绍了SpringBoot中使用Ehcache的相关知识,需要的朋友可以参考下
收藏 0 赞 0 分享

在idea 中添加和删除模块Module操作

这篇文章主要介绍了在idea 中添加和删除模块Module操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
收藏 0 赞 0 分享

java spring整合junit操作(有详细的分析过程)

这篇文章主要介绍了java spring整合junit操作(有详细的分析过程),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
收藏 0 赞 0 分享

详解JAVA 弱引用

这篇文章主要介绍了 JAVA 弱引用的相关资料,帮助大家更好的理解和学习java引用对象,感兴趣的朋友可以了解下
收藏 0 赞 0 分享

深入了解JAVA 虚引用

这篇文章主要介绍了JAVA 虚引用的相关资料,帮助大家更好的理解和学习JAVA,感兴趣的朋友可以了解下
收藏 0 赞 0 分享

详解JAVA 强引用

这篇文章主要介绍了JAVA 强引用的相关资料,帮助大家更好的理解和学习,感兴趣的朋友可以了解下
收藏 0 赞 0 分享

java中的按位与(&)用法说明

这篇文章主要介绍了java中的按位与(&)用法说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
收藏 0 赞 0 分享

深入了解JAVA 软引用

这篇文章主要介绍了JAVA 软引用的相关资料,帮助大家更好的理解和学习,感兴趣的朋友可以了解下
收藏 0 赞 0 分享

利用MyBatis实现条件查询的方法汇总

这篇文章主要给大家介绍了关于利用MyBatis实现条件查询的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者使用MyBatis具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
收藏 0 赞 0 分享

Intellij IDEA 与maven 版本不符 Unable to import maven project See logs for details: No implementation for org.apache.maven.model.path.PathTranslator was bound

这篇文章主要介绍了Intellij IDEA 与maven 版本不符 Unable to import maven project See logs for details: No implementation for org.apache.maven.model.path.Pa
收藏 0 赞 0 分享
查看更多