refresh config local cache (#173)
This commit is contained in:
160
fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java
Normal file
160
fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java
Normal file
@@ -0,0 +1,160 @@
|
||||
/*
|
||||
* Copyright (C) 2021 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package we.config;
|
||||
|
||||
import com.alibaba.nacos.api.config.annotation.NacosValue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import we.fizz.ConfigLoader;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.ApiConifg2appsService;
|
||||
import we.plugin.auth.AppService;
|
||||
import we.plugin.auth.GatewayGroupService;
|
||||
import we.proxy.RpcInstanceService;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* refresh config local cache config
|
||||
* @see ApiConfigService#refreshLocalCache() refresh api config local cache
|
||||
* @see ApiConifg2appsService#refreshLocalCache() refresh api config to apps local cache
|
||||
* @see ConfigLoader#refreshLocalCache() refresh aggregate config local cache
|
||||
* @see GatewayGroupService#refreshLocalCache() refresh gateway group local cache
|
||||
* @see AppService#refreshLocalCache() refresh app local cache
|
||||
* @see ResourceRateLimitConfigService#refreshLocalCache() refresh flow control rule local cache
|
||||
* @see RpcInstanceService#refreshLocalCache() refresh rpc service local cache
|
||||
*
|
||||
* @author zhongjie
|
||||
*/
|
||||
@Configuration
|
||||
public class RefreshLocalCacheConfig {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RefreshLocalCacheConfig.class);
|
||||
|
||||
@NacosValue(value = "${refresh-local-cache.api-config-enabled:false}", autoRefreshed = true)
|
||||
@Value("${refresh-local-cache.api-config-enabled:false}")
|
||||
private boolean apiConfigCacheRefreshEnabled;
|
||||
@NacosValue(value = "${refresh-local-cache.api-config-2-apps-enabled:false}", autoRefreshed = true)
|
||||
@Value("${refresh-local-cache.api-config-2-apps-enabled:false}")
|
||||
private boolean apiConfig2AppsCacheRefreshEnabled;
|
||||
@NacosValue(value = "${refresh-local-cache.aggregate-config-enabled:false}", autoRefreshed = true)
|
||||
@Value("${refresh-local-cache.aggregate-config-enabled:false}")
|
||||
private boolean aggregateConfigCacheRefreshEnabled;
|
||||
@NacosValue(value = "${refresh-local-cache.gateway-group-enabled:false}", autoRefreshed = true)
|
||||
@Value("${refresh-local-cache.gateway-group-enabled:false}")
|
||||
private boolean gatewayGroupCacheRefreshEnabled;
|
||||
@NacosValue(value = "${refresh-local-cache.app-auth-enabled:false}", autoRefreshed = true)
|
||||
@Value("${refresh-local-cache.app-auth-enabled:false}")
|
||||
private boolean appAuthCacheRefreshEnabled;
|
||||
@NacosValue(value = "${refresh-local-cache.flow-control-rule-enabled:false}", autoRefreshed = true)
|
||||
@Value("${refresh-local-cache.flow-control-rule-enabled:false}")
|
||||
private boolean flowControlRuleCacheRefreshEnabled;
|
||||
@NacosValue(value = "${refresh-local-cache.rpc-service-enabled:false}", autoRefreshed = true)
|
||||
@Value("${refresh-local-cache.rpc-service-enabled:false}")
|
||||
private boolean rpcServiceCacheRefreshEnabled;
|
||||
|
||||
@Resource
|
||||
private ConfigLoader configLoader;
|
||||
|
||||
@Resource
|
||||
private ApiConfigService apiConfigService;
|
||||
|
||||
@Resource
|
||||
private ApiConifg2appsService apiConifg2appsService;
|
||||
|
||||
@Resource
|
||||
private GatewayGroupService gatewayGroupService;
|
||||
|
||||
@Resource
|
||||
private AppService appService;
|
||||
|
||||
@Resource
|
||||
private ResourceRateLimitConfigService resourceRateLimitConfigService;
|
||||
|
||||
@Resource
|
||||
private RpcInstanceService rpcInstanceService;
|
||||
|
||||
@Scheduled(initialDelayString = "${refresh-local-cache.initial-delay-millis:300000}",
|
||||
fixedRateString = "${refresh-local-cache.fixed-rate-millis:300000}")
|
||||
public void refreshLocalCache() {
|
||||
if (apiConfigCacheRefreshEnabled) {
|
||||
LOGGER.debug("refresh api config local cache");
|
||||
try {
|
||||
apiConfigService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh api config local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (apiConfig2AppsCacheRefreshEnabled) {
|
||||
LOGGER.debug("refresh api config to apps local cache");
|
||||
try {
|
||||
apiConifg2appsService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh api config to apps local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (aggregateConfigCacheRefreshEnabled) {
|
||||
LOGGER.debug("refresh aggregate config local cache");
|
||||
try {
|
||||
configLoader.refreshLocalCache();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("refresh aggregate config local cache exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (gatewayGroupCacheRefreshEnabled) {
|
||||
LOGGER.debug("refresh gateway group local cache");
|
||||
try {
|
||||
gatewayGroupService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh gateway group local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (appAuthCacheRefreshEnabled) {
|
||||
LOGGER.debug("refresh app auth local cache");
|
||||
try {
|
||||
appService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh app auth local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (flowControlRuleCacheRefreshEnabled) {
|
||||
LOGGER.debug("refresh flow control rule local cache");
|
||||
try {
|
||||
resourceRateLimitConfigService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh flow control rule local cache exception", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (rpcServiceCacheRefreshEnabled) {
|
||||
LOGGER.debug("refresh rpc service local cache");
|
||||
try {
|
||||
rpcInstanceService.refreshLocalCache();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("refresh rpc service local cache exception", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,8 @@ import com.alibaba.nacos.api.config.annotation.NacosValue;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.config.AppConfigProperties;
|
||||
import we.fizz.input.*;
|
||||
|
||||
@@ -40,6 +42,9 @@ import we.fizz.input.extension.grpc.GrpcInput;
|
||||
import we.fizz.input.extension.dubbo.DubboInput;
|
||||
import we.fizz.input.extension.mysql.MySQLInput;
|
||||
import we.fizz.input.extension.request.RequestInput;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.util.Constants;
|
||||
import we.util.ReactorUtils;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
@@ -52,6 +57,7 @@ import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -209,16 +215,18 @@ public class ConfigLoader {
|
||||
|
||||
@PostConstruct
|
||||
public synchronized void init() throws Exception {
|
||||
this.refreshLocalCache();
|
||||
}
|
||||
|
||||
public synchronized void refreshLocalCache() throws Exception {
|
||||
if (formalPathPrefix == null) {
|
||||
formalPathPrefix = appContext.getEnvironment().getProperty("gateway.prefix", "/proxy");
|
||||
formalPathServiceNameStartIndex = formalPathPrefix.length() + 1;
|
||||
}
|
||||
|
||||
if (aggregateResources == null) {
|
||||
aggregateResources = new ConcurrentHashMap<>(1024);
|
||||
resourceKey2ConfigInfoMap = new ConcurrentHashMap<>(1024);
|
||||
aggregateId2ResourceKeyMap = new ConcurrentHashMap<>(1024);
|
||||
}
|
||||
Map<String, String> aggregateResourcesTmp = new ConcurrentHashMap<>(1024);
|
||||
Map<String, ConfigInfo> resourceKey2ConfigInfoMapTmp = new ConcurrentHashMap<>(1024);
|
||||
Map<String, String> aggregateId2ResourceKeyMapTmp = new ConcurrentHashMap<>(1024);
|
||||
|
||||
if (readLocalConfigFlag) {
|
||||
File dir = new File("json");
|
||||
@@ -230,28 +238,64 @@ public class ConfigLoader {
|
||||
throw new IOException("File not found");
|
||||
}
|
||||
String configStr = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
|
||||
this.addConfig(configStr);
|
||||
this.addConfig(configStr, aggregateResourcesTmp, resourceKey2ConfigInfoMapTmp, aggregateId2ResourceKeyMapTmp);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 从Redis缓存中获取配置
|
||||
reactiveStringRedisTemplate.opsForHash().scan(AGGREGATE_HASH_KEY).subscribe(entry -> {
|
||||
String configStr = (String) entry.getValue();
|
||||
this.addConfig(configStr);
|
||||
});
|
||||
}
|
||||
}
|
||||
final Throwable[] throwable = new Throwable[1];
|
||||
Throwable error = Mono.just(Objects.requireNonNull(reactiveStringRedisTemplate.opsForHash().entries(AGGREGATE_HASH_KEY)
|
||||
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> LOGGER.info(null, t))
|
||||
.concatMap(entry -> {
|
||||
Object k = entry.getKey();
|
||||
if (k == ReactorUtils.OBJ) {
|
||||
return Flux.just(entry);
|
||||
}
|
||||
String configStr = (String) entry.getValue();
|
||||
LOGGER.info("aggregate config: " + k.toString() + Constants.Symbol.COLON + configStr, LogService.BIZ_ID, k.toString());
|
||||
|
||||
public synchronized void addConfig(String configStr) {
|
||||
if (aggregateResources == null) {
|
||||
try {
|
||||
this.init();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
try {
|
||||
this.addConfig(configStr, aggregateResourcesTmp, resourceKey2ConfigInfoMapTmp, aggregateId2ResourceKeyMapTmp);
|
||||
return Flux.just(entry);
|
||||
} catch (Throwable t) {
|
||||
throwable[0] = t;
|
||||
LOGGER.info(configStr, t);
|
||||
return Flux.error(t);
|
||||
}
|
||||
}).blockLast())).flatMap(
|
||||
e -> {
|
||||
if (throwable[0] != null) {
|
||||
return Mono.error(throwable[0]);
|
||||
}
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
).block();
|
||||
if (error != ReactorUtils.EMPTY_THROWABLE) {
|
||||
assert error != null;
|
||||
throw new RuntimeException(error);
|
||||
}
|
||||
}
|
||||
|
||||
aggregateResources = aggregateResourcesTmp;
|
||||
resourceKey2ConfigInfoMap = resourceKey2ConfigInfoMapTmp;
|
||||
aggregateId2ResourceKeyMap = aggregateId2ResourceKeyMapTmp;
|
||||
}
|
||||
|
||||
public synchronized void addConfig(String configStr) {
|
||||
if (aggregateResources == null) {
|
||||
try {
|
||||
this.init();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
this.addConfig(configStr, aggregateResources, resourceKey2ConfigInfoMap, aggregateId2ResourceKeyMap);
|
||||
}
|
||||
|
||||
private void addConfig(String configStr, Map<String, String> aggregateResources,
|
||||
Map<String, ConfigInfo> resourceKey2ConfigInfoMap, Map<String, String> aggregateId2ResourceKeyMap) {
|
||||
ONode cfgNode = ONode.loadStr(configStr);
|
||||
|
||||
boolean needReGenConfigStr = false;
|
||||
|
||||
@@ -40,6 +40,7 @@ import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
@@ -88,12 +89,19 @@ public class ApiConfigService {
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
this.init(this::lsnApiConfigChange);
|
||||
}
|
||||
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.init(null);
|
||||
}
|
||||
|
||||
private void init(Supplier<Mono<Throwable>> doAfterLoadCache) throws Throwable {
|
||||
Map<Integer, ApiConfig> apiConfigMapTmp = new HashMap<>(128);
|
||||
Map<String, ServiceConfig> serviceConfigMapTmp = new HashMap<>(128);
|
||||
final Throwable[] throwable = new Throwable[1];
|
||||
Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzApiConfig)
|
||||
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> {
|
||||
log.info(null, t);
|
||||
})
|
||||
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> log.info(null, t))
|
||||
.concatMap(e -> {
|
||||
Object k = e.getKey();
|
||||
if (k == ReactorUtils.OBJ) {
|
||||
@@ -104,8 +112,8 @@ public class ApiConfigService {
|
||||
String json = (String) v;
|
||||
try {
|
||||
ApiConfig ac = JacksonUtils.readValue(json, ApiConfig.class);
|
||||
apiConfigMap.put(ac.id, ac);
|
||||
updateServiceConfigMap(ac);
|
||||
apiConfigMapTmp.put(ac.id, ac);
|
||||
updateServiceConfigMap(ac, serviceConfigMapTmp);
|
||||
return Flux.just(e);
|
||||
} catch (Throwable t) {
|
||||
throwable[0] = t;
|
||||
@@ -117,15 +125,23 @@ public class ApiConfigService {
|
||||
if (throwable[0] != null) {
|
||||
return Mono.error(throwable[0]);
|
||||
}
|
||||
return lsnApiConfigChange();
|
||||
|
||||
if (doAfterLoadCache != null) {
|
||||
return doAfterLoadCache.get();
|
||||
} else {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
}
|
||||
).block();
|
||||
if (error != ReactorUtils.EMPTY_THROWABLE) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
this.apiConfigMap = apiConfigMapTmp;
|
||||
this.serviceConfigMap = serviceConfigMapTmp;
|
||||
}
|
||||
|
||||
public Mono<Throwable> lsnApiConfigChange() {
|
||||
private Mono<Throwable> lsnApiConfigChange() {
|
||||
final Throwable[] throwable = new Throwable[1];
|
||||
final boolean[] b = {false};
|
||||
rt.listenToChannel(fizzApiConfigChannel).doOnError(t -> {
|
||||
@@ -145,9 +161,9 @@ public class ApiConfigService {
|
||||
ApiConfig r = apiConfigMap.remove(ac.id);
|
||||
if (ac.isDeleted != ApiConfig.DELETED && r != null) {
|
||||
r.isDeleted = ApiConfig.DELETED;
|
||||
updateServiceConfigMap(r);
|
||||
updateServiceConfigMap(r, serviceConfigMap);
|
||||
}
|
||||
updateServiceConfigMap(ac);
|
||||
updateServiceConfigMap(ac, serviceConfigMap);
|
||||
if (ac.isDeleted != ApiConfig.DELETED) {
|
||||
apiConfigMap.put(ac.id, ac);
|
||||
} else {
|
||||
@@ -172,7 +188,7 @@ public class ApiConfigService {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
|
||||
private void updateServiceConfigMap(ApiConfig ac) {
|
||||
private void updateServiceConfigMap(ApiConfig ac, Map<String, ServiceConfig> serviceConfigMap) {
|
||||
ServiceConfig sc = serviceConfigMap.get(ac.service);
|
||||
if (ac.isDeleted == ApiConfig.DELETED) {
|
||||
if (sc == null) {
|
||||
|
||||
@@ -55,6 +55,15 @@ public class ApiConifg2appsService {
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
this.init(this::lsnChannel);
|
||||
}
|
||||
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.init(null);
|
||||
}
|
||||
|
||||
private void init(Runnable doAfterLoadCache) throws Throwable {
|
||||
Map<Integer, Set<String>> apiConfig2appsMapTmp = new HashMap<>(128);
|
||||
rt.opsForHash().entries(fizzApiConfigAppSetSize)
|
||||
.collectList()
|
||||
.map(
|
||||
@@ -73,7 +82,7 @@ public class ApiConifg2appsService {
|
||||
.collectList()
|
||||
.map(
|
||||
as -> {
|
||||
save(apiConfigId, as);
|
||||
save(apiConfigId, as, apiConfig2appsMapTmp);
|
||||
return ReactorUtils.NULL;
|
||||
}
|
||||
)
|
||||
@@ -89,7 +98,10 @@ public class ApiConifg2appsService {
|
||||
m -> {
|
||||
m.subscribe(
|
||||
e -> {
|
||||
lsnChannel();
|
||||
apiConfig2appsMap = apiConfig2appsMapTmp;
|
||||
if (doAfterLoadCache != null) {
|
||||
doAfterLoadCache.run();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -107,7 +119,7 @@ public class ApiConifg2appsService {
|
||||
log.info(b.toString());
|
||||
}
|
||||
|
||||
private void save(Integer apiConfigId, List<String> as) {
|
||||
private void save(Integer apiConfigId, List<String> as, Map<Integer, Set<String>> apiConfig2appsMap) {
|
||||
Set<String> appSet = apiConfig2appsMap.get(apiConfigId);
|
||||
if (appSet == null) {
|
||||
appSet = new HashSet<>();
|
||||
|
||||
@@ -36,6 +36,7 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
@@ -59,6 +60,16 @@ public class AppService {
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
this.init(this::lsnAppChange);
|
||||
}
|
||||
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.init(null);
|
||||
}
|
||||
|
||||
private void init(Supplier<Mono<Throwable>> doAfterLoadCache) throws Throwable {
|
||||
Map<String, App> appMapTmp = new HashMap<>(32);
|
||||
Map<Integer, App> oldAppMapTmp = new HashMap<>(32);
|
||||
final Throwable[] throwable = new Throwable[1];
|
||||
Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzApp)
|
||||
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> {
|
||||
@@ -74,8 +85,8 @@ public class AppService {
|
||||
String json = (String) v;
|
||||
try {
|
||||
App app = JacksonUtils.readValue(json, App.class);
|
||||
oldAppMap.put(app.id, app);
|
||||
updateAppMap(app);
|
||||
oldAppMapTmp.put(app.id, app);
|
||||
updateAppMap(app, appMapTmp);
|
||||
return Flux.just(e);
|
||||
} catch (Throwable t) {
|
||||
throwable[0] = t;
|
||||
@@ -87,12 +98,19 @@ public class AppService {
|
||||
if (throwable[0] != null) {
|
||||
return Mono.error(throwable[0]);
|
||||
}
|
||||
return lsnAppChange();
|
||||
if (doAfterLoadCache != null) {
|
||||
return doAfterLoadCache.get();
|
||||
} else {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
}
|
||||
).block();
|
||||
if (error != ReactorUtils.EMPTY_THROWABLE) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
appMap = appMapTmp;
|
||||
oldAppMap = oldAppMapTmp;
|
||||
}
|
||||
|
||||
private Mono<Throwable> lsnAppChange() {
|
||||
@@ -116,7 +134,7 @@ public class AppService {
|
||||
if (app.isDeleted != App.DELETED && r != null) {
|
||||
appMap.remove(r.app);
|
||||
}
|
||||
updateAppMap(app);
|
||||
updateAppMap(app, appMap);
|
||||
if (app.isDeleted != App.DELETED) {
|
||||
oldAppMap.put(app.id, app);
|
||||
}
|
||||
@@ -139,7 +157,7 @@ public class AppService {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
|
||||
private void updateAppMap(App app) {
|
||||
private void updateAppMap(App app, Map<String, App> appMap) {
|
||||
if (app.isDeleted == App.DELETED) {
|
||||
App removedApp = appMap.remove(app.app);
|
||||
log.info("remove " + removedApp);
|
||||
|
||||
@@ -34,6 +34,7 @@ import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -61,6 +62,17 @@ public class GatewayGroupService {
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
this.init(this::lsnGatewayGroupChange);
|
||||
}
|
||||
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.init(null);
|
||||
}
|
||||
|
||||
private void init(Supplier<Mono<Throwable>> doAfterLoadCache) throws Throwable {
|
||||
Map<String, GatewayGroup> gatewayGroupMapTmp = new HashMap<>(6);
|
||||
Map<Integer, GatewayGroup> oldGatewayGroupMapTmp = new HashMap<>(6);
|
||||
Set<String> currentGatewayGroupSetTmp = Stream.of(GatewayGroup.DEFAULT).collect(Collectors.toSet());
|
||||
final Throwable[] throwable = new Throwable[1];
|
||||
Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzGatewayGroup)
|
||||
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> {
|
||||
@@ -76,8 +88,8 @@ public class GatewayGroupService {
|
||||
String json = (String) v;
|
||||
try {
|
||||
GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class);
|
||||
oldGatewayGroupMap.put(gg.id, gg);
|
||||
updateGatewayGroupMap(gg);
|
||||
oldGatewayGroupMapTmp.put(gg.id, gg);
|
||||
updateGatewayGroupMap(gg, gatewayGroupMapTmp, currentGatewayGroupSetTmp);
|
||||
return Flux.just(e);
|
||||
} catch (Throwable t) {
|
||||
throwable[0] = t;
|
||||
@@ -89,12 +101,20 @@ public class GatewayGroupService {
|
||||
if (throwable[0] != null) {
|
||||
return Mono.error(throwable[0]);
|
||||
}
|
||||
return lsnGatewayGroupChange();
|
||||
if (doAfterLoadCache != null) {
|
||||
return doAfterLoadCache.get();
|
||||
} else {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
}
|
||||
).block();
|
||||
if (error != ReactorUtils.EMPTY_THROWABLE) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
gatewayGroupMap = gatewayGroupMapTmp;
|
||||
oldGatewayGroupMap = oldGatewayGroupMapTmp;
|
||||
currentGatewayGroupSet = currentGatewayGroupSetTmp;
|
||||
}
|
||||
|
||||
private Mono<Throwable> lsnGatewayGroupChange() {
|
||||
@@ -118,7 +138,7 @@ public class GatewayGroupService {
|
||||
if (gg.isDeleted != GatewayGroup.DELETED && r != null) {
|
||||
gatewayGroupMap.remove(r.group);
|
||||
}
|
||||
updateGatewayGroupMap(gg);
|
||||
updateGatewayGroupMap(gg, gatewayGroupMap, currentGatewayGroupSet);
|
||||
if (gg.isDeleted != GatewayGroup.DELETED) {
|
||||
oldGatewayGroupMap.put(gg.id, gg);
|
||||
}
|
||||
@@ -141,7 +161,8 @@ public class GatewayGroupService {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
|
||||
private void updateGatewayGroupMap(GatewayGroup gg) {
|
||||
private void updateGatewayGroupMap(GatewayGroup gg, Map<String, GatewayGroup> gatewayGroupMap,
|
||||
Set<String> currentGatewayGroupSet) {
|
||||
if (gg.isDeleted == GatewayGroup.DELETED) {
|
||||
GatewayGroup r = gatewayGroupMap.remove(gg.group);
|
||||
log.info("remove " + r);
|
||||
@@ -154,10 +175,11 @@ public class GatewayGroupService {
|
||||
log.info("update " + existGatewayGroup + " with " + gg);
|
||||
}
|
||||
}
|
||||
updateCurrentGatewayGroupSet();
|
||||
updateCurrentGatewayGroupSet(currentGatewayGroupSet, gatewayGroupMap);
|
||||
}
|
||||
|
||||
private void updateCurrentGatewayGroupSet() {
|
||||
private void updateCurrentGatewayGroupSet(Set<String> currentGatewayGroupSet, Map<String,
|
||||
GatewayGroup> gatewayGroupMap) {
|
||||
String ip = NetworkUtils.getServerIp();
|
||||
currentGatewayGroupSet.clear();
|
||||
gatewayGroupMap.forEach(
|
||||
|
||||
@@ -50,4 +50,10 @@ public interface RpcInstanceService {
|
||||
* @return instance, {@code null} if instance not-exist
|
||||
*/
|
||||
String getInstance(RpcTypeEnum rpcTypeEnum, String service);
|
||||
|
||||
/**
|
||||
* refresh local cache
|
||||
* @throws Throwable any error
|
||||
*/
|
||||
void refreshLocalCache() throws Throwable;
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* RPC instance service implementation, get all config from redis cache when init and listen on redis channel for change
|
||||
@@ -78,6 +79,20 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
this.init(this::lsnRpcServiceChange);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.init(null);
|
||||
}
|
||||
|
||||
private void init(Supplier<Mono<Throwable>> doAfterLoadCache) throws Throwable {
|
||||
Map<String, List<String>> serviceToInstancesMapTmp = new ConcurrentHashMap<>(32);
|
||||
Map<String, Byte> serviceToLoadBalanceTypeMapTmp = new ConcurrentHashMap<>(32);
|
||||
Map<Long, RpcService> idToRpcServiceMapTmp = new ConcurrentHashMap<>(32);
|
||||
Map<String, AtomicLong> serviceToCountMapTmp = new ConcurrentHashMap<>(32);
|
||||
|
||||
final Throwable[] throwable = new Throwable[1];
|
||||
Throwable error = Mono.just(Objects.requireNonNull(redisTemplate.opsForHash().entries(RPC_SERVICE_HASH_KEY)
|
||||
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> LOGGER.info(null, t))
|
||||
@@ -91,7 +106,8 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
|
||||
String json = (String) v;
|
||||
try {
|
||||
RpcService rpcService = JacksonUtils.readValue(json, RpcService.class);
|
||||
this.updateLocalCache(rpcService);
|
||||
this.updateLocalCache(rpcService, serviceToInstancesMapTmp, serviceToLoadBalanceTypeMapTmp,
|
||||
idToRpcServiceMapTmp, serviceToCountMapTmp);
|
||||
return Flux.just(e);
|
||||
} catch (Throwable t) {
|
||||
throwable[0] = t;
|
||||
@@ -103,13 +119,23 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
|
||||
if (throwable[0] != null) {
|
||||
return Mono.error(throwable[0]);
|
||||
}
|
||||
return lsnRpcServiceChange();
|
||||
|
||||
if (doAfterLoadCache != null) {
|
||||
return doAfterLoadCache.get();
|
||||
} else {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
}
|
||||
).block();
|
||||
if (error != ReactorUtils.EMPTY_THROWABLE) {
|
||||
assert error != null;
|
||||
throw error;
|
||||
}
|
||||
|
||||
serviceToInstancesMap = serviceToInstancesMapTmp;
|
||||
serviceToLoadBalanceTypeMap = serviceToLoadBalanceTypeMapTmp;
|
||||
idToRpcServiceMap = idToRpcServiceMapTmp;
|
||||
serviceToCountMap = serviceToCountMapTmp;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -171,7 +197,8 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
|
||||
LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis());
|
||||
try {
|
||||
RpcService rpcService = JacksonUtils.readValue(json, RpcService.class);
|
||||
this.updateLocalCache(rpcService);
|
||||
this.updateLocalCache(rpcService, serviceToInstancesMap, serviceToLoadBalanceTypeMap, idToRpcServiceMap,
|
||||
serviceToCountMap);
|
||||
} catch (Throwable t) {
|
||||
LOGGER.info(json, t);
|
||||
}
|
||||
@@ -191,7 +218,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
|
||||
private void updateLocalCache(RpcService rpcService) {
|
||||
private void updateLocalCache(RpcService rpcService, Map<String, List<String>> serviceToInstancesMap,
|
||||
Map<String, Byte> serviceToLoadBalanceTypeMap, Map<Long, RpcService> idToRpcServiceMap,
|
||||
Map<String, AtomicLong> serviceToCountMap) {
|
||||
if (rpcService.getType() == null) {
|
||||
// historical gRPC data type and loadBalanceType is null, here set default value
|
||||
rpcService.setType(RpcTypeEnum.gRPC.getType());
|
||||
|
||||
@@ -35,6 +35,7 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
@@ -58,6 +59,16 @@ public class ResourceRateLimitConfigService {
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
this.init(this::lsnResourceRateLimitConfigChange);
|
||||
}
|
||||
|
||||
public void refreshLocalCache() throws Throwable {
|
||||
this.init(null);
|
||||
}
|
||||
|
||||
private void init(Supplier<Mono<Throwable>> doAfterLoadCache) throws Throwable {
|
||||
Map<String, ResourceRateLimitConfig> resourceRateLimitConfigMapTmp = new HashMap<>(32);
|
||||
Map<Integer, ResourceRateLimitConfig> oldResourceRateLimitConfigMapTmp = new HashMap<>(32);
|
||||
final Throwable[] throwable = new Throwable[1];
|
||||
Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzRateLimit)
|
||||
.defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> {
|
||||
@@ -73,8 +84,8 @@ public class ResourceRateLimitConfigService {
|
||||
String json = (String) v;
|
||||
try {
|
||||
ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class);
|
||||
oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
|
||||
updateResourceRateLimitConfigMap(rrlc);
|
||||
oldResourceRateLimitConfigMapTmp.put(rrlc.id, rrlc);
|
||||
updateResourceRateLimitConfigMap(rrlc, resourceRateLimitConfigMapTmp);
|
||||
return Flux.just(e);
|
||||
} catch (Throwable t) {
|
||||
throwable[0] = t;
|
||||
@@ -86,12 +97,18 @@ public class ResourceRateLimitConfigService {
|
||||
if (throwable[0] != null) {
|
||||
return Mono.error(throwable[0]);
|
||||
}
|
||||
return lsnResourceRateLimitConfigChange();
|
||||
if (doAfterLoadCache != null) {
|
||||
return doAfterLoadCache.get();
|
||||
} else {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
}
|
||||
).block();
|
||||
if (error != ReactorUtils.EMPTY_THROWABLE) {
|
||||
throw error;
|
||||
}
|
||||
resourceRateLimitConfigMap = resourceRateLimitConfigMapTmp;
|
||||
oldResourceRateLimitConfigMap = oldResourceRateLimitConfigMapTmp;
|
||||
}
|
||||
|
||||
private Mono<Throwable> lsnResourceRateLimitConfigChange() {
|
||||
@@ -115,7 +132,7 @@ public class ResourceRateLimitConfigService {
|
||||
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED && r != null) {
|
||||
resourceRateLimitConfigMap.remove(r.resource);
|
||||
}
|
||||
updateResourceRateLimitConfigMap(rrlc);
|
||||
updateResourceRateLimitConfigMap(rrlc, resourceRateLimitConfigMap);
|
||||
if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED) {
|
||||
oldResourceRateLimitConfigMap.put(rrlc.id, rrlc);
|
||||
}
|
||||
@@ -138,7 +155,8 @@ public class ResourceRateLimitConfigService {
|
||||
return Mono.just(ReactorUtils.EMPTY_THROWABLE);
|
||||
}
|
||||
|
||||
private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc) {
|
||||
private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc,
|
||||
Map<String, ResourceRateLimitConfig> resourceRateLimitConfigMap) {
|
||||
if (rrlc.isDeleted == ResourceRateLimitConfig.DELETED) {
|
||||
ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.resource);
|
||||
log.info("remove " + removedRrlc);
|
||||
|
||||
Reference in New Issue
Block a user