APP下载

采用ScheduledThreadPoolExecutor执行定时重试任务时内存溢出的分析及解决

2016-05-14余志坚姜春志

科技资讯 2016年7期
关键词:调用入队队列

余志坚 姜春志

摘 要:开发JavaWeb项目中发现服务之间的调用存在超时情况,由于涉及的处理逻辑全部是异步,引入定时重试的机制,重试工具选择了JDK自带的ScheduledThreadPoolExecutor。当A服务依赖B服务,B服务由于在业务高峰期处理能力降低,导致大量A服务过来的请求超时,A加入了超时重试机制,间隔时间根据重试次数的多少来决定,次数越多,两次重试之间间隔的时间越多,此时的业务高峰也会给A带来大量请求,大量的超时会导致重试队列迅速堆积,直到内存溢出。该文从线程池工作机制、ScheduledThreadPoolExecutor实例的创建,获取重试任务的过程以及提交任务的过程角度分析,并通过源代码的剖析和测试工具MyEclipse进行演示测试内存泄露的情况,得出避免内存泄露的解决方案。

关键词:ScheduledThreadPoolExecutor 线程池 内存溢出

中图分类号:TP3 文献标识码:A 文章编号:1672-3791(2016)03(a)-0015-03

1 ScheduledThreadPoolExecutor实例的创建过程及线程池工作机制

1.1 ScheduledThreadPoolExecutor实例的创建过程

重试工具选择了JDK自带的ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor实例的创建过程如下:ScheduledThreadPoolExecutor实例的创建过程如下:(1)获取当前机器上处理器的数量;(2)使用Google的ThreadFactoryBuiler创建指定格式名称的线程,以方便查看问题;(3)有需要被拒绝的任务时,抛出异常;(4)创建定时任务池;打开MyEclipse工具显示相对的代码:int corePoolSize=Runtime.getRuntime().availableProcessors();

ThreadFactory tf=new ThreadFactoryBuilder().setNameFormat("FailureRetryTask-pool-%d").build();

RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();

ScheduledThreadPoolExecutor taskService=new ScheduletThreadPooExecutor(corePoolSize,tf,handler);

线程池就是多个线程在一个队列中取任务执行,提交的任务会被放入队列中等待线程执行,故队列要设置一个大小。线程池同样会根据任务繁忙程度来动态调整连接数,空闲时保持最小连接数,繁忙时增加连接,但不会超过上限,具有伸缩性,线程的创建和销毁也需要消耗系统资源,线程的连接重用就可以避免这部分损失,具有重用性。

1.2 线程池工作机制

线程获取任务的策略就是如果当前线程池运行状态正常,则阻塞等待任务,否则直接返回或等待有限时间后返回。线程池中线程的主要任务就是获取任务,然后执行,然后再去获取任务,如此循环,这就实现了线程池中线程的可重用。

Worker封装了任务,同时创建了新的线程,并被添加到集合workers中,这个workers其实就是最核心的线程池。通过run方法实现重用。private final HashSet workers=new HashSet();

public void run(){

try{

Runnable task=firstTask;

firstTask=null;

while(task!=null||(task=getTask())!=null){

runTask(task);

task=null;}}

finally{

workerDone(this);

}

}

Runnable getTask(){

for(;;){

try{

int state=runState;

if(state>SHUTDOWN){return null;}

Runnable r;

if(state==SHUTDOWN){r=workQueue.poll();}

else if(poolSize>corePoolSize||allowCoreThreadTimeOut){

r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);

}else{r=workQueue.take();}

if(r!=null){return r;}

if(workerCanExit()){

if(runState>=SHUTDOWN){interruptIdleWorkers();}

return null;

}

}

catch(InterruptedException ie){}

}

}

private boolean workerCanExit(){

final RenntrantLock mainLock=this.mainLock;

mainLock.lock();

boolean canExit; try{canExit=runState>=STOP||workQueue.isEmpty()||(allowCoreThreadTimeOut&&poolSize>Math.max(1,corePoolSize));

}finally{mainLock.unLock();}

return canExit;

}

如果此时线程池运行状态是终止(runState >= STOP),或者队列为空,或者允许核心线程超时并且线程池中线程数量大于最小线程数量,那么方法将返回true。再回到getTask方法,调用workerCanExit方法的前提是没有获取到任务,根据上边获取任务的过程,这几个条件都有可能成立,所以此时getTask方法可以返回null,上层Worker的run方法从while循环重返回,整个线程结束,这就实现了线程池的可伸缩。

