详解租约机制以及在hbase中的应用

所属分类: 软件编程 / java 阅读数: 32
收藏 0 赞 0 分享

详解租约机制以及在hbase中的应用

为什么需要Lease

分布式系统中为什么需要租约机制,这是因为在分布式系统,为了保证服务的高可用,需要在服务发生故障的时候及时启动另外一个服务实例以替换故障服务。这样就需要在服务端和客户端或者服务端和控制中心维持一个心跳信息,用于服务进程向控制中心汇报当前自己的健康情况,如果控制中心在一段时间收不到服务进程上报的心跳,则会启动新的进程继续对外提供服务。

但是,由于实际网络情况的复杂性,控制中心无法收到心跳时不能准确地判断究竟是服务故障了还是服务进程和控制中心之间的网络发生了故障。这种情况下控制中心冒然地启用新进程有可能会造成“双主”这种情况出现。

为避免上述情况的发生引入了租约机制,此时服务节点持续向控制中心申请短时间租约,控制中心在已派发的租约过期之前,不会启用新服务节点,而服务节点租约过期时若还无法从控制中心申请到新租约,自己中断客户链接。

此外,租约机制还可用于客户端和服务端之间的解藕,避免客户端进程失去响应时,其占用的服务端资源长期得不到释放进而影响到服务端的稳定。

Lease的实现

在实际系统中,如果依赖一个中心结点向外发布lease存在很大的风险,那就是如果该中心结点发生宕机或者网络故障,那么服务节点由于接收不到新的租约那么会导致整个服务集群进入不可用状态。因此,在实际使用中,对外提供lease服务的往往是由多个进程实例组成的另外一套集群,该集群具有高可用性,可以对外提供lease服务,比如zookeeper集群。

HRegionServer的租约Lease管理

租约线程的初始化

在HRegionServer的run主循环里会调用preRegistrationInitialization预先初始化一些线程,包括初始化集群连接信息setupClusterConnection()、healthCheckChore、pauseMonitor、initializeZookeeper以及initializeThreads()。

其中在initializeThreads()中会初始化各类线程,这些线程包括了这台regionServer的lease线程:

this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); //检查合并请求 
this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);  //周期性地检查memstore的flush请求 
this.leases = new Leases(this.threadWakeFrequency); 

 Leases类的定义如下,它继承了HasThread这个抽象类,并定义了如下几个主要的成员变量:

public static final int MIN_WAIT_TIME = 100; 
private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>(); 
 
protected final int leaseCheckFrequency; 
protected volatile boolean stopRequested = false; 

 其中Map型成员变量leases负责管理该regionserver进程中的lease实例,我们看看lease类都定义了哪些变量:

private final String leaseName; 
private final LeaseListener listener; 
private int leaseTimeoutPeriod; 
private long expirationTime; 

 leaseTimeoutPeriod是租约时间,expirationTime会在lease被创建时被置位为系统时间与leaseTimeoutPeriod之和,用于周期性地计算该租约已经被使用多长时间,如果租约已经超过了leaseTimeoutPeriod定义的到期时间,则会触发一个expired事件,LeaseListener会监听该事件并调用leaseExpired方法,不同类型的lease都会继承LeaseListener接口并实现自己的leaseExpired方法,如下所示是scan lease对该方法的实现:

@Override 
public void leaseExpired() {    //处理租约过期 
  RegionScannerHolder rsh = scanners.remove(this.scannerName); 
  if (rsh != null) { 
   RegionScanner s = rsh.s; 
   LOG.info("Scanner " + this.scannerName + " lease expired on region " 
     + s.getRegionInfo().getRegionNameAsString()); 
   try { 
    Region region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 
    if (region != null && region.getCoprocessorHost() != null) { 
     region.getCoprocessorHost().preScannerClose(s); 
    } 
 
    s.close(); 
    if (region != null && region.getCoprocessorHost() != null) { 
     region.getCoprocessorHost().postScannerClose(s); 
    } 
   } catch (IOException e) { 
    LOG.error("Closing scanner for " 
     + s.getRegionInfo().getRegionNameAsString(), e); 
   } 
  } else { 
   LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + 
    " scanner found, hence no chance to close that related scanner!"); 
  } 
 } 

 客户端的scan请求是分解成多次RPC请求发到服务端的,分解的次数是scan的总数据量与客户端setCache两者的比值。每个scan请求发到服务端后会租用一个scanner,用于当前的scan结束后,后续的scan可以直接复用已有的资源,但是为防止scanner长期占用服务端资源,通过租约管理,关闭不再使用的scanner。

OK,回到前面的Leases类,看看它是如何管理regionServer进程中的各个lease的,这部分逻辑在它覆写的run方法中:

