第31章 Spring中的任务调度和线程池支持

31.2 Spring对JDKTimer的集成

31.2.1 JDK Timer小记

Timer是随JDK1.3的发布而引入的任务调度”标准装备”, 但无论是在设计与实现上,还是在整体的功能支持上,Timer都无法与Quartz相媲美。Timer没有明确的Trigger的概念,一般意义上来讲,Timer自身肩负着Scheduler和Trigger的双重使命,而且,不像Quartz可以通过cron表达式指定复杂的调度规则。Timer基本上只提供SimpleTrigger类似的调度行为支持,所以,大部分时间,我们会优先考虑使用Quartz作为任务调度框架。不过,世事无绝对,如果当前的任务调度需求十分简单,而且,在使用第三方类库有限制的情况下,Timer也不失为一种经济实惠的选择。

使用Timer进行任务调度要比使用Quartz简单许多,同样的FXNewsProvider功能调度,采用Timer实现只涉及少许的迁移。因为所有经由Timer进行调度的任务都需要继承TimerTask,所以,我们首先需要提供FXNewsProvider对应的TimerTask实现,如下方代码清单所示。

public class FXNewsTimerTask extends TimerTask {
	private FXNewsProvider fxNewsProvider;
	@Override
	public void run() {
		getFxNewsProvider().getAnaPersistNews();
  	}
	public FXNewsProvider getFxNewsProvider() {
		return fxNewsProvider;
  	}
	public void setFxNewsProvider(FXNewsProvider fxNewsProvider) {
		this.fxNewsProvider = fxNewsProvider;
  	}
}

现在,我们应该可以感受到将业务逻辑尽可能剥离到独立业务对象的好处。如果当初直接将要调度的业务逻辑写入Job实现类,那么现在或许又得重蹈覆辙,在TimerTask实现类中重新复制一遍了。不管怎样,在有了FXNewsTimerTask之后,我们就可以通过相应的Timer实例对其进行调度,如下代码所示:

FXNewsTimerTask task = new FXNewsTimerTask();

Timer timer = new Timer("schedulerName[optional]");
timer.schedule(task, 0, 60000);

喔!看起来这要比使用Quartz简单多了。不过,就跟使用编程方式演示Quartz的使用一样,这里的代码也只是原则上按照规则办事,而实际上,在FXNewsTimerTask没有获得相应的依赖注入之前,整段代码是无法正常运行的,所以,如你所愿,我们马上进入Spring对Timer的集成内容。

31.2.2 Spring集成后的JDKTimer

1. 逃离TimerTask的魔咒

原则上讲,任何将由Timer进行调度的任务都需要继承自TimerTask,说把相应的业务对象压得喘不过气来显然有点儿夸张了,但在这个处处讲求“非强制,少侵入”的时代,这一强制要求显然也不是很受人欢迎。不过,好在我们可以将这项硬性规定的实现转给其他人代为”打理”,这样,原本就是自由身的业务对象可以继续无拘无束, 而这个代替业务对象行使TimerTask职责的人,则可以是Spring提供的MethodInvokingTimerTaskFactoryBean。

实际上,将为Quartz提供的MethodInvokingJobDetailFactoryBean与MethodInvokingTimerTaskFactoryBean做对比,也就不难理解MethodInvokingTimerTaskFactoryBean的存在目的了。在我们为某一业务对象披上MethodInvokingTimerTaskFactoryBean这一外衣之后,它就真得看起来是一个TimerTask了,例如:

<bean id="fxNewsProvider" class="org.darrenstudio.books.unveilspring.news.FXNewsProvider" p:newsListener-ref="newsListener" p:newPersistener-ref="newsPersister"/>
<bean id="task" class="org.springframework.scheduling.timer.MethodInvokingTimerTaskFactoryBean">
	<property name="targetObject" ref="fxNewsProvider"/>
	<property name="targetMethod" va1ue="getAndPersistNews"/>
</bean>

现在的MethodInvokingTimerTaskFactoryBean将代替FXNewsProvider作为一个TimerTask, 只是在真正需要执行调度逻辑的时候,才会根据我们指定的信息,通过反射来调用指定实例上的指定方法。 显然,MethodInvokingTimerTaskFactoryBean加LFXNewsProvider,给出了完成FXNewsTimerTask同一功能的另一种选择,而至于最终选择哪种,那看来得因人而异了。

2. TimerTask的模块化封装——ScheduledrimerTask

传统的TimerTask对应的Trigger信息是在将TimerTask提交Timer调度的时候指定的,如下所示:

timer.schedule(task, 0, 60000);