2 ScheduledThreadPoolExecutor获取任务的过程

在getTask()中,描述了整个获取任务的过程,如果线程池运行状态已经是SHUTDOWN了,调用非阻塞方法poll,因为如果当前有任务,那么可以获取到任务并返回,如果没有任务,也没有必要阻塞在队列上等待任务,因为已经SHUTDOWN,后续不会再有任务进入。

如果当前线程数大于最小线程数,或者核心线程也可以做超时处理,意味着如果获取不到任务就可以销毁一部分线程了,所以poll方法设置了等待时间,超时后立即返回。

另一种情况是线程池还在运行状态,并且当前线程数不大于最小线程数,同时也不允许最小线程数以内的线程超时,这个时候线程就要调用阻塞方法take,等待任务进入队列以后才返回。

3 ScheduledThreadPoolExecutor提交任务的执行过程

ScheduledThreadPoolExecutor提交任务的执行过程,首先提交任务:taskService .schedule(new Runnable(){public void run(){}},1,TimeUnit,DAYS);

ScheduledThreadPoolExecutor通过schedule方法提交定时任务,schedule方法源码如下:

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit){

if(command==null||unit==null){throw new NullPointerException();}

if(delay<0){delay=0;}

RunnableScheduledFuture<?> t=decorateTask(command,new ScheduledFutureTask(command,null,triggerTime));

delayedExecute(t);

return t;

}

提交的任务会被封装成ScheduledFutureTask类型对象。

