update: for test and bak
This commit is contained in:
@@ -36,13 +36,11 @@ import we.stats.ResourceTimeWindowStat;
|
||||
import we.stats.TimeWindowStat;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfig;
|
||||
import we.stats.ratelimit.ResourceRateLimitConfigService;
|
||||
import we.util.Constants;
|
||||
import we.util.DateTimeUtils;
|
||||
import we.util.NetworkUtils;
|
||||
import we.util.ThreadContext;
|
||||
import we.util.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.math.BigDecimal;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -143,7 +141,19 @@ public class FlowStatSchedConfig extends SchedConfig {
|
||||
for (; current < toBeCollectedWins.size(); ) {
|
||||
TimeWindowStat win = toBeCollectedWins.get(current);
|
||||
Long timeSlot = win.getStartTime();
|
||||
if (DateTimeUtils.from(timeSlot).getSecond() % 10 == 9) {
|
||||
|
||||
LocalDateTime dt = null;
|
||||
try {
|
||||
dt = DateTimeUtils.from(timeSlot);
|
||||
} catch (NullPointerException n) {
|
||||
System.err.println("resource: " + resource);
|
||||
System.err.println("toBeCollectedWins: " + JacksonUtils.writeValueAsString(toBeCollectedWins));
|
||||
System.err.println("current: " + current);
|
||||
System.err.println("win: " + JacksonUtils.writeValueAsString(win));
|
||||
System.err.println("timeSlot: " + timeSlot);
|
||||
}
|
||||
|
||||
if (dt.getSecond() % 10 == 9) {
|
||||
int from = current - 9;
|
||||
if (from > 0) {
|
||||
ArrayList<TimeWindowStat> cws = ThreadContext.getArrayList(collectedWins, TimeWindowStat.class, true);
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.config;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@@ -61,13 +61,16 @@ public class FlowControlController {
|
||||
|
||||
try {
|
||||
FlowStat flowStat = flowControlFilter.getFlowStat();
|
||||
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
|
||||
result.put("concurrents", concurrents);
|
||||
|
||||
long currentTimeSlot = flowStat.currentTimeSlotId();
|
||||
List<ResourceTimeWindowStat> wins = flowStat.getResourceTimeWindowStats(ResourceRateLimitConfig.GLOBAL, currentTimeSlot - recent * 1000, currentTimeSlot, recent);
|
||||
rps = wins.get(0).getWindows().get(0).getRps().doubleValue();
|
||||
result.put("rps", rps);
|
||||
if (wins == null || wins.isEmpty()) {
|
||||
result.put("rps", 0);
|
||||
} else {
|
||||
concurrents = flowStat.getConcurrentRequests(ResourceRateLimitConfig.GLOBAL);
|
||||
result.put("concurrents", concurrents);
|
||||
rps = wins.get(0).getWindows().get(0).getRps().doubleValue();
|
||||
result.put("rps", rps);
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
log.error("get current global concurrents and rps error", t);
|
||||
|
||||
@@ -101,6 +101,10 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(WebUtils.getClientReqPath(exchange) + " apply rate limit rule: " + rlc, LogService.BIZ_ID, exchange.getRequest().getId());
|
||||
}
|
||||
|
||||
if (concurrentOrRpsExceed) {
|
||||
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
@@ -124,7 +128,7 @@ public class FlowControlFilter extends ProxyAggrFilter {
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
throwable -> {
|
||||
t -> {
|
||||
inTheEnd(start, globalConfig, rlcCopy, currentTimeSlot, false);
|
||||
}
|
||||
);
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebFilterChain;
|
||||
@@ -36,6 +37,7 @@ import we.plugin.auth.ApiConfig;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.AuthPluginFilter;
|
||||
import we.plugin.stat.StatPluginFilter;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ReactorUtils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
@@ -74,6 +76,12 @@ public class PreFilter extends ProxyAggrFilter {
|
||||
@Override
|
||||
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
|
||||
|
||||
String clientReqPath = WebUtils.getClientReqPath(exchange);
|
||||
if ("/flowControl/mock".equals(clientReqPath)) {
|
||||
ServerHttpResponse resp = exchange.getResponse();
|
||||
return resp.writeWith(Mono.just(resp.bufferFactory().wrap("ok".getBytes())));
|
||||
}
|
||||
|
||||
Map<String, FilterResult> fc = new HashMap<>(6, 1.0f); fc.put(WebUtils.PREV_FILTER_RESULT, succFr);
|
||||
Map<String, String> appendHdrs = new HashMap<>(6, 1.0f);
|
||||
Map<String, Object> eas = exchange.getAttributes(); eas.put(WebUtils.FILTER_CONTEXT, fc);
|
||||
|
||||
@@ -214,7 +214,7 @@ public class FlowStat {
|
||||
* window [startTimeMilli, endTimeMilli)
|
||||
*
|
||||
* @param startTimeMilli included
|
||||
* @param endTimemilli excluded
|
||||
* @param endTimeMilli excluded
|
||||
* @return
|
||||
*/
|
||||
public TimeWindowStat getTimeWindowStat(String resourceId, long startTimeMilli, long endTimeMilli) {
|
||||
@@ -280,7 +280,7 @@ public class FlowStat {
|
||||
String rid = entry.getKey();
|
||||
ResourceTimeWindowStat resourceWin = new ResourceTimeWindowStat(rid);
|
||||
for (long i = startSlotId; i < endSlotId;) {
|
||||
TimeWindowStat tws = getTimeWindowStat(resourceId, startSlotId, endSlotId);
|
||||
TimeWindowStat tws = getTimeWindowStat(rid, startSlotId, endSlotId);
|
||||
if (tws != null) {
|
||||
resourceWin.getWindows().add(tws);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user