每一TimerTask与其对应的Trigger信息之间没有一种紧密的关联关系。 Spring提供了ScheduledTimerTask,对TimerTask和与之相关的rrigger信息进行模块化封装,使得我们可以用一种统一的方式来管理TimerTask及其对应的Trigger信息。

因为我们的FXNewsProvider通常是每隔一分钟调度一次,而其对应的TimerTask我们也通过MethodInvokingTimerTaskFactoryBean定义好了,现在,我们就可以通过一个ScheduledTimerTask,将其对应的TimerTask和相关的Trigger信息封装到一个可以统一管理的实体中,如下所示:

<bean id="scheduledrask" class="org.springframework.scheduling.timer.ScheduledTimerTask">
  <property name="timerTask" ref="task"/>
  <property name="period" value="3000"/>
</bean>

在有了封装了所有TimerTask和相关Trrigger信息的ScheduledTimerTask实例之后,Timer就可以根据ScheduledTimerTask所提供的信息,对相应的TimerTask进行调度了。

3. Timer的新家——TimerFactoryBean

Spring通过TimerFactoryBean对JDK的Timer进行封装。 在我们将“以ScheduledTimerTask形式封装好的调度的任务”提交给TimerFactoryBean之后,TimerFactoryBean内部将提取每一ScheduledTimerTask所包含的TimerTask以及相关的Trigger信息,从而决定调用Timer的哪个调度方法,对当前ScheduledTimerTask所包含的TimerTask进行调度。整个过程如下方代码清单所示。

ScheduledTimerTask[] stt = ...;
for(ScheduledTimerTask task : stt) {
  	if(task.isOneTimeTask()) {
		timer.schedule(task.getTimerTask(), task.getDelay());
  	}
	else {
		if(task.isFixedRate()) {
			timer.scheduleAtFixeaRate(task.getTimerTask(), task.getDelay(), task.getPeriod());
	   	}
		else {
			timer.schedule(task.getTimerTask(), task.getDelay(), task.getPeriod());
	   	}
	 }
}

所以,现在只要将我们的NewsProvider对应的ScheduledTimerTask实例提交给TimerFactoryBean,使用Timer对FXNewsProvider功能的调度就算大功告成,如下所示:

<bean id="scheduler" class="org.springframework.scheduling.timer.TimerFactoryBean">
  <property name="scheduledrimerTasks">
    <list>
      <ref bean="scheduledTask"/>
    </list>
  </property>
</bean>

我们回顾一下Spring为集成Timer而提供的各类装备,脑海中或许应该浮现如图31-4所示的一副图景。

image-20220721185515208

当然,最主要的是,在我们使用了Quartz和Timer两种任务调度方案对同一功能的调度给予实现之后,通过对比这两种方案,你又有何想法呢?

31.3 Executor的孪生兄弟TaskExecutor

Java5为我们带来了一套崭新的任务执行框架(task execution framework),该框架以Executor为首:

public interface Executor{
  void execute(Runnable task);
}

虽然该项层抽象接口Executor看起来是那么简单,但所带来的好处却并不简单。 Executor这一抽象的意义在于,可以将任务的提交(task commission)和任务的执行(task execution)策略分隔开来,解除二者之间的耦合性 ,以Runnable类型界定的任务提交之后,最终会以什么样的策略执行(什么时间执行,交给谁执行,等等),完全由不同的Executor实现类负责,提交任务的客户端完全可以忽略后者的实现细节,所以,虽然大部分时间我们会使用像ThreadPoolExecutor之类提供线程池(ThreadPool)且以异步方式执行任务的Executor实现,但Executor自身的语义却并不只限于线程池。

Executor任务执行框架提供了许多功能强劲的Executor实现类,但是,这些都是基于Java5的。 如果我们要在之前版本的Java平台上使用的话,显然就得做一些兼容性的工作,而Spring2.0之后提出的TaskExecutor,实际上就是为了这个目的而生的。 现在,我们可以在Java1.4或者Java5上使用TaskExecutor完成Executor同样的使命,因为二者的定义实际上没有什么实质上的差别,TaskExecutor如下定义:

public interface TaskExecutor{
  void execute(Runnable task);
}

当然,最主要的是,Spring为TaskExecutor提供了足够的实现类选择,而且,即使TaskExecutor没有相应的实现类与某个Executor提供的功能相对应,Spring也提供了“飞架TaskExecutor和Executor的桥”,这些应该让TaskExecutor的使用没有任何后顾之忧。

31.3.1 可用的TaskExecutor

Spring框架内部提供了一定的TaskExecutor实现类,这些实现类可以完成几乎所有使用场景的覆盖,所以,大多数情况下,我们没有必要自己去实现某个TaskExecutor。即使我们真的需要去实现新的TaskExecutor,也应该首先来了解一下Spring都为我们提供了哪些现有的TaskExecutor实现吧?

