Degrade rule service init and listen to channel

This commit is contained in:
zhongjie
2022-01-05 16:25:36 +08:00
parent 394b9b06f8
commit ca7ecaa674
5 changed files with 323 additions and 0 deletions

View File

@@ -103,6 +103,7 @@ refresh-local-cache:
app-auth-enabled: true
flow-control-rule-enabled: true
rpc-service-enabled: true
degrade-rule-enabled: true
fizz:

View File

@@ -26,6 +26,7 @@ import we.plugin.auth.ApiConfig2appsService;
import we.plugin.auth.AppService;
import we.plugin.auth.GatewayGroupService;
import we.proxy.RpcInstanceService;
import we.stats.degrade.DegradeRuleService;
import we.stats.ratelimit.ResourceRateLimitConfigService;
import javax.annotation.Resource;
@@ -39,6 +40,7 @@ import javax.annotation.Resource;
* @see AppService#refreshLocalCache() refresh app local cache
* @see ResourceRateLimitConfigService#refreshLocalCache() refresh flow control rule local cache
* @see RpcInstanceService#refreshLocalCache() refresh rpc service local cache
* @see DegradeRuleService#refreshLocalCache() refresh degrade rule local cache
*
* @author zhongjie
*/
@@ -73,6 +75,9 @@ public class RefreshLocalCacheConfig {
@Resource
private FizzMangerConfig fizzMangerConfig;
@Resource
private DegradeRuleService degradeRuleService;
@Scheduled(initialDelayString = "${refresh-local-cache.initial-delay-millis:300000}",
fixedRateString = "${refresh-local-cache.fixed-rate-millis:300000}")
public void refreshLocalCache() {
@@ -139,6 +144,15 @@ public class RefreshLocalCacheConfig {
}
}
if (refreshLocalCacheConfigProperties.isDegradeRuleCacheRefreshEnabled()) {
LOGGER.debug("refresh degrade rule local cache");
try {
degradeRuleService.refreshLocalCache();
} catch (Throwable t) {
LOGGER.warn("refresh degrade rule local cache exception", t);
}
}
fizzMangerConfig.updateMangerUrl();
}
}

View File

