20、JUC线程池(ScheduledThreadPoolExecutor)

ScheduledThreadPoolExecutor

1.1 API介绍

构造线程池

Executors使用 newScheduledThreadPool 工厂方法创建ScheduledThreadPoolExecutor:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   
     
	return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
   
     
	return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

ScheduledThreadPoolExecutor的构造器,内部其实都是调用了父类ThreadPoolExecutor的构造器,这里比较特别的是任务队列的选择——DelayedWorkQueue

public ScheduledThreadPoolExecutor(int corePoolSize) {
   
     
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
   
     
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
   
     
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
   
     
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}

线程池的调度

该线程池的核心调度方法,是schedule、scheduleAtFixedRate(周期固定间隔时间调度)、scheduleWithFixedDelay(周期延迟调度),通过 schedule方法来看下整个调度流程:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
   
     
	if (command == null || unit == null)
		throw new NullPointerException();
	RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
	delayedExecute(t);
	return t;
}

上述的decorateTask方法把Runnable任务包装成ScheduledFutureTask,用户可以根据自己的需要覆写该方法(扩展Task的入口):

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
   
     
	return task;
}

ScheduledFutureTask是RunnableScheduledFuture接口的实现类,任务通过period字段来表示任务类型

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
   
     
	//任务序号, 自增唯一(如果延迟时间相同,会根据id判断先后顺序)
	private final long sequenceNumber;
	
	// 首次执行的时间点
	private long time;
	
	// 0: 非周期任务; >0: fixed-rate任务;<0: fixed-delay任务
	private final long period;
	
	//在堆中的索引
	int heapIndex;
	
	ScheduledFutureTask(Runnable r, V result, long ns) {
   
     
		super(r, result);
		this.time = ns;
		this.period = 0;
		this.sequenceNumber = sequencer.getAndIncrement();
	}
	// ...
}

ScheduledThreadPoolExecutor中的任务队列——DelayedWorkQueue,保存的元素就是ScheduledFutureTask。DelayedWorkQueue是一种堆结构,time最小的任务会排在堆顶(表示最早过期),每次出队都是取堆顶元素,这样最快到期的任务就会被先执行。如果两个ScheduledFutureTask的time相同,就比较它们的序号——sequenceNumber,序号小的代表先被提交,所以就会先执行。

1.2 核心流程源码简析

schedule的核心是其中的delayedExecute方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
   
     
	if (isShutdown()) // 线程池已关闭
		reject(task); // 任务拒绝策略
	else {
   
     
		super.getQueue().add(task); // 将任务入队
		
		// 如果线程池已关闭且该任务是非周期任务, 则将其从队列移除
		if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
			task.cancel(false); // 取消任务
		else
			ensurePrestart(); // 添加一个工作线程
	}
}

处理过程:

  • 任务被提交到线程池后,会判断线程池的状态,如果不是RUNNING状态会执行拒绝策略;
  • 然后,将任务添加到阻塞队列中,由于DelayedWorkQueue是无界队列,所以一定会add成功;
  • 然后,会创建一个工作线程,加入到核心线程池或者非核心线程池;
void ensurePrestart() {
     
       
	int wc = workerCountOf(ctl.get());
	if (wc < corePoolSize)//如果核心线程池未满,则新建的工作线程会被放到核心线程池中。
		addWorker(null, true);
	else if (wc == 0) //当通过setCorePoolSize方法设置核心线程池大小为0时,这里必须要保证任务能够被执行,会创建一个工作线程,放到非核心线程池中。
		addWorker(null, false);
	//如果核心线程池已经满了,不会再去创建工作线程,直接返回。
}
  • 最后,线程池中的工作线程会去任务队列获取任务并执行,当任务被执行完成后,如果该任务是周期任务,则会重置time字段,并重新插入队列中,等待下次执行。
  • 从队列中获取元素的方法:
    对于核心线程池中的工作线程来说,如果没有超时设置( allowCoreThreadTimeOut == false ),则会使用阻塞方法take获取任务(因为没有超时限制,所以会一直等待直到队列中有任务);如果设置了超时,则会使用poll方法(方法入参需要超时时间),超时还没拿到任务的话,该工作线程就会被回收。对于非工作线程来说,都是调用poll获取队列元素,超时取不到任务就会被回收。

1.3 案例演示

public class ScheduledThreadPoolExecutorTest {
   
     
    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
     
       ScheduledThreadPoolExecutorTest.scheduleWithFixedDelay();
  //      ScheduledThreadPoolExecutorTest.scheduleAtFixedRate();
//        ScheduledThreadPoolExecutorTest.scheduleCaller();
//        ScheduledThreadPoolExecutorTest.scheduleRunable();
    }

    // 任务以固定时间间隔执行,延迟5s后开始执行任务,任务执行完毕后间隔5s再次执行,依次往复
    static void scheduleWithFixedDelay() throws InterruptedException, ExecutionException {
   
     
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
   
     
            public void run() {
   
     

                System.out.println(System.currentTimeMillis());
            }
        }, 5000, 5000, TimeUnit.MILLISECONDS);

        // 由于是定时任务,一直不会返回
        result.get();
        System.out.println("over");

    }
    // 相对开始加入任务的时间点固定频率执行:从加入任务开始算2s后开始执行任务,2+5s开始执行,2+2*5s执行,2+n*5s开始执行;
    // 但是如果执行任务时间大于5s,则不会并发执行,后续任务将会延迟。
    static void scheduleAtFixedRate() throws InterruptedException, ExecutionException {
   
     
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
   
     
            public void run() {
   
     
                try {
   
     
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
   
     
                    e.printStackTrace();
                }
                System.out.println(System.currentTimeMillis());

            }
        }, 2000, 5000, TimeUnit.MILLISECONDS);

        // 由于是定时任务,一直不会返回
        result.get();
        System.out.println("over");
    }

    // 延迟2s后开始执行,只执行一次,没有返回值
    static void scheduleRunable() throws InterruptedException, ExecutionException {
   
     
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.schedule(new Runnable() {
   
     

            @Override
            public void run() {
   
     
                System.out.println("gh");
                try {
   
     
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
   
     
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }, 2000, TimeUnit.MILLISECONDS);

        System.out.println(result.get());

    }

    // 延迟2s后开始执行,只执行一次,有返回值
    static void scheduleCaller() throws InterruptedException, ExecutionException {
   
     
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {
   
     

            @Override
            public String call() throws Exception {
   
     

                try {
   
     
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
   
     
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                return "gh";
            }

        }, 2000, TimeUnit.MILLISECONDS);

        // 阻塞,直到任务执行完成
        System.out.print(result.get());

    }
}

scheduleWithFixedDelay: