客户端通过Redis缓存获取知识包
当服务端启用集群的部署方式后,我们可以选择将知识包放到Redis中缓存,客户端需要更新知识包时,去Redis中获取而不是去服务端获取。
服务端需要实现 ClusterPacketCacheAdapter
,在以下方法中,操作Redis中的缓存。
方法 | 说明 |
---|---|
putPacket | 第一次和重置缓存时触发 |
remove | 删除知识包缓存时触发 |
refreshPacket | 刷新知识包缓存时触发 |
removeProject | 删除项目时触发 |
recacheAllPackets | 重置全部知识包缓存时触发 |
实现参考
1、定义缓存操作接口
import com.bstek.urule.runtime.KnowledgePackage;
/**
* 自定义知识包缓存接口
*/
public interface CustomKnowledgeCacheService {
public static final String BEAN_ID = "urule.ext.customKnowledgeCache";
//缓存中放入知识包
public void putCache(String key ,KnowledgePackage value);
//删除知识包
public void removeCache(String key);
//删除所有知识包
public Long removeAllCache(String prefixKey);
}
2、知识包Reids缓存实现类
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.bstek.urule.Utils;
import com.bstek.urule.console.cache.packet.PacketData;
import com.bstek.urule.runtime.KnowledgePackage;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class RedisKnowledgeCacheServiceImpl implements CustomKnowledgeCacheService {
@Resource
RedisTemplate redisTemplate;
@Value("${project.urule.packetcache.prefix:packetcache:packetcache_}")
String packetCachePrefix;
@Value("${project.urule.packetcache.timestampsuffix:_timestamp}")
String timestampSuffix;
/**
* 在Redis中存入知识包的同时,存入一个时间戳,用来更新知识包时进行比对
*/
@Override
public void putCache(String packetKey, KnowledgePackage knowledgePackage) {
String knowledgePackageTimestamp = String.valueOf(knowledgePackage.getTimestamp());
String redisKnowledgePackageTimestamp = (String) redisTemplate.opsForValue().get(packetCachePrefix + packetKey + timestampSuffix);
if (redisKnowledgePackageTimestamp == null || !knowledgePackageTimestamp.equals(redisKnowledgePackageTimestamp)) {
String knowledgePackageContent = Utils.knowledgePackageToString(knowledgePackage);
log.info("set redis id{},timestamp={},redistimestamp={}", packetCachePrefix + packetKey, knowledgePackageTimestamp,redisKnowledgePackageTimestamp);
redisTemplate.opsForValue().set(packetCachePrefix + packetKey, knowledgePackageContent);
redisTemplate.opsForValue().set(packetCachePrefix + packetKey + timestampSuffix, knowledgePackageTimestamp);
}
}
@Override
public void removeCache(String packetKey) {
redisTemplate.delete(packetCachePrefix + packetKey);
redisTemplate.delete(packetCachePrefix + packetKey + timestampSuffix);
}
@Override
public Long removeAllCache(String key) {
Set<String> keys = redisTemplate.keys(packetCachePrefix+key);
if(!CollectionUtils.isEmpty(keys)) {
return redisTemplate.delete(keys);
}
return null;
}
}
3、在 ClusterPacketCacheAdapter
实现中调用 customKnowledgeCacheService
/**
* 知识包缓存更新消息通知类
*/
@Slf4j
@Component("urule.clusterPacketCacheAdapter")
public class MsgClusterPacketCacheAdapter implements ClusterPacketCacheAdapter{
@Autowired
private CustomKnowledgeCacheService customKnowledgeCacheService;
@Autowired
private CustomProducerService customProducerService;
public List<Map<String, Object>> recacheAllPackets(String groupId) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("recacheAllPackets(String groupId):"+groupId);
customKnowledgeCacheService.removeAllCache("*");
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
return result;
}
@Override
public List<Map<String, Object>> refreshPacket(String groupId, long packetId) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
Packet packet = PacketManager.ins.load(packetId);
String packetCode = packet.getCode();
PacketData packetData =PacketCache.ins.getPacket(packetId);
//缓存知识包
if(packetData!=null) {
KnowledgePackage knowledgePackage = packetData.getKnowledgePackageWrapper().getKnowledgePackage();
customKnowledgeCacheService.putCache(String.valueOf(packetId), knowledgePackage);
customKnowledgeCacheService.putCache(packetCode, knowledgePackage);
}
//通知服务端集群节点
log.info("refreshPacket(String groupId, long packetId):{}:{}:{}",groupId,packetId,packetCode);
ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clustermsg.put("groupId", groupId);
clustermsg.put("systemId", Utils.SystemId);
clustermsg.put("packetId", String.valueOf(packetId));
clustermsg.put("packetCode", packetCode);
clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESH);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());
ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clientmsg.put("groupId", groupId);
clientmsg.put("systemId", Utils.SystemId);
clientmsg.put("packetId", String.valueOf(packetId));
clientmsg.put("packetCode", packetCode);
clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_REFRESH);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
return result;
}
@Override
public List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("removeProject(String paramString, long paramLong, List<PacketConfig> paramList):"+projectId);
for(PacketConfig pc:list) {
disableClientsPacket(groupId,pc.getId(),pc.getCode());
customKnowledgeCacheService.removeCache(String.valueOf(pc.getId()));
customKnowledgeCacheService.removeCache(pc.getCode());
}
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("projectId", String.valueOf(projectId));
msg.put("messageType", MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
return result;
}
@Override
public void putPacket(long packetId, PacketData paramPacketData) {
// TODO Auto-generated method stub
log.info("putPacket(long packetId){}", packetId);
if (paramPacketData == null) {
throw new RuleException("Put Package to Redis [id=" + packetId + "] not exist");
}
KnowledgePackage knowledgePackage = paramPacketData.getKnowledgePackageWrapper().getKnowledgePackage();
customKnowledgeCacheService.putCache(String.valueOf(packetId), knowledgePackage);
}
@Override
public void putPacket(String packetCode, PacketData paramPacketData) {
// TODO Auto-generated method stub
log.info("putPacket(String packetCode):{}", packetCode);
if (paramPacketData == null) {
throw new RuleException("Put Package to Redis [code=" + packetCode + "] not exist");
}
KnowledgePackage knowledgePackage = paramPacketData.getKnowledgePackageWrapper().getKnowledgePackage();
customKnowledgeCacheService.putCache(packetCode, knowledgePackage);
}
@Override
public void remove(long packetId) {
// TODO Auto-generated method stub
log.info("remove(String packetId):{}", packetId);
customKnowledgeCacheService.removeCache(String.valueOf(packetId));
}
@Override
public void remove(String packetCode) {
// TODO Auto-generated method stub
log.info("remove(String packetCode):{}", packetCode);
customKnowledgeCacheService.removeCache(packetCode);
}
public List<Map<String, Object>> disableClientsPacket(String groupId, long packetId,String packetCode) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("disableClientsPacket(String groupId:{}, long packetId):{},code:{}" ,groupId,packetId,packetCode);
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("packetId", String.valueOf(packetId));
msg.put("packetCode", packetCode);
msg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_DISABLE);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, msg.toString());
return result;
}
}
4、客户端配置从Redis中获取知识包