diff --git a/src/main/java/we/config/FlowStatSchedConfig.java b/src/main/java/we/config/FlowStatSchedConfig.java index bb28fc5..8a260a3 100644 --- a/src/main/java/we/config/FlowStatSchedConfig.java +++ b/src/main/java/we/config/FlowStatSchedConfig.java @@ -36,13 +36,11 @@ import we.stats.ResourceTimeWindowStat; import we.stats.TimeWindowStat; import we.stats.ratelimit.ResourceRateLimitConfig; import we.stats.ratelimit.ResourceRateLimitConfigService; -import we.util.Constants; -import we.util.DateTimeUtils; -import we.util.NetworkUtils; -import we.util.ThreadContext; +import we.util.*; import javax.annotation.Resource; import java.math.BigDecimal; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -143,7 +141,19 @@ public class FlowStatSchedConfig extends SchedConfig { for (; current < toBeCollectedWins.size(); ) { TimeWindowStat win = toBeCollectedWins.get(current); Long timeSlot = win.getStartTime(); - if (DateTimeUtils.from(timeSlot).getSecond() % 10 == 9) { + + LocalDateTime dt = null; + try { + dt = DateTimeUtils.from(timeSlot); + } catch (NullPointerException n) { + System.err.println("resource: " + resource); + System.err.println("toBeCollectedWins: " + JacksonUtils.writeValueAsString(toBeCollectedWins)); + System.err.println("current: " + current); + System.err.println("win: " + JacksonUtils.writeValueAsString(win)); + System.err.println("timeSlot: " + timeSlot); + } + + if (dt.getSecond() % 10 == 9) { int from = current - 9; if (from > 0) { ArrayList cws = ThreadContext.getArrayList(collectedWins, TimeWindowStat.class, true); diff --git a/src/main/java/we/config/SchedConfig.java b/src/main/java/we/config/SchedConfig.java index e7beca8..49b310a 100644 --- a/src/main/java/we/config/SchedConfig.java +++ b/src/main/java/we/config/SchedConfig.java @@ -14,6 +14,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ + package we.config; import java.util.Date; diff --git a/src/main/java/we/controller/FlowControlController.java b/src/main/java/we/controller/FlowControlController.java index 653b5d7..3bb6bb5 100644 --- a/src/main/java/we/controller/FlowControlController.java +++ b/src/main/java/we/controller/FlowControlController.java @@ -61,13 +61,16 @@ public class FlowControlController { try { FlowStat flowStat = flowControlFilter.getFlowStat(); - concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL); - result.put("concurrents", concurrents); - long currentTimeSlot = flowStat.currentTimeSlotId(); List wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, currentTimeSlot - recent * 1000, currentTimeSlot, recent); - rps = wins.get(0).getWindows().get(0).getRps().doubleValue(); - result.put("rps", rps); + if (wins == null || wins.isEmpty()) { + result.put("rps", 0); + } else { + concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL); + result.put("concurrents", concurrents); + rps = wins.get(0).getWindows().get(0).getRps().doubleValue(); + result.put("rps", rps); + } } catch (Throwable t) { log.error("get current global concurrents and rps error", t); diff --git a/src/main/java/we/filter/FlowControlFilter.java b/src/main/java/we/filter/FlowControlFilter.java index 9579268..85da3af 100644 --- a/src/main/java/we/filter/FlowControlFilter.java +++ b/src/main/java/we/filter/FlowControlFilter.java @@ -101,6 +101,10 @@ public class FlowControlFilter extends ProxyAggrFilter { } } + if (log.isDebugEnabled()) { + log.debug(WebUtils.getClientReqPath(exchange) + " apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId()); + } + if (concurrentOrRpsExceed) { StringBuilder b = ThreadContext.getStringBuilder(); @@ -124,7 +128,7 @@ public class FlowControlFilter extends ProxyAggrFilter { } ) .doOnError( - throwable -> { + t -> { inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, false); } ); diff --git a/src/main/java/we/filter/PreFilter.java b/src/main/java/we/filter/PreFilter.java index 423d138..a6aef4d 100644 --- a/src/main/java/we/filter/PreFilter.java +++ b/src/main/java/we/filter/PreFilter.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.core.annotation.Order; import org.springframework.http.HttpStatus; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; @@ -36,6 +37,7 @@ import we.plugin.auth.ApiConfig; import we.plugin.auth.ApiConfigService; import we.plugin.auth.AuthPluginFilter; import we.plugin.stat.StatPluginFilter; +import we.util.JacksonUtils; import we.util.ReactorUtils; import we.util.WebUtils; @@ -74,6 +76,12 @@ public class PreFilter extends ProxyAggrFilter { @Override public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { + String clientReqPath = WebUtils.getClientReqPath(exchange); + if ("/flowControl/mock".equals(clientReqPath)) { + ServerHttpResponse resp = exchange.getResponse(); + return resp.writeWith(Mono.just(resp.bufferFactory().wrap("ok".getBytes()))); + } + Map fc = new HashMap<>(6, 1.0f); fc.put(WebUtils.PREV_FILTER_RESULT, succFr); Map appendHdrs = new HashMap<>(6, 1.0f); Map eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc); diff --git a/src/main/java/we/stats/FlowStat.java b/src/main/java/we/stats/FlowStat.java index e7c9289..f2be710 100644 --- a/src/main/java/we/stats/FlowStat.java +++ b/src/main/java/we/stats/FlowStat.java @@ -214,7 +214,7 @@ public class FlowStat { * window [startTimeMilli, endTimeMilli) * * @param startTimeMilli included - * @param endTimemilli excluded + * @param endTimeMilli excluded * @return */ public TimeWindowStat getTimeWindowStat(String resourceId, long startTimeMilli, long endTimeMilli) { @@ -280,7 +280,7 @@ public class FlowStat { String rid = entry.getKey(); ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(rid); for (long i = startSlotId; i < endSlotId;) { - TimeWindowStat tws = getTimeWindowStat(resourceId, startSlotId, endSlotId); + TimeWindowStat tws = getTimeWindowStat(rid, startSlotId, endSlotId); if (tws != null) { resourceWin.getWindows().add(tws); }