服务端集群节点之间缓存更新通知
如果您部署的服务端集群节点服务之间可以通过确定的URL地址访问,那么您可以通过3.8.集群管理进行集群节点缓存更新的管理。
如果集群节点服务之间无法通过确定的URL地址访问,比如每次节点重启IP都不确定,或者节点个数可能随时随地发生变化,那么可以参考如下实现,通过消息中间件通知的机制实现每个节点的缓存更新。
这种方式无需在URule Pro的集群管理中配置每个节点URL地址,需要实现 ClusterPacketCacheAdapter
和 JarCacheAdapter
两个URule更新缓存的接口的方法生产消息:
package com.bstek.urule.console.cache.packet;
import java.util.List;
import java.util.Map;
//服务端集群节点知识包缓存
public interface ClusterPacketCacheAdapter {
public static final String BEAN_ID = "urule.clusterPacketCacheAdapter";
void putPacket(long packetId, PacketData paramPacketData);
void putPacket(String packetCode, PacketData paramPacketData);
void remove(long packetId);
void remove(String packetCode);
//刷新知识包缓存时触发
List<Map<String, Object>> refreshPacket(String groupId, long packetId);
//重置全部知识包缓存时触发
List<Map<String, Object>> recacheAllPackets(String groupId);
//删除项目时触发
List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list);
}
public abstract class JarCacheAdapter {
public static String BEAN_ID = "urule.jarCacheAdapter";
//更新jar热部署时触发
public abstract List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception;
}
接收到消息的消费类中,通过 com.bstek.urule.console.cache.ServerCacheManager
里的方法进行当前节点的缓存更新:
方法 | 说明 |
---|---|
reloadPacket(String systemId, long packetId) | 重新加载指定id知识包 |
syncPacketForRemoveProject(String systemId, long projectId) | 删除指定项目的知识包缓存 |
recacheAllPackets(String systemId) | 重新加载所有知识包缓存 |
reloadDynamicJars(String systemId) | 重新加载所有Jar缓存 |
下面做一个参考实现,前面我们使用Redis实现了Session共享,为了方便我们可以继续使用Redis的消息通知功能,你也可以使用更专业的消息中间件来实现,比如Kafka、RabbitMQ等。
1、定义消息通知的常量类
package com.bstek.urule.sample.mq.constant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* MQ所需常量
*/
@Component
public class MQConstant {
public static String QUEUE_CLUSTER_PACKET_REFRESHALL = "urule.cluster.knowledge.refreshall";
public static String QUEUE_CLUSTER_PACKET_REFRESH = "urule.cluster.knowledge.refresh";
public static String QUEUE_CLUSTER_PROJECT_REMOVE = "urule.cluster.project.remove";
public static String QUEUE_CLUSTER_JAR_SYNC = "urule.cluster.jar.sync";
public static String QUEUE_CLIENT_PACKET_DISABLE = "urule.client.knowledge.disable";
public static String QUEUE_CLIENT_PACKET_ENABLE = "urule.client.knowledge.enable";
public static String QUEUE_CLIENT_PACKET_REFRESH = "urule.client.knowledge.refresh";
public static String QUEUE_CLIENT_JAR_SYNC = "urule.client.jar.sync";
public static String MQIP;
@Value("${project.urule.mq.ip:127.0.0.1:9092}")
public void setMQIP(String mqIP) {
MQIP = mqIP;
}
public static String CLUSTER_TOPIC;
@Value("${project.urule.mq.clusterTopic:urule-cluster-topic}")
public void setClusterTopic(String clusterTopic) {
CLUSTER_TOPIC = clusterTopic;
}
public static String CLIENT_TOPIC;
@Value("${project.urule.mq.clientTopic:urule-client-topic}")
public void setClientTopic(String clientTopic) {
CLIENT_TOPIC = clientTopic;
}
}
2、yaml配置
project:
urule:
mq:
clusterTopic: urule-cluster-topic
clientTopic: urule-client-topic
3、配置RedisSessionConfig类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.session.data.redis.config.ConfigureRedisAction;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
@Configuration
//设置session过期时间,默认是1800秒
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 30 * 60)
public class RedisSessionConfig {
@Bean
public static ConfigureRedisAction configureRedisAction(){
return ConfigureRedisAction.NO_OP;
}
@Autowired
private RedisConnectionFactory factory;
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(factory);
return redisTemplate;
}
}
4、配置RedisConsumerConfig消息消费类
package com.bstek.urule.sample.mq.redis.config;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.listener.RedisConsumerListener;
/**
* Redis消费频道配置
*
*/
@Component
public class RedisConsumerConfig {
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(new RedisConsumerListener());
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, new PatternTopic(MQConstant.CLUSTER_TOPIC));
return container;
}
}
5、为了切换消息中间件方便,定义了一个生产消息的接口
package com.bstek.urule.sample.mq;
public interface CustomProducerService {
public static final String BEAN_ID = "urule.ext.CustomProducerService";
public void sendMessage(String topic, String message);
}
6、定义生产消息的实现类
package com.bstek.urule.sample.mq.redis.service;
import javax.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.bstek.urule.sample.mq.CustomProducerService;
/**
* Reids消息发送类
*/
@Service
public class RedisProducerServiceImpl implements CustomProducerService{
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void sendMessage(String topic, String message){
redisTemplate.convertAndSend(topic,message);
}
}
7、定义服务端某一个集群节点知识包缓存发生变化时,发送消息的通知类
package com.bstek.urule.sample.mq.adapter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties.Tomcat.Threads;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.packet.ClientPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.ClusterPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.PacketCache;
import com.bstek.urule.console.cache.packet.PacketConfig;
import com.bstek.urule.console.cache.packet.PacketData;
import com.bstek.urule.console.database.manager.packet.PacketManager;
import com.bstek.urule.console.database.model.Packet;
import com.bstek.urule.exception.RuleException;
import com.bstek.urule.runtime.KnowledgePackage;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
* 知识包缓存更新消息通知类
*/
@Slf4j
@Component(ClusterPacketCacheAdapter.BEAN_ID)
public class MsgClusterPacketCacheAdapter implements ClusterPacketCacheAdapter{
@Autowired
private CustomProducerService customProducerService;
public List<Map<String, Object>> recacheAllPackets(String groupId) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("recacheAllPackets(String groupId):"+groupId);
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
return result;
}
@Override
public List<Map<String, Object>> refreshPacket(String groupId, long packetId) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
// PacketData packetData = PacketCache.ins.getPacket(packetId);
// String packetCode =packetData.getPacket().getCode();
Packet packet = PacketManager.ins.load(packetId);
String packetCode = packet.getCode();
log.info("refreshPacket(String groupId, long packetId):{}:{}:{}",groupId,packetId,packetCode);
ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clustermsg.put("groupId", groupId);
clustermsg.put("systemId", Utils.SystemId);
clustermsg.put("packetId", String.valueOf(packetId));
clustermsg.put("packetCode", packetCode);
clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESH);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());
/* 如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clientmsg.put("groupId", groupId);
clientmsg.put("systemId", Utils.SystemId);
clientmsg.put("packetId", String.valueOf(packetId));
clientmsg.put("packetCode", packetCode);
clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_REFRESH);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
*/
return result;
}
@Override
public List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("removeProject(String paramString, long paramLong, List<PacketConfig> paramList):"+projectId);
/*如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
for(PacketConfig pc:list) {
disableClientsPacket(groupId,pc.getId(),pc.getCode());
} */
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("projectId", String.valueOf(projectId));
msg.put("messageType", MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
return result;
}
@Override
public void putPacket(long packetId, PacketData paramPacketData) {
// TODO Auto-generated method stub
}
@Override
public void putPacket(String packetCode, PacketData paramPacketData) {
// TODO Auto-generated method stub
}
@Override
public void remove(long packetId) {
// TODO Auto-generated method stub
}
@Override
public void remove(String packetCode) {
// TODO Auto-generated method stub
}
/*如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
public List<Map<String, Object>> disableClientsPacket(String groupId, long packetId,String packetCode) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("disableClientsPacket(String groupId:{}, long packetId):{},code:{}" ,groupId,packetId,packetCode);
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("packetId", String.valueOf(packetId));
msg.put("packetCode", packetCode);
msg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_DISABLE);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, msg.toString());
return result;
}
*/
}
8、定义服务端某一个集群节点Jar包缓存发生变化时,发送消息的通知类
package com.bstek.urule.sample.mq.adapter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.database.model.UrlType;
import com.bstek.urule.console.editor.jar.JarCacheAdapter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
* Jar包缓存消息发送类
*/
@Slf4j
@Component("urule.jarCacheAdapter")
public class MsgJarCacheAdapter extends JarCacheAdapter {
@Autowired
private CustomProducerService customProducerService;
@Override
public List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("loadDynamicJars(String groupId, UrlType urlType)"+groupId);
ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clustermsg.put("groupId", groupId);
clustermsg.put("systemId", Utils.SystemId);
clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_JAR_SYNC);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());
/*如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clientmsg.put("groupId", groupId);
clientmsg.put("systemId", Utils.SystemId);
clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_JAR_SYNC);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
*/
return result;
}
}
9、定义接收到消息的监听类
package com.bstek.urule.sample.mq.redis.listener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.ServerCacheManager;
import com.bstek.urule.runtime.cache.ClientCacheManager;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.bstek.urule.sample.mq.constant.MQConstant;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Resource;
/**
* Redis消息监听处理类
*/
@Slf4j
public class RedisConsumerListener implements MessageListener {
private static Map<String, Consumer<String>> RULE = new HashMap<>();
{
RULE.put(MQConstant.CLUSTER_TOPIC, this::consumerClusterMessage);
// RULE.put(MQConstant.CLIENT_TOPIC, this::consumerClusterMessage);
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] b_channel = message.getChannel();
byte[] b_body = message.getBody();
try {
String channel = new String(b_channel);
String body = new String(b_body);
log.info("channel is:" + channel + " , body is: " + body);
RULE.get(channel).accept(body);
} catch (Exception e) {
}
}
public void consumerClusterMessage(String message) {
ServerCacheManager serverCacheManager = (ServerCacheManager) Utils.getApplicationContext().getBean("urule.serverCacheManager");
log.info("consumerClusterMessage exec params is :" + message);
try {
HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
String messageType = mapMessage.get("messageType");
String systemId = mapMessage.get("systemId");
String groupId = mapMessage.get("groupId");
String packetId = mapMessage.get("packetId");
String projectId = mapMessage.get("projectId");
if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESH.equals(messageType)) {
serverCacheManager.reloadPacket(systemId, Long.valueOf(packetId));
}else if(MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE.equals(messageType)) {
serverCacheManager.syncPacketForRemoveProject(systemId, Long.valueOf(projectId));
}else if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL.equals(messageType)) {
serverCacheManager.recacheAllPackets(systemId);
}else if(MQConstant.QUEUE_CLUSTER_JAR_SYNC.equals(messageType)) {
serverCacheManager.reloadDynamicJars(systemId);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*如果客户端的消息消费监听类,可以添加如下代码
public void consumerClinetMessage(String message) {
ClientCacheManager clientCacheManager = (ClientCacheManager) Utils.getApplicationContext().getBean("urule.clientCacheManager");
log.info("consumerClinetMessage exec params is :" + message);
try {
HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
String messageType = mapMessage.get("messageType");
String systemId = mapMessage.get("systemId");
String groupId = mapMessage.get("groupId");
String packetId = mapMessage.get("packetId");
String projectId = mapMessage.get("projectId");
if(MQConstant.QUEUE_CLIENT_PACKET_DISABLE.equals(messageType)) {
clientCacheManager.disableKnowledge(packetId);
}else if(MQConstant.QUEUE_CLIENT_PACKET_ENABLE.equals(messageType)) {
clientCacheManager.enableKnowledge(packetId);
}else if(MQConstant.QUEUE_CLIENT_PACKET_REFRESH.equals(messageType)) {
clientCacheManager.reloadKnowledge(packetId);
}else if(MQConstant.QUEUE_CLIENT_JAR_SYNC.equals(messageType)) {
clientCacheManager.reloadDynamicJars();
log.info("=====jar重新加载完成=====");
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
*/
至此,通过Redis实现的集群节点之间的缓存通知功能就完成了。