diff --git a/fizz-bootstrap/src/main/resources/application.yml b/fizz-bootstrap/src/main/resources/application.yml index d2d0d80..745acc4 100644 --- a/fizz-bootstrap/src/main/resources/application.yml +++ b/fizz-bootstrap/src/main/resources/application.yml @@ -103,3 +103,16 @@ gateway: aggr: # set headers when calling the backend API proxy_set_headers: host,X-Real-IP,X-Forwarded-Proto,X-Forwarded-For + +refresh-local-cache: + # initial delay 5 minutes + initial-delay-millis: 300000 + # fixed rate 5 minutes + fixed-rate-millis: 300000 + api-config-enabled: true + api-config-2-apps-enabled: true + aggregate-config-enabled: true + gateway-group-enabled: true + app-auth-enabled: true + flow-control-rule-enabled: true + rpc-service-enabled: true \ No newline at end of file diff --git a/fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java b/fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java new file mode 100644 index 0000000..5058cbd --- /dev/null +++ b/fizz-core/src/main/java/we/config/RefreshLocalCacheConfig.java @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2021 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package we.config; + +import com.alibaba.nacos.api.config.annotation.NacosValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; +import we.fizz.ConfigLoader; +import we.plugin.auth.ApiConfigService; +import we.plugin.auth.ApiConifg2appsService; +import we.plugin.auth.AppService; +import we.plugin.auth.GatewayGroupService; +import we.proxy.RpcInstanceService; +import we.stats.ratelimit.ResourceRateLimitConfigService; + +import javax.annotation.Resource; + +/** + * refresh config local cache config + * @see ApiConfigService#refreshLocalCache() refresh api config local cache + * @see ApiConifg2appsService#refreshLocalCache() refresh api config to apps local cache + * @see ConfigLoader#refreshLocalCache() refresh aggregate config local cache + * @see GatewayGroupService#refreshLocalCache() refresh gateway group local cache + * @see AppService#refreshLocalCache() refresh app local cache + * @see ResourceRateLimitConfigService#refreshLocalCache() refresh flow control rule local cache + * @see RpcInstanceService#refreshLocalCache() refresh rpc service local cache + * + * @author zhongjie + */ +@Configuration +public class RefreshLocalCacheConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshLocalCacheConfig.class); + + @NacosValue(value = "${refresh-local-cache.api-config-enabled:false}", autoRefreshed = true) + @Value("${refresh-local-cache.api-config-enabled:false}") + private boolean apiConfigCacheRefreshEnabled; + @NacosValue(value = "${refresh-local-cache.api-config-2-apps-enabled:false}", autoRefreshed = true) + @Value("${refresh-local-cache.api-config-2-apps-enabled:false}") + private boolean apiConfig2AppsCacheRefreshEnabled; + @NacosValue(value = "${refresh-local-cache.aggregate-config-enabled:false}", autoRefreshed = true) + @Value("${refresh-local-cache.aggregate-config-enabled:false}") + private boolean aggregateConfigCacheRefreshEnabled; + @NacosValue(value = "${refresh-local-cache.gateway-group-enabled:false}", autoRefreshed = true) + @Value("${refresh-local-cache.gateway-group-enabled:false}") + private boolean gatewayGroupCacheRefreshEnabled; + @NacosValue(value = "${refresh-local-cache.app-auth-enabled:false}", autoRefreshed = true) + @Value("${refresh-local-cache.app-auth-enabled:false}") + private boolean appAuthCacheRefreshEnabled; + @NacosValue(value = "${refresh-local-cache.flow-control-rule-enabled:false}", autoRefreshed = true) + @Value("${refresh-local-cache.flow-control-rule-enabled:false}") + private boolean flowControlRuleCacheRefreshEnabled; + @NacosValue(value = "${refresh-local-cache.rpc-service-enabled:false}", autoRefreshed = true) + @Value("${refresh-local-cache.rpc-service-enabled:false}") + private boolean rpcServiceCacheRefreshEnabled; + + @Resource + private ConfigLoader configLoader; + + @Resource + private ApiConfigService apiConfigService; + + @Resource + private ApiConifg2appsService apiConifg2appsService; + + @Resource + private GatewayGroupService gatewayGroupService; + + @Resource + private AppService appService; + + @Resource + private ResourceRateLimitConfigService resourceRateLimitConfigService; + + @Resource + private RpcInstanceService rpcInstanceService; + + @Scheduled(initialDelayString = "${refresh-local-cache.initial-delay-millis:300000}", + fixedRateString = "${refresh-local-cache.fixed-rate-millis:300000}") + public void refreshLocalCache() { + if (apiConfigCacheRefreshEnabled) { + LOGGER.debug("refresh api config local cache"); + try { + apiConfigService.refreshLocalCache(); + } catch (Throwable t) { + LOGGER.warn("refresh api config local cache exception", t); + } + } + + if (apiConfig2AppsCacheRefreshEnabled) { + LOGGER.debug("refresh api config to apps local cache"); + try { + apiConifg2appsService.refreshLocalCache(); + } catch (Throwable t) { + LOGGER.warn("refresh api config to apps local cache exception", t); + } + } + + if (aggregateConfigCacheRefreshEnabled) { + LOGGER.debug("refresh aggregate config local cache"); + try { + configLoader.refreshLocalCache(); + } catch (Exception e) { + LOGGER.warn("refresh aggregate config local cache exception", e); + } + } + + if (gatewayGroupCacheRefreshEnabled) { + LOGGER.debug("refresh gateway group local cache"); + try { + gatewayGroupService.refreshLocalCache(); + } catch (Throwable t) { + LOGGER.warn("refresh gateway group local cache exception", t); + } + } + + if (appAuthCacheRefreshEnabled) { + LOGGER.debug("refresh app auth local cache"); + try { + appService.refreshLocalCache(); + } catch (Throwable t) { + LOGGER.warn("refresh app auth local cache exception", t); + } + } + + if (flowControlRuleCacheRefreshEnabled) { + LOGGER.debug("refresh flow control rule local cache"); + try { + resourceRateLimitConfigService.refreshLocalCache(); + } catch (Throwable t) { + LOGGER.warn("refresh flow control rule local cache exception", t); + } + } + + if (rpcServiceCacheRefreshEnabled) { + LOGGER.debug("refresh rpc service local cache"); + try { + rpcInstanceService.refreshLocalCache(); + } catch (Throwable t) { + LOGGER.warn("refresh rpc service local cache exception", t); + } + } + } +} diff --git a/fizz-core/src/main/java/we/fizz/ConfigLoader.java b/fizz-core/src/main/java/we/fizz/ConfigLoader.java index b7f87d5..22a68fc 100644 --- a/fizz-core/src/main/java/we/fizz/ConfigLoader.java +++ b/fizz-core/src/main/java/we/fizz/ConfigLoader.java @@ -24,6 +24,8 @@ import com.alibaba.nacos.api.config.annotation.NacosValue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ConfigurableApplicationContext; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import we.config.AppConfigProperties; import we.fizz.input.*; @@ -40,6 +42,9 @@ import we.fizz.input.extension.grpc.GrpcInput; import we.fizz.input.extension.dubbo.DubboInput; import we.fizz.input.extension.mysql.MySQLInput; import we.fizz.input.extension.request.RequestInput; +import we.flume.clients.log4j2appender.LogService; +import we.util.Constants; +import we.util.ReactorUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; @@ -52,6 +57,7 @@ import java.io.IOException; import java.io.Serializable; import java.lang.ref.SoftReference; import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -209,16 +215,18 @@ public class ConfigLoader { @PostConstruct public synchronized void init() throws Exception { + this.refreshLocalCache(); + } + + public synchronized void refreshLocalCache() throws Exception { if (formalPathPrefix == null) { formalPathPrefix = appContext.getEnvironment().getProperty("gateway.prefix", "/proxy"); formalPathServiceNameStartIndex = formalPathPrefix.length() + 1; } - if (aggregateResources == null) { - aggregateResources = new ConcurrentHashMap<>(1024); - resourceKey2ConfigInfoMap = new ConcurrentHashMap<>(1024); - aggregateId2ResourceKeyMap = new ConcurrentHashMap<>(1024); - } + Map aggregateResourcesTmp = new ConcurrentHashMap<>(1024); + Map resourceKey2ConfigInfoMapTmp = new ConcurrentHashMap<>(1024); + Map aggregateId2ResourceKeyMapTmp = new ConcurrentHashMap<>(1024); if (readLocalConfigFlag) { File dir = new File("json"); @@ -230,28 +238,64 @@ public class ConfigLoader { throw new IOException("File not found"); } String configStr = FileUtils.readFileToString(file, StandardCharsets.UTF_8); - this.addConfig(configStr); + this.addConfig(configStr, aggregateResourcesTmp, resourceKey2ConfigInfoMapTmp, aggregateId2ResourceKeyMapTmp); } } } } else { // 从Redis缓存中获取配置 - reactiveStringRedisTemplate.opsForHash().scan(AGGREGATE_HASH_KEY).subscribe(entry -> { - String configStr = (String) entry.getValue(); - this.addConfig(configStr); - }); - } - } + final Throwable[] throwable = new Throwable[1]; + Throwable error = Mono.just(Objects.requireNonNull(reactiveStringRedisTemplate.opsForHash().entries(AGGREGATE_HASH_KEY) + .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> LOGGER.info(null, t)) + .concatMap(entry -> { + Object k = entry.getKey(); + if (k == ReactorUtils.OBJ) { + return Flux.just(entry); + } + String configStr = (String) entry.getValue(); + LOGGER.info("aggregate config: " + k.toString() + Constants.Symbol.COLON + configStr, LogService.BIZ_ID, k.toString()); - public synchronized void addConfig(String configStr) { - if (aggregateResources == null) { - try { - this.init(); - } catch (Exception e) { - e.printStackTrace(); + try { + this.addConfig(configStr, aggregateResourcesTmp, resourceKey2ConfigInfoMapTmp, aggregateId2ResourceKeyMapTmp); + return Flux.just(entry); + } catch (Throwable t) { + throwable[0] = t; + LOGGER.info(configStr, t); + return Flux.error(t); + } + }).blockLast())).flatMap( + e -> { + if (throwable[0] != null) { + return Mono.error(throwable[0]); + } + return Mono.just(ReactorUtils.EMPTY_THROWABLE); + } + ).block(); + if (error != ReactorUtils.EMPTY_THROWABLE) { + assert error != null; + throw new RuntimeException(error); } } + aggregateResources = aggregateResourcesTmp; + resourceKey2ConfigInfoMap = resourceKey2ConfigInfoMapTmp; + aggregateId2ResourceKeyMap = aggregateId2ResourceKeyMapTmp; + } + + public synchronized void addConfig(String configStr) { + if (aggregateResources == null) { + try { + this.init(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + this.addConfig(configStr, aggregateResources, resourceKey2ConfigInfoMap, aggregateId2ResourceKeyMap); + } + + private void addConfig(String configStr, Map aggregateResources, + Map resourceKey2ConfigInfoMap, Map aggregateId2ResourceKeyMap) { ONode cfgNode = ONode.loadStr(configStr); boolean needReGenConfigStr = false; diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java b/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java index 7df0d68..b25b625 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java @@ -40,6 +40,7 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** * @author hongqiaowei @@ -88,12 +89,19 @@ public class ApiConfigService { @PostConstruct public void init() throws Throwable { + this.init(this::lsnApiConfigChange); + } + public void refreshLocalCache() throws Throwable { + this.init(null); + } + + private void init(Supplier> doAfterLoadCache) throws Throwable { + Map apiConfigMapTmp = new HashMap<>(128); + Map serviceConfigMapTmp = new HashMap<>(128); final Throwable[] throwable = new Throwable[1]; Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzApiConfig) - .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> { - log.info(null, t); - }) + .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> log.info(null, t)) .concatMap(e -> { Object k = e.getKey(); if (k == ReactorUtils.OBJ) { @@ -104,8 +112,8 @@ public class ApiConfigService { String json = (String) v; try { ApiConfig ac = JacksonUtils.readValue(json, ApiConfig.class); - apiConfigMap.put(ac.id, ac); - updateServiceConfigMap(ac); + apiConfigMapTmp.put(ac.id, ac); + updateServiceConfigMap(ac, serviceConfigMapTmp); return Flux.just(e); } catch (Throwable t) { throwable[0] = t; @@ -117,15 +125,23 @@ public class ApiConfigService { if (throwable[0] != null) { return Mono.error(throwable[0]); } - return lsnApiConfigChange(); + + if (doAfterLoadCache != null) { + return doAfterLoadCache.get(); + } else { + return Mono.just(ReactorUtils.EMPTY_THROWABLE); + } } ).block(); if (error != ReactorUtils.EMPTY_THROWABLE) { throw error; } + + this.apiConfigMap = apiConfigMapTmp; + this.serviceConfigMap = serviceConfigMapTmp; } - public Mono lsnApiConfigChange() { + private Mono lsnApiConfigChange() { final Throwable[] throwable = new Throwable[1]; final boolean[] b = {false}; rt.listenToChannel(fizzApiConfigChannel).doOnError(t -> { @@ -145,9 +161,9 @@ public class ApiConfigService { ApiConfig r = apiConfigMap.remove(ac.id); if (ac.isDeleted != ApiConfig.DELETED && r != null) { r.isDeleted = ApiConfig.DELETED; - updateServiceConfigMap(r); + updateServiceConfigMap(r, serviceConfigMap); } - updateServiceConfigMap(ac); + updateServiceConfigMap(ac, serviceConfigMap); if (ac.isDeleted != ApiConfig.DELETED) { apiConfigMap.put(ac.id, ac); } else { @@ -172,7 +188,7 @@ public class ApiConfigService { return Mono.just(ReactorUtils.EMPTY_THROWABLE); } - private void updateServiceConfigMap(ApiConfig ac) { + private void updateServiceConfigMap(ApiConfig ac, Map serviceConfigMap) { ServiceConfig sc = serviceConfigMap.get(ac.service); if (ac.isDeleted == ApiConfig.DELETED) { if (sc == null) { diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConifg2appsService.java b/fizz-core/src/main/java/we/plugin/auth/ApiConifg2appsService.java index 23ded6f..8967282 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConifg2appsService.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConifg2appsService.java @@ -55,6 +55,15 @@ public class ApiConifg2appsService { @PostConstruct public void init() throws Throwable { + this.init(this::lsnChannel); + } + + public void refreshLocalCache() throws Throwable { + this.init(null); + } + + private void init(Runnable doAfterLoadCache) throws Throwable { + Map> apiConfig2appsMapTmp = new HashMap<>(128); rt.opsForHash().entries(fizzApiConfigAppSetSize) .collectList() .map( @@ -73,7 +82,7 @@ public class ApiConifg2appsService { .collectList() .map( as -> { - save(apiConfigId, as); + save(apiConfigId, as, apiConfig2appsMapTmp); return ReactorUtils.NULL; } ) @@ -89,7 +98,10 @@ public class ApiConifg2appsService { m -> { m.subscribe( e -> { - lsnChannel(); + apiConfig2appsMap = apiConfig2appsMapTmp; + if (doAfterLoadCache != null) { + doAfterLoadCache.run(); + } } ); } @@ -107,7 +119,7 @@ public class ApiConifg2appsService { log.info(b.toString()); } - private void save(Integer apiConfigId, List as) { + private void save(Integer apiConfigId, List as, Map> apiConfig2appsMap) { Set appSet = apiConfig2appsMap.get(apiConfigId); if (appSet == null) { appSet = new HashSet<>(); diff --git a/fizz-core/src/main/java/we/plugin/auth/AppService.java b/fizz-core/src/main/java/we/plugin/auth/AppService.java index 0ed0e4a..58aba42 100644 --- a/fizz-core/src/main/java/we/plugin/auth/AppService.java +++ b/fizz-core/src/main/java/we/plugin/auth/AppService.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** * @author hongqiaowei @@ -59,6 +60,16 @@ public class AppService { @PostConstruct public void init() throws Throwable { + this.init(this::lsnAppChange); + } + + public void refreshLocalCache() throws Throwable { + this.init(null); + } + + private void init(Supplier> doAfterLoadCache) throws Throwable { + Map appMapTmp = new HashMap<>(32); + Map oldAppMapTmp = new HashMap<>(32); final Throwable[] throwable = new Throwable[1]; Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzApp) .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> { @@ -74,8 +85,8 @@ public class AppService { String json = (String) v; try { App app = JacksonUtils.readValue(json, App.class); - oldAppMap.put(app.id, app); - updateAppMap(app); + oldAppMapTmp.put(app.id, app); + updateAppMap(app, appMapTmp); return Flux.just(e); } catch (Throwable t) { throwable[0] = t; @@ -87,12 +98,19 @@ public class AppService { if (throwable[0] != null) { return Mono.error(throwable[0]); } - return lsnAppChange(); + if (doAfterLoadCache != null) { + return doAfterLoadCache.get(); + } else { + return Mono.just(ReactorUtils.EMPTY_THROWABLE); + } } ).block(); if (error != ReactorUtils.EMPTY_THROWABLE) { throw error; } + + appMap = appMapTmp; + oldAppMap = oldAppMapTmp; } private Mono lsnAppChange() { @@ -116,7 +134,7 @@ public class AppService { if (app.isDeleted != App.DELETED && r != null) { appMap.remove(r.app); } - updateAppMap(app); + updateAppMap(app, appMap); if (app.isDeleted != App.DELETED) { oldAppMap.put(app.id, app); } @@ -139,7 +157,7 @@ public class AppService { return Mono.just(ReactorUtils.EMPTY_THROWABLE); } - private void updateAppMap(App app) { + private void updateAppMap(App app, Map appMap) { if (app.isDeleted == App.DELETED) { App removedApp = appMap.remove(app.app); log.info("remove " + removedApp); diff --git a/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java b/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java index 2fb2034..c060bfa 100644 --- a/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java +++ b/fizz-core/src/main/java/we/plugin/auth/GatewayGroupService.java @@ -34,6 +34,7 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -61,6 +62,17 @@ public class GatewayGroupService { @PostConstruct public void init() throws Throwable { + this.init(this::lsnGatewayGroupChange); + } + + public void refreshLocalCache() throws Throwable { + this.init(null); + } + + private void init(Supplier> doAfterLoadCache) throws Throwable { + Map gatewayGroupMapTmp = new HashMap<>(6); + Map oldGatewayGroupMapTmp = new HashMap<>(6); + Set currentGatewayGroupSetTmp = Stream.of(GatewayGroup.DEFAULT).collect(Collectors.toSet()); final Throwable[] throwable = new Throwable[1]; Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzGatewayGroup) .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> { @@ -76,8 +88,8 @@ public class GatewayGroupService { String json = (String) v; try { GatewayGroup gg = JacksonUtils.readValue(json, GatewayGroup.class); - oldGatewayGroupMap.put(gg.id, gg); - updateGatewayGroupMap(gg); + oldGatewayGroupMapTmp.put(gg.id, gg); + updateGatewayGroupMap(gg, gatewayGroupMapTmp, currentGatewayGroupSetTmp); return Flux.just(e); } catch (Throwable t) { throwable[0] = t; @@ -89,12 +101,20 @@ public class GatewayGroupService { if (throwable[0] != null) { return Mono.error(throwable[0]); } - return lsnGatewayGroupChange(); + if (doAfterLoadCache != null) { + return doAfterLoadCache.get(); + } else { + return Mono.just(ReactorUtils.EMPTY_THROWABLE); + } } ).block(); if (error != ReactorUtils.EMPTY_THROWABLE) { throw error; } + + gatewayGroupMap = gatewayGroupMapTmp; + oldGatewayGroupMap = oldGatewayGroupMapTmp; + currentGatewayGroupSet = currentGatewayGroupSetTmp; } private Mono lsnGatewayGroupChange() { @@ -118,7 +138,7 @@ public class GatewayGroupService { if (gg.isDeleted != GatewayGroup.DELETED && r != null) { gatewayGroupMap.remove(r.group); } - updateGatewayGroupMap(gg); + updateGatewayGroupMap(gg, gatewayGroupMap, currentGatewayGroupSet); if (gg.isDeleted != GatewayGroup.DELETED) { oldGatewayGroupMap.put(gg.id, gg); } @@ -141,7 +161,8 @@ public class GatewayGroupService { return Mono.just(ReactorUtils.EMPTY_THROWABLE); } - private void updateGatewayGroupMap(GatewayGroup gg) { + private void updateGatewayGroupMap(GatewayGroup gg, Map gatewayGroupMap, + Set currentGatewayGroupSet) { if (gg.isDeleted == GatewayGroup.DELETED) { GatewayGroup r = gatewayGroupMap.remove(gg.group); log.info("remove " + r); @@ -154,10 +175,11 @@ public class GatewayGroupService { log.info("update " + existGatewayGroup + " with " + gg); } } - updateCurrentGatewayGroupSet(); + updateCurrentGatewayGroupSet(currentGatewayGroupSet, gatewayGroupMap); } - private void updateCurrentGatewayGroupSet() { + private void updateCurrentGatewayGroupSet(Set currentGatewayGroupSet, Map gatewayGroupMap) { String ip = NetworkUtils.getServerIp(); currentGatewayGroupSet.clear(); gatewayGroupMap.forEach( diff --git a/fizz-core/src/main/java/we/proxy/RpcInstanceService.java b/fizz-core/src/main/java/we/proxy/RpcInstanceService.java index 89fcedd..a43a82b 100644 --- a/fizz-core/src/main/java/we/proxy/RpcInstanceService.java +++ b/fizz-core/src/main/java/we/proxy/RpcInstanceService.java @@ -50,4 +50,10 @@ public interface RpcInstanceService { * @return instance, {@code null} if instance not-exist */ String getInstance(RpcTypeEnum rpcTypeEnum, String service); + + /** + * refresh local cache + * @throws Throwable any error + */ + void refreshLocalCache() throws Throwable; } diff --git a/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java b/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java index 5b25f1f..af3c648 100644 --- a/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java +++ b/fizz-core/src/main/java/we/proxy/RpcInstanceServiceImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * RPC instance service implementation, get all config from redis cache when init and listen on redis channel for change @@ -78,6 +79,20 @@ public class RpcInstanceServiceImpl implements RpcInstanceService { @PostConstruct public void init() throws Throwable { + this.init(this::lsnRpcServiceChange); + } + + @Override + public void refreshLocalCache() throws Throwable { + this.init(null); + } + + private void init(Supplier> doAfterLoadCache) throws Throwable { + Map> serviceToInstancesMapTmp = new ConcurrentHashMap<>(32); + Map serviceToLoadBalanceTypeMapTmp = new ConcurrentHashMap<>(32); + Map idToRpcServiceMapTmp = new ConcurrentHashMap<>(32); + Map serviceToCountMapTmp = new ConcurrentHashMap<>(32); + final Throwable[] throwable = new Throwable[1]; Throwable error = Mono.just(Objects.requireNonNull(redisTemplate.opsForHash().entries(RPC_SERVICE_HASH_KEY) .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> LOGGER.info(null, t)) @@ -91,7 +106,8 @@ public class RpcInstanceServiceImpl implements RpcInstanceService { String json = (String) v; try { RpcService rpcService = JacksonUtils.readValue(json, RpcService.class); - this.updateLocalCache(rpcService); + this.updateLocalCache(rpcService, serviceToInstancesMapTmp, serviceToLoadBalanceTypeMapTmp, + idToRpcServiceMapTmp, serviceToCountMapTmp); return Flux.just(e); } catch (Throwable t) { throwable[0] = t; @@ -103,13 +119,23 @@ public class RpcInstanceServiceImpl implements RpcInstanceService { if (throwable[0] != null) { return Mono.error(throwable[0]); } - return lsnRpcServiceChange(); + + if (doAfterLoadCache != null) { + return doAfterLoadCache.get(); + } else { + return Mono.just(ReactorUtils.EMPTY_THROWABLE); + } } ).block(); if (error != ReactorUtils.EMPTY_THROWABLE) { assert error != null; throw error; } + + serviceToInstancesMap = serviceToInstancesMapTmp; + serviceToLoadBalanceTypeMap = serviceToLoadBalanceTypeMapTmp; + idToRpcServiceMap = idToRpcServiceMapTmp; + serviceToCountMap = serviceToCountMapTmp; } @Override @@ -171,7 +197,8 @@ public class RpcInstanceServiceImpl implements RpcInstanceService { LOGGER.info(json, LogService.BIZ_ID, "rpc" + System.currentTimeMillis()); try { RpcService rpcService = JacksonUtils.readValue(json, RpcService.class); - this.updateLocalCache(rpcService); + this.updateLocalCache(rpcService, serviceToInstancesMap, serviceToLoadBalanceTypeMap, idToRpcServiceMap, + serviceToCountMap); } catch (Throwable t) { LOGGER.info(json, t); } @@ -191,7 +218,9 @@ public class RpcInstanceServiceImpl implements RpcInstanceService { return Mono.just(ReactorUtils.EMPTY_THROWABLE); } - private void updateLocalCache(RpcService rpcService) { + private void updateLocalCache(RpcService rpcService, Map> serviceToInstancesMap, + Map serviceToLoadBalanceTypeMap, Map idToRpcServiceMap, + Map serviceToCountMap) { if (rpcService.getType() == null) { // historical gRPC data type and loadBalanceType is null, here set default value rpcService.setType(RpcTypeEnum.gRPC.getType()); diff --git a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java index 1289a9c..9d84f9b 100644 --- a/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java +++ b/fizz-core/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** * @author hongqiaowei @@ -58,6 +59,16 @@ public class ResourceRateLimitConfigService { @PostConstruct public void init() throws Throwable { + this.init(this::lsnResourceRateLimitConfigChange); + } + + public void refreshLocalCache() throws Throwable { + this.init(null); + } + + private void init(Supplier> doAfterLoadCache) throws Throwable { + Map resourceRateLimitConfigMapTmp = new HashMap<>(32); + Map oldResourceRateLimitConfigMapTmp = new HashMap<>(32); final Throwable[] throwable = new Throwable[1]; Throwable error = Mono.just(Objects.requireNonNull(rt.opsForHash().entries(fizzRateLimit) .defaultIfEmpty(new AbstractMap.SimpleEntry<>(ReactorUtils.OBJ, ReactorUtils.OBJ)).onErrorStop().doOnError(t -> { @@ -73,8 +84,8 @@ public class ResourceRateLimitConfigService { String json = (String) v; try { ResourceRateLimitConfig rrlc = JacksonUtils.readValue(json, ResourceRateLimitConfig.class); - oldResourceRateLimitConfigMap.put(rrlc.id, rrlc); - updateResourceRateLimitConfigMap(rrlc); + oldResourceRateLimitConfigMapTmp.put(rrlc.id, rrlc); + updateResourceRateLimitConfigMap(rrlc, resourceRateLimitConfigMapTmp); return Flux.just(e); } catch (Throwable t) { throwable[0] = t; @@ -86,12 +97,18 @@ public class ResourceRateLimitConfigService { if (throwable[0] != null) { return Mono.error(throwable[0]); } - return lsnResourceRateLimitConfigChange(); + if (doAfterLoadCache != null) { + return doAfterLoadCache.get(); + } else { + return Mono.just(ReactorUtils.EMPTY_THROWABLE); + } } ).block(); if (error != ReactorUtils.EMPTY_THROWABLE) { throw error; } + resourceRateLimitConfigMap = resourceRateLimitConfigMapTmp; + oldResourceRateLimitConfigMap = oldResourceRateLimitConfigMapTmp; } private Mono lsnResourceRateLimitConfigChange() { @@ -115,7 +132,7 @@ public class ResourceRateLimitConfigService { if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED && r != null) { resourceRateLimitConfigMap.remove(r.resource); } - updateResourceRateLimitConfigMap(rrlc); + updateResourceRateLimitConfigMap(rrlc, resourceRateLimitConfigMap); if (rrlc.isDeleted != ResourceRateLimitConfig.DELETED) { oldResourceRateLimitConfigMap.put(rrlc.id, rrlc); } @@ -138,7 +155,8 @@ public class ResourceRateLimitConfigService { return Mono.just(ReactorUtils.EMPTY_THROWABLE); } - private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc) { + private void updateResourceRateLimitConfigMap(ResourceRateLimitConfig rrlc, + Map resourceRateLimitConfigMap) { if (rrlc.isDeleted == ResourceRateLimitConfig.DELETED) { ResourceRateLimitConfig removedRrlc = resourceRateLimitConfigMap.remove(rrlc.resource); log.info("remove " + removedRrlc); diff --git a/fizz-spring-boot-starter/src/main/resources/META-INF/spring.factories b/fizz-spring-boot-starter/src/main/resources/META-INF/spring.factories index ce878cc..454a869 100644 --- a/fizz-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ b/fizz-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -6,6 +6,7 @@ we.config.AppConfigProperties,\ we.config.FlowControlConfig,\ we.config.FlowStatSchedConfig,\ we.config.ProxyWebClientConfig,\ +we.config.RefreshLocalCacheConfig,\ we.config.SystemConfig,\ we.config.WebFluxConfig,\ we.controller.HealthCheckController,\