optimize flow stat logic
This commit is contained in:
@@ -27,11 +27,19 @@ import we.plugin.auth.ApiConfig2appsService;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.AppService;
|
||||
import we.plugin.auth.GatewayGroupService;
|
||||
import we.stats.FlowStat;
|
||||
import we.stats.ResourceStat;
|
||||
import we.stats.circuitbreaker.CircuitBreakManager;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfig;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ResourceIdUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
@@ -62,6 +70,9 @@ public class CacheCheckController {
|
||||
@Resource
|
||||
private CircuitBreakManager circuitBreakManager;
|
||||
|
||||
@Resource
|
||||
private FlowStat flowStat;
|
||||
|
||||
@GetMapping("/gatewayGroups")
|
||||
public Mono<String> gatewayGroups(ServerWebExchange exchange) {
|
||||
return Mono.just(JacksonUtils.writeValueAsString(gatewayGroupService.gatewayGroupMap));
|
||||
@@ -101,4 +112,73 @@ public class CacheCheckController {
|
||||
public Mono<String> circuitBreakers(ServerWebExchange exchange) {
|
||||
return Mono.just(JacksonUtils.writeValueAsString(circuitBreakManager.getResource2circuitBreakerMap()));
|
||||
}
|
||||
|
||||
@GetMapping("/resourceStats")
|
||||
public Mono<String> resourceStats(ServerWebExchange exchange) {
|
||||
Map<String, Integer> map = new HashMap<>();
|
||||
int nodeCnt = 0, serviceDefaultCnt = 0, serviceCnt = 0, appDefaultCnt = 0, appCnt = 0, ipCnt = 0, hostCnt = 0;
|
||||
ConcurrentMap<String, ResourceStat> resourceStats = flowStat.resourceStats;
|
||||
Set<Map.Entry<String, ResourceStat>> entrySet = resourceStats.entrySet();
|
||||
for (Map.Entry<String, ResourceStat> entry : entrySet) {
|
||||
String resource = entry.getKey();
|
||||
ResourceRateLimitConfig config = resourceRateLimitConfigService.getResourceRateLimitConfig(resource);
|
||||
if (config == null) {
|
||||
String app = ResourceIdUtils.getApp(resource);
|
||||
String ip = ResourceIdUtils.getIp(resource);
|
||||
String node = ResourceIdUtils.getNode(resource);
|
||||
String service = ResourceIdUtils.getService(resource);
|
||||
if (node == null) {
|
||||
if (app != null) {
|
||||
ResourceRateLimitConfig appConfig = resourceRateLimitConfigService.getResourceRateLimitConfig(ResourceIdUtils.APP_DEFAULT_RESOURCE);
|
||||
if (appConfig != null && appConfig.isEnable()) {
|
||||
appDefaultCnt++;
|
||||
} else {
|
||||
appCnt++;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (ip != null) {
|
||||
ipCnt++;
|
||||
continue;
|
||||
}
|
||||
if (service != null) {
|
||||
serviceDefaultCnt++;
|
||||
}
|
||||
} else {
|
||||
if (node.equals(ResourceIdUtils.NODE)) {
|
||||
nodeCnt++;
|
||||
} else {
|
||||
hostCnt++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
byte t = config.type;
|
||||
if (t == ResourceRateLimitConfig.Type.NODE) {
|
||||
nodeCnt++;
|
||||
} else if (t == ResourceRateLimitConfig.Type.SERVICE_DEFAULT) {
|
||||
serviceDefaultCnt++;
|
||||
} else if (t == ResourceRateLimitConfig.Type.SERVICE) {
|
||||
serviceCnt++;
|
||||
} else if (t == ResourceRateLimitConfig.Type.APP_DEFAULT) {
|
||||
appDefaultCnt++;
|
||||
} else if (t == ResourceRateLimitConfig.Type.APP) {
|
||||
appCnt++;
|
||||
} else if (t == ResourceRateLimitConfig.Type.IP) {
|
||||
ipCnt++;
|
||||
} else {
|
||||
hostCnt++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
map.put("node", nodeCnt);
|
||||
map.put("serviceDefault", serviceDefaultCnt);
|
||||
map.put("service", serviceCnt);
|
||||
map.put("appDefault", appDefaultCnt);
|
||||
map.put("app", appCnt);
|
||||
map.put("ip", ipCnt);
|
||||
map.put("host", hostCnt);
|
||||
|
||||
return Mono.just(JacksonUtils.writeValueAsString(map));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -324,12 +324,12 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
List<ResourceConfig> resourceConfigs = new ArrayList<>(sz);
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
|
||||
if (hasHost) {
|
||||
String resourceId = ResourceIdUtils.buildResourceId(app, ip, node, service, path);
|
||||
ResourceConfig resourceConfig = new ResourceConfig(resourceId, 0, 0);
|
||||
resourceConfigs.add(resourceConfig);
|
||||
}
|
||||
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, ResourceIdUtils.NODE, null, null, null);
|
||||
if (hasHost) {
|
||||
String resourceId = ResourceIdUtils.buildResourceId(app, ip, node, service, path);
|
||||
ResourceConfig resourceConfig = new ResourceConfig(resourceId, 0, 0);
|
||||
resourceConfigs.add(resourceConfig);
|
||||
}
|
||||
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, null, service, null, ResourceIdUtils.SERVICE_DEFAULT);
|
||||
checkRateLimitConfigAndAddTo(resourceConfigs, b, null, null, null, service, path, null);
|
||||
|
||||
@@ -396,34 +396,16 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
}
|
||||
}
|
||||
|
||||
/*if (checkDegradeRule) {
|
||||
DegradeRule degradeRule = degradeRuleService.getDegradeRule(resource);
|
||||
if (degradeRule != null && degradeRule.isEnable()) {
|
||||
if (rc == null) {
|
||||
rc = new ResourceConfig(resource, 0, 0);
|
||||
resourceConfigs.add(rc);
|
||||
}
|
||||
fillDegradeRuleData(rc, degradeRule);
|
||||
} else {
|
||||
if (defaultRateLimitConfigId != null && defaultRateLimitConfigId.equals(ResourceIdUtils.SERVICE_DEFAULT)) {
|
||||
degradeRule = degradeRuleService.getDegradeRule(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||
if (degradeRule != null && degradeRule.isEnable()) {
|
||||
fillDegradeRuleData(rc, degradeRule);
|
||||
}
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
if (checkDegradeRule && resourceConfigs.size() == prevSize) {
|
||||
CircuitBreaker cb = circuitBreakManager.getCircuitBreaker(resource);
|
||||
if (cb == null) {
|
||||
/*if (cb == null) {
|
||||
if (defaultRateLimitConfigId != null && defaultRateLimitConfigId.equals(ResourceIdUtils.SERVICE_DEFAULT)) {
|
||||
cb = circuitBreakManager.getCircuitBreaker(ResourceIdUtils.SERVICE_DEFAULT_RESOURCE);
|
||||
if (cb == null || !cb.serviceDefaultEnable) {
|
||||
cb = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}*/
|
||||
if (cb != null) {
|
||||
rc = new ResourceConfig(resource, 0, 0);
|
||||
resourceConfigs.add(rc);
|
||||
@@ -452,12 +434,12 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
String service = ResourceIdUtils.getService(prev);
|
||||
if (service == null) {
|
||||
something4(resourceConfigs, rateLimitConfig.app, null, rateLimitConfig.service);
|
||||
} else {
|
||||
} /*else {
|
||||
app = ResourceIdUtils.getApp(prevPrev);
|
||||
if (app == null) {
|
||||
something4(resourceConfigs, rateLimitConfig.app, null, null);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,12 +460,12 @@ public class FlowControlFilter extends FizzWebFilter {
|
||||
String service = ResourceIdUtils.getService(prev);
|
||||
if (service == null) {
|
||||
something4(resourceConfigs, null, rateLimitConfig.ip, rateLimitConfig.service);
|
||||
} else {
|
||||
} /*else {
|
||||
ip = ResourceIdUtils.getIp(prevPrev);
|
||||
if (ip == null) {
|
||||
something4(resourceConfigs, null, rateLimitConfig.ip, null);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,12 +58,12 @@ public class FlowStat {
|
||||
/**
|
||||
* A string Resource ID as key
|
||||
*/
|
||||
public ConcurrentMap<String, ResourceStat> resourceStats = new ConcurrentHashMap<>(100);
|
||||
public ConcurrentMap<String, ResourceStat> resourceStats = new ConcurrentHashMap<>(256);
|
||||
|
||||
/**
|
||||
* Retention time of statistic data
|
||||
*/
|
||||
public static long RETENTION_TIME_IN_MINUTES = 10;
|
||||
public static long RETENTION_TIME_IN_MINUTES = 5;
|
||||
|
||||
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
private Lock w = rwl.writeLock();
|
||||
@@ -564,15 +564,26 @@ public class FlowStat {
|
||||
long n = FlowStat.RETENTION_TIME_IN_MINUTES * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL;
|
||||
long lastSlotId = stat.currentTimeSlotId() - n;
|
||||
while (true) {
|
||||
// log.debug("housekeeping start");
|
||||
long slotId = stat.currentTimeSlotId() - n;
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("{} - {} resource stats size {}", lastSlotId, slotId, stat.resourceStats.size());
|
||||
}
|
||||
Set<Map.Entry<String, ResourceStat>> es = stat.resourceStats.entrySet();
|
||||
for (Entry<String, ResourceStat> e : es) {
|
||||
String resourceId = e.getKey();
|
||||
ConcurrentMap<Long, TimeSlot> timeSlots = e.getValue().getTimeSlots();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("{} - {} {} has {} timeslot", lastSlotId, slotId, resourceId, timeSlots.size());
|
||||
}
|
||||
}
|
||||
for (long i = lastSlotId; i < slotId;) {
|
||||
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
|
||||
for (Entry<String, ResourceStat> entry : entrys) {
|
||||
String resourceId = entry.getKey();
|
||||
ConcurrentMap<Long, TimeSlot> timeSlots = entry.getValue().getTimeSlots();
|
||||
// log.debug("housekeeping remove slot: resourceId={} slotId=={}", resourceId,
|
||||
// i);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("{} - {} {} remove {} timeslot", lastSlotId, slotId, resourceId, i);
|
||||
}
|
||||
timeSlots.remove(i);
|
||||
}
|
||||
i = i + FlowStat.INTERVAL;
|
||||
@@ -580,9 +591,9 @@ public class FlowStat {
|
||||
lastSlotId = slotId;
|
||||
// log.debug("housekeeping done");
|
||||
try {
|
||||
Thread.sleep(60 * 1000);
|
||||
Thread.sleep(10 * 1000);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
log.error("HouseKeepJob error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -623,7 +634,7 @@ public class FlowStat {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
log.error("PeakConcurrentJob error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ public class ResourceStat {
|
||||
/**
|
||||
* Request count of time slot, the beginning timestamp(timeId) as key
|
||||
*/
|
||||
private ConcurrentMap<Long, TimeSlot> timeSlots = new ConcurrentHashMap<>(100);
|
||||
private ConcurrentMap<Long, TimeSlot> timeSlots = new ConcurrentHashMap<>(256);
|
||||
|
||||
/**
|
||||
* Concurrent requests
|
||||
|
||||
Reference in New Issue
Block a user