批处理支持
实际业务当中,我们除了会做单条规则计算外,还有可能需要运行规则引擎来处理一大批数据,这些数据可能有几万条,几十万条,甚至更多。在这种情况下,如果我们还是采用普通的KnowledgeSession在一个线程里处理大批量数据的话,那么引擎还是只能在当前线程里运行,这样就会需要很长的时间才能可能将这几十万条甚至更多的数据处理完成,在这个时候,为了充分利用服务器较强的CPU性能,我们可以使用BatchSession利用多线程并行处理这些数据。顾名思义BatchSession是用来做批处理任务的会话对象,它是 URule Pro当中提供的多线程并行处理大批量业务数据的规则会话对象。
要得到一个BatchSession对象,我们也需要提供一个或多个KnowledgePackage对象,获取KnowledgePackage对象的方法与上面介绍的方法相同,有了KnowledgePackage对象后,就可以利用KnowledgeSessionFactory类创建一个BatchSession对象。
在com.bstek.urule.runtime.KnowledgeSessionFactory类中除上面提到的两个构建KnowledgeSession的静态方法外,还提供了另外八个可用于构建BatchSession的静态方法,其源码如下:
/**
* 创建一个用于批处理的BatchSession对象,这里默认将开启10个普通的线程池来运行提交的批处理任务,默认将每100个任务放在一个线程里处理
* @param knowledgePackage 创建BatchSession对象所需要的KnowledgePackage对象
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSession(KnowledgePackage knowledgePackage){
return new BatchSessionImpl(knowledgePackage,BatchSession.DEFAULT_THREAD_SIZE,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 创建一个用于批处理的BatchSession对象,第二个参数来指定线程池中可用线程个数,默认将每100个任务放在一个线程里处理
* @param knowledgePackage 创建BatchSession对象所需要的KnowledgePackage对象
* @param threadSize 线程池中可用的线程个数
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSessionByThreadSize(KnowledgePackage knowledgePackage,int threadSize){
return new BatchSessionImpl(knowledgePackage,threadSize,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 创建一个用于批处理的BatchSession对象,这里默认将开启10个普通的线程池来运行提交的批处理任务,第二个参数用来决定单个线程处理的任务数
* @param knowledgePackage 创建BatchSession对象所需要的KnowledgePackage对象
* @param batchSize 单个线程处理的任务数
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSessionByBatchSize(KnowledgePackage knowledgePackage,int batchSize){
return new BatchSessionImpl(knowledgePackage,BatchSession.DEFAULT_THREAD_SIZE,batchSize);
}
/**
* 创建一个用于批处理的BatchSession对象,第二个参数来指定线程池中可用线程个数,第三个参数用来决定单个线程处理的任务数
* @param knowledgePackage 创建BatchSession对象所需要的KnowledgePackage对象
* @param threadSize 线程池中可用的线程个数
* @param batchSize 单个线程处理的任务数
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSession(KnowledgePackage knowledgePackage,int threadSize,int batchSize){
return new BatchSessionImpl(knowledgePackage,threadSize,batchSize);
}
/**
* 创建一个用于批处理的BatchSession对象,这里默认将开启10个普通的线程池来运行提交的批处理任务,默认将每100个任务放在一个线程里处理
* @param knowledgePackage 创建BatchSession对象所需要的KnowledgePackage集合对象
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSession(KnowledgePackage[] knowledgePackages){
return new BatchSessionImpl(knowledgePackages,BatchSession.DEFAULT_THREAD_SIZE,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 创建一个用于批处理的BatchSession对象,第二个参数来指定线程池中可用线程个数,默认将每100个任务放在一个线程里处理
* @param knowledgePackages 创建BatchSession对象所需要的KnowledgePackage集合对象
* @param threadSize 线程池中可用的线程个数
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSessionByThreadSize(KnowledgePackage[] knowledgePackages,int threadSize){
return new BatchSessionImpl(knowledgePackages,threadSize,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 创建一个用于批处理的BatchSession对象,这里默认将开启10个普通的线程池来运行提交的批处理任务,第二个参数用来决定单个线程处理的任务数
* @param knowledgePackages 创建BatchSession对象所需要的KnowledgePackage集合对象
* @param batchSize 单个线程处理的任务数
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSessionByBatchSize(KnowledgePackage[] knowledgePackages,int batchSize){
return new BatchSessionImpl(knowledgePackages,BatchSession.DEFAULT_THREAD_SIZE,batchSize);
}
/**
* 创建一个用于批处理的BatchSession对象,第二个参数来指定线程池中可用线程个数,第三个参数用来决定单个线程处理的任务数
* @param knowledgePackages 创建BatchSession对象所需要的KnowledgePackage集合对象
* @param threadSize 线程池中可用的线程个数
* @param batchSize 单个线程处理的任务数
* @return 返回一个新的BatchSession对象
*/
public static BatchSession newBatchSession(KnowledgePackage[] knowledgePackages,int threadSize,int batchSize){
return new BatchSessionImpl(knowledgePackages,threadSize,batchSize);
}
前面介绍规则流中的决策节点时,了解到决策节点中支持百分比分流,这种百分比分流就要求必须是在使用BatchSession处理一批数据的时候,或者是一个用一个普通的KnowledgeSession一次性处理多条数据才有效,否则规则流只会走比例最高的那个分支。
BatchSession接口比较简单,它只定义了两个方法:
/**
* 添加一个具体要执行Business对象
* @param business Business对象实例
*/
void addBusiness(Business business);
/**
* 等待线程池中所有业务线程执行完成,在进行批处理操作时一定要以此方法作为方法调用结尾
*/
void waitForCompletion();
可以看到,它可以接收若干个名为com.bstek.urule.runtime.Business接口实例,Business接口比较简单,它只有一个方法:
package com.bstek.urule.runtime;
/**
* @author Jacky.gao
* @since 2015年9月29日
*/
public interface Business {
void execute(KnowledgeSession session);
}
在Business实现类中,我们的业务写在execute方法当中,在这个方法中,只有一个KnowledgeSession对象,这个session对象就是我们与规则引擎操作的对象,示例代码如下:
//从Spring中获取KnowledgeService接口实例
KnowledgeService service=(KnowledgeService)Utils.getApplicationContext().getBean(KnowledgeService.BEAN_ID);
//通过KnowledgeService接口获取指定的知识包ID"213"
KnowledgePackage knowledgePackage=service.getKnowledge("213");
//通过取的KnowledgePackage对象创建BatchSession对象,在这个对象中,我们将开启5个线程,每个线程最多放置10个Bussiness接口实例运行
BatchSession batchSession=KnowledgeSessionFactory.newBatchSession(knowledgePackage, 5, 10);
for(int i=0;i<100;i++){
batchSession.addBusiness(new Business(){
@Override
public void execute(KnowledgeSession session) {
Employee employee=new Employee();
employee.setSalary(11080);
//将业务数据对象Employee插入到KnowledgeSession中
session.insert(employee);
session.startProcess("demo");
}
});
}
//等待所有的线程执行完成,对于BatchSession调用来说,此行代码必不可少,否则将导致错误
batchSession.waitForCompletion();