分析delayedExecute方法:private void delayedExecute(Runnable command){

if(isShutDown()){reject(command);return;}

if(getPoolSize()

super.getQueue().add(command);

}

如果线程的运行状态不是RUNNNING或者入队列没有成功,则采用线程池的构造方法中设置的拒绝策略来处理任务。

如果当前线程池中的线程数量poolSize小于线程池核心线程的数量corePoolSize,执行prestartCoreThread(),该方法会创建一个新的线程来执行任务,如果prestartCoreThread()创建的新线程执行任务失败或者当前线程池中的线程数量poolSize大于等于线程池核心线程数量corePoolSize,当若线程池的运行状态是RUNNING并且入队成功,由于在多线程环境下,状态随时可能会改变,此时线程池的运行状态runState不是RUNNING或者线程池中没有可用的线程(poolSize==0),要确保进入的任务被执行处理,线程池在初始化完成以后是空的,并没有线程,如果在服务器中使用线程池,服务重启后有大量请求进入,则要同时创建多个线程,而且创建过程是加锁同步的,会导致一定的竞争,解决办法就是线程池初始化时调用prestartAllCoreThreads方法启动核心线程数量的线程,这样就能在线程池中的线程就绪以后才开始接收请求。

通过getQueue方法获取任务队列,并且调用add方法向队列中添加任务,dq的定义:

private final DelayQueue dq=new DelayQueue();

public boolean add(Runnable x){return dq.add((RunnableScheduleFuture)x);}

可以看出dq是阻塞队列,线程池中的线程都是在队列中取数据,ScheduledThreadPoolExecutor中的构造方法里的队列的实现使用链表结构的阻塞队列,add方法内部调用offer方法,offer源码如下:public boolean offer(E e){

final ReentrantLock lock=this.lock();

lock.lock();

try{

E first=q.peek();

q.offer(e);

if(first==null||e.compareTo(first)<0){

available.singalAll();

return true; }

}finally{lock.unlock();}}

这方法需要在多线程环境下同步执行,会用到锁Lock。锁实现的大概原理如下。

Lock实现锁的方式是通过排他性更新共享变量,更新成功的线程继续执行,没有更新成功的线程将会被阻塞。Lock的共享变量state在可重入锁中可以用来表示一个线程调用了几次lock方法,也就是有几次获取锁的行为。Lock的功能实现是通过内部聚合了抽象队列同步器(AQS),同步器有公平和非公平之分。非公平同步器对于新来的线程会尝试获取,不成功以后才会进入等待队列,而公平同步器则会首先判断是否排队。AQS中会保存获取锁的当先线程的引用。如果一次性尝试获取锁不成功,则线程会进入队列,循环尝试获取锁。

peek方法会获取队列的第一个元素,只是获取,并没有出队列。接着调用优先级队列PriorityQueue类型变量q的offer方法将队列入队,优先级队列会对任务进行排序,距离执行时间越近,位置越靠前。下边的if判断可以这样理解,first是在当前任务入队之前获取的,也就是队列中原有的第一个任务,compareTo的这段比较是说当前任务的执行时间比队列中第一个任务执行时间还要早,如果first是null,那么当前任务入队后将是第一个元素,如果当前任务的执行时间比队列中第一个任务的执行时间早,那么当前入队后也将是第一个元素,只要这两个条件有一个成立了,这个if的判断条件就为true,就要执行Condition类型的available变量的signalAll方法,唤醒等待的线程工作。

4 队列的大小判断

队列的大小是决定内存溢出最直观的因素,首先来看看优先级队列PriorityQueue的offer方法:public boolean offer(E e){

if(e==null){throw new NullPointerException();}

modCount++;

int i=size;

if(i>=queue.length){grow(i+1);}

size=i+1;

if(i==0){queue[0]=e;}

else{siftUp(i,e);}

return true;

上述代码表示如果队列中元素的个数(size)大于等于队列的长度,将要通过grow方法扩容,如下:private void grow(int minCapacity){

if(minCapacity<0){throw new OutOfMemoryError();}

int oldCapacity=queue.length;

int newCapacity=((oldCapacity<64)?((oldCapacity+1)*2):((oldCapacity/2)*3));

if(newCapacity<0){newCapacity=Integer.MAX_VALUE;}

if(newCapacity

queue=Arrays.copyof(queue,newCapacity);

}

若队列容量小于64,那就在原有基础上加1然后扩大2倍,这种情况绝对不会造成内存的溢出问题。如果大于等于64呢?直接扩容一半,然后将值赋给一个int型变量,当某种情况如果超过int类型的最大值了,JDK的处理是赋值成Integer的MAX_VALUE为2147483647,也就是最大的队列长度是2 G多,如果一个对象的大小按照50个字节来算,将会占用100 G的内存必定溢出。

5 模拟内存溢出代码测试

当业务高峰给服务器带来大量请求,大量的超时会导致重试队列迅速堆积,直到内存溢出,下面就通过代码来测试一下:模拟大量的添加任务,并且任务在调度队列中堆积,推迟一天执行。

while(true){

taskService.schedule(new Runnable(){public void run(){}},1,TimeUnit.DAYS);

}

虚拟机启动参数-:

-Xms32M -Xmx32M -Xmn10M-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=d:/

运行输出:

java.lang.OutOfMemoryError:Java heap space Dumping head to d:/\java_pid12884.hprof...

Heap dump file created [44940425 bytes in 0.618 secs]

内存溢出了,来看看内存快照(见表1)。

6 解决方案及措施

编译好的java程序需要运行在JVM中,而JVM为程序提供并管理所需要的内存空间,JVM自带一个用于回收没有任何引用指向的对象的线程机制(垃圾回收器),但针对于ScheduledThreadPoolExecutor提交的任务会被封装成ScheduledFutureTask类型对象且每个对象中又有Sync成员变量。解决的办法可以是手动判断队列的大小,通过taskService.getQueue().size()方法,通过Jmap内存分析工具估算每个对象的大小,Jmap是一个可以输出所有内存中对象的工具,甚至可以将JVM 中的heap,以二进制输出成文本。打印出某个Java进程内存内的所有‘对象的情况,结合能够为队列分配的内存大小,计算出队列容纳任务的最大数量,以避免内存溢出。

参考文献

[1] 逯昌浩.浅析多核处理器条件下的Java编程[J].中国科技信息,2009(12):128,130.

[2] 张复兴,曾新洲.扩展线程池模型及性能分析[J].计算技术与自动化,2007(4):110-112.

[3] (美)Bruce Eckel.Java编程思想[M].陈昊鹏,译.北京:机械工业出版社,2007.

猜你喜欢

调用入队队列
自从马上入队了
队列队形体育教案
我入队了
入队
帮孩子扣好人生第一粒扣子
缓存淘汰算法研究
基于Android Broadcast的短信安全监听系统的设计和实现
青春的头屑
利用RFC技术实现SAP系统接口通信
队列操练