From 5f69deb1bda1c033eda04263dafb9efd8dd2b573 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Thu, 7 Jan 2021 15:32:28 +0800 Subject: [PATCH] update: flow control tests --- .../ratelimit/ResourceRateLimitConfig.java | 1 + .../ResourceRateLimitConfigService.java | 2 +- .../we/stats/ratelimit/RateLimitTests.java | 124 ++++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 src/test/java/we/stats/ratelimit/RateLimitTests.java diff --git a/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java index 90ff2ea..596538a 100644 --- a/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java +++ b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfig.java @@ -14,6 +14,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ + package we.stats.ratelimit; import we.util.JacksonUtils; diff --git a/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java index e9a861d..9d6f608 100644 --- a/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java +++ b/src/main/java/we/stats/ratelimit/ResourceRateLimitConfigService.java @@ -14,6 +14,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ + package we.stats.ratelimit; import org.slf4j.Logger; @@ -24,7 +25,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.AggregateRedisConfig; import we.flume.clients.log4j2appender.LogService; -import we.plugin.auth.App; import we.util.JacksonUtils; import we.util.ReactorUtils; diff --git a/src/test/java/we/stats/ratelimit/RateLimitTests.java b/src/test/java/we/stats/ratelimit/RateLimitTests.java new file mode 100644 index 0000000..721f136 --- /dev/null +++ b/src/test/java/we/stats/ratelimit/RateLimitTests.java @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2021 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.stats.ratelimit; + +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import org.junit.jupiter.api.Test; +import org.springframework.http.HttpMethod; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.client.reactive.ReactorResourceFactory; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; +import we.stats.FlowStat; +import we.stats.ResourceTimeWindowStat; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author hongqiaowei + */ + +public class RateLimitTests { + + private FlowStat stat = new FlowStat(); + + private ConnectionProvider getConnectionProvider() { + return ConnectionProvider + .builder("flow-control-cp") + .maxConnections(100) + .pendingAcquireTimeout(Duration.ofMillis(6_000)) + .maxIdleTime(Duration.ofMillis(40_000)) + .build(); + } + + private LoopResources getLoopResources() { + LoopResources lr = LoopResources.create("flow-control-el", Runtime.getRuntime().availableProcessors(), true); + lr.onServer(false); + return lr; + } + + private ReactorResourceFactory reactorResourceFactory() { + ReactorResourceFactory fact = new ReactorResourceFactory(); + fact.setUseGlobalResources(false); + fact.setConnectionProvider(getConnectionProvider()); + fact.setLoopResources(getLoopResources()); + fact.afterPropertiesSet(); + return fact; + } + + private WebClient getWebClient() { + ConnectionProvider cp = getConnectionProvider(); + LoopResources lr = getLoopResources(); + HttpClient httpClient = HttpClient.create(cp).compress(false).tcpConfiguration( + tcpClient -> { + return tcpClient.runOn(lr, false) + .doOnConnected( + connection -> { + connection.addHandlerLast(new ReadTimeoutHandler( 20_000, TimeUnit.MILLISECONDS)) + .addHandlerLast(new WriteTimeoutHandler( 20_000, TimeUnit.MILLISECONDS)); + } + ) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 20_000) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); + } + ); + return WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build()) + .clientConnector(new ReactorClientHttpConnector(httpClient)).build(); + } + + @Test + public void flowControlTests() { + WebClient webClient = getWebClient(); + for (int i = 0; i < 10; i++) { + webClient + .method(HttpMethod.GET) + .uri("") + .headers(hdrs -> {}) + .body(Mono.just(""), String.class) + .exchange().name("") + .doOnRequest(l -> {}) + .doOnSuccess(r -> {}) + .doOnError(t -> { + t.printStackTrace(); + }) + .timeout(Duration.ofMillis(6_000)) + .flatMap( + remoteResp -> { + remoteResp.bodyToMono(String.class) + .doOnSuccess( + s -> { + System.out.println(s); + } + ); + return Mono.empty(); + } + ); + } + } +}