Merge branch 'feature/flow'

This commit is contained in:
Francis Dong
2021-01-08 18:19:01 +08:00
6 changed files with 42 additions and 21 deletions

View File

@@ -117,7 +117,7 @@ public class FlowStatSchedConfig extends SchedConfig {
}
List<ResourceTimeWindowStat> resourceTimeWindowStats = flowStat.getResourceTimeWindowStats(null, startTimeSlot, currentTimeSlot);
if (resourceTimeWindowStats == null || resourceTimeWindowStats.isEmpty()) {
log.info(DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19) + " -> " + DateTimeUtils.toDate(currentTimeSlot, Constants.DatetimePattern.DP19) + " no flow stat data");
log.info(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " no flow stat data");
startTimeSlot = currentTimeSlot;
return;
}
@@ -141,19 +141,8 @@ public class FlowStatSchedConfig extends SchedConfig {
for (; current < toBeCollectedWins.size(); ) {
TimeWindowStat win = toBeCollectedWins.get(current);
Long timeSlot = win.getStartTime();
LocalDateTime dt = null;
try {
dt = DateTimeUtils.from(timeSlot);
} catch (NullPointerException n) {
System.err.println("resource: " + resource);
System.err.println("toBeCollectedWins: " + JacksonUtils.writeValueAsString(toBeCollectedWins));
System.err.println("current: " + current);
System.err.println("win: " + JacksonUtils.writeValueAsString(win));
System.err.println("timeSlot: " + timeSlot);
}
if (dt.getSecond() % 10 == 9) {
int second = DateTimeUtils.from(timeSlot).getSecond();
if (second % 10 == 9) {
int from = current - 9;
if (from > 0) {
ArrayList<TimeWindowStat> cws = ThreadContext.getArrayList(collectedWins, TimeWindowStat.class, true);
@@ -187,6 +176,10 @@ public class FlowStatSchedConfig extends SchedConfig {
startTimeSlot = currentTimeSlot;
}
private String toDP19(long startTimeSlot) {
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
}
private void calcAndRpt(String resource, List<TimeWindowStat> cws) {
String ip = NetworkUtils.getServerIp();
int id = resourceRateLimitConfigService.getResourceRateLimitConfig(resource).id;
@@ -255,10 +248,14 @@ public class FlowStatSchedConfig extends SchedConfig {
b.append(_minRespTime); b.append(minRespTime);
b.append(Constants.Symbol.RIGHT_BRACE);
String msg = b.toString();
if ("kafka".equals(dest)) {
log.info(b.toString(), LogService.HANDLE_STGY, LogService.toKF(queue));
log.info(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
} else {
rt.convertAndSend(queue, b.toString()).subscribe();
rt.convertAndSend(queue, msg).subscribe();
}
if (log.isDebugEnabled()) {
log.debug(toDP19(start) + " rpt " + msg);
}
}

View File

@@ -29,10 +29,14 @@ import reactor.core.publisher.Mono;
import we.filter.FlowControlFilter;
import we.stats.FlowStat;
import we.stats.ResourceTimeWindowStat;
import we.stats.TimeWindowStat;
import we.stats.ratelimit.ResourceRateLimitConfig;
import we.util.Constants;
import we.util.DateTimeUtils;
import we.util.JacksonUtils;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -62,14 +66,24 @@ public class FlowControlController {
try {
FlowStat flowStat = flowControlFilter.getFlowStat();
long currentTimeSlot = flowStat.currentTimeSlotId();
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, currentTimeSlot - recent * 1000, currentTimeSlot, recent);
long startTimeSlot = currentTimeSlot - recent * 1000;
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, startTimeSlot, currentTimeSlot, recent);
if (wins == null || wins.isEmpty()) {
result.put("rps", 0);
} else {
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
result.put("concurrents", concurrents);
rps = wins.get(0).getWindows().get(0).getRps().doubleValue();
TimeWindowStat timeWindowStat = wins.get(0).getWindows().get(0);
BigDecimal winrps = timeWindowStat.getRps();
if (winrps == null) {
rps = 0;
} else {
rps = winrps.doubleValue();
}
result.put("rps", rps);
// if (log.isDebugEnabled()) {
// log.debug(toDP19(startTimeSlot) + " - " + toDP19(currentTimeSlot) + " global completes " + timeWindowStat.getCompReqs() + " concurrents " + concurrents + " rps " + rps);
// }
}
} catch (Throwable t) {
@@ -77,4 +91,8 @@ public class FlowControlController {
}
return Mono.just(JacksonUtils.writeValueAsString(result));
}
private String toDP19(long startTimeSlot) {
return DateTimeUtils.toDate(startTimeSlot, Constants.DatetimePattern.DP19);
}
}

View File

@@ -102,7 +102,7 @@ public class FlowControlFilter extends ProxyAggrFilter {
}
if (log.isDebugEnabled()) {
log.debug(WebUtils.getClientReqPath(exchange) + " apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId());
log.debug(WebUtils.getClientReqPath(exchange) + " already apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId());
}
if (concurrentOrRpsExceed) {
@@ -143,7 +143,7 @@ public class FlowControlFilter extends ProxyAggrFilter {
flowStat.decrConcurrentRequest(globalConfig.resource, currentTimeSlot);
flowStat.addRequestRT(globalConfig.resource, currentTimeSlot, spend, success);
}
if (globalConfig != apiOrServiceConfig) {
if (apiOrServiceConfig.type != ResourceRateLimitConfig.Type.GLOBAL && apiOrServiceConfig.type != ResourceRateLimitConfig.Type.SERVICE_DEFAULT) {
flowStat.decrConcurrentRequest(apiOrServiceConfig.resource, currentTimeSlot);
flowStat.addRequestRT(apiOrServiceConfig.resource, currentTimeSlot, spend, success);
}

View File

@@ -113,6 +113,9 @@ public class FlowStat {
if (success) {
success = resourceStat.incrRequestToTimeSlot(curTimeSlotId, maxRPS);
}
if (log.isDebugEnabled()) {
log.debug(resourceId + " incr req for current time slot " + curTimeSlotId + " with max con " + maxCon + " and max rps " + maxRPS);
}
return success;
}

View File

@@ -181,6 +181,9 @@ public class ResourceStat {
public TimeWindowStat getTimeWindowStat(long startSlotId, long endSlotId) {
TimeWindowStat tws = new TimeWindowStat();
tws.setStartTime(startSlotId);
tws.setEndTime(endSlotId);
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
long totalReqs = 0;

View File

@@ -204,7 +204,7 @@ public class FlowStatTests {
}
}
System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats));
//System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats));
List<ResourceTimeWindowStat> list = stat.getResourceTimeWindowStats("resource-" + 1, start, end + 3 * 1000,
10);