public void run() { 
  long toWait = leaseCheckFrequency; 
  Lease nextLease = null; 
  long nextLeaseDelay = Long.MAX_VALUE; 
 
  while (!stopRequested || (stopRequested && !leases.isEmpty()) ) { 
 
   //睡眠一段时间 
 
   nextLease = null; 
   nextLeaseDelay = Long.MAX_VALUE; 
   for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) { 
    Map.Entry<String, Lease> entry = it.next(); 
    Lease lease = entry.getValue(); 
    long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS); 
    if ( thisLeaseDelay > 0) { 
     if (nextLease == null || thisLeaseDelay < nextLeaseDelay) { 
      nextLease = lease; 
      nextLeaseDelay = thisLeaseDelay; 
     } 
    } else { 
     // A lease expired. Run the expired code before removing from map 
     // since its presence in map is used to see if lease exists still. 
     if (lease.getListener() == null) { 
      LOG.error("lease listener is null for lease " + lease.getLeaseName()); 
     } else { 
      lease.getListener().leaseExpired(); 
     } 
     it.remove(); 
    } 
   } 
  } 
  close(); 
 } 

 我们省略掉一些异常处理,在while的循环周期中会逐一便利map中管理的lease,计算每个lease的thisLeaseDelay以检查改lease是否已经过期。判断lease是否过期的方法很简单,就是取出当前时间与lease中定义的expirationTime做差,如果差值小于0,则说明该租约已经到期,则调用lease中定义的leaseExpired方法,这与上面我们讲过的关联上了。其中thisLeaseDelay决定了下一次的lease检查在多久之后发生,thisLeaseDelay的计算依据是选择选取所有未过期lease中leaseDelay最短的,通过thisLeaseDelay计算toWait时间,用于决定前面的睡眠时间。

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

更多精彩内容其他人还在看

Java的面向对象编程基本概念学习笔记整理

这篇文章主要介绍了Java的面向对象编程基本概念学习笔记整理,包括类与方法以及多态等支持面向对象语言中的重要特点,需要的朋友可以参考下
收藏 0 赞 0 分享

Eclipse下编写java程序突然不会自动生成R.java文件和包的解决办法

这篇文章主要介绍了Eclipse下编写java程序突然不会自动生成R.java文件和包的解决办法 的相关资料,需要的朋友可以参考下
收藏 0 赞 0 分享

基于Java实现杨辉三角 LeetCode Pascal's Triangle

这篇文章主要介绍了基于Java实现杨辉三角 LeetCode Pascal's Triangle的相关资料,需要的朋友可以参考下
收藏 0 赞 0 分享

Java中Spring获取bean方法小结

Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架,如何在程序中获取Spring配置的bean呢?下面通过本文给大家介绍Java中Spring获取bean方法小结,对spring获取bean方法相关知识感兴趣的朋友一起学习吧
收藏 0 赞 0 分享

如何计算Java对象占用了多少空间?

在Java中没有sizeof运算符,所以没办法知道一个对象到底占用了多大的空间,但是在分配对象的时候会有一些基本的规则,我们根据这些规则大致能判断出来对象大小,需要的朋友可以参考下
收藏 0 赞 0 分享

剖析Java中的事件处理与异常处理机制

这篇文章主要介绍了Java中的事件处理与异常处理机制,讲解Java是如何对事件或者异常作出响应以及定义异常的一些方法,需要的朋友可以参考下
收藏 0 赞 0 分享

详解Java的Struts2框架的结构及其数据转移方式

这篇文章主要介绍了详解Java的Struts2框架的结构及其数据转移方式,Struts框架是Java的SSH三大web开发框架之一,需要的朋友可以参考下
收藏 0 赞 0 分享

Java封装好的mail包发送电子邮件的类

本文给大家分享了2个java封装好的mail包发送电子邮件的类,并附上使用方法,小伙伴们可以根据自己的需求自由选择。
收藏 0 赞 0 分享

在Java的Struts中判断是否调用AJAX及用拦截器对其优化

这篇文章主要介绍了在Java的Struts中判断是否调用AJAX及用拦截器对其优化的方法,Struts框架是Java的SSH三大web开发框架之一,需要的朋友可以参考下
收藏 0 赞 0 分享

java多线程Future和Callable类示例分享

JAVA多线程实现方式主要有三种:继承Thread类、实现Runnable接口、使用ExecutorService、Callable、Future实现有返回结果的多线程。其中前两种方式线程执行完后都没有返回值,只有最后一种是带返回值的。今天我们就来研究下Future和Callab
收藏 0 赞 0 分享
查看更多