1. SyncTaskBxecutor

提交给SyncTaskExecutor的任务将直接在当前调用线程中执行 ,所以该类的定义也是出奇地简单,如下所示:

public class SyncTaskExecutor implements TaskExecutor,Serializable {
	public void execute(Runnable task) {
		Assert.notNull(task, "Runnable must not be null");
		task.run();
  	}
}

更多时候,我们只会在测试的时候想到它。

2. SimpleAsyncTaskExecutor

SimpleAsyncTaskExecutor提供的任务执行策略十分简单和直观,它提供最基础的异步执行能力,而实现的方式则是为每个任务都创建新的线程 ,所以,原型代码看起来是如下的形式:

public class SimpleAsyncTaskExecutor implements TaskExecutor {
	public void execute(Runnable task) {
		new Thread(task).start();
  	}
}

不过,SimpleAsyncTaskExecutor的实现实际上比我们的原型代码要复杂一些。起码SimpleAsyncTaskExecutor提供了相应的属性,以控制创建的线程数目上限,如下所示:

<bean id="simpleAsycTaskExecutor" class="org.springframework.core.task.Simp1eAsyncTaskExecutor">
	<property name="concurrencyLimit" value="100"/>
</bean>

如果不做明确设置,SimpleAsyncTaskExecutor对于可以创建的线程数目没有任何限制。

如果线程的创建和销毁不需要占用相应资源,我想,到SimpleAsyncTaskExecutor我们就可以打住了。不过,事实却是,为了避免线程创建和销毁带来的开销,我们需要某个使用线程池提供异步任务执行能力的TaskExecutor,所以,接着往下看吧!

3. ThreadPoolTaekEecutor

既然SimpleAsyncTaskExecutor每次都要创建新的线程的做法不可取,ThreadPoolTaskExecutor改用线程池来管理并重用处理任务异步执行的工作线程。 实际上,ThreadPoolTaskExecutor只是对标准的java.util.concurrent.ThreadPoolExecutor进行了封装。只不过可以通过类似JavaBean的风格进行配置。当然,封装后的java.util.concurrent.ThreadPoolExecutor,最终还是以TaskExecutor类型公开给客户端代码使用的。

Spring提供的ThreadPoolTaskExecutor一共有两个,一个是org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor,它对Java5的java.util.concurrent.ThreadPoolExecutor进行封装;另一个是org.springframework.scheduling.backportconcurrent.ThreadPoolTaskExecutor,它对JSR-166 backport的ThreadPoolTaskExecutor进行封装。如果我们的应用程序需要在Java1.4平台运行,那么后者才是我们应该选用的TaskExecutor类型。

ThreadPoolTaskExecutor公开了一些配置属性允许我们对使用的ThreadPoolTaskExecutor进行定制,比如:

<bean id="threadPoolTaskExecutor" class="org.springframework,scheduling.concurrent.ThreadPoolTaskExecutor">
	<property name="corePoolSize" value="10"/>
	<property name="maxPoo1Size" value="20"/>
	<property name="queueCapacity" value="20"/>
</bean>

更多定制属性及相关信息可以在使用的时候参照相应类的Javadoc。通常情况下,ThreadPoolTaskExecutor将是我们的首要选择。

4. ConcurrentTaekExecutor

ConcurrentTaskExecutor为Java5的Executor和Spring的TaskExecutor搭建了一道桥梁,使得我们可以将Executor框架下的某些实现类以TaskExecutor的形式公开出来。 如果我们感觉ThreadPoolTaskExecutor封装的java.util.concurrent.ThreadPoolExecutor不足以满足当前场景需要,那么可以构建需要的Executor实例,比如通过Executors.newXXXThreadPool(),然后以ConcurrentTaskExecutor对其进行封装,封装后获得的ConcurrentTaskExecutor即获得相应Executor的能力,但它现在是以TaskExecutor的样子示人,如下所示:

Executor executor = Executors.newScheduledThreadPool(10);
TaskExecutor taskExecutor = new ConcurrentTaskExecutor(executor);

如果我们使用Spring的IoC容器来配置使用ConcurrentTaskExecutor,那么可以直接配置它所封装的Executor实例,或者通过FactoryBean机制对它所使用的Executor实例做进一步定制。

注意:与Spring的ThreadPoolTaskExecutor一样,ConcurrentTaskExecutor也存在两个版本,一个面向Java5,一个面向JSR-166 backport,使用的时候可不要混淆哦!

5. TimerTaskExecutor、SimleThreadPoolTaskExecutor和WorkManagerTaskExecutor

之所以将这三个实现类放到一起,是因为它们实在是太相似了, 因为它们都是使用特定的调度程序(Job Scheduler)来执行提交给它们的任务。