@@ -51,4 +51,7 @@ public class RefreshLocalCacheConfigProperties {
@Value("${refresh-local-cache.rpc-service-enabled:false}")
private boolean rpcServiceCacheRefreshEnabled;
@Value("${refresh-local-cache.degrade-rule-enabled:false}")
private boolean degradeRuleCacheRefreshEnabled;
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats.degrade;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import we.util.JacksonUtils;
import static we.util.ResourceIdUtils.SERVICE_DEFAULT;
/**
* Degrade rule entity
*
* @author zhongjie
*/
@Data
public class DegradeRule {
/**
* 整型自增主键
*/
private Long id;
/**
* 熔断类型 1-服务默认配置 2-服务 3-接口
*/
private Byte type;
/**
* 前端服务名
*/
private String service;
/**
* 前端API路径
*/
private String path;
/**
* 熔断策略 1-异常比例 2-异常数
*/
private Byte strategy;
/**
* 比例阈值,当熔断策略为 1-异常比例 时该字段有值
*/
private Float ratioThreshold;
/**
* 异常数,当熔断策略为 2-异常数 时该字段有值
*/
private Long exceptionCount;
/**
* 最小请求数
*/
private Long minRequestCount;
/**
* 熔断时长(秒)
*/
private Integer timeWindow;
/**
* 统计时长(秒)
*/
private Integer statInterval;
/**
* 恢复策略 1-尝试恢复 2-逐步恢复 3-立即恢复
*/
private Byte recoveryStrategy;
/**
* 恢复时长(秒),当恢复策略为 2-逐步恢复 时该字段有值
*/
private Integer recoveryTimeWindow;
/**
* 熔断响应ContentType
*/
private String responseContentType;
/**
* 熔断响应报文
*/
private String responseContent;
/**
* 当type为1时1启用0反之
*/
private Integer enable;
/**
* 是否删除 1-是 2-否
*/
private Integer isDeleted;
private String resourceId;
public boolean isDeleted() {
return isDeleted == 1;
}
public boolean isEnable() {
return enable == 1;
}
public void setType(Byte type) {
this.type = type;
if (type == 1) {
service = SERVICE_DEFAULT;
}
}
public void setService(String s) {
if (StringUtils.isNotBlank(s)) {
service = s;
}
}
public void setPath(String p) {
if (StringUtils.isNotBlank(p)) {
path = p;
}
}
@JsonIgnore
public String getResourceId() {
if (resourceId == null) {
resourceId = "^^^" + (service == null ? "" : service) + '^' + (path == null ? "" : path);
}
return resourceId;
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}

View File

@@ -0,0 +1,164 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats.degrade;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.util.JacksonUtils;
import we.util.Result;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Degrade rule service
*
* @author zhongjie
*/
@Service
@Slf4j
public class DegradeRuleService {
/**
* Redis degrade rule change channel
*/
private static final String DEGRADE_RULE_CHANNEL = "fizz_degrade_rule_channel";
/**
* redis degrade rule info hash key
*/
private static final String DEGRADE_RULE_HASH_KEY = "fizz_degrade_rule";
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;
private Map<String, DegradeRule> resourceId2DegradeRuleMap = new ConcurrentHashMap<>(32);
private Map<Long, DegradeRule> id2DegradeRuleMap = new ConcurrentHashMap<>(32);
@PostConstruct
public void init() {
Result<?> result = initDegradeRule();
if (result.code == Result.FAIL) {
throw new RuntimeException(result.msg, result.t);
}
result = lsnDegradeRuleChange();
if (result.code == Result.FAIL) {
throw new RuntimeException(result.msg, result.t);
}
}
public void refreshLocalCache() throws Throwable {
this.initDegradeRule();
}
private Result<?> initDegradeRule() {
Result<?> result = Result.succ();
Map<String, DegradeRule> resourceId2DegradeRuleMapTmp = new ConcurrentHashMap<>(32);
Map<Long, DegradeRule> id2DegradeRuleMapTmp = new ConcurrentHashMap<>(32);
Flux<Map.Entry<Object, Object>> degradeRuleEntries = rt.opsForHash().entries(DEGRADE_RULE_HASH_KEY);
degradeRuleEntries.collectList()
.defaultIfEmpty(Collections.emptyList())
.flatMap(es -> {
if (!es.isEmpty()) {
String json = null;
try {
for (Map.Entry<Object, Object> e : es) {
json = (String) e.getValue();
DegradeRule degradeRule = JacksonUtils.readValue(json, DegradeRule.class);
resourceId2DegradeRuleMapTmp.put(degradeRule.getResourceId(), degradeRule);
id2DegradeRuleMapTmp.put(degradeRule.getId(), degradeRule);
log.info("init degrade rule: {}", json);
}
} catch (Throwable t) {
result.code = Result.FAIL;
result.msg = "init degrade rule error, json: " + json;
result.t = t;
}
} else {
log.info("no degrade rule");
}
return Mono.empty();
}
)
.onErrorReturn(
throwable -> {
result.code = Result.FAIL;
result.msg = "init degrade rule error";
result.t = throwable;
return true;
},
result
)
.block();
resourceId2DegradeRuleMap = resourceId2DegradeRuleMapTmp;
id2DegradeRuleMap = id2DegradeRuleMapTmp;
return result;
}
private Result<?> lsnDegradeRuleChange() {
Result<?> result = Result.succ();
rt.listenToChannel(DEGRADE_RULE_CHANNEL)
.doOnError(
t -> {
result.code = Result.FAIL;
result.msg = "lsn error, channel: " + DEGRADE_RULE_CHANNEL;
result.t = t;
log.error("lsn channel {} error", DEGRADE_RULE_CHANNEL, t);
}
)
.doOnSubscribe(
s -> log.info("success to lsn on {}", DEGRADE_RULE_CHANNEL)
)
.doOnNext(
msg -> {
String message = msg.getMessage();
try {
DegradeRule degradeRule = JacksonUtils.readValue(message, DegradeRule.class);
if (degradeRule.isDeleted()) {
DegradeRule remove = id2DegradeRuleMap.remove(degradeRule.getId());
if (remove != null) {
resourceId2DegradeRuleMap.remove(remove.getResourceId());
}
log.info("remove degrade rule {}", message);
} else {
DegradeRule previous = id2DegradeRuleMap.put(degradeRule.getId(), degradeRule);
if (previous != null) {
if (!previous.getResourceId().equals(degradeRule.getResourceId())) {
resourceId2DegradeRuleMap.remove(previous.getResourceId());
}
}
resourceId2DegradeRuleMap.put(degradeRule.getResourceId(), degradeRule);
log.info("update degrade rule {}", message);
}
} catch (Throwable t) {
log.error("update degrade rule error, {}", message, t);
}
}
)
.subscribe();
return result;
}
}