浅谈spring-boot-rabbitmq动态管理的方法

所属分类: 软件编程 / java 阅读数: 19
收藏 0 赞 0 分享

使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置

一. 关于rabbitmq监听的配置

  1. 配置属性类:RabbitProperties,包含rabbitmq的认证、监听、发送者以及其他的一些配置
  2. 自动配置类:RabbitAutoConfiguration,主要配置rabbitmq的连接工厂和发送者等,不包含监听的配置
  3. rabbitmq监听的配置是RabbitAnnotationDrivenConfiguration,是通过RabbitAutoConfiguration引入的
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
  ...
}

RabbitAnnotationDrivenConfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化

通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂SimpleRabbitListenerContainerFactory

@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
 SimpleRabbitListenerContainerFactoryConfigurer configurer,
 ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  configurer.configure(factory, connectionFactory);
  return factory;
}

既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializeContainer(SimpleMessageListenerContainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有创建监听容器和初始化的代码

/**
 * Create and start a new {@link MessageListenerContainer} using the specified factory.
 * @param endpoint the endpoint to create a {@link MessageListenerContainer}.
 * @param factory the {@link RabbitListenerContainerFactory} to use.
 * @return the {@link MessageListenerContainer}.
 */
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
 RabbitListenerContainerFactory<?> factory) {
  MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
  if (listenerContainer instanceof InitializingBean) {
   try {
      ((InitializingBean) listenerContainer).afterPropertiesSet();
   }
   catch (Exception ex) {
      throw new BeanInitializationException("Failed to initialize message listener container", ex);
   }
  }
  int containerPhase = listenerContainer.getPhase();
  if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
   if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
      throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
       this.phase + " vs " + containerPhase);
   }
   this.phase = listenerContainer.getPhase();
  }  
  return listenerContainer;
}

继续找调用这个方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,发现调用的地方很多了

看看afterPropertiesSet方法,是InitializingBean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里创建的实例。原来是在RabbitListenerAnnotationBeanPostProcessor中的私有属性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了

二. 动态管理rabbitmq监听

回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下

/**
 * @return the managed {@link MessageListenerContainer} instance(s).
 */
public Collection<MessageListenerContainer> getListenerContainers() {
  return Collections.unmodifiableCollection(this.listenerContainers.values());
}
 
@Override
public void start() {
  for (MessageListenerContainer listenerContainer : getListenerContainers()) {
   startIfNecessary(listenerContainer);
  }
}

/**
 * Start the specified {@link MessageListenerContainer} if it should be started
 * on startup or when start is called explicitly after startup.
 * @see MessageListenerContainer#isAutoStartup()
 */
private void startIfNecessary(MessageListenerContainer listenerContainer) {
  if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
   listenerContainer.start();
  }
}

@Override
public void stop() {
  for (MessageListenerContainer listenerContainer : getListenerContainers()) {
   listenerContainer.stop();
  }
}

写个controller,注入RabbitListenerEndpointRegistry,使用start()和stop()对监听进行启用停用的操作,并且RabbitListenerEndpointRegistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:

import java.util.Set;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.itopener.framework.ResultMap;
/**
 * Created by fuwei.deng on 2017年7月24日.
 */
@RestController
@RequestMapping("rabbitmq/listener")
public class RabbitMQController {

  @Resource
  private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
  
  @RequestMapping("stop")
  public ResultMap stop(){
   rabbitListenerEndpointRegistry.stop();
   return ResultMap.buildSuccess();
  }
  
  @RequestMapping("start")
  public ResultMap start(){
   rabbitListenerEndpointRegistry.start();
   return ResultMap.buildSuccess();
  }
  
  @RequestMapping("setup")
  public ResultMap setup(int consumer, int maxConsumer){
   Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();
   SimpleMessageListenerContainer container = null;
   for(String id : containerIds){
   container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);
   if(container != null){
    container.setConcurrentConsumers(consumer);
    container.setMaxConcurrentConsumers(maxConsumer);
   }
   }
   return ResultMap.buildSuccess();
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

更多精彩内容其他人还在看

Java基础之反射原理与用法详解

这篇文章主要介绍了Java基础之反射原理与用法,结合实例形式详细分析了java反射的相关概念、原理、使用方法与操作注意事项,需要的朋友可以参考下
收藏 0 赞 0 分享

java字符串的截取方法substring()代码解析

这篇文章主要介绍了java字符串的截取方法substring()代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
收藏 0 赞 0 分享

Mybatis一对多关联关系映射实现过程解析

这篇文章主要介绍了Mybatis一对多关联关系映射实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
收藏 0 赞 0 分享

Java整数和字符串相互转化实例详解

这篇文章主要介绍了Java整数和字符串相互转化实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
收藏 0 赞 0 分享

Spring Boot 项目设置网站图标的方法

这篇文章主要介绍了Spring Boot 项目设置网站图标的方法,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
收藏 0 赞 0 分享

java多态机制原理特点详解

在本篇文章里小编给大家分享的是关于java多态机制原理特点详解,有需要的朋友们可以跟着学习下。
收藏 0 赞 0 分享

Jmeter如何添加循环控制器

这篇文章主要介绍了Jmeter如何添加循环控制器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
收藏 0 赞 0 分享

java虚拟机创建失败的原因整理

在本篇文章里小编给大家整理了关于创建java虚拟机失败的解决方法和知识点,需要的朋友们可以参考学习下。
收藏 0 赞 0 分享

Java如何把int类型转换成byte

这篇文章主要介绍了Java如何把int类型转换成byte,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
收藏 0 赞 0 分享

linux中java获取路径的实例代码

在本篇文章里小编给大家整理的是一篇关于linux中java获取路径的实例代码以及相关知识点,有兴趣的朋友们可以学习参考下。
收藏 0 赞 0 分享
查看更多