TimerTaskExecutor内部持有一个Timer的引用,在任务提交给TimerTaskExecutor执行之后,TimerTaskExecutor将对Runnable类型提交的任务以TimerTask形式进行包装,然后就转交给它所持有的那个Timer来调度执行了。所以,简单点儿来看,TimerTaskExecutor的原型代码可以如下构建:

public class PrototypeTimerTaskExecutor implements TaskExecutor {
	private Timer timer = new Timer();
	public void execute(final Runnable task) {
		timer.schedule(new TimerTask() {
			@Override
			public void run() {
				task.run();
      		}
    	}, newDate());
  	}
}

实际的TimerTaskExecutor实现做了更多的关注点分离,并提供了更详细的设定选项,以Runnable提交的task实际上是以org.springframework.scheduling.timer.DelegatingTimerTask的形式单独封装的,而不是匿名内部类,而像执行延时(delay)之类属性也是可配置的。

SimpleThreadPoolTaskExecutor扩展自Quarz的SimpleThreadPool,它使用SimpleThreadPool的runInThread(task)方法来执行提交的任务。我想,如果不在意绑定到Quartz,或者想将SimpleThreadPoolTaskExecutor提供给Quartz以替换默认的SimpleThreadPool使用,通常应该不会考虑使用SimpleThreadPoolTaskExecutor吧!

CommonJ是BEA和IBM联手搞得一套应用服务器端的任务调度实现规范。WorkManagerTaskExecutor就是使用CommonJ的WorkManager构建的。至于什么场合下使用它,我想你应该可以猜得到。

31.3.2 TaskExecutor使用实例

假设我们的FXNewsProvider在通过FTP接收到新闻文件之后,想对接收到的文件进行并行处理,以提高处理速度,那么,我们可以考虑将单一新闻文件的处理逻辑进行Runnable形式的任务封装,然后提交给相应的TaskExecutor进行处理。 为了达到这一目的,我们首先要对新闻文件解析和持久化的逻辑进行封装,封装后的代码如下方代码清单所示。

public class NewsProcessingTask implements Runnable {
	private IFXNewsListener listener;
	private IFXNewsPersister persister;
	private String newsId;

  	public NewsProcessingTask(IFXNewsListener listener, IFXNewsPersister persister, String newsId) {
		this.listener = listener;
		this.persister = persister;
		this.newsId = newsId;
  	}
	public void run() {
		FXNewsBean newsBean = getlistener().getNewsByPK(newsID);
		getPersister().persistNews(newsBean);
		getListener().postProcessIfNecessary(newsId);
  	}
}

然后,为FXNewsProvider提供一个TaskExecutor以执行相继提交的NewsProcessingTask。 为了区别于之前的FXNewsProvider,我们称现在的实现为RefactoredFXNewsProvider,其定义如下方代码清单所示。

public class RefactoredFXNewsProvider {
	private IFXNewsListener newsListener;
	private IFXNewsPersister newPersistener;
  
	private TaskExecutor taskExecutor;
  
	public void process() {
		String[] newsIds = newsListener.getAvailableNewsIds();
		if(ArrayUtils.isEmpty(newsIds)) {
			return;
    	}
    
		for(String newsId : newsIds) {
			NewsProcessingTask task = new NewsProcessingTask(getNewsListener(), getNewPersistener(), newsId);
			getTaskExecutor().execute(task);
    	}
  	}
	// getter和setter方法定义
}

现在,我们只要为RefactoredFXNewsProvider提供必需的依赖就可使之开始工作。假设我们为其提供ThreadPoolTaskExecutor作为使用的TaskExecutor,那么对应的配置内容如下方代码清单所示。

// 其他可能依赖的bean定义.....
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
	<property name="corePoolSize" value="10"/>
	<property name="maxPoolSize" value="20"/>
	<property name="queueCapacity" value="20"/>
</bean>

<bean id="rFxNewsProvider" class="..RefactoredFXNewsProvider">
	<property name="newsListener" ref="newsListener"/>
	<property name="newPersistener" ref="newPersistener"/>
	<property name="taskExecutor" ref="taskExecutor"/>
</bean>

实际上,只要声明依赖的是TaskExecutor接口,最终使用哪一种TaskExecutor实现(实际上也正是选择任务的执行策略)完全可以在部署的时候决定,无非只是配置文件的些许改动而已。

31.4 小结

本章我们对Spring框架提供集成支持的几种任务调度程序进行了“身世”分析,这包括Quartz、JDKTimer以及Spring提供的TaskExecutor。在阅读完本章内容之后,你应该可以根据自身应用的情况,做出最终的选择。