在生产环境中,往往有在服务在运行时需要更换配置的需求。这一节的目的是介绍使用redis的发布订阅功能来实现动态配置的更新功能。
使用到了
- redis的发布订阅功能
- java反射
- bean声明周期 BeanPostProcessor
业务流程

其中标识和对象的关系是通过下文名为dccObjGroup
的HashMap来存储的。
代码实现
redis的依赖
xml
<!-- redis依赖 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.26.0</version>
</dependency>
自定义注解
创建自定义注解DCCValue
,用于标记需要动态修改的配置项,方便后续反射获取并修改字段
java
@Retention(RetentionPolicy.RUNTIME) // 运行时注解(因为在处理业务逻辑时需要获取注解)
@Target({ElementType.FIELD}) // 作用于字段
@Documented
public @interface DCCValue {
// 格式为 配置项的名称:配置值
// 比如 downgradeSwitch:0
String value() default "";
}
使用时 在需要动态修改的字段上添加注解并配置默认值即可 需要注意格式
java
// 某个配置类中
@DCCValue("downgradeSwitch:0")
private String downgradeSwitch;
@DCCValue("cutRange:100")
private String cutRange;
扫描并注入配置项实现
扫描所有bean是通过实现BeanPostProcessor
接口的类(本项目为DCCValueBeanFactory
)并重写postProcessAfterInitialization
方法来实现的。
这个类的postProcessAfterInitialization
方法会在每个bean初始化后被调用,可以在这里扫描所有的bean并查找带有特定注解的字段。
java
@Slf4j
@Configuration
public class DCCValueBeanFactory implements BeanPostProcessor {
// 配置项的前缀
// 方便后续在Redis中查找配置项
private static final String BASE_CONFIG_PATH = "group_buy_market_dcc_";
// 用于存储配置项的对象 本文为了突出重点 没有写client的配置
private final RedissonClient redissonClient;
// 存储配置项的对象
private final Map<String, Object> dccObjGroup = new HashMap<>();
public DCCValueBeanFactory(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* Bean初始化后处理
* 每当有个bean被初始化后都会调用这个方法
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 注意;增加 AOP 代理后,获得类的方式要通过 AopProxyUtils.getTargetClass(bean); 不能直接 bean.class 因为代理后类的结构发生变化,这样不能获得到自己的自定义注解了。
Class<?> targetBeanClass = bean.getClass();
Object targetBeanObject = bean;
if (AopUtils.isAopProxy(bean)) {
// 如果是AOP代理的对象,则获取目标类和目标对象
targetBeanClass = AopUtils.getTargetClass(bean);
targetBeanObject = AopProxyUtils.getSingletonTarget(bean);
}
// 遍历所有字段
Field[] fields = targetBeanClass.getDeclaredFields();
for (Field field : fields) {
// 仅处理带有DCCValue注解的字段
if (!field.isAnnotationPresent(DCCValue.class)) {
continue;
}
// 获取注解中的配置
DCCValue dccValue = field.getAnnotation(DCCValue.class);
String value = dccValue.value();
if (StringUtils.isBlank(value)) {
// 必须有默认值
throw new RuntimeException(field.getName() + " @DCCValue is not config value config case 「isSwitch/isSwitch:1」");
}
// 分割配置项的名称和默认值
// 格式为 配置项的名称:配置值
String[] splits = value.split(":");
String key = BASE_CONFIG_PATH.concat(splits[0]);
String defaultValue = splits.length == 2 ? splits[1] : null;
// 设置值
String setValue = defaultValue;
try {
// 如果为空则抛出异常
if (StringUtils.isBlank(defaultValue)) {
throw new RuntimeException("dcc config error " + key + " is not null - 请配置默认值!");
}
// Redis 操作,判断配置Key是否存在,不存在则创建,存在则获取最新值
RBucket<String> bucket = redissonClient.getBucket(key);
boolean exists = bucket.isExists();
if (!exists) {
bucket.set(defaultValue);
} else {
setValue = bucket.get();
}
// 注入到目标对象中
field.setAccessible(true);
field.set(targetBeanObject, setValue);
field.setAccessible(false);
} catch (Exception e) {
throw new RuntimeException(e);
}
// 将配置项存储到dccObjGroup中 用于后续订阅发布时可以通过标识获取到对应的对象
dccObjGroup.put(key, targetBeanObject);
}
return bean;
}
}
发布订阅实现
订阅处理
下文的函数是属于DCCValueBeanFactory
类的一个方法,因为使用到了dccObjGroup
java
/**
* 简单来说 这个函数就是创建了一个Redis的Topic订阅 并给其绑定了一个回调函数
* 当有消息发布到这个Topic时 就会触发这个回调函数
*/
@Bean("dccTopic")
public RTopic dccRedisTopicListener(RedissonClient redissonClient) {
RTopic topic = redissonClient.getTopic("group_buy_market_dcc");
// 绑定的回调函数 s为订阅的消息内容
// s需要是 "attribute,value" 的格式
topic.addListener(String.class, (charSequence, s) -> {
String[] split = s.split(Constants.SPLIT);
// 获取值
String attribute = split[0];
// redis中存储的格式为 BASE_CONFIG_PATH + attribute
String key = BASE_CONFIG_PATH + attribute;
String value = split[1];
// 设置值
RBucket<String> bucket = redissonClient.getBucket(key);
boolean exists = bucket.isExists();
if (!exists) return;
bucket.set(value);
Object objBean = dccObjGroup.get(key);
if (null == objBean) return;
Class<?> objBeanClass = objBean.getClass();
// 检查 objBean 是否是代理对象
if (AopUtils.isAopProxy(objBean)) {
// 获取代理对象的目标对象
objBeanClass = AopUtils.getTargetClass(objBean);
}
try {
// 1. getDeclaredField 方法用于获取指定类中声明的所有字段,包括私有字段、受保护字段和公共字段。
// 2. getField 方法用于获取指定类中的公共字段,即只能获取到公共访问修饰符(public)的字段。
Field field = objBeanClass.getDeclaredField(attribute);
field.setAccessible(true);
field.set(objBean, value);
field.setAccessible(false);
log.info("DCC 节点监听,动态设置值 {} {}", key, value);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return topic;
}
发布配置
写了一个Controller
用于发布配置到Redis中
java
// 注入上文定义的bean dccTopic
@Resource
private RTopic dccTopic;
/**
* 动态值变更
*/
@RequestMapping(value = "update_config", method = RequestMethod.GET)
@Override
public Response<Boolean> updateConfig(@RequestParam String key, @RequestParam String value) {
try {
log.info("DCC 动态配置值变更 key:{} value:{}", key, value);
// 在topic中发布消息
dccTopic.publish(key + "," + value);
return Response.<Boolean>builder()
.code(ResponseCode.SUCCESS.getCode())
.info(ResponseCode.SUCCESS.getInfo())
.build();
} catch (Exception e) {
log.error("DCC 动态配置值变更失败 key:{} value:{}", key, value, e);
return Response.<Boolean>builder()
.code(ResponseCode.UN_ERROR.getCode())
.info(ResponseCode.UN_ERROR.getInfo())
.build